# Script to extract nodes and edges from the Million Song Dataset

### Import the required libraries

In [1]:
import sqlite3
import pandas as pd
import numpy as np

In [2]:
import findspark
findspark.init()
import pyspark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

### Create a Spark Session

In [4]:
# sc = pyspark.SparkContext(appName="dataset_creation")
# sqlContext = SQLContext(sc)
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("dataset_creation") \
    .getOrCreate()

In [5]:
# sc.stop()
# spark.stop()

## Extract node data

### Read data

In [6]:
# set the path to 'track_metadata.db' file here
track_metadata_db_path = "C:\\Users\\mgandhi39\\Downloads\\DVA Project\\Dataset\MSD\\track_metadata.db"

In [7]:
conn = sqlite3.connect(track_metadata_db_path)
cursor = conn.cursor()

In [8]:
query = 'SELECT name FROM sqlite_master WHERE type="table"'
cursor.execute(query)
print(cursor.fetchall())

[('songs',)]


In [9]:
query = "SELECT * FROM songs"
pandas_df = pd.read_sql_query(query, conn)

In [10]:
# trim the data to smaller size for testing
no_of_records = 1000000
trimmed_pandas_df = pandas_df[:no_of_records]

In [11]:
# this step takes a while to run 
# TO DO: find a better way to read data from .db file to spark
spark_df = spark.createDataFrame(pandas_df)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [12]:
# spark_df.limit(5).toPandas().head()

### Group by artist_id and aggregate fields

In [13]:
# artists2 = spark_df.groupby("artist_id").agg({F.count(spark_df["track_id"]).alias("total_songs"), F.avg(spark_df["duration"]).alias("avg_duration"), F.avg(spark_df["artist_hotttnesss"]).alias("avg_hotttness"), F.avg(spark_df["artist_familiarity"]).alias("avg_familiarity"), F.min(spark_df["year"]).alias("oldest_track_year"), F.max(spark_df["year"]).alias("latest_track_year")})
artists = spark_df.groupby("artist_id").agg({"track_id": "count", "duration": "avg", "artist_hotttnesss": "avg", "artist_familiarity": "avg"})
artists = artists.withColumnRenamed("avg(duration)", "avg_duration").withColumnRenamed("count(track_id)", "total_tracks").withColumnRenamed("avg(artist_familiarity)", "avg_familiarity").withColumnRenamed("avg(artist_hotttnesss)", "avg_hotttnesss")

In [14]:
# artists.limit(10).toPandas().head()

In [15]:
# artists.summary().show()

### Add artist_name to the extracted data
An artist_id may correspond to more than one artist_name i.e. artist_name is not unique. This is because artist of a track can be 'Abc ft. Xyz'. So we keep the name which is of the shortest length.


In [16]:
artist_names = spark_df.groupby(["artist_id", "artist_name"]).count()

In [17]:
# artist_names.show()

In [18]:
# artist_names.summary().show()

In [19]:
window = Window.partitionBy(artist_names["artist_id"]).orderBy(length(artist_names["artist_name"]))
unique_artist_names = artist_names.select('*', row_number().over(window).alias('rank')).filter(col('rank') == 1)
unique_artist_names = unique_artist_names.drop("count").drop("rank").withColumnRenamed("artist_id", "id")

In [20]:
# unique_artist_names.limit(5).toPandas().head()

In [21]:
unique_artist_names.count()

44745

In [22]:
node_data = artists.join(unique_artist_names, artists["artist_id"] == unique_artist_names["id"], "inner").drop("id")

In [23]:
# node_data.limit(10).toPandas().head()

In [24]:
# node_data.summary().show()

In [25]:
# TO DO
# create links between colaborators

### Keep only one of the duplicate artists

In [26]:
# nodes_to_remove = node_data.groupby("artist_name").count().where(col("count") > 1)

In [27]:
# nodes_to_remove.show()

In [28]:
# nodes_to_remove.count()

In [29]:
window2 = Window.partitionBy(col("artist_name")).orderBy(desc(col("total_tracks")))
corrected_nodes_data = node_data.select("*", row_number().over(window2).alias("rank")).filter(col("rank") == 1)

In [30]:
# corrected_nodes_data.limit(5).toPandas().head()

In [31]:
corrected_nodes_data.count()

42916

### Output node data to CSV

In [32]:
# set nodes.csv output path here
node_data_output_path = "C:\\Users\\mgandhi39\\Downloads\\DVA Project\\Dataset\\nodes.csv"

In [None]:
corrected_nodes_data.toPandas().to_csv(node_data_output_path)

## Extract edge data

### Read Data and extract edges

In [33]:
# set the path to 'artist_similarity.db' file here
artist_similarity_db_path = "C:\\Users\\mgandhi39\\Downloads\\DVA Project\\Dataset\MSD\\artist_similarity.db"

In [34]:
conn2 = sqlite3.connect(artist_similarity_db_path)
cursor2 = conn2.cursor()

In [35]:
query = 'SELECT name FROM sqlite_master WHERE type="table"'
cursor2.execute(query)
print(cursor2.fetchall())

[('artists',), ('similarity',)]


In [36]:
pd_edges = pd.read_sql_query('SELECT * FROM similarity', conn2)
pd_edges["index"] = range(1, len(pd_edges) + 1)

In [37]:
spark_edges = spark.createDataFrame(pd_edges)
spark_edges = spark_edges.withColumnRenamed("target", "source_artist_id").withColumnRenamed("similar","target_artist_id")

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [38]:
window3 = Window.partitionBy("source_artist_id").orderBy("index")
edge_data = spark_edges.select("*", row_number().over(window3).alias("priority")).drop("index")

In [39]:
# edge_data.limit(5).toPandas().head()

In [40]:
edge_data.count()

2201916

### Keep only one of the duplicate artists

In [48]:
corrected_edge_data = edge_data.join(corrected_nodes_data, edge_data["source_artist_id"] == corrected_nodes_data["artist_id"], 'leftsemi')

In [49]:
# corrected_edge_data.limit(5).toPandas().head()

In [50]:
corrected_edge_data.count()

2107199

In [51]:
corrected_edge_data = corrected_edge_data.join(corrected_nodes_data, edge_data["target_artist_id"] == corrected_nodes_data["artist_id"], 'leftsemi')

In [52]:
corrected_edge_data.count()

2002865

### Output edge data to CSV

In [53]:
# set edges.csv output path here
edge_data_output_path = "C:\\Users\\mgandhi39\\Downloads\\DVA Project\\Dataset\edges.csv"

In [54]:
corrected_edge_data.toPandas().to_csv(edge_data_output_path)