#Notebook Description
**Author**: Slawomir Drzymala

**Description:**   
This notebook is getting the data from the curated layer of the data lake and preparing a dataset that can be used in further analysis placing them in the enriched layer

#Set up connection to data lake on Azure

**Things to be noticed:**   
* **sensitive data alert** - please note that this is not recommended to store any key or any other sensitve data in the notebooks, this is just to make the code more simple for the demo. For real work please use Azure KeyVault or databricks secrets.
* **multiple ways to connect to Azure data lake** - there are multiple options to connect to the Azure data lake, we can use the access key or the service principal, we can also mount the storage account so the storage account will be visible in many notebooks, please see link below for mode details

In [0]:
#vide https://docs.databricks.com/_static/notebooks/data-import/azure-data-lake-store.html
#vide https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/adls-gen2/azure-datalake-gen2-get-started
spark.conf.set(
  "fs.azure.account.key.sdsalearnsthnew.dfs.core.windows.net", 
  "RJMELuc9ffZPf5D0gwcbxJp+hWTkQuW8lmWa1DRFSF59aDiatDsMJ6X/yC/dHZtB7kdGl3cJIrYry++6EnCb5g==" 
)


#Read entire dataset from curated layer

In [0]:
# read all files from all radio stations
base_path = "abfss://learnsthnew@sdsalearnsthnew.dfs.core.windows.net/curated-initial/"
file_path = f"abfss://learnsthnew@sdsalearnsthnew.dfs.core.windows.net/curated-initial/radio_name=*/year=*/*.parquet"
df_playlist = spark.read.option("basePath", base_path) \
                        .option('encoding', 'UTF-8') \
                        .parquet(file_path, multiLine=True)


#Display 5 sample rows

**Things to be noticed:**   
* **display** - display is the magic Databricks function that can be used for visualization of many different objects including spark or pandas dataframes

In [0]:
display(df_playlist.head(5))

datetime,artist,title,radio_name,year
2019-12-31T00:00:00.000+0000,Peja/slums Attack,Szacunek Ludzi Ulicy (Explicit),Eska,2019
2019-12-31T00:05:00.000+0000,Tymek/tede,Rainman (Explicit),Eska,2019
2019-12-31T00:08:00.000+0000,Taconafide,Metallica 808 (Explicit),Eska,2019
2019-12-31T00:12:00.000+0000,Nautilus,Blat,Eska,2019
2019-12-31T00:16:00.000+0000,Young Multi,Jeden Dzien (Explicit),Eska,2019


# Create playlist fact table

**New dataframe**
* **df_playlist_enriched** - copy of playlist dataframe with additional columns

**New columns**
* **artist_and_title** - concatenation of the artitist and the title - key for single song
* **year** - year derived from the datetime time stamp of each row
* **date** - date without a time derived from the time stamp
* **month_name** - month derived from datetime time stamp of each row
* **played** - static value, indicator that the song was played in the given time

**Things to be noticed:**   
* **display** - display is the magic Databricks function that can be used for visualization of many different objects including spark or pandas dataframes

In [0]:
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import lit, split, reverse, regexp_replace, count, concat_ws, coalesce, desc, trim, lower
from pyspark.sql.functions import year, date_format, hour, to_date, col

df_playlist_enriched = df_playlist.select("radio_name", "artist", "datetime", "title") \
                                  .withColumn("artist_trim", lower(trim("artist"))) \
                                  .withColumn("title_trim", lower(trim("title"))) \
                                  .withColumn("artist_and_title", concat_ws(" - ", "artist_trim", "title_trim")) \
                                  .withColumn("date", to_date(df_playlist["datetime"])) \
                                  .withColumn("month_name", date_format("date", "MMMM")) \
                                  .withColumn("year", year(to_date("date"))) \
                                  .withColumn("played", lit(1)) \
                                  .select("radio_name", \
                                          col("artist_trim").alias("artist"), \
                                          "datetime", \
                                          col("title_trim").alias("title"), \
                                          "artist_and_title", \
                                          "date", \
                                          "year", \
                                          "month_name", \
                                          "played" \
                                   )

