## Part 2: Spark Streaming and Machine Learning for Real-Time Music Recommendation
This part focuses on streaming the data and applying machine learning to generate recommendations:

<li>Setting up Spark Streaming to process data in real time as it is received from Kafka.
<li>Implementing the machine learning models to make music recommendations based on user preferences and music features.
<li>Fine-tuning and deploying the recommendation system.</li>
<br>
By splitting the project this way, it allows the user to focus first on data ingestion and preprocessing (Part 1), and then on real-time streaming and machine learning tasks (Part 2). 
<br>
This division ensures a structured and logical progression from data handling to actionable insights.

In [None]:
!pip3.9 install pyspark

In [1]:
# Imports
import os
os.environ["PYSPARK_PYTHON"]="/opt/homebrew/opt/python@3.9/bin/python3.9"
os.environ["PYSPARK_DRIVER_PYTHON"]="/opt/homebrew/opt/python@3.9/bin/python3.9"
import time
import random
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler

In [2]:
# Kafka server address
SERVER = 'localhost:9092'

In [3]:
# Kafka topic name
TOPIC = "music-recommendation"

In [4]:
# Conectores do Spark para o Apache Kafka
spark_jars =  ("{},{},{},{},{}".format(os.getcwd() + "/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar",  
                                       os.getcwd() + "/jars/kafka-clients-2.7.0.jar", 
                                       os.getcwd() + "/jars/spark-streaming-kafka-0-10-assembly_2.12-3.3.2.jar", 
                                       os.getcwd() + "/jars/commons-pool2-2.8.0.jar",  
                                       os.getcwd() + "/jars/spark-token-provider-kafka-0-10_2.12-3.1.2.jar"))

In [5]:
# Initialize Spark session
spark = SparkSession \
        .builder \
        .config("spark.jars", spark_jars) \
        .appName("Music_Recommendation") \
        .getOrCreate()


24/09/18 01:20:51 WARN Utils: Your hostname, Samuels-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.113 instead (on interface en0)
24/09/18 01:20:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/09/18 01:20:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
spark.sparkContext.setLogLevel("ERROR")

In [7]:
# Using Spark Streaming to read the Kafka data stream and save it in a dataframe
df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", SERVER) \
        .option("subscribe", TOPIC) \
        .option("startingOffsets", "latest") \
        .load()

In [8]:
# Select the timestamp column as a string and save it to a new dataframe
df1 = df.selectExpr("CAST(value AS STRING)", "timestamp") 

In [9]:
# Define the schema with the name and type of each column
def_schema = "order_id INT, id STRING, name STRING, popularity INT, duration_ms DOUBLE, " \
             + "artists STRING, id_artists STRING, release_date STRING, " \
             + "danceability DOUBLE,energy DOUBLE, key INT, loudness DOUBLE, " \
             + "mode INT,speechiness DOUBLE," \
             + "acousticness DOUBLE, instrumentalness DOUBLE, liveness DOUBLE, " \
             + "valence DOUBLE, tempo DOUBLE, time_signature DOUBLE"

In [10]:
# Select the data stream according to the schema and save it in a new dataframe
df2 = df1.select(from_csv(col("value"), def_schema).alias("song"), "timestamp")

In [11]:
# Create an in-memory view in Spark and visualize the schema
df3 = df2.select("song.*", "timestamp")  
df3.createOrReplaceTempView("df3_View");
df3.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- artists: string (nullable = true)
 |-- id_artists: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [12]:
# Select the data with the songs from the stream
music_stream = spark.sql("SELECT * FROM df3_View")

In [13]:
# We cannot visualize yet, as we need to generate the Spark Streaming stream
# music_stream.show()

In [14]:
# Create the data stream in Spark Streaming
music_stream_spark = music_stream \
        .writeStream \
        .trigger(processingTime = '5 seconds') \
        .outputMode("append") \
        .option("truncate", "false") \
        .format("memory") \
        .queryName("table_spark") \
        .start()

# Start the Spark Streaming stream
music_stream_spark.awaitTermination(1)

False

In [15]:
# "Select the songs from the Spark stream table."

spark_songs = spark.sql("SELECT * FROM table_spark")

                                                                                

In [17]:
# Now we can visualize the real-time stream as a Spark table
spark_songs.show(5)

