In [0]:
%load_ext autoreload
%autoreload 2

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Add root directory of this project e.g. spotifyaz_dab is added in the system path to get rid of ModuelNotFoundError

import os, sys
project_path = os.path.join(os.getcwd(),'..','..')

# Add the project path to the system path
sys.path.append(project_path)

from utils.transformations import reusable

print(project_path)

### **Dim Users**

In [0]:
# df = spark.read.format("parquet") \
#                 .load("abfss://bronze@azureprojectstorageaccnt.dfs.core.windows.net/DimUser")

In [0]:
# display(df)

### **AUTOLOADER**

In [0]:
df_user = spark.readStream.format("cloudFiles") \
            .option("cloudFiles.format", "parquet") \
            .option("cloudFiles.schemaLocation","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/DimUser/checkpoint") \
            .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
            .load("abfss://bronze@azureprojectstorageaccnt.dfs.core.windows.net/DimUser")

In [0]:
display(df_user)

In [0]:
df_user_obj = reusable()

df_user = df_user_obj.dropColumns(df_user, ["_rescued_data"])
display(df_user)

### **Tip: Incase list of columns are given as string format then convert them first into list using eval function**

col_lst = "['name','age']" <br>
print(eval(col_lst))
print(type(eval(col_lst)))

In [0]:
# Drop duplicates
df_user = df_user.dropDuplicates(['user_id'])
display(df_user)

In [0]:
# Write to silver layer table

df_user.writeStream.format("delta") \
            .outputMode("append") \
            .option("checkpointLocation","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/DimUser/checkpoint") \
            .trigger(once=True) \
            .option("path","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/DimUser/data") \
            .toTable("spotify_cata.silver.DimUser")


## **DimArtist**

In [0]:
# Read the DimArtist data using Autoloader

df_artist = spark.readStream.format("cloudFiles") \
            .option("cloudFiles.format", "parquet") \
            .option("cloudFiles.schemaLocation","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/DimArtist/checkpoint") \
            .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
            .load("abfss://bronze@azureprojectstorageaccnt.dfs.core.windows.net/DimArtist")

In [0]:
display(df_artist)

In [0]:
df_art_obj = reusable()

df_artist = df_art_obj.dropColumns(df_artist, ["_rescued_data"])
df_artist = df_artist.dropDuplicates(['artist_id'])
display(df_artist)

In [0]:
# Write dim_artist dataframe to silver container in Adls Gen2

df_artist.writeStream.format("delta") \
            .outputMode("append") \
            .option("checkpointLocation","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/DimArtist/checkpoint") \
            .trigger(once=True) \
            .option("path","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/DimArtist/data") \
            .toTable("spotify_cata.silver.DimArtist")


## **DimTrack**

In [0]:
# Read the DimTrack data from bronze container using Autoloader

df_track = spark.readStream.format("cloudFiles") \
            .option("cloudFiles.format", "parquet") \
            .option("cloudFiles.schemaLocation","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/DimTrack/checkpoint") \
            .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
            .load("abfss://bronze@azureprojectstorageaccnt.dfs.core.windows.net/DimTrack")

In [0]:
display(df_track)

In [0]:
# Business transformation-1: If track duration is less than 150 seconds, flag it as low, between 150 and 300 seconds, flag it as medium, and above 300 seconds, flag it as high

df_track = df_track.withColumn("durationFlag",when(col("duration_sec")<150, "low") \
                                              .when(col("duration_sec")<300, "medium") \
                                              .otherwise("high"))
display(df_track)                                              


In [0]:
# Business transformation-2: Search engine of your native application doesn't support any types of special characters ex- '-' in track_name so you need to replace this hyphen with space.

df_track = df_track.withColumn("track_name", regexp_replace(col('track_name'), '-', ' '))

df_track_obj = reusable()
df_track = df_track_obj.dropColumns(df_track, ["_rescued_data"])

display(df_track)

In [0]:
# Write dim_track dataframe to silver container in Adls Gen2

df_track.writeStream.format("delta") \
            .outputMode("append") \
            .option("checkpointLocation","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/DimTrack/checkpoint") \
            .trigger(once=True) \
            .option("path","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/DimTrack/data") \
            .toTable("spotify_cata.silver.DimTrack")


## **DimDate**

In [0]:
# Read the DimDate data from bronze container using Autoloader

df_date = spark.readStream.format("cloudFiles") \
            .option("cloudFiles.format", "parquet") \
            .option("cloudFiles.schemaLocation","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/DimDate/checkpoint") \
            .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
            .load("abfss://bronze@azureprojectstorageaccnt.dfs.core.windows.net/DimDate")

In [0]:
df_date = reusable().dropColumns(df_date, ["_rescued_data"])
display(df_date)

In [0]:
# Write dim_date dataframe to silver container in Adls Gen2

df_date.writeStream.format("delta") \
            .outputMode("append") \
            .option("checkpointLocation","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/DimDate/checkpoint") \
            .trigger(once=True) \
            .option("path","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/DimDate/data") \
            .toTable("spotify_cata.silver.DimDate")


## **FactStream**

In [0]:
# Read the FactStream data from Bronze container using Autoloader

df_fact = spark.readStream.format("cloudFiles") \
            .option("cloudFiles.format", "parquet") \
            .option("cloudFiles.schemaLocation","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/FactStream/checkpoint") \
            .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
            .load("abfss://bronze@azureprojectstorageaccnt.dfs.core.windows.net/FactStream")

In [0]:
display(df_fact)

In [0]:
df_fact = reusable().dropColumns(df_fact, ["_rescued_data"])
display(df_fact)

In [0]:
# Write df_fact dataframe to silver container in Adls Gen2

df_fact.writeStream.format("delta") \
            .outputMode("append") \
            .option("checkpointLocation","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/FactStream/checkpoint") \
            .trigger(once=True) \
            .option("path","abfss://silver@azureprojectstorageaccnt.dfs.core.windows.net/FactStream/data") \
            .toTable("spotify_cata.silver.FactStream")
