In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

spark: SparkSession = SparkSession \
    .builder \
    .appName("StructuredStreaming") \
    .config('spark.executor.memory', '16g') \
    .config('spark.driver.memory', '16g') \
    .getOrCreate()
    
spark.sparkContext.setLogLevel("ERROR")
    

25/01/21 01:06:21 WARN Utils: Your hostname, Maxims-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.102 instead (on interface en0)
25/01/21 01:06:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/21 01:06:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/21 01:06:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [None]:
'''
This cell is for setting up the streaming data of recorded chords and features.
    1. Data is read from a csv file and streamed in batches of 1 file per trigger.
    2. The data is then grouped by record_name, user_name and a 5 second window.
    3. The chords and features are aggregated by taking the last value in the window.
    4. Chords and features are taken by the last values in the window.
'''

basepath = '.'
timestamp_format = 'MM/dd/yy HH:mm:ss'

sample_df = spark.read.option("header", "true").csv(f"{basepath}/streamed_chords/test.csv")
schema = sample_df.schema

sample_df = sample_df.withColumn('timestamp', F.to_timestamp("timestamp", timestamp_format)).orderBy('timestamp')
sample_df.show(5)

input_stream = spark.readStream\
    .schema(schema)\
    .option('header', True)\
    .option("maxFilesPerTrigger", 1)\
    .option("pathGlobFilter", "*.csv")\
    .option("encoding", "UTF-8") \
    .csv(f"{basepath}/streamed_chords/")\
    .withColumn('timestamp', F.to_timestamp("timestamp", timestamp_format))\
    .dropna().withWatermark("timestamp", "10 seconds")

  
input_stream.printSchema() 
input_stream.writeStream.format("memory").queryName('input_stream').start()
       
windowed_records = input_stream.groupBy(
    input_stream.record_name,
    input_stream.user_name,
    F.window(input_stream.timestamp, '5 second')
).agg(
    F.last("chords").alias("chords"),
    F.last("features").alias("features"),
    F.last("timestamp").alias("latest_timestamp")
)