+--------+--------------------+------------+----------+-----------+------------+--------------------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+
|order_id|                  id|        name|popularity|duration_ms|     artists|          id_artists|release_date|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|           timestamp|
+--------+--------------------+------------+----------+-----------+------------+--------------------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+
|    3047|4efr1TvCzFIRxBdjy...|Son of a Gun|         1|   238946.0|Summer Flake|2oPPuIRXZ5gCKBtbt...|  2016-04-08|       0.301| 0.674|  1|  -8.784|   0|     0.0341|     0.00711|           0.799|  0.0981|  0.361|115.776|           4.

In [18]:
# Visualize only a few columns
spark_songs.select('order_id', 'id', 'name', 'popularity', 'duration_ms', 'artists').show(5)

+--------+--------------------+------------+----------+-----------+------------+
|order_id|                  id|        name|popularity|duration_ms|     artists|
+--------+--------------------+------------+----------+-----------+------------+
|    3047|4efr1TvCzFIRxBdjy...|Son of a Gun|         1|   238946.0|Summer Flake|
|    3048|0ATr1Z7lOjpY6kDiN...|     Jizzney|         0|   196480.0| A Giant Dog|
|    3049|30lDfg1I6ytj9LUz2...|   Chow Chow|         0|   122206.0|        Faye|
|    3050|2k7MQPXJxVwgFksCo...|       Bloom|        27|   243072.0|       Denny|
|    3051|7BSROePF8jz3tVHba...|All Day Long|         0|   215386.0|   KAPPEKOFF|
+--------+--------------------+------------+----------+-----------+------------+
only showing top 5 rows



In [20]:
# Count the number of songs extracted in real-time
spark_songs.count()

36

Wait a few minutes before proceeding with execution to allow the data stream to be collected.

## Proceed with extracting data from Spotify.



In [21]:
# Install the Spotipy library for accessing the Spotify API
# https://pypi.org/project/spotipy/
!pip3.9 install -q spotipy ujson seaborn tqdm

