In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import*

In [0]:
import os
import sys

project_path= os.path.join(os.getcwd(), "..","..")
sys.path.append(project_path)

In [0]:
from utils.transformations import reusable_function_name

##DIMUSER


In [0]:
df_user= spark.readStream.format("cloudFiles")\
                     .option("cloudFiles.format","parquet")\
                     .option("cloudFiles.schemaLocation", "abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimUser/Checkpoint")\
                     .load("abfss://bronze@spotifystoragejithu.dfs.core.windows.net/DimUser")

In [0]:
display(
  df_user,
  checkpointLocation="abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimUser/chk_displays"
)


In [0]:
df_user= df_user.withColumn("user_name",upper(col("user_name")))


In [0]:

# Create an empty temp view first (so display won't fail)
df_user.limit(0).createOrReplaceTempView("dimuser_previews")

def preview_batch(batch_df, batch_id):
    batch_df.limit(50).createOrReplaceTempView("dimuser_previews")

q = (df_user.writeStream
     .foreachBatch(preview_batch)
     .outputMode("append")
     .option("checkpointLocation",
             "abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimUser/_chk_preview_dev")
     .trigger(availableNow=True)
     .start())

q.awaitTermination()

display(spark.sql("SELECT * FROM dimuser_previews"), checkpointLocation = "abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimUser/_chk_display")


In [0]:
df_user_obj= reusable_function_name()
df_user=df_user_obj.dropColumns(df_user,['_rescued_data'])

In [0]:
def preview_batch(batch_df, batch_id):
    batch_df.limit(50).createOrReplaceTempView("df_user_preview")

q = (
    df_user.writeStream
    .foreachBatch(preview_batch)
    .outputMode("append")
    .option(
        "checkpointLocation",
        "abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimUser/_chk_preview_dev"
    )
    .trigger(availableNow=True)
    .start()
)

q.awaitTermination()



In [0]:
df_user.writeStream.format("delta")\
                  .outputMode("append")\
                  .option("checkpointLocation","abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimUser/checkpoint")\
                  .trigger(once=True)\
                  .option("path","abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimUser/data_v2")\
                  .toTable("spotifycatalog.silver.DimUser")

In [0]:
df_read = spark.read.format("delta") \
    .load("abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimUser/data_v2")

display(df_read)

##DimArtist

###AutoLoader

In [0]:
df_artist= spark.readStream.format("cloudFiles")\
                     .option("cloudFiles.format","parquet")\
                     .option("cloudFiles.schemaLocation", "abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimArtist/Checkpoint")\
                     .load("abfss://bronze@spotifystoragejithu.dfs.core.windows.net/DimArtist")

In [0]:
display(df_artist,
        checkpointLocation="abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimArtist/check_display")

In [0]:
df_artistobj= reusable_function_name()
df_artist=df_artistobj.dropColumns(df_artist,['_rescued_data'])
df_artist=df_artist.dropDuplicates(['artist_id'])



In [0]:
df_artist.writeStream.format("delta")\
                  .outputMode("append")\
                  .option("checkpointLocation","abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimArtist/checkpoint")\
                  .trigger(once=True)\
                  .option("path","abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimArtist/data_v2")\
                  .toTable("spotifycatalog.silver.DimArtist")


In [0]:
df_artist_read = spark.read.format("delta") \
    .load("abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimArtist/data_v2")
display(df_artist_read)

##DimTrack


In [0]:
dim_track= spark.readStream.format("cloudFiles")\
                     .option("cloudFiles.format","parquet")\
                     .option("cloudFiles.schemaLocation", "abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimTrack/Checkpoint")\
                     .load("abfss://bronze@spotifystoragejithu.dfs.core.windows.net/DimTrack")

In [0]:
display(dim_track,
        checkpointLocation="abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimTrack/checkdisplay")


In [0]:
df_trackobj = reusable_function_name()
df_track=df_trackobj.dropColumns(dim_track,['_rescued_data'])
df_track=df_track.dropDuplicates(['track_id'])

df_track= df_track.withColumn("durationFlag",when(col("duration_sec")<150,"Short")\
                                            .when(col("duration_sec")<300,"Medium")\
                                            .otherwise("High"))
df_track=df_track.withColumn("track_name",regexp_replace(col("track_name"),"-"," "))

In [0]:
df_track.writeStream.format("delta")\
                    .outputMode("append")\
                    .option("checkpointLocation","abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimTrack/Checkpoint")\
                    .trigger(once=True)\
                    .option("path","abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimTrack/data_v2")\
                    .toTable("spotifycatalog.silver.DimTrack")

##DimDate

In [0]:
dim_date=spark.readStream.format("cloudFiles")\
                     .option("cloudFiles.format","parquet")\
                     .option("cloudFiles.schemaLocation", "abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimDate/Checkpoint")\
                     .load("abfss://bronze@spotifystoragejithu.dfs.core.windows.net/DimDate")
                     

In [0]:
display(dim_date,
        checkpointLocation="abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimDate/checkdisplay")

In [0]:
df_dateobj = reusable_function_name()
df_date=df_dateobj.dropColumns(dim_date,['_rescued_data'])


In [0]:
df_date.writeStream.format("delta")\
                    .outputMode("append")\
                    .option("checkpointLocation","abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimDate/Checkpoint")\
                    .trigger(once=True)\
                    .option("path","abfss://silver@spotifystoragejithu.dfs.core.windows.net/DimDate/data_v2")\
                    .toTable("spotifycatalog.silver.DimDate")

##FactStream

In [0]:
df_fact=spark.readStream.format("cloudFiles")\
                     .option("cloudFiles.format","parquet")\
                     .option("cloudFiles.schemaLocation", "abfss://silver@spotifystoragejithu.dfs.core.windows.net/FactStream/Checkpoint")\
                     .load("abfss://bronze@spotifystoragejithu.dfs.core.windows.net/FactStream")

In [0]:
display(df_fact,
        checkpointLocation="abfss://silver@spotifystoragejithu.dfs.core.windows.net/FactStream/checkdisplay")

In [0]:
df_factobj = reusable_function_name()
df_fact=df_factobj.dropColumns(df_fact,['_rescued_data'])


In [0]:
df_fact.writeStream.format("delta")\
                    . outputMode("append")\
                    .option("checkpointLocation","abfss://silver@spotifystoragejithu.dfs.core.windows.net/FactStream/Checkpoint")\
                    .trigger(once=True)\
                    .option("path","abfss://silver@spotifystoragejithu.dfs.core.windows.net/FactStream/data_v2")\
                    .toTable("spotifycatalog.silver.FactStream")