In [13]:
# Importing necessary libraries and settings
import json
from random import randint
from functools import reduce
import spotipy
import spotipy.util as util
from spotipy.oauth2 import SpotifyClientCredentials
from spotipy import oauth2

from kafka import KafkaProducer
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    FloatType,
    ArrayType,
)
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

In [2]:
packages = {
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0,org.neo4j:neo4j-connector-apache-spark_2.12:5.0.2_for_spark_3"
}

In [3]:
cid = '6546e0f0348341a88d86457d98b3bc9b'
secret = 'cfeabd91de824bcfa8f2c8dd8a2191e0'
redirect_uri='http://localhost:7777/callback'
username = 'vass.zora'

In [4]:
# Once the Authorisation is complete, we just need to `sp` to call the APIs
scope = 'user-top-read user-read-private playlist-modify-private playlist-modify-public user-read-currently-playing'
token = util.prompt_for_user_token(username, scope, client_id=cid, client_secret=secret, redirect_uri=redirect_uri)

if token:
    sp = spotipy.Spotify(auth=token)
else:
    print("Can't get token for", username)

In [5]:
# Creating a function to get tracks IDs from a playlist
def get_playlist_tracks(username,playlist_id):
    results = sp.user_playlist_tracks(username,playlist_id)
    tracks = results['items']
    while results['next']:
        results = sp.next(results)
        tracks.extend(results['items'])
    return tracks

In [6]:
playlist_id_mood_dict = {
    'sad': '37i9dQZF1DX3rxVfibe1L0',
    'happy': '37i9dQZF1DX4uPi2roRUwU',
    #'chill': '37i9dQZF1DWWQRwui0ExPn',
    #'angry': '37i9dQZF1DX3ND264N08pv',
    #'romantic': '37i9dQZF1DX7rOY2tZUw1k',
}

In [14]:
moods = ['sad', 
         'happy', 
         #'chill', 
         #'angry', 
         #'romantic'
         ]

# Getting tracks from each playlist
tracks = []
audio_features = []
for mood in moods:
    current_tracks = get_playlist_tracks(username, playlist_id_mood_dict[mood])
    tracks.extend(current_tracks)
    # for track in current_tracks: #uncomment if spotify api works
    #     current_audio = sp.audio_features(track['track']['id'])[0]
    #     current_audio['mood'] = moods.index(mood)
    #     audio_features.append(current_audio)

In [8]:
spark = (
    SparkSession.builder.appName("Final assignment")
    .config("spark.jars.packages", ",".join(packages))
    .getOrCreate()
)

23/06/01 14:31:02 WARN Utils: Your hostname, HP-Elite830 resolves to a loopback address: 127.0.1.1; using 192.168.1.18 instead (on interface wlp1s0)
23/06/01 14:31:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/vaszo/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/vaszo/.ivy2/cache
The jars for the packages stored in: /home/vaszo/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.neo4j#neo4j-connector-apache-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a3b21036-093d-415e-b5a9-f45104794319;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.9.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in local-m

23/06/01 14:31:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [16]:
producer = KafkaProducer(bootstrap_servers="localhost:9092")
spark.sparkContext.setLogLevel("ERROR")

In [17]:
producer.send("tracks_topic", b"")
producer.send("mood_prediction", b"")
producer.send("audio_features_topic", b"")

<kafka.producer.future.FutureRecordMetadata at 0x7faea2d6b1f0>

In [18]:
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "tracks_topic, mood_prediction, audio_features_topic")
    .option("startingOffsets", "latest")
    .load()
)

In [19]:
tracks_schema = StructType(
    [
        StructField("id", StringType()),
        StructField("name", StringType()),
        StructField("artists", ArrayType(
            StructType([
                StructField("id", StringType()),
                StructField("name", StringType())
            ])
        )),
        StructField("duration_ms", StringType())
    ]
)       


mood_prediction_schema = StructType(
    [
        StructField("id", StringType()),
        StructField("mood", StringType())
    ]
)

audio_features_schema = StructType(
    [
        StructField("id", StringType(), True),
        StructField("danceability", FloatType(), True),
        StructField("energy", FloatType(), True),
        StructField("key", IntegerType(), True),
        StructField("loudness", FloatType(), True),
        StructField("mode", IntegerType(), True),
        StructField("speechiness", FloatType(), True),
        StructField("acousticness", FloatType(), True),
        StructField("instrumentalness", FloatType(), True),
        StructField("liveness", FloatType(), True),
        StructField("valence", FloatType(), True),
        StructField("tempo", FloatType(), True),
        StructField("mood", IntegerType(), True),
    ]
)


In [20]:
tracks_stream = (
    df.selectExpr("CAST(value AS STRING)")
    .filter("topic = 'tracks_topic'")
    .select(from_json("value", tracks_schema).alias("data"))
    .select("data.*")
)

