In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

In [0]:
track_spark_df = spark.read.format("parquet") \
                .option("inferSchema", "true") \
                .load("/mnt/spotifydata/to_be_processed/track")

artist_spark_df = spark.read.format("parquet") \
                .option("inferSchema", "true") \
                .load("/mnt/spotifydata/to_be_processed/artist")

In [0]:
one_big_table = track_spark_df.join(artist_spark_df, track_spark_df.artist_id == artist_spark_df.artist_id, "left").drop(track_spark_df.artist_id)
one_big_table = one_big_table.dropDuplicates(["track_id"])

In [0]:
# Upsert tracks to delta table
delta_table_path = "/mnt/spotifydata/reporting_data/"
is_merge_track = False

if DeltaTable.isDeltaTable(spark, delta_table_path) == False:
    # If the table does not exist, create it
    one_big_table.write.format("delta").save(delta_table_path)
    print("Delta table created successfully.")
else:
    delta_table = DeltaTable.forPath(spark, delta_table_path)
    
    # Define merge condition and update logic
    delta_table.alias("target").merge(
        one_big_table.alias("source"),
        "target.track_id = source.track_id"
    ).whenMatchedUpdate(set={
        "track_popularity": "source.track_popularity"
    }).whenNotMatchedInsertAll().execute()
    print("Merge operation completed.")
    is_merge_track = True

In [0]:
# Update artist popularity and follower to delta table
if is_merge_track == True:
    delta_table = DeltaTable.forPath(spark, delta_table_path)
    
    # Define merge condition and update logic
    delta_table.alias("target").merge(
        artist_spark_df.alias("source"),
        "target.artist_id = source.artist_id"
    ).whenMatchedUpdate(set={
          "artist_popularity": "source.artist_popularity",
          "artist_follower": "source.artist_follower"
    }).execute()
    print("Merge artist operation completed.")

In [0]:
table_name = "spotify_track_table"
# Register to catalog

# Check if the Delta table is registered in the catalog
if spark.catalog.tableExists(table_name) == False:
    # If the table is not registered register it
    # Register the Delta table in the metastore
    spark.sql(f"CREATE TABLE {table_name} USING DELTA LOCATION '{delta_table_path}'")
    print(f"Table '{table_name}' created and registered.")

In [0]:
def move_file(source_dir, destination_dir, file_type):

    source_dir = source_dir + file_type + '/'
    destination_dir = destination_dir + file_type + '/'
    
    # List all parquet files in the source directory
    files = dbutils.fs.ls(source_dir)

    # Move each file to the destination directory
    for file in files:
        if file.name.endswith(".parquet"):
            source_path = file.path
            destination_path = destination_dir + file.name
            dbutils.fs.mv(source_path, destination_path)

In [0]:
# move raw file to processed folder
source_dir = "/mnt/spotifydata/to_be_processed/"
destination_dir = "/mnt/spotifydata/processed_data/"
move_file (source_dir, destination_dir, "track")
move_file (source_dir, destination_dir, "artist")