# Music Recommender with AudioScrobbler

In [1]:
import pyspark
spark = (pyspark
         .sql
         .SparkSession.builder
         .master("local[*]")
         .appName("MusicRecommender")
         .config("spark.driver.memory", "4g") # Main methods, organize lazy actions
         .config("spark.executor.memory", "8g") # Nodes, execute transformations
         .getOrCreate())
spark

## Dataset

Data can be obtained in: http://www-ens.iro.umontreal.ca/~bergstrj/audioscrobbler_data.html

Audioscrobbler dataset with:
- user_artist_data.txt: 141K unique users, 1.6M unique artists, 24M datapoints.
- artist_data.txt: 1.8M datapoints
- artist_alias.txt: 193K datapoints.

#### Take raw datasets

In [2]:
rawUserArtistData = spark.read.text("ds/user_artist_data.txt")
rawUserArtistData.show(5)

+-------------------+
|              value|
+-------------------+
|       1000002 1 55|
| 1000002 1000006 33|
|  1000002 1000007 8|
|1000002 1000009 144|
|1000002 1000010 314|
+-------------------+
only showing top 5 rows



In [3]:
rawArtistData = spark.read.text("ds/artist_data.txt")
rawArtistData.show(5, False)

+-------------------------------------+
|value                                |
+-------------------------------------+
|1134999	06Crazy Life                 |
|6821360	Pang Nakarin                 |
|10113088	Terfel, Bartoli- Mozart: Don|
|10151459	The Flaming Sidebur         |
|6826647	Bodenstandig 3000            |
+-------------------------------------+
only showing top 5 rows



In [4]:
rawArtistAlias = spark.read.text("ds/artist_alias.txt")
rawArtistAlias.show(5)

+----------------+
|           value|
+----------------+
| 1092764	1000311|
| 1095122	1000557|
| 6708070	1007267|
|10088054	1042317|
| 1195917	1042317|
+----------------+
only showing top 5 rows



#### Organize raw DataFrames intro structured data

In [5]:
from pyspark.sql import functions as f
split_col = f.split(rawUserArtistData.value, ' ')

userArtistDF = (rawUserArtistData.withColumn('userid', split_col.getItem(0))
.withColumn('artistid', split_col.getItem(1))
.withColumn('playcount', split_col.getItem(2))
.drop(rawUserArtistData.value)
               )

userArtistDF.show(5)

+-------+--------+---------+
| userid|artistid|playcount|
+-------+--------+---------+
|1000002|       1|       55|
|1000002| 1000006|       33|
|1000002| 1000007|        8|
|1000002| 1000009|      144|
|1000002| 1000010|      314|
+-------+--------+---------+
only showing top 5 rows



In [6]:
split_col = f.split(rawArtistData.value, '\t')

artistData = (rawArtistData
.withColumn("artistid", split_col.getItem(0))
.withColumn("artist_name", split_col.getItem(1))
.drop("value"))

artistData.show(5, False)

+--------+----------------------------+
|artistid|artist_name                 |
+--------+----------------------------+
|1134999 |06Crazy Life                |
|6821360 |Pang Nakarin                |
|10113088|Terfel, Bartoli- Mozart: Don|
|10151459|The Flaming Sidebur         |
|6826647 |Bodenstandig 3000           |
+--------+----------------------------+
only showing top 5 rows



In [7]:
split_col = f.split(rawArtistAlias.value, '\t')

artistAlias = (rawArtistAlias
               .withColumn("badid", split_col.getItem(0))
               .withColumn("goodid", split_col.getItem(1))
               .drop("value")
)

artistAlias.show(5, False)

+--------+-------+
|badid   |goodid |
+--------+-------+
|1092764 |1000311|
|1095122 |1000557|
|6708070 |1007267|
|10088054|1042317|
|1195917 |1042317|
+--------+-------+
only showing top 5 rows



In [8]:
artistData.filter(artistData.artistid.isin("1092764", "1000311")).show()

+--------+--------------+
|artistid|   artist_name|
+--------+--------------+
| 1000311| Steve Winwood|
| 1092764|Winwood, Steve|
+--------+--------------+



### On average, how many artists an individual user listen to?