output_directory = "abfss://learnsthnew@sdsalearnsthnew.dfs.core.windows.net/enriched/playlist/"
df_playlist_enriched.write.mode('overwrite') \
                 .partitionBy("radio_name", "year") \
                 .parquet(output_directory)

#Create song dimension, get unique songs

**New dataframe**
* **df_unique_songs** - dataframe with list of all unique songs from entire playlist dataframe
* **df_songs** - dataframe with unique list of songs enriched with the attributes that will be used to keep information about the song from spotify

In [0]:
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import lit, split, reverse, regexp_replace, count, concat_ws, coalesce, desc
from pyspark.sql.functions import year, date_format, hour, to_date, col
from pyspark.sql.types import StringType, BooleanType

# add basic attributes
df_unique_songs = df_playlist.select(df_playlist["artist"], df_playlist["title"]) \
                                     .withColumn("artist_trim", lower(trim("artist"))) \
                                     .withColumn("title_trim", lower(trim("title"))) \
                                     .groupby("artist_trim", "title_trim") \
                                     .agg(count(lit(1)).alias("cnt")) \
                                     .select("artist_trim", "title_trim", "cnt") \
                                     .withColumn("artist_and_title", concat_ws(" - ", "artist_trim", "title_trim")) \
                                     .select("artist_and_title", col("artist_trim").alias("artist"), col("title_trim").alias("title"), "cnt") \
                                     .orderBy(desc("cnt"))

# add spotify columns
df_songs = df_unique_songs.withColumn('track_id', lit('#').cast(StringType())) \
                          .withColumn('track_href', lit('#').cast(StringType())) \
                          .withColumn('track_is_local', lit('#').cast(StringType())) \
                          .withColumn('track_is_playable', lit('#').cast(StringType())) \
                          .withColumn('track_name', lit('#').cast(StringType())) \
                          .withColumn('track_popularity', lit('#').cast(StringType())) \
                          .withColumn('track_track_number', lit('#').cast(StringType())) \
                          .withColumn('track_type', lit('#').cast(StringType())) \
                          .withColumn('track_uri', lit('#').cast(StringType())) \
                          .withColumn('track_duration_ms', lit('#').cast(StringType())) \
                          .withColumn('track_disc_number', lit('#').cast(StringType())) \
                          .withColumn('track_explicit', lit('#').cast(StringType())) \
                          .withColumn('track_external_ids_isrc', lit('#').cast(StringType())) \
                          .withColumn('album_album_type', lit('#').cast(StringType())) \
                          .withColumn('album_href', lit('#').cast(StringType())) \
                          .withColumn('album_id', lit('#').cast(StringType())) \
                          .withColumn('album_name', lit('#').cast(StringType())) \
                          .withColumn('album_release_date', lit('#').cast(StringType())) \
                          .withColumn('album_release_date_precision', lit('#').cast(StringType())) \
                          .withColumn('album_total_tracks', lit('#').cast(StringType())) \
                          .withColumn('album_type', lit('#').cast(StringType())) \
                          .withColumn('album_uri', lit('#').cast(StringType())) \
                          .withColumn('album_album_type', lit('#').cast(StringType())) \
                          .withColumn('is_track_downloaded', lit(0).cast(BooleanType())) \
                          .withColumn('audio_features_danceability', lit('#').cast(StringType())) \
                          .withColumn('audio_features_energy', lit('#').cast(StringType())) \
                          .withColumn('audio_features_key', lit('#').cast(StringType())) \
                          .withColumn('audio_features_loudness', lit('#').cast(StringType())) \
                          .withColumn('audio_features_mode', lit('#').cast(StringType())) \
                          .withColumn('audio_features_speechiness', lit('#').cast(StringType())) \
                          .withColumn('audio_features_acousticness', lit('#').cast(StringType())) \
                          .withColumn('audio_features_instrumentalness', lit('#').cast(StringType())) \
                          .withColumn('audio_features_liveness', lit('#').cast(StringType())) \
                          .withColumn('audio_features_valence', lit('#').cast(StringType())) \
                          .withColumn('audio_features_tempo', lit('#').cast(StringType())) \
                          .withColumn('is_audio_features_downloaded', lit(0).cast(BooleanType()))

#Display 5 sample rows

**Things to be noticed:**   
* **display** - display is the magic Databricks function that can be used for visualization of many different objects including spark or pandas dataframes

