In [0]:
import requests
import xml.etree.ElementTree as ET
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# Setup
spark = SparkSession.getActiveSession()
api_key = "**********************"
artist_tags = ["name", "listeners", "mbid", "url"]
track_tags = ["name", "duration", "listeners", "url"]
artist_nested_tags = ["name", "mbid", "url"]
countries = ["India", "United states", "Canada", "Spain", "Germany", "China", "Mexico"]
today = datetime.today().strftime('%Y%m%d')


# Parse and return all artist rows
def fetch_all_artists():
    all_rows = []
    for country in countries:
        url = f"https://ws.audioscrobbler.com/2.0/?method=geo.gettopartists&api_key={api_key}&country={country}"
        response = requests.get(url)
        response.raise_for_status()
        root = ET.fromstring(response.text)
        artists = root.findall(".//artist")
        for artist in artists:
            row = {}
            for tag in artist_tags:
                el = artist.find(f".//{tag}")
                row[tag] = el.text if el is not None else "NULL"
            row["country"] = country
            all_rows.append(row)
    return all_rows


# Parse and return all track rows
def fetch_all_tracks():
    all_rows = []
    for country in countries:
        url = f"https://ws.audioscrobbler.com/2.0/?method=geo.gettoptracks&api_key={api_key}&country={country}"
        response = requests.get(url)
        response.raise_for_status()
        root = ET.fromstring(response.text)
        tracks = root.findall(".//track")
        for track in tracks:
            row = {}
            for tag in track_tags:
                el = track.find(f".//{tag}")
                row[tag] = el.text if el is not None else "NULL"
            for tag in artist_nested_tags:
                el = track.find(f".//artist/{tag}")
                row[f"artist_{tag}"] = el.text if el is not None else "NULL"
            row["country"] = country
            all_rows.append(row)
    return all_rows


# Save DataFrame as single CSV file in Unity Catalog Volume
def write_df_to_volume(df, path):
    df.coalesce(1).write.option("header", True).mode("overwrite").csv(path)
    print(f"Data written to: {path}")


# Main Execution
# Top Artists
artist_rows = fetch_all_artists()
artist_schema = StructType([StructField(tag, StringType(), True) for tag in artist_tags + ["country"]])
df_artists = spark.createDataFrame(artist_rows, artist_schema)
artist_volume_path = f"/Volumes/lastfmdata/bronze/csv_uploads/top_artists_{today}"
write_df_to_volume(df_artists, artist_volume_path)

# Top Tracks
track_rows = fetch_all_tracks()
all_tags = track_tags + [f"artist_{t}" for t in artist_nested_tags] + ["country"]
track_schema = StructType([StructField(tag, StringType(), True) for tag in all_tags])
df_tracks = spark.createDataFrame(track_rows, track_schema)
track_volume_path = f"/Volumes/lastfmdata/bronze/csv_uploads/top_tracks_{today}"
write_df_to_volume(df_tracks, track_volume_path)

Data written to: /Volumes/lastfmdata/bronze/csv_uploads/top_artists_20250709
Data written to: /Volumes/lastfmdata/bronze/csv_uploads/top_tracks_20250709
