In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
import os
import sys

project_pth = os.path.join(os.getcwd(), '..', '..')

sys.path.append(project_pth) 

from utils.transformations import reusable


# **AUTO LOADER**

## **DimUser**

In [0]:
df_user = spark.readStream.format("cloudfiles")\
    .option("cloudFiles.format", "parquet")\
    .option("cloudFiles.schemalocation","abfss://silver@spotifydlstorage.dfs.core.windows.net/DimUser/checkpoint")\
    .option("cloudFiles.rescuedDataColumn", "rescued_data")\
    .load("abfss://bronze@spotifydlstorage.dfs.core.windows.net/DimUser")

In [0]:
df_user = df_user.withColumn("user_name",upper(col("user_name")))

In [0]:
df_user_obj = reusable()

df_user = df_user_obj.dropColumns(df_user, ['rescued_data'])
df_user = df_user.dropDuplicates(['user_id'])

In [0]:
df_user.writeStream.format("delta")\
         .outputMode("append")\
         .option("checkpointLocation", "abfss://silver@spotifydlstorage.dfs.core.windows.net/DimUser/checkpoint")\
         .trigger(once=True)\
         .option("path", "abfss://silver@spotifydlstorage.dfs.core.windows.net/DimUser/data")\
         .toTable('spotify_cata.silver.DimUser')

## **DimArtist**

In [0]:
df_artist = spark.readStream.format("cloudfiles")\
             .option("cloudFiles.format", "parquet")\
             .option("cloudFiles.schemalocation","abfss://silver@spotifydlstorage.dfs.core.windows.net/DimArtist/checkpoint")\
             .option("cloudFiles.rescuedDataColumn", "rescued_data")\
             .load("abfss://bronze@spotifydlstorage.dfs.core.windows.net/DimArtist")

In [0]:
df_artist_obj = reusable()

df_artist = df_artist_obj.dropColumns(df_artist, ['rescued_data'])
df_artist = df_artist.dropDuplicates(['artist_id'])


In [0]:
df_artist.writeStream.format("delta")\
         .outputMode("append")\
         .option("checkpointLocation", "abfss://silver@spotifydlstorage.dfs.core.windows.net/DimArtist/checkpoint")\
         .trigger(once=True)\
         .option("path", "abfss://silver@spotifydlstorage.dfs.core.windows.net/DimArtist/data")\
         .toTable('spotify_cata.silver.DimArtist')

## **DimTracks**

In [0]:
df_tracks = spark.readStream.format("cloudfiles")\
             .option("cloudFiles.format", "parquet")\
             .option("cloudFiles.schemalocation","abfss://silver@spotifydlstorage.dfs.core.windows.net/DimTrack/checkpoint")\
             .option("cloudFiles.rescuedDataColumn", "rescued_data")\
             .load("abfss://bronze@spotifydlstorage.dfs.core.windows.net/DimTrack")

In [0]:
df_tracks = df_tracks.withColumn("durationFlag", when(col("duration_sec") < 150, "Low")\
                                                .when(col("duration_sec") > 300, "Medium")\
                                                .otherwise("High"))

df_tracks = df_tracks.withColumn("track_name",regexp_replace(col("track_name"), '-', ' '))
df_tracks = reusable().dropColumns(df_tracks, ['rescued_data'])
                                      

In [0]:
df_tracks.writeStream.format("delta")\
         .outputMode("append")\
         .option("checkpointLocation", "abfss://silver@spotifydlstorage.dfs.core.windows.net/DimTrack/checkpoint")\
         .trigger(once=True)\
         .option("path", "abfss://silver@spotifydlstorage.dfs.core.windows.net/DimTrack/data")\
         .toTable('spotify_cata.silver.DimTrack')

## **DimDate**

In [0]:
df_date = spark.readStream.format("cloudfiles")\
             .option("cloudFiles.format", "parquet")\
             .option("cloudFiles.schemalocation","abfss://silver@spotifydlstorage.dfs.core.windows.net/DimDate/checkpoint")\
             .option("cloudFiles.rescuedDataColumn", "rescued_data")\
             .load("abfss://bronze@spotifydlstorage.dfs.core.windows.net/DimDate")

In [0]:
df_date = reusable().dropColumns(df_date, ['rescued_data'])

df_date.writeStream.format("delta")\
          .outputMode("append")\
          .option("checkpointLocation", "abfss://silver@spotifydlstorage.dfs.core.windows.net/DimDate/checkpoint")\
          .trigger(once=True)\
          .option("path", "abfss://silver@spotifydlstorage.dfs.core.windows.net/DimDate/data")\
          .toTable('spotify_cata.silver.DimDate')

## **FactStream**

In [0]:
df_fact = spark.readStream.format("cloudfiles")\
             .option("cloudFiles.format", "parquet")\
             .option("cloudFiles.schemalocation","abfss://silver@spotifydlstorage.dfs.core.windows.net/FactStream/checkpoint")\
             .option("cloudFiles.rescuedDataColumn", "rescued_data")\
             .load("abfss://bronze@spotifydlstorage.dfs.core.windows.net/FactStream")

In [0]:
df_fact = reusable().dropColumns(df_fact, ['rescued_data'])

df_fact.writeStream.format("delta")\
          .outputMode("append")\
          .option("checkpointLocation", "abfss://silver@spotifydlstorage.dfs.core.windows.net/FactStream/checkpoint")\
          .trigger(once=True)\
          .option("path", "abfss://silver@spotifydlstorage.dfs.core.windows.net/FactStream/data")\
          .toTable('spotify_cata.silver.FactStream')