In [9]:
from pyspark.sql.functions import count,avg
userArtistDF.groupBy("userid").agg(count("artistid").alias("artistCount")).agg(avg("artistCount").
                                                                               alias("Average Artists per User")).show()

+------------------------+
|Average Artists per User|
+------------------------+
|      164.04492576513562|
+------------------------+



### Use Alias to create a canonical Id for artists

Use join to see single artist's names

In [10]:
from pyspark.sql.functions import col 
canArtistData = (artistData
 .alias("a1")
 .join(artistAlias.alias("id"), on=[col("a1.artistid")==col("id.badid")], how="left")
 .join(artistData.alias("a2"), on=col("a2.artistid")==col("id.goodid"), how="left")
 .select("a2.artist_name", "id.goodid")
 .distinct()
)
canArtistData.show(5)

+--------------+-------+
|   artist_name| goodid|
+--------------+-------+
|          Cher|1000280|
| Dead Or Alive|1000795|
|Rimini Project|1000839|
|  Stevie Nicks|1001866|
|        Digger|1002783|
+--------------+-------+
only showing top 5 rows



In [30]:
print("How many unique artists there are?")
canArtistData.count()

How many unique artists there are?


22475

In [33]:
print("Is there misspelled Id's on the userArtistDF?")
(userArtistDF
 .join(artistAlias, 
      on= (userArtistDF.artistid==artistAlias.badid),
      how="inner")

).show(10)

Is there misspelled Id's on the userArtistDF?
+-------+--------+---------+-------+-------+
| userid|artistid|playcount|  badid| goodid|
+-------+--------+---------+-------+-------+
|1000002| 1000434|       89|1000434|1000518|
|1000002| 1000762|        1|1000762|1001514|
|1000002| 1001220|        1|1001220|    721|
|1000002| 1001410|        5|1001410|1034635|
|1000002| 1002498|        1|1002498|   3066|
|1000002| 1003377|        1|1003377|6691692|
|1000002| 1003633|        1|1003633|1237611|
|1000002| 1006102|        4|1006102|1034635|
|1000002| 1007652|        1|1007652|1001172|
|1000002| 1010219|        2|1010219|1008391|
+-------+--------+---------+-------+-------+
only showing top 10 rows



In the above DF, the same *(userid, artistid)* pair may have multiple rows because of mispelled Id.

#### Create a cannonical userArtistDF which will be the trainData

Create a Python Dictionary which will map each Id (being *badid* or *goodid*) to the cannonical Id.

In [17]:
mapIds = {}
mapIds.update(
    {row['badid']:row['goodid'] for row in artistAlias.collect()}
)
mapIds.update(
    {row['goodid']:row['goodid'] for row in artistAlias.collect()}
)

In [20]:
canUserArtistDF = (userArtistDF
 .rdd
 .map(lambda x: (x[0], mapIds.get(x[1], x[1]), x[2]))
 .toDF(["userid", "artistid", "playcount"])
)

canUserArtistDF.show()

+-------+--------+---------+
| userid|artistid|playcount|
+-------+--------+---------+
|1000002|       1|       55|
|1000002| 1000006|       33|
|1000002| 1000007|        8|
|1000002| 1000009|      144|
|1000002| 1000010|      314|
|1000002| 1000013|        8|
|1000002| 1000014|       42|
|1000002| 1000017|       69|
|1000002| 1000024|      329|
|1000002| 1000025|        1|
|1000002| 1000028|       17|
|1000002| 1000031|       47|
|1000002| 1000033|       15|
|1000002| 1000042|        1|
|1000002| 1000045|        1|
|1000002| 1000054|        2|
|1000002| 1000055|       25|
|1000002| 1000056|        4|
|1000002| 1000059|        2|
|1000002| 1000062|       71|
+-------+--------+---------+
only showing top 20 rows



In [21]:
from pyspark.sql.functions import concat, lit

In [22]:
canUserArtistDfToSave = (canUserArtistDF
                         .select(
                             concat(
                                 canUserArtistDF.userid, 
                                 lit(" "),
                                 canUserArtistDF.artistid, 
                                 lit(" "), 
                                 canUserArtistDF.playcount)
                             .alias("value")))