In [0]:
display(df_songs.head(5))

artist_and_title,artist,title,cnt,track_id,track_href,track_is_local,track_is_playable,track_name,track_popularity,track_track_number,track_type,track_uri,track_duration_ms,track_disc_number,track_explicit,track_external_ids_isrc,album_album_type,album_href,album_id,album_name,album_release_date,album_release_date_precision,album_total_tracks,album_type,album_uri,is_track_downloaded,audio_features_danceability,audio_features_energy,audio_features_key,audio_features_loudness,audio_features_mode,audio_features_speechiness,audio_features_acousticness,audio_features_instrumentalness,audio_features_liveness,audio_features_valence,audio_features_tempo,is_audio_features_downloaded
antyradio - najlepszy rock na świecie,antyradio,najlepszy rock na świecie,18546,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,False,#,#,#,#,#,#,#,#,#,#,#,False
c - bool,c,bool,9290,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,False,#,#,#,#,#,#,#,#,#,#,#,False
ewa farna - ewakuacja,ewa farna,ewakuacja,4568,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,False,#,#,#,#,#,#,#,#,#,#,#,False
melanie fiona - monday morning,melanie fiona,monday morning,4545,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,False,#,#,#,#,#,#,#,#,#,#,#,False
t.love - warszawa,t.love,warszawa,4383,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,#,False,#,#,#,#,#,#,#,#,#,#,#,False


#Save song dataframe as delta

**Things to be noticed:**
* **idea** - we are going to fill the empty columns in the next stages from the information from spotify API to have the possibility to easily and efficiently update the data in the data lake we are going to use a "delta lake" functionality
* **delta lake** - is an open source storage layer that brings reliability to data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing. Delta Lake runs on top of your existing data lake and is fully compatible with Apache Spark APIs. Delta Lake on Databricks allows you to configure Delta Lake based on your workload patterns.

In [0]:
output_file = "abfss://learnsthnew@sdsalearnsthnew.dfs.core.windows.net/enriched/songs.parquet"
df_songs.write.format("delta").save(output_file)

#Create functions to retrive data from Spotify API

**Things to be noticed:**
* **python** - please note that we can easilty write a custom code inside the notebook and later on use those functions together with the dataframes
* **python function vs udf** - please note that here we are creating a functions that will be executed row by row, that's primarly because of the need to execute the external api, please also note that in Databricks we can also create a custom functions (udf) that we can execute on the column level and get the most out of the spark engine

In [0]:
import requests
import urllib

# custom exception template
class MyException(Exception):
    pass

# function to get the spotify access token to be able to make the API call
def get_header_with_access_token(AUTH_URL, CLIENT_ID, CLIENT_SECRET):
  
  auth_response = requests.post(AUTH_URL, {
    'grant_type': 'client_credentials',
    'client_id': CLIENT_ID,
    'client_secret': CLIENT_SECRET,
  })

  # convert the response to JSON
  auth_response_data = auth_response.json()

  # save the access token
  access_token = auth_response_data['access_token']

  # specify request header
  headers = {
      'Authorization': 'Bearer {token}'.format(token=access_token)
  }
  
  return headers
  
# function to search for a song in spotify API and among the other basic info get the track ID
def get_spotify_track(AUTH_URL, CLIENT_ID, CLIENT_SECRET, BASE_URL, track_artist, track_name):
  
  headers = get_header_with_access_token(AUTH_URL, CLIENT_ID, CLIENT_SECRET)
  
  q = f'artist:{urllib.parse.quote(track_artist)}+track:{urllib.parse.quote(track_name)}&type=track&market=PL'
  full_url = BASE_URL + f'search?query={q}'
  #print (full_url)
  
  response = requests.get(full_url, headers=headers)
  response_json = response.json()
  
  #print (response_json)
  
  if response.status_code != 200:
    raise MyException("limit reached, try again later")
  elif response.status_code == 200 and int(response_json["tracks"]["total"]) > 0:
    # one track might exists in multiple albums, we will return first that has the highest popularity
    track = response_json["tracks"]["items"][0]
    return track
  else:
    return None
  
