In [1]:
!pip install -q pandas requests

In [3]:
import requests
import json
import pandas as pd

# The direct URL to the JSON file on your MinIO console
DATA_URL = 'http://127.0.0.1:9001/browser/spotify-raw-data/mpd.slice.114000-114999.json'

try:
    # 1. Fetch the data from the URL
    response = requests.get(DATA_URL)
    response.raise_for_status() # Check for HTTP errors (4xx or 5xx)

    # 2. Parse the JSON content
    data = response.json()
    
    print("Data successfully fetched and parsed!")
    
except requests.exceptions.RequestException as e:
    print(f"Error accessing the MinIO server or file: {e}")
    print("Ensure MinIO is running and accessible at http://127.0.0.1:9001.")
except json.JSONDecodeError as e:
    print(f"Error decoding JSON: {e}")
    print("The file might not be valid JSON.")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

Error accessing the MinIO server or file: Expecting value: line 1 column 1 (char 0)
Ensure MinIO is running and accessible at http://127.0.0.1:9001.


In [4]:
!pip install minio




In [6]:
from minio import Minio
import json

# Kh·ªüi t·∫°o client
client = Minio("127.0.0.1:9000", access_key="minioadmin", secret_key="minioadmin", secure=False)

# T·∫£i file JSON t·ª´ bucket
for b in client.list_buckets():
    print(b.name)
data = client.get_object("spotify-raw-data", "mpd.slice.114000-114999.json")

# ƒê·ªçc n·ªôi dung JSON
playlist_data = json.load(data)
print(len(playlist_data["playlists"]))


spotify-processed-data
spotify-raw-data
1000


In [7]:
playlists = playlist_data['playlists']
records = []

for p in playlists:
    for t in p['tracks']:
        records.append({
            'playlist_id': p['pid'],
            'playlist_name': p.get('name', ''),
            'track_name': t.get('track_name', ''),
            'artist_name': t.get('artist_name', ''),
            'album_name': t.get('album_name', '')
        })

df = pd.DataFrame(records)

print("‚úÖ S·ªë l∆∞·ª£ng playlist:", df['playlist_id'].nunique())
print("‚úÖ T·ªïng s·ªë track entries:", len(df))
print("‚úÖ S·ªë l∆∞·ª£ng track duy nh·∫•t:", df['track_name'].nunique())
print(df.head())

‚úÖ S·ªë l∆∞·ª£ng playlist: 1000
‚úÖ T·ªïng s·ªë track entries: 67230
‚úÖ S·ªë l∆∞·ª£ng track duy nh·∫•t: 30369
   playlist_id playlist_name           track_name  artist_name  \
0       114000         chill               M√©tele   Buscabulla   
1       114000         chill                House     Kindness   
2       114000         chill          Polish Girl  Neon Indian   
3       114000         chill            New Slang    The Shins   
4       114000         chill  White Winter Hymnal  Fleet Foxes   

                         album_name  
0                        Buscabulla  
1  World, You Need a Change of Mind  
2                       Era Extra√±a  
3                Oh, Inverted World  
4                       Fleet Foxes  


In [8]:
avg_tracks = df.groupby('playlist_id')['track_name'].count().mean()
print(f"S·ªë b√†i h√°t trung b√¨nh m·ªói playlist: {avg_tracks:.2f}")

# Ngh·ªá sƒ© ph·ªï bi·∫øn nh·∫•t
top_artists = df['artist_name'].value_counts().head(10)
print("\nTop 10 ngh·ªá sƒ© ph·ªï bi·∫øn nh·∫•t:")
print(top_artists)

# Album ph·ªï bi·∫øn nh·∫•t
top_albums = df['album_name'].value_counts().head(10)
print("\nTop 10 album ph·ªï bi·∫øn nh·∫•t:")
print(top_albums)

S·ªë b√†i h√°t trung b√¨nh m·ªói playlist: 67.23