+-------------------+----------------+--------------------+---------+-----------+--------------------+
|          timestamp|          chords|              frames|user_name|record_name|            features|
+-------------------+----------------+--------------------+---------+-----------+--------------------+
|2025-01-20 10:26:46|['Am', 'F', 'C']|['1.67', '4.09', ...|test_user|       test|{'tempo': 99.3840...|
|2025-01-20 10:26:47|['Am', 'F', 'C']|['1.67', '4.09', ...|test_user|       test|{'tempo': 99.3840...|
+-------------------+----------------+--------------------+---------+-----------+--------------------+

root
 |-- timestamp: timestamp (nullable = true)
 |-- chords: string (nullable = true)
 |-- frames: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- record_name: string (nullable = true)
 |-- features: string (nullable = true)



In [3]:
''' This cell is for basic data cleanup and transformation. '''
import re
import pandas as pd
from pyspark.sql import functions as F
import warnings

warnings.filterwarnings("ignore")

def data_cleanup(csv_path):
    df = pd.read_csv(csv_path)
    print(df.columns)

    df = df[['song_name', 'artist_name', 'chords', 'genres']]

    df['chords'] = list(map(lambda ch: ch.replace("'","").replace(',',' ').split(), df['chords']))
    regex = r"^(C|C#|D|D#|E|F|F#|G|G#|A|A#|B)(m)?$"

    for i, chords_list in enumerate(df['chords']):
        clean_chords = [chord for chord in chords_list if re.match(regex, chord)]
        df['chords'][i] = clean_chords
        
    df.to_csv('/Users/mkhusid/Desktop/Code/python/autochord/data/chords.csv', sep=';')
    return df

clean_chords_df = data_cleanup('./data/chords_and_lyrics.csv')
chords_and_lyrics = spark.createDataFrame(clean_chords_df)
chords_and_lyrics.printSchema()
chords_and_lyrics.dropna().count()

Index(['Unnamed: 0', 'artist_name', 'song_name', 'chords&lyrics', 'chords',
       'lyrics', 'tabs', 'lang', 'artist_id', 'followers', 'genres',
       'popularity', 'name_e_chords'],
      dtype='object')
root
 |-- song_name: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- chords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- genres: string (nullable = true)



                                                                                

135783

In [4]:
import ast
from pyspark.ml import PipelineModel

def parse_recorded_data(chords_data):
  ''' 
  Parse the recorded chords and features data, then classify the genre of the song using pre-trained model.
  Args:
    chords_data: A DataFrame containing the recorded chords and features data.
  Returns:
    A DataFrame containing the classified genre of the song.
  '''
  
  @F.udf('float')
  def parse_feature(features, feature):
      return ast.literal_eval(features)[feature]

  users_chords = chords_data.withColumnRenamed('chords', 'compared')\
          .withColumn("tempo", parse_feature(F.col('features'), F.lit('tempo')))\
          .withColumn("energy", parse_feature(F.col('features'), F.lit('energy')))\
          .withColumn("acousticness", parse_feature(F.col('features'), F.lit('acousticness')))\
          .withColumn("danceability", parse_feature(F.col('features'), F.lit('danceability')))\
          .withColumn("liveness", parse_feature(F.col('features'), F.lit('liveness')))\
          .withColumn("valence", parse_feature(F.col('features'), F.lit('valence')))\
          .drop('features')    
                
  users_chords.printSchema()

  genres_classifier = PipelineModel.load('./models/genres_model/')
  predictions = genres_classifier.transform(users_chords)

  @F.udf
  def decoded(pred):
    decoded_genres = ['rock', 'pop']
    return decoded_genres[int(pred)]

  recorded_classified = predictions.withColumn('genre', decoded(F.col('prediction'))).select('record_name', 'user_name', 'compared', 'genre')
  recorded_classified.printSchema()
  
  return recorded_classified

In [5]:
'''
This cell is to find similar songs within a input stream of recorded chords and features.
It creates an output stream of similar songs with a similarity score greater than a threshold.
This allows to implement real-time analysis of the input stream for all the users simultaneously.
'''
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import ast

@F.udf
def jac_similarity(v1, v2):
    ''' Define the Jaccard similarity function for two sets of chords. '''
    set1 = set(v1)
    set2 = set(ast.literal_eval(v2))
    intersection = len(set1 & set2)
    union = len(set1 | set2)
    return intersection / union if union != 0 else 0.0

@F.udf
def check_genre(genre, genres_list):
    return genre in genres_list 

def find_similar(original_df, input_stream, threshold):  
    ''' Join original data with the input stream & calculate chords similarity. ''' 
    joined = original_df.crossJoin(input_stream)
    result = joined.withColumn("similarity", jac_similarity(F.col("chords"), F.col("compared")))
    return result.filter(F.col("similarity") > threshold)

   
recorded_classified = parse_recorded_data(windowed_records)
filtered = find_similar(chords_and_lyrics, recorded_classified, 0.8).withColumn('genres_match', check_genre('genre', 'genres'))
final = filtered.filter('genres_match == true').orderBy(F.asc('similarity'))
final.writeStream.format("memory").queryName('analyzer_results').outputMode("complete").trigger(processingTime="5 seconds").start()

root
 |-- record_name: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- compared: string (nullable = true)
 |-- latest_timestamp: timestamp (nullable = true)
 |-- tempo: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- acousticness: float (nullable = true)
 |-- danceability: float (nullable = true)
 |-- liveness: float (nullable = true)
 |-- valence: float (nullable = true)

root
 |-- record_name: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- compared: string (nullable = true)
 |-- genre: string (nullable = true)



<pyspark.sql.streaming.query.StreamingQuery at 0x29adc5ee0>

In [8]:
def get_prediticons_for_user(user_id):
    ''' Get the predictions for a specific user. '''
    columns = ['record_name', 'song_name', 'artist_name', 'chords', 'compared', 'genre']
    query = f'select {", ".join(columns)} from analyzer_results where user_name="{user_id}"'
    return spark.sql(query)

user_id = input('Enter user name: ')
get_prediticons_for_user(user_id).show()

+---------+-----------+-----------------+-------------+--------------------+----------------+-----+
|user_name|record_name|        song_name|  artist_name|              chords|        recorded|genre|
+---------+-----------+-----------------+-------------+--------------------+----------------+-----+
|test_user|      test2|     Runaway Love|Justin Bieber|[Am, C, G, Am, C,...|['C', 'G', 'Am']|  pop|
|test_user|     test23|     Runaway Love|Justin Bieber|[Am, C, G, Am, C,...|['C', 'G', 'Am']|  pop|
|test_user|      test2|       Better Man| Taylor Swift|[C, C, G, C, G, G...|['C', 'G', 'Am']|  pop|
|test_user|     test23|       Better Man| Taylor Swift|[C, C, G, C, G, G...|['C', 'G', 'Am']|  pop|
|test_user|      test2|     Shake It Off| Taylor Swift|[Am, C, G, Am, C,...|['C', 'G', 'Am']|  pop|
|test_user|     test23|     Shake It Off| Taylor Swift|[Am, C, G, Am, C,...|['C', 'G', 'Am']|  pop|
|test_user|       test|     Sweet Escape| Taylor Swift|[Am, C, F, Am, C,...|['Am', 'F', 'C']|  pop|