mood_prediction_stream = (
    df.selectExpr("CAST(value AS STRING)")
    .select(from_json("value", mood_prediction_schema).alias("data"))
    .select("data.*")
)

audio_features_stream = (
    df.selectExpr("CAST(value AS STRING)")
    .select(from_json("value", audio_features_schema).alias("data"))
    .select("data.*")
)

In [48]:
tracks_query = (
    tracks_stream.writeStream.format("memory")
    .queryName("tracks")
    .start()
)

mood_prediction_query = (
    mood_prediction_stream.writeStream.format("memory")
    .queryName("mood_prediction")
    .start()
)

audio_features_query = (
    audio_features_stream.writeStream.format("memory")
    .queryName("audio_features")
    .start()
)

In [81]:
# tracks_query.stop()
# number_of_clusters_query.stop()
# audio_features_query.stop()

In [22]:
for track in tracks:
    producer.send("tracks_topic", json.dumps(track['track']).encode("utf-8"))

for audio in audio_features:
    producer.send("audio_features_topic", json.dumps(audio).encode("utf-8"))

                                                                                

                                                                                

In [23]:
tracks_spark = spark.sql("select * from tracks")

In [24]:
tracks_spark.show(10, False)

+----------------------+---------------------------------------+--------------------------------------------------------------------------------------+-----------+
|id                    |name                                   |artists                                                                               |duration_ms|
+----------------------+---------------------------------------+--------------------------------------------------------------------------------------+-----------+
|0rzaRSujxA0bKyjJl6vHYq|Satellite                              |[{6KImCVD70vtIoJWnq6nGn3, Harry Styles}]                                              |218577     |
|5jQI2r1RdgtuT8S3iG8zFC|Lavender Haze                          |[{06HL4z0CvFAxyc27GXpf02, Taylor Swift}]                                              |202395     |
|1l4iQsOZ5sOXZPMQLvouaB|Coast (feat. Anderson .Paak)           |[{5p7f24Rk5HkUZsaS3BLG5F, Hailee Steinfeld}, {3jK9MiCrA42lLAdMGUZpwa, Anderson .Paak}]|166720     |
|6maTPqynTmrkWIr

In [25]:
audio_features_spark = spark.sql("select * from audio_features")

In [26]:
audio_features_spark.show(10, False)

+----------------------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-----+----+
|id                    |danceability|energy|key |loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|mood|
+----------------------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-----+----+
|0rzaRSujxA0bKyjJl6vHYq|null        |null  |null|null    |null|null       |null        |null            |null    |null   |null |null|
|5jQI2r1RdgtuT8S3iG8zFC|null        |null  |null|null    |null|null       |null        |null            |null    |null   |null |null|
|1l4iQsOZ5sOXZPMQLvouaB|null        |null  |null|null    |null|null       |null        |null            |null    |null   |null |null|
|6maTPqynTmrkWIralgGaoP|null        |null  |null|null    |null|null       |null        |null            |null    |null   |null |null|
|6dgUya35uo964z7GZXM07g|null        |null  |null|null    |null

In [31]:
def transform_data(df, assembler, scaler):
    df = df.dropna()
    df = df.drop('id')
    assembled_data = assembler.transform(df)
    scaler_model = scaler.fit(assembled_data)
    scaled_data = scaler_model.transform(assembled_data)
    return scaled_data

In [None]:
vectorAssembler = VectorAssembler(inputCols = ['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo'], outputCol = 'features')
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scaled_train = transform_data(audio_features_spark, vectorAssembler, scaler)

In [29]:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol="mood", featuresCol="scaledFeatures")

In [103]:
model = nb.fit(scaled_train)

In [42]:
def predict_mood(model, track):
    # audio = sp.audio_features(track['id'])[0] #uncomment if spotify api is working
    # scaled_audio = transform_data(spark.createDataFrame([audio]), vectorAssembler, scaler)
    # prediction = model.predict(scaled_audio.select('scaledFeatures').collect()[0][0])
    # return {'id': track['id'], 'mood': moods[int(prediction)]}
    prediction_dict = {'id': track['id'], 'mood': moods[randint(0, 1)]} #comment this if spotify api is working
    return prediction_dict

In [33]:
model = 1 #comment this if spotify api is working

In [None]:
prev_song = None
while True:
    current_track = sp.current_user_playing_track()
    if current_track is not None:
        if prev_song != current_track['item']['id']:
            prediction = predict_mood(model, current_track['item'])
            producer.send("mood_prediction", json.dumps(prediction).encode("utf-8"))
            prev_song = current_track['item']['id']
    else:
        prev_song = None

In [52]:
mood_predictions_spark = spark.sql("select * from mood_prediction") 

In [53]:
mood_predictions_spark.show(10, False)

+----------------------+-----+
|id                    |mood |
+----------------------+-----+
|0BaFtki3CupqritofvTRSj|happy|
+----------------------+-----+