Top 10 ngh·ªá sƒ© ph·ªï bi·∫øn nh·∫•t:
artist_name
Drake             729
Ed Sheeran        440
Kendrick Lamar    418
The Weeknd        410
Beyonc√©           377
Kanye West        361
Rihanna           308
J. Cole           306
Justin Bieber     304
Eminem            280
Name: count, dtype: int64

Top 10 album ph·ªï bi·∫øn nh·∫•t:
album_name
Views                        191
Coloring Book                177
Stoney                       172
DAMN.                        171
Purpose                      168
Starboy                      165
x                            161
√∑                            157
Beauty Behind The Madness    147
Greatest Hits                147
Name: count, dtype: int64


In [16]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("Spotify Production Pipeline") \
    .config("spark.jars.packages",
            "org.apache.hadoop:hadoop-aws:3.3.4,"
            "com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.timeout", "600000")  \
    .config("spark.hadoop.fs.s3a.connection.establish.timeout", "600000")\
    .config("spark.hadoop.fs.s3a.attempts.maximum", "100")\
    .config("spark.hadoop.fs.s3a.retry.limit", "20")\
    .config("spark.sql.caseSensitive", "true") \
    .getOrCreate()


print("Spark Session Ready!")

Spark Session Ready!


25/12/11 14:11:56 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [17]:
# 1. ƒê·ªãnh nghƒ©a Schema chu·∫©n c·ªßa Spotify MPD
# Vi·ªác n√†y gi√∫p Spark ƒë·ªçc nhanh h∆°n v√† lo·∫°i b·ªè b·∫£n ghi sai format ngay t·ª´ ƒë·∫ßu
track_schema = StructType([
    StructField("pos", IntegerType(), True),
    StructField("artist_name", StringType(), True),
    StructField("track_uri", StringType(), True),
    StructField("artist_uri", StringType(), True),
    StructField("track_name", StringType(), True),
    StructField("album_uri", StringType(), True),
    StructField("duration_ms", LongType(), True),
    StructField("album_name", StringType(), True)
])

playlist_schema = StructType([
    StructField("name", StringType(), True),
    StructField("collaborative", StringType(), True),
    StructField("pid", LongType(), True),
    StructField("modified_at", LongType(), True),
    StructField("num_tracks", IntegerType(), True),
    StructField("num_albums", IntegerType(), True),
    StructField("num_followers", IntegerType(), True),
    StructField("tracks", ArrayType(track_schema), True), # Nested Array
    StructField("description", StringType(), True)
])
info_schema = StructType([
    StructField("tag", StringType(), True),
    StructField("generated_on", LongType(), True),
    StructField("slice", StringType(), True),
    StructField("version", StringType(), True)
])

root_schema = StructType([
    StructField("info", info_schema, True),
    StructField("playlists", ArrayType(playlist_schema), True)
])

print("Loading Raw Data from Data Lake...")
# ƒê·ªçc d·ªØ li·ªáu v·ªõi schema ƒë√£ ƒë·ªãnh nghƒ©a
raw_df = spark.read \
    .schema(root_schema) \
    .option("mode", "PERMISSIVE") \
    .option("multiline", "true") \
    .json("s3a://spotify-raw-data/mpd.slice.114000-114999.json") # Demo 1 file

print(f"Raw Data Loaded.")

Loading Raw Data from Data Lake...


25/12/11 14:11:59 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/12/11 14:11:59 WARN FileSystem: Failed to initialize filesystem s3a://spotify-raw-data/mpd.slice.114000-114999.json: java.lang.NumberFormatException: For input string: "60s"
25/12/11 14:11:59 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3a://spotify-raw-data/mpd.slice.114000-114999.json.
java.lang.NumberFormatException: For input string: "60s"
	at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:67)
	at java.base/java.lang.Long.parseLong(Long.java:709)
	at java.base/java.lang.Long.parseLong(Long.java:832)
	at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1607)
	at org.apache.hadoop.fs.s3a.S3AUtils.longOption(S3AUtils.java:1024)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initThreadPools(S3AFileSystem.java:719)
	at org.apache.hado