[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m[33m
[0m[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m[33m
[0m

In [22]:
# Imports
import os

import ujson
import spotipy
import spotipy.util
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

In [23]:
# Here you input your Spotify API keys
os.environ["SPOTIPY_CLIENT_ID"] = 'your_client_id_here' ## your_client_id_here
os.environ["SPOTIPY_CLIENT_SECRET"] = 'your_client_secret_here' ## your_client_secret_here
os.environ["SPOTIPY_REDIRECT_URI"] = 'http://localhost:7777/callback'

https://developer.spotify.com/documentation/general/guides/authorization/scopes/

In [24]:
# Scope for extracting the user's library preferences
scope = 'user-library-read'

In [25]:
# Spotify username
username = 'your_spotify_email_here' ## your_spotify_email_here

In [26]:
# Create the access token
token = spotipy.util.prompt_for_user_token(username, scope)

In [27]:
# Create the authentication object
spotipy_obj = spotipy.Spotify(auth = token)

In [28]:
# Extract up to 50 songs from the user's favorite tracks
saved_tracks = spotipy_obj.current_user_saved_tracks(limit = 50) 

In [29]:
# Number of tracks extracted
n_tracks = saved_tracks['total']
print('Total de Tracks: %d ' % n_tracks)

Total de Tracks: 36 


In [30]:
# Function to extract attributes from the user's track list
def select_features(track_response):
    return {        
        'id': str(track_response['track']['id']),
        'name': str(track_response['track']['name']),
        'artists': [artist['name'] for artist in track_response['track']['artists']],
        'popularity': track_response['track']['popularity']
    }

In [31]:
# Apply the function
tracks = [select_features(track) for track in saved_tracks['items']]

In [32]:
# Function to extract attributes from the user's track list
while saved_tracks['next']:
    saved_tracks = spotipy_obj.next(saved_tracks)
    tracks.extend([select_features(track) for track in saved_tracks['items']])

In [33]:
# Create a pandas dataframe
df_tracks = pd.DataFrame(tracks)
pd.set_option('display.max_rows', len(tracks))
df_tracks['artists'] = df_tracks['artists'].apply(lambda artists: artists[0])

In [34]:
# Display the first 10 rows
df_tracks.head(10)

Unnamed: 0,id,name,artists,popularity
0,31ZctoTXcPE0uqcsXfZdgI,Headphones,Christian McKinney,17
1,3QzPVAv3U2BoQVWcOInhFZ,You Found Me,Christian McKinney,24
2,79uxhWGPK9WnWf1Mcpq516,Elevate,10 Talents,24
3,3JNUHClcruc1O419mggHx4,Break Free,Coastside,17
4,6oOi6coHSGhCEkqwrdy224,I Know That's Right,Jimmy Dooley,41
5,4e8lXxOrEBJfvyx2zoZ0K3,Drive,NONAH,30
6,3ZEno9fORwMA1HPecdLi0R,HAPPY,NF,72
7,2f4nKv7hzbd9vWUaqJ4idM,Outside,Marc Vanparla,38
8,3TyXxUT1D3EiATkn4g5vJ6,Deeper Waters,Jeremy Camp,46
9,1JR7uAiV3UrpNwUDycbpAR,sun and skies,alli ward,29


In [35]:
# Dictionary for audio attributes
audio_features = {}

In [36]:
# Extract the audio attributes
for idd in df_tracks['id'].tolist():
    audio_features[idd] = spotipy_obj.audio_features(idd)[0]

In [37]:
# Add the audio attributes to the dataframe
df_tracks['acousticness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['acousticness'])
df_tracks['speechiness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['speechiness'])
df_tracks['key'] = df_tracks['id'].apply(lambda idd: str(audio_features[idd]['key']))
df_tracks['liveness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['liveness'])
df_tracks['instrumentalness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['instrumentalness'])
df_tracks['energy'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['energy'])
df_tracks['tempo'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['tempo'])
df_tracks['loudness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['loudness'])
df_tracks['danceability'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['danceability'])
df_tracks['valence'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['valence'])

In [38]:
df_tracks.head()

Unnamed: 0,id,name,artists,popularity,acousticness,speechiness,key,liveness,instrumentalness,energy,tempo,loudness,danceability,valence
0,31ZctoTXcPE0uqcsXfZdgI,Headphones,Christian McKinney,17,0.424,0.0655,8,0.0955,0.0,0.611,149.92,-9.793,0.616,0.708
1,3QzPVAv3U2BoQVWcOInhFZ,You Found Me,Christian McKinney,24,0.661,0.0374,5,0.121,0.00211,0.632,143.983,-6.489,0.737,0.341
2,79uxhWGPK9WnWf1Mcpq516,Elevate,10 Talents,24,0.012,0.155,11,0.0662,5e-05,0.746,173.938,-6.155,0.663,0.677
3,3JNUHClcruc1O419mggHx4,Break Free,Coastside,17,0.215,0.332,4,0.113,0.0,0.738,155.08,-8.464,0.672,0.338
4,6oOi6coHSGhCEkqwrdy224,I Know That's Right,Jimmy Dooley,41,0.119,0.211,6,0.0921,0.0,0.592,113.055,-10.631,0.747,0.796


In [39]:
# Randomly selected a song.

random_music = random. randint(0,len(df_tracks)-1)
df_random = df_tracks.head(random_music)[-1:]
df_random

Unnamed: 0,id,name,artists,popularity,acousticness,speechiness,key,liveness,instrumentalness,energy,tempo,loudness,danceability,valence
26,5F7IJrXD6Fa3EmqoYE0gU4,Say Goodbye,Chris Brown,69,0.00167,0.0371,1,0.0784,0.0,0.502,115.091,-6.062,0.8,0.388


In [40]:
# Spark music stream
spark_songs.show(5)

+--------+--------------------+------------+----------+-----------+------------+--------------------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+
|order_id|                  id|        name|popularity|duration_ms|     artists|          id_artists|release_date|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|           timestamp|
+--------+--------------------+------------+----------+-----------+------------+--------------------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+
|    3047|4efr1TvCzFIRxBdjy...|Son of a Gun|         1|   238946.0|Summer Flake|2oPPuIRXZ5gCKBtbt...|  2016-04-08|       0.301| 0.674|  1|  -8.784|   0|     0.0341|     0.00711|           0.799|  0.0981|  0.361|115.776|           4.

In [41]:
# Drop the columns that are not needed
spark_songs = spark_songs.drop('order_id', 
                               'mode', 
                               'release_date', 
                               'id_artists',
                               'time_signature', 
                               'duration_ms',
                               'timestamp')

In [42]:
# Create a new column with the song's name and artist
df_sp = spark.createDataFrame(df_random)

In [43]:
# Concatenate songs from Spark streaming with Spotify music
df = spark_songs.union(df_sp)

In [44]:
df.show(5)

+--------------------+------------+----------+------------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-------+
|                  id|        name|popularity|     artists|danceability|energy|key|loudness|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|
+--------------------+------------+----------+------------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-------+
|4efr1TvCzFIRxBdjy...|Son of a Gun|         1|Summer Flake|       0.301| 0.674|  1|  -8.784|     0.0341|     0.00711|           0.799|  0.0981|  0.361|115.776|
|0ATr1Z7lOjpY6kDiN...|     Jizzney|         0| A Giant Dog|       0.419| 0.941|  9|  -5.686|     0.0699|     1.19E-4|         1.58E-5|   0.122|  0.602|125.416|
|30lDfg1I6ytj9LUz2...|   Chow Chow|         0|        Faye|       0.444| 0.941|  4|  -5.015|     0.0533|      0.0411|          0.0818|   0.103|  0.521|119.009|
|2k7MQPXJxVwgFksCo...|       Bloom|     

## Data Preprocessing

In [45]:
# Prepare the VectorAssembler
vetor = VectorAssembler(inputCols = ['danceability',
                                     'energy',
                                     'loudness',
                                     'speechiness',
                                     'acousticness',
                                     'instrumentalness',
                                     'liveness',
                                     'valence',
                                     'tempo'], 
                        outputCol = 'song_features')

In [46]:
# Discard invalid values
assembled = vetor.setHandleInvalid("skip").transform(df)

In [47]:
# Prepare the StandardScaler
std = StandardScaler(inputCol = 'song_features', outputCol = 'standardized')

In [48]:
# Train the standardizer
scale = std.fit(assembled)

In [49]:
# Dataframe with standardized data
df = scale.transform(assembled)

In [50]:
df.show(5)

+--------------------+------------+----------+------------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-------+--------------------+--------------------+
|                  id|        name|popularity|     artists|danceability|energy|key|loudness|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|       song_features|        standardized|
+--------------------+------------+----------+------------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-------+--------------------+--------------------+
|4efr1TvCzFIRxBdjy...|Son of a Gun|         1|Summer Flake|       0.301| 0.674|  1|  -8.784|     0.0341|     0.00711|           0.799|  0.0981|  0.361|115.776|[0.301,0.674,-8.7...|[1.49656377223971...|
|0ATr1Z7lOjpY6kDiN...|     Jizzney|         0| A Giant Dog|       0.419| 0.941|  9|  -5.686|     0.0699|     1.19E-4|         1.58E-5|   0.122|  0.602|125.416|[0.419,0.941,-5.6...|[2.083256546

## Machine Learning with Unsupervised Learning

In [51]:
# Create the model object
objeto_KMeans = KMeans(featuresCol = 'standardized', k = 3)

In [52]:
# Drop rows with null values
df_clean = df.na.drop()

# Train the model
modelo_KMeans = objeto_KMeans.fit(df_clean)


In [53]:
# Predictions of the model
df_output = modelo_KMeans.transform(df)

## Recommendation System

In [64]:
# Class
class RecoSystem():
    
    # Construtor
    def __init__(self, data):
        self.data_ = data
    
    # Receive the music name and the number of recommendations
    def Recomm(self, music_name, amount = 1):
        
        # List for distances
        distances = []
        
        # Select the music
        song = self.data_[(self.data_.name.str.lower() == music_name.lower())].head(1).values[0]
        res_dt = self.data_[self.data_.name.str.lower() != music_name.lower()]
        
        # Loop to calculate distances
        for i_song in tqdm(res_dt.values):
            
            # Initialize the distance
            distance = 0
            
            # Loop to calculate the distance
            for col in np.arange(len(res_dt.columns)):
                if not col in [0,1,2,14]:
                    distance = distance + np.absolute(float(song[col]) - float(i_song[col]))
            
            # Add to the list of distances
            distances.append(distance)
        
        res_dt['distance'] = distances
        res_dt = res_dt.sort_values('distance')
        
        columns = ['id','name', 
                   'artists', 
                   'acousticness', 
                   'liveness', 
                   'instrumentalness', 
                   'energy', 
                   'danceability', 
                   'valence']
        
        return res_dt[columns][:amount]

In [65]:
# Columns names
datalabel = df_output.select('id',
                             'name',
                             'artists',
                             'danceability',
                             'energy',
                             'key',
                             'loudness',
                             'speechiness',
                             'acousticness',
                             'instrumentalness',
                             'liveness',
                             'valence',
                             'tempo',
                             'prediction')

In [66]:
# Dataset with the labels
df_final = datalabel.toPandas()
df_final.drop(df_final[df_final['artists'] == '0'].index, inplace = True)
df_final.drop_duplicates(inplace = True)
df_final.drop(df_final[df_final['danceability'] == 0.0000].index, inplace = True)
df_final.drop(df_final[df_final['liveness'] == 0.000].index, inplace = True)
df_final.drop(df_final[df_final['instrumentalness'] == 0.000000].index, inplace = True)
df_final.drop(df_final[df_final['energy'] == 0.0000].index, inplace = True)
df_final.drop(df_final[df_final['danceability'] == 0.000].index, inplace = True)
df_final.drop(df_final[df_final['valence'] == 0.000].index, inplace = True)

In [67]:
df_final.shape

(288, 14)

In [68]:
df_final.sample(5)

Unnamed: 0,id,name,artists,danceability,energy,key,loudness,speechiness,acousticness,instrumentalness,liveness,valence,tempo,prediction
28,0Z5v6Ng5AhScYxZuiiWEC7,Raindrops,APOGEE,0.51,0.791,5,-6.759,0.0395,0.000275,3.7e-05,0.106,0.518,159.892,0
298,4N9ghytYf41jio4x3gWLAu,House,Nicky Blitz,0.504,0.783,4,-4.205,0.0544,0.00453,0.705,0.389,0.833,158.035,0
47,6lY7moXPj3FGJ5VscgsU54,Hippo + 47,ICHI,0.921,0.519,0,-11.135,0.466,0.213,0.0253,0.0946,0.725,98.96,1
102,5betBtffCTwfvW64m0iiXG,4AM,Povi,0.732,0.702,10,-4.461,0.039,0.0292,0.000106,0.0906,0.55,123.033,0
314,2lNt62LH941Q5PmuU3XrQM,A Moment to Return,Why We Run,0.498,0.676,5,-8.599,0.0407,0.0152,0.154,0.167,0.482,143.234,0


In [69]:
# Create the object
reco_obj = RecoSystem(df_final)

In [70]:
music = df_random['name'].tolist()[0]

In [71]:
print(music)

Say Goodbye


In [72]:
# Execute the recommendation
recommendation = reco_obj.Recomm(music)


100%|██████████| 287/287 [00:00<00:00, 43043.88it/s]


In [73]:
# Extract the random music from the Spotify favorites list
y = df_random[['id','name', 
                         'artists',  
                         'acousticness', 
                         'liveness', 
                         'instrumentalness', 
                         'energy', 
                         'danceability', 
                         'valence']]

In [74]:
# Concatenate the recommendation with the random music from the Spotify favorites list
recommendation = pd.concat([recommendation, y])

In [75]:
# Save the recommendation to disk
recommendation.to_csv('output/recommendation.csv')

In [76]:
# Upload the file from disk
df_reco = (spark.read.format("csv").options(header = "true").load("output/recommendation.csv"))

In [77]:
# Visualize the recommendation
df_reco.show()

+---+--------------------+-----------+---------------+------------+--------+----------------+------+------------+-------+
|_c0|                  id|       name|        artists|acousticness|liveness|instrumentalness|energy|danceability|valence|
+---+--------------------+-----------+---------------+------------+--------+----------------+------+------------+-------+
|104|1veCEMe2GY4W8ECYE...|  Dixie Boy|Joseph Petersen|      0.0135|   0.162|          0.0254| 0.694|       0.199| 0.0832|
| 26|5F7IJrXD6Fa3EmqoY...|Say Goodbye|    Chris Brown|     0.00167|  0.0784|             0.0| 0.502|         0.8|  0.388|
+---+--------------------+-----------+---------------+------------+--------+----------------+------+------------+-------+



                                                                                