# get the information about the song from spotify audio features API
def get_spotify_track_audio_features(AUTH_URL, CLIENT_ID, CLIENT_SECRET, BASE_URL, track_id):
  
  headers = get_header_with_access_token(AUTH_URL, CLIENT_ID, CLIENT_SECRET)
  url = BASE_URL + f'audio-features?ids={track_id}'
  response = requests.get(url, headers=headers)
  response_json = response.json()
   
  return response_json
 

#Use spotify API to indentify each song and get basic info

**Things to be noticed:**
* **collect()** - action function is used to retrieve all elements from the dataset
* **dataframe_name.alias("source_table_alias").merge** - perform merge operation on the delta table

In [0]:
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import lit, split, reverse, regexp_replace, count, concat_ws, coalesce, desc
from pyspark.sql.functions import year, date_format, hour, to_date, col
from pyspark.sql.types import StringType, BooleanType

import time
import pandas as pd
from delta.tables import *
from numpy import random

# read delta table
input_file = "abfss://learnsthnew@sdsalearnsthnew.dfs.core.windows.net/enriched/songs.parquet"
songs_delta_table = DeltaTable.forPath(spark, input_file)

# get title and aritst name if the song wasn't already downloaded
# also convert to spark dataframe and collect the results
songs_to_be_donlowaded = songs_delta_table.toDF()
                                          .where(((col('is_track_downloaded') == 0) & (col('artist_and_title').like("%'%") == False)))
                                          .select("artist_and_title", "artist", "title")
                                          .orderBy(desc("cnt"))
                                          .collect()

# spotify API secrets, please note that this should be taken from Databricks secrets
BASE_URL = 'https://api.spotify.com/v1/'
AUTH_URL = 'https://accounts.spotify.com/api/token'
CLIENT_ID = '88a4f7cc9a9940b19f468fca7bd0a0a8'
CLIENT_SECRET = '75168639ec5c4c869df3817a93c15cd9'