NumberFormatException: For input string: "60s"

In [None]:
print("Processing Silver Layer...")

# 1. Explode Playlists (T√°ch m·∫£ng playlist th√†nh t·ª´ng d√≤ng)
playlists_df = raw_df.select(explode(col("playlists")).alias("p"))

# 2. Explode Tracks (T√°ch m·∫£ng tracks l·ªìng b√™n trong playlist)
# ƒê√¢y l√† b∆∞·ªõc "Explode nested track arrays" trong y√™u c·∫ßu
flat_df = playlists_df.select(
    col("p.pid").alias("playlist_id"),
    col("p.name").alias("playlist_name"),
    col("p.num_tracks"),
    col("p.num_followers"),
    col("p.modified_at"),
    explode(col("p.tracks")).alias("t")
)

# 3. Parse & Normalize Metadata (L√†m s·∫°ch)
# - Trim: C·∫Øt kho·∫£ng tr·∫Øng th·ª´a
# - Lower: Chuy·ªÉn v·ªÅ ch·ªØ th∆∞·ªùng ƒë·ªÉ chu·∫©n h√≥a
# - Cast: ƒê·∫£m b·∫£o ƒë√∫ng ki·ªÉu d·ªØ li·ªáu
silver_df = flat_df.select(
    col("playlist_id"),
    trim(col("playlist_name")).alias("playlist_name"),
    col("num_tracks"),
    col("num_followers"),
    # Convert timestamp (modified_at l√† epoch seconds)
    to_timestamp(col("modified_at")).alias("modified_date"),
    
    # Track Metadata Parsing
    trim(col("t.track_uri")).alias("track_uri"),
    trim(col("t.track_name")).alias("track_name"),
    trim(col("t.artist_uri")).alias("artist_uri"),
    trim(col("t.artist_name")).alias("artist_name"),
    col("t.duration_ms"),
    
    # Th√™m c·ªôt partition key (V√≠ d·ª•: l·∫•y ch·ªØ c√°i ƒë·∫ßu c·ªßa artist ƒë·ªÉ partition cho ƒë·ªÅu)
    substring(col("t.artist_name"), 0, 1).alias("artist_partition")
).drop_duplicates() # Lo·∫°i b·ªè d√≤ng tr√πng l·∫∑p (Deduplication)

# L∆∞u b·∫£ng s·∫°ch n√†y xu·ªëng Data Lake (Silver)
silver_df.write.mode("overwrite").parquet("s3a://warehouse/silver/tracks")
print("Silver Layer (Cleaned & Normalized) Saved.")
silver_df.show(3)

üîÑ Processing Silver Layer...
‚úÖ Silver Layer (Cleaned & Normalized) Saved.
+-----------+----------------+----------+-------------+-------------------+--------------------+----------------+--------------------+-------------------+-----------+----------------+
|playlist_id|   playlist_name|num_tracks|num_followers|      modified_date|           track_uri|      track_name|          artist_uri|        artist_name|duration_ms|artist_partition|
+-----------+----------------+----------+-------------+-------------------+--------------------+----------------+--------------------+-------------------+-----------+----------------+
|          0|      Throwbacks|        52|            1|2017-04-29 00:00:00|spotify:track:2eJ...|            Baby|spotify:artist:1u...|      Justin Bieber|     213973|               J|
|          1|Awesome Playlist|        39|            1|2017-09-28 00:00:00|spotify:track:2HH...|Eye of the Tiger|spotify:artist:26...|           Survivor|     243773|               S|
|

In [None]:
print("Processing Gold Layer (Feature Engineering)...")

