In [0]:
spark

In [0]:
df = spark.read.format("parquet")\
    .load("abfss://bronze@strgs2099.dfs.core.windows.net/DimUser")

In [0]:
display(df)

### Autoloader

In [0]:
df_user = spark.readStream.format("cloudFiles")\
    .option("cloudFiles.format", "parquet")\
    .option("cloudFiles.schemaLocation", "abfss://silver@strgs2099.dfs.core.windows.net/DimUser/checkpoint")\
        .option("schemaEvolutionMode", "addNewColumns")\
        .load("abfss://bronze@strgs2099.dfs.core.windows.net/DimUser")   

In [0]:
# display(df_user)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

df_user = df_user.withColumn("user_name", upper(col("user_name")))


In [0]:
# display(df_user)

In [0]:
import os
import sys

module_path = '/Workspace/Users/www.karanbhatt18@gmail.com/utils'
if module_path not in sys.path:
    sys.path.append(module_path)
from transformation import reuseable

In [0]:
df_user_obj = reuseable()

df_user = df_user_obj.dropColumns(df_user, ['_rescued_data'])
df_user = df_user.dropDuplicates(['user_id'])
# display(df_user)

In [0]:
df_user = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/schema/DimUser")\
                    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")\
    .load("abfss://bronze@strgs2099.dfs.core.windows.net/DimUser")


from pyspark.sql.functions import *

df_user = df_user.withColumn("user_name", upper(col("user_name")))
df_user = df_user.dropDuplicates(["user_id"])


# print("Is Streaming:", df_user.isStreaming)   # MUST BE True


df_user.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/checkpoints/DimUser") \
    .trigger(once=True) \
    .start("abfss://silver@strgs2099.dfs.core.windows.net/DimUser") \
    .awaitTermination()


In [0]:
df_user.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/checkpoints/DimUser_v2") \
    .trigger(once=True) \
    .toTable("db2099.silver.dimuser") \
    .awaitTermination()

In [0]:
%sql
DROP TABLE db2099.silver.dimuser;


### DimArtist

In [0]:
# 1️⃣ Read from Bronze using Auto Loader
df_art = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/schema/DimArtist") \
    .load("abfss://bronze@strgs2099.dfs.core.windows.net/DimArtist")


# 2️⃣ Transformations
from pyspark.sql.functions import *

df_art = df_art.dropDuplicates(["artist_id"])


# 3️⃣ Verify streaming
print("Is Streaming:", df_art.isStreaming)   # MUST print True


# 4️⃣ Write to Silver Delta Table
query = df_art.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/checkpoints/DimArtist_v2") \
    .trigger(once=True) \
    .start("abfss://silver@strgs2099.dfs.core.windows.net/DimArtist")

query.awaitTermination()


In [0]:
df_art = df_art.drop("_rescued_data")

df_art.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/checkpoints/DimArtist_v2") \
    .trigger(once=True) \
    .toTable("db2099.silver.dimartist") \
    .awaitTermination()



### DimTrack

In [0]:
from pyspark.sql.functions import col, when, regexp_replace

df_track = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/schema/DimTrack") \
    .load("abfss://bronze@strgs2099.dfs.core.windows.net/DimTrack")


# Duration Category Logic
df_track = df_track.withColumn(
    "durationFlag",
    when(col("duration_sec") < 150, "Low")
    .when((col("duration_sec") >= 150) & (col("duration_sec") < 300), "Medium")
    .otherwise("High")
)

# Clean track name
df_track = df_track.withColumn(
    "track_name",
    regexp_replace(col("track_name"), "- ", " ")
)

# Drop rescued column
df_track = df_track.drop("_rescued_data")


df_track.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/checkpoints/DimTrack_v2") \
    .trigger(once=True) \
    .start("abfss://silver@strgs2099.dfs.core.windows.net/DimTrack") \
    .awaitTermination()


In [0]:
df_track = df_track.drop("_rescued_data")

df_track.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/checkpoints/DimTrack_v2") \
    .trigger(once=True) \
    .toTable("db2099.silver.dimtrack") \
    .awaitTermination()


### DimDate

In [0]:
df_date = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/schema/DimDate") \
    .load("abfss://bronze@strgs2099.dfs.core.windows.net/DimDate")

# Drop rescued column correctly
# df_date = drop_columns(df_date, ['_rescued_data'])

print("Is Streaming:", df_date.isStreaming)

df_date.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/checkpoints/DimDate_v2") \
    .trigger(once=True) \
    .start("abfss://silver@strgs2099.dfs.core.windows.net/DimDate") \
    .awaitTermination()


In [0]:
df_date.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/checkpoints/DimDate_v3") \
    .trigger(once=True) \
    .toTable("db2099.silver.dimdate") \
    .awaitTermination()

In [0]:
# Delete checkpoint first (for testing)
dbutils.fs.rm(
    "abfss://silver@strgs2099.dfs.core.windows.net/checkpoints/DimDate_v3",
    True
)

df_date = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/schema/DimDate") \
    .load("abfss://bronze@strgs2099.dfs.core.windows.net/DimDate") \
    .drop("_rescued_data")

df_date.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/checkpoints/DimDate_v3") \
    .trigger(once=True) \
    .start("abfss://silver@strgs2099.dfs.core.windows.net/DimDate") \
    .awaitTermination()


In [0]:
%sql
DROP TABLE IF EXISTS db2099.silver.dimdate;


### FactStream

In [0]:
df_fact = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/schema/FactStreams") \
    .load("abfss://bronze@strgs2099.dfs.core.windows.net/FactStream")

df_fact = df_fact.drop("_rescued_data")

print("Is Streaming:", df_fact.isStreaming)

df_fact.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/checkpoints/FactStreams_v2") \
    .trigger(once=True) \
    .toTable("db2099.silver.factstreams") \
    .awaitTermination()


In [0]:
df_fact.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",
            "abfss://silver@strgs2099.dfs.core.windows.net/checkpoints/FactStreams_v3") \
    .trigger(once=True) \
    .start("abfss://silver@strgs2099.dfs.core.windows.net/FactStreams") \
    .awaitTermination()


In [0]:
%sql
SELECT * FROM db2099.gold.dimtrack
WHERE '_END_AT' IS NOT NULL