# iterate through each song and call the spotify API to get the song info
# if the temporary list of the enriched song list will reach 50 then save the results to the data lake
# for saving use the artist and title as a key and perform a merge operation
tracks = []
for song in songs_to_be_donlowaded:
  try:
    
    # sleep a random number of seconds to avoid flooding the api
    time.sleep(random.uniform(3, 15))
    
    # get the results from the API
    track = get_spotify_track(AUTH_URL, CLIENT_ID, CLIENT_SECRET, BASE_URL, song["artist"], song["title"])
      
    # it might be that the song won't be found in spotify API so at this stage we will simply ignore
    if track != None:
    
      # if API results is not empty than add the song to the table
      tracks.append(
        { 
            'artist_and_title': song["artist_and_title"],
            'album_album_type': track["album"]["album_type"],
            'album_href': track["album"]["href"],
            'album_id': track["album"]["id"],
            'album_name': track["album"]["name"],
            'album_release_date': track["album"]["release_date"],
            'album_release_date_precision': track["album"]["release_date_precision"],
            'album_total_tracks': track["album"]["total_tracks"],
            'album_type': track["album"]["type"],
            'album_uri': track["album"]["uri"],
            'track_disc_number': track["disc_number"],
            'track_duration_ms': track["duration_ms"],
            'track_explicit': track["explicit"],
            'track_external_ids_isrc': track["external_ids"]["isrc"],
            'track_href': track["href"],
            'track_id': track["id"],
            'track_is_local': track["is_local"],
            'track_is_playable': track["is_playable"],
            'track_name': track["name"],
            'track_popularity': track["popularity"],
            'track_track_number': track["track_number"],
            'track_type': track["type"],
            'track_uri': track["uri"]
         } 
      )
      
      # reports success of getting a single song
      print ("ok : " + song["artist_and_title"])

      # if there is X number of songs in the temp list than save to the delta lake
      if len(tracks) > 50:
          
          # reports saving to the data lake
          print ("start saving")
          
          # conver the temp list of songs to the pandas dataframe and than create a spark dataframe
          df_pd_tracks_to_save = pd.DataFrame(tracks)
          df_tracks_to_save=spark.createDataFrame(df_pd_tracks_to_save) 

          # perform a merge operation on the data lake
          songs_delta_table.alias("songs").merge(
              df_tracks_to_save.alias("updates"),
              "songs.artist_and_title = updates.artist_and_title") \
            .whenMatchedUpdate(set = 
                               { 
                                            'album_album_type': 'updates.album_album_type',
                                            'album_href': 'updates.album_href',
                                            'album_id': 'updates.album_id',
                                            'album_name': 'updates.album_name',
                                            'album_release_date': 'updates.album_release_date',
                                            'album_release_date_precision': 'updates.album_release_date_precision',
                                            'album_total_tracks': 'updates.album_total_tracks',
                                            'album_type': 'updates.album_type',
                                            'album_uri': 'updates.album_uri',
                                            'track_disc_number': 'updates.track_disc_number',
                                            'track_duration_ms': 'updates.track_duration_ms',
                                            'track_explicit': 'updates.track_explicit',
                                            'track_external_ids_isrc': 'updates.track_external_ids_isrc',
                                            'track_href': 'updates.track_href',
                                            'track_id': 'updates.track_id',
                                            'track_is_local': 'updates.track_is_local',
                                            'track_is_playable': 'updates.track_is_playable',
                                            'track_name': 'updates.track_name',
                                            'track_popularity': 'updates.track_popularity',
                                            'track_track_number': 'updates.track_track_number',
                                            'track_type': 'updates.track_type',
                                            'track_uri': 'updates.track_uri',
                                            'is_track_downloaded': '1',
                                       } 
                              ) \
            .execute()

          # report success of writing to the delta lake
          print ("finish saving, start clean up")
          
          # clean up the temporary objects
          tracks.clear()
          del(df_pd_tracks_to_save)
          del(df_tracks_to_save)
          print ("ok saved: " + len(tracks))
          
    else:
      # it might be that the song won't be find in the spotify api, mark the song anyhow
      song_title_join = song['artist_and_title'].replace("'", '')
      songs_delta_table.update(f"artist_and_title = '{song_title_join}'", 
                               { 
                                    'is_track_downloaded': lit(1),
                               } 
                              )
      print ("empty results for : " + song["artist_and_title"])
      
  except TypeError:
      # it might be that the song won't be find in the spotify api, mark the song anyhow
      song_title_join = song['artist_and_title'].replace("'", '')
      songs_delta_table.update(f"artist_and_title = '{song_title_join}'", 
                               { 
                                    'is_track_downloaded': lit(1),
                               } 
                              )
      print ("ok but empty: " + song["artist_and_title"])
  except Exception as e:
      # if there is any other error than raise the exception
      print ("error: " + song["artist_and_title"])
      raise

    

#Use spotify API to get more information (audio features) about each song

**Things to be noticed:**
* **collect()** - action function is used to retrieve all elements from the dataset
* **dataframe_name.alias("source_table_alias").merge** - perform merge operation on the delta table

In [0]:
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import lit, split, reverse, regexp_replace, count, concat_ws, coalesce, desc
from pyspark.sql.functions import year, date_format, hour, to_date, col
from pyspark.sql.types import StringType, BooleanType

import time
import pandas as pd
from delta.tables import *
from numpy import random

# parameters
input_file = "abfss://learnsthnew@sdsalearnsthnew.dfs.core.windows.net/enriched/songs.parquet"
BASE_URL = 'https://api.spotify.com/v1/'
AUTH_URL = 'https://accounts.spotify.com/api/token'
CLIENT_ID = '88a4f7cc9a9940b19f468fca7bd0a0a8'
CLIENT_SECRET = '75168639ec5c4c869df3817a93c15cd9'