In [29]:
(canUserArtistDfToSave
 .repartition(10)
 .write
 .format("text")
 .option("header", "false")
 .mode("overwrite")
 .save("./testData2"))

In [50]:
testWith1Column.show()

+-------------------+
|              value|
+-------------------+
|       1000002 1 55|
| 1000002 1000006 33|
|  1000002 1000007 8|
|1000002 1000009 144|
|1000002 1000010 314|
|  1000002 1000013 8|
| 1000002 1000014 42|
| 1000002 1000017 69|
|1000002 1000024 329|
|  1000002 1000025 1|
| 1000002 1000028 17|
| 1000002 1000031 47|
| 1000002 1000033 15|
|  1000002 1000042 1|
|  1000002 1000045 1|
|  1000002 1000054 2|
| 1000002 1000055 25|
|  1000002 1000056 4|
|  1000002 1000059 2|
| 1000002 1000062 71|
+-------------------+
only showing top 20 rows



In [None]:
test.filter(col("_2")=="1000006").show()

In [135]:
(userArtistDF
 .withColumn("realid", map)
).show()

TypeError: unhashable type: 'Column'

In [13]:
from pyspark import broadcast

In [19]:
spark.sparkContext.broadcast(artistAlias)

Traceback (most recent call last):
  File "C:\apps\opt\spark-3.0.3-bin-hadoop2.7\python\pyspark\broadcast.py", line 111, in dump
    pickle.dump(value, f, pickle_protocol)
TypeError: cannot pickle '_thread.RLock' object


PicklingError: Could not serialize broadcast: TypeError: cannot pickle '_thread.RLock' object

In [16]:
broadcast(artistAlias)

TypeError: 'module' object is not callable

In [15]:
userArtistDF.join(broadcast(artistAlias))

TypeError: 'module' object is not callable

In [12]:
canUserArtistDF = (userArtistDF
 .join(artistAlias, 
      on= (userArtistDF.artistid==artistAlias.badid) | (userArtistDF.artistid==artistAlias.goodid),
      how="left")
 .select("userid", col("goodid").alias("artistid"),"playcount")
 #.distinct()
)

In [None]:
canUserArtistDF.count()

In [None]:
(userArtistDF
 .alias("a1")
 .join(artistAlias.alias("a2"),
       on=( (userArtistDF.artistid==artistAlias.badid) | (userArtistDF.artistid==artistAlias.goodid) ),
       how="inner")
 .select("userid", col("goodid").alias("artistid"), "playcount")
 .distinct()
).show(1000)

In [104]:
(userArtistDF
 .alias("a1")
 .join(artistAlias.alias("id"), on=[col("a1.artistid")==col("id.badid")], how="left")
 .join(userArtistDF.alias("a2"), on=col("a2.artistid")==col("id.goodid"), how="left")
).show(5)

+-------+--------+---------+-----+------+------+--------+---------+
| userid|artistid|playcount|badid|goodid|userid|artistid|playcount|
+-------+--------+---------+-----+------+------+--------+---------+
|1000002|       1|       55| null|  null|  null|    null|     null|
|1000002| 1000006|       33| null|  null|  null|    null|     null|
|1000002| 1000007|        8| null|  null|  null|    null|     null|
|1000002| 1000009|      144| null|  null|  null|    null|     null|
|1000002| 1000010|      314| null|  null|  null|    null|     null|
+-------+--------+---------+-----+------+------+--------+---------+
only showing top 5 rows



In [106]:
artistAlias.filter(col("goodid")=="1").show()

+--------+------+
|   badid|goodid|
+--------+------+
| 6892711|     1|
| 6768907|     1|
| 1330855|     1|
| 1330866|     1|
| 1330616|     1|
| 1329801|     1|
| 1178093|     1|
| 9912772|     1|
| 2061674|     1|
| 2172455|     1|
|10001148|     1|
| 1037407|     1|
|10285498|     1|
| 1332568|     1|
|10345209|     1|
| 1010803|     1|
| 1112830|     1|
| 1039289|     1|
| 1209643|     1|
| 1125695|     1|
+--------+------+
only showing top 20 rows