# Feature 1: Artist Frequency (ƒê·ªô ph·ªï bi·∫øn c·ªßa ngh·ªá sƒ©)
# Ngh·ªá sƒ© n√†y xu·∫•t hi·ªán trong bao nhi√™u playlist? T·ªïng th·ªùi l∆∞·ª£ng nghe l√† bao nhi√™u?
artist_features = silver_df.groupBy("artist_name") \
    .agg(
        countDistinct("playlist_id").alias("total_playlist_appearances"),
        count("track_uri").alias("total_tracks_count"),
        avg("duration_ms").alias("avg_track_duration"),
        sum("duration_ms").alias("total_listen_time_ms")
    ) \
    .withColumn("popularity_score", col("total_playlist_appearances") * 0.7 + col("total_tracks_count") * 0.3)

# Feature 2: Playlist Complexity (ƒê·ªô ph·ª©c t·∫°p c·ªßa Playlist)
# Playlist n√†y c√≥ bao nhi√™u ngh·ªá sƒ© kh√°c nhau? ƒê·ªô ƒëa d·∫°ng th·∫ø n√†o?
playlist_features = silver_df.groupBy("playlist_id", "playlist_name") \
    .agg(
        count("track_uri").alias("playlist_length"),
        countDistinct("artist_name").alias("unique_artists"),
        avg("duration_ms").alias("avg_track_duration")
    ) \
    .withColumn("diversity_ratio", col("unique_artists") / col("playlist_length"))

print("Sample Artist Features:")
artist_features.orderBy(desc("total_playlist_appearances")).show(5)

print("Sample Playlist Features:")
playlist_features.orderBy(desc("diversity_ratio")).show(5)

Processing Gold Layer (Feature Engineering)...
Sample Artist Features:
+----------------+--------------------------+------------------+------------------+--------------------+------------------+
|     artist_name|total_playlist_appearances|total_tracks_count|avg_track_duration|total_listen_time_ms|  popularity_score|
+----------------+--------------------------+------------------+------------------+--------------------+------------------+
|           Drake|                       202|               923|237832.82665222103|           219519699|418.29999999999995|
|         Rihanna|                       170|               348| 224016.5459770115|            77957758|223.39999999999998|
|      Kanye West|                       149|               412|248709.39077669903|           102468269|227.89999999999998|
|      The Weeknd|                       139|               291| 264382.6632302406|            76935355|             184.6|
|The Chainsmokers|                       121|               2

In [None]:
print("Saving Final Data (Storage Optimization)...")

# 1. L∆∞u Artist Features (Partition theo ch·ªØ c√°i ƒë·∫ßu ƒë·ªÉ truy v·∫•n nhanh)
# L∆∞u √Ω: Partition by artist_id/name tr·ª±c ti·∫øp s·∫Ω t·∫°o ra h√†ng tri·ªáu folder nh·ªè (Bad Practice).
# Chu·∫©n ch·ªâ l√† n√™n partition theo nh√≥m (Bucketing) ho·∫∑c ch·ªØ c√°i ƒë·∫ßu.
artist_features.write \
    .mode("overwrite") \
    .partitionBy("total_playlist_appearances") \
    .parquet("s3a://warehouse/gold/artist_features")

# 2. L∆∞u Playlist Features (Partition by Range v√≠ d·ª• theo ƒë·ªô d√†i playlist)
# T·∫°o bucket c·ªôt playlist_length ƒë·ªÉ partition cho g·ªçn
playlist_features_bucketed = playlist_features.withColumn(
    "length_bucket", 
    (col("playlist_length") / 50).cast("integer") * 50 # Gom nh√≥m 0-50, 50-100...
)

playlist_features_bucketed.write \
    .mode("overwrite") \
    .partitionBy("length_bucket") \
    .parquet("s3a://warehouse/gold/playlist_features")

print("All Data Saved & Partitioned Successfully!")

Saving Final Data (Storage Optimization)...
All Data Saved & Partitioned Successfully!