# iterate till all of the songs will be parsed
while True:
  
  # read 100 songs from the delta lake that were not enriched yet
  # there is a limit on spotify API and we can get information about only 100 songs at single APi call
  songs_delta_table = DeltaTable.forPath(spark, input_file)
  track_ids_to_be_downloaded = songs_delta_table.toDF() \
                                                .where("track_id != '#' and is_audio_features_downloaded == False") \
                                                .select("track_id") \
                                                .limit(100) \
                                                .distinct() \
                                                .collect()
  
  # if there is no more songs to be enriched than break the loop
  if len(track_ids_to_be_downloaded) == 0:
    break
  
  # convert list of the track IDs to the comma separated list
  track_ids_list = ",".join([track_id["track_id"] for track_id in track_ids_to_be_downloaded])

  # wait a random number of songs and then make a call to the API
  # the call will get the informationa about 100 songs in single call
  time.sleep(random.uniform(3, 15))
  tracks_audio_features = get_spotify_track_audio_features(AUTH_URL, CLIENT_ID, CLIENT_SECRET, BASE_URL, track_ids_list)

  # create a temp table that will hold the information about multiple songs
  tracks = []
  
  # iterate through each song, get the information about each song and then add to the temp table
  for track_audio_features in tracks_audio_features["audio_features"]:
    try:
      tracks.append(
          { 
            'track_id': track_audio_features["id"],
            'audio_features_danceability': track_audio_features["danceability"],
            'audio_features_energy': track_audio_features["energy"],
            'audio_features_key': track_audio_features["key"],
            'audio_features_loudness': track_audio_features["loudness"],
            'audio_features_mode': track_audio_features["mode"],
            'audio_features_speechiness': track_audio_features["speechiness"],
            'audio_features_acousticness': track_audio_features["acousticness"],
            'audio_features_instrumentalness': track_audio_features["instrumentalness"],
            'audio_features_liveness': track_audio_features["liveness"],
            'audio_features_valence': track_audio_features["valence"],
            'audio_features_tempo': track_audio_features["tempo"],
            'is_audio_features_downloaded': 1,
           } 
      )
    except:
      pass
    
  # it might be that the spotify won't return the results from each song 
  # therefore we will check if the temp table is not empty
  if len(tracks) > 0:
    
    # convert the temp list with enriched songs to the pandas dataframe and then to the spark dataframe
    df_pd_tracks_to_save = pd.DataFrame(tracks)
    df_tracks_to_save=spark.createDataFrame(df_pd_tracks_to_save) 

    # use merge to update the songs in the data lake
    songs_delta_table.alias("songs").merge(
        df_tracks_to_save.alias("updates"),
        "songs.track_id = updates.track_id") \
      .whenMatchedUpdate(set = 
                         { 
                            'audio_features_danceability': 'updates.audio_features_danceability',
                            'audio_features_energy': 'updates.audio_features_energy',
                            'audio_features_key': 'updates.audio_features_key',
                            'audio_features_loudness': 'updates.audio_features_loudness',
                            'audio_features_mode': 'updates.audio_features_mode',
                            'audio_features_speechiness': 'updates.audio_features_speechiness',
                            'audio_features_acousticness': 'updates.audio_features_acousticness',
                            'audio_features_instrumentalness': 'updates.audio_features_instrumentalness',
                            'audio_features_liveness': 'updates.audio_features_liveness',
                            'audio_features_valence': 'updates.audio_features_valence',
                            'audio_features_tempo': 'updates.audio_features_tempo',
                            'is_audio_features_downloaded': 'updates.is_audio_features_downloaded'
                         } 
                      ) \
      .execute()

    # report success and clean the temp variables
    print ("finish saving another tracks chunk, start clean up")
    tracks.clear()
    del(df_pd_tracks_to_save)
    del(df_tracks_to_save)
  else:
    break

#Optimize the delta lake table (optional)

In [0]:
# read more about coalsesce vs repartition

display(spark.sql("DROP TABLE IF EXISTS songs"))

display(spark.sql("CREATE TABLE songs USING DELTA LOCATION 'abfss://learnsthnew@sdsalearnsthnew.dfs.core.windows.net/enriched/songs.parquet'"))

display(spark.sql("ALTER TABLE songs SET TBLPROPERTIES ( delta.targetFileSize = '100mb')"))
  
display(spark.sql("OPTIMIZE songs"))

display(spark.sql("VACUUM songs"))

#Show list of files in enriched layer

In [0]:
# read all files from all radio stations
file_path = f"abfss://learnsthnew@sdsalearnsthnew.dfs.core.windows.net/enriched/"

display(
  dbutils.fs.ls(file_path)
)

path,name,size
abfss://learnsthnew@sdsalearnsthnew.dfs.core.windows.net/enriched/playlist/,playlist/,0
abfss://learnsthnew@sdsalearnsthnew.dfs.core.windows.net/enriched/songs.parquet/,songs.parquet/,0


# Reload from songs_copy

In [0]:
from delta.tables import *
output_file = "abfss://learnsthnew@sdsalearnsthnew.dfs.core.windows.net/enriched/songs.parquet"
output_songs_delta_table = DeltaTable.forPath(spark, output_file)

from delta.tables import *
input_file = "abfss://learnsthnew@sdsalearnsthnew.dfs.core.windows.net/enriched/songs_copy.parquet"
input_songs_delta_table = DeltaTable.forPath(spark, input_file)
input_songs = input_songs_delta_table.toDF() \
                                     .where("track_id != '#'") \
                                     .withColumn("artist_trim", lower(trim("artist"))) \
                                     .withColumn("title_trim", lower(trim("title"))) \
                                     .withColumn("artist_and_title", concat_ws(" - ", "artist_trim", "title_trim")) \
                                     .select(
                                        'artist_and_title',
                                        'track_id',
                                        'track_href',
                                        'track_is_local',
                                        'track_is_playable',
                                        'track_name',
                                        'track_popularity',
                                        'track_track_number',
                                        'track_type',
                                        'track_uri',
                                        'track_duration_ms',
                                        'track_disc_number',
                                        'track_explicit',
                                        'track_external_ids_isrc',
                                        'album_album_type',
                                        'album_href',
                                        'album_id',
                                        'album_name',
                                        'album_release_date',
                                        'album_release_date_precision',
                                        'album_total_tracks',
                                        'album_type',
                                        'album_uri',
                                        'album_album_type',
                                        'audio_features_danceability',
                                        'audio_features_energy',
                                        'audio_features_key',
                                        'audio_features_loudness',
                                        'audio_features_mode',
                                        'audio_features_speechiness',
                                        'audio_features_acousticness',
                                        'audio_features_instrumentalness',
                                        'audio_features_liveness',
                                        'audio_features_valence',
                                        'audio_features_tempo'
                                     ) \
                                  .distinct()

# perform a merge operation on the data lake
output_songs_delta_table.alias("songs").merge( \
    input_songs.alias("updates"), \
    "songs.artist_and_title = updates.artist_and_title") \
  .whenMatchedUpdate(set = 
                     {   
                        'track_id': 'updates.track_id',
                        'track_href': 'updates.track_href',
                        'track_is_local': 'updates.track_is_local',
                        'track_is_playable': 'updates.track_is_playable',
                        'track_name': 'updates.track_name',
                        'track_popularity': 'updates.track_popularity',
                        'track_track_number': 'updates.track_track_number',
                        'track_type': 'updates.track_type',
                        'track_uri': 'updates.track_uri',
                        'track_duration_ms': 'updates.track_duration_ms',
                        'track_disc_number': 'updates.track_disc_number',
                        'track_explicit': 'updates.track_explicit',
                        'track_external_ids_isrc': 'updates.track_external_ids_isrc',
                        'album_album_type': 'updates.album_album_type',
                        'album_href': 'updates.album_href',
                        'album_id': 'updates.album_id',
                        'album_name': 'updates.album_name',
                        'album_release_date': 'updates.album_release_date',
                        'album_release_date_precision': 'updates.album_release_date_precision',
                        'album_total_tracks': 'updates.album_total_tracks',
                        'album_type': 'updates.album_type',
                        'album_uri': 'updates.album_uri',
                        'album_album_type': 'updates.album_album_type',
                        'is_track_downloaded': '1',
                        'audio_features_danceability': 'updates.audio_features_danceability',
                        'audio_features_energy': 'updates.audio_features_energy',
                        'audio_features_key': 'updates.audio_features_key',
                        'audio_features_loudness': 'updates.audio_features_loudness',
                        'audio_features_mode': 'updates.audio_features_mode',
                        'audio_features_speechiness': 'updates.audio_features_speechiness',
                        'audio_features_acousticness': 'updates.audio_features_acousticness',
                        'audio_features_instrumentalness': 'updates.audio_features_instrumentalness',
                        'audio_features_liveness': 'updates.audio_features_liveness',
                        'audio_features_valence': 'updates.audio_features_valence',
                        'audio_features_tempo': 'updates.audio_features_tempo',
                        'is_audio_features_downloaded': '1'
                     } 
                    ) \
  .execute()