In [2]:
!pip install pyspark
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,LongType
from pyspark.sql.functions import col

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 62 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 59.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=ea2a9221661019d31d0e9667d0e12dc1536851456cdf364249c2c21865a7261a
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [3]:
# import libraries
from pyspark import SparkContext
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession ,Row
appName="Collaborative Filtering with PySpark"
# initialize the spark session
spark = SparkSession.builder.appName(appName).getOrCreate()
# get sparkcontext from the sparksession
sc = spark.sparkContext


In [None]:
'''def parser(s, delimeters=" ", to_int=None):
    s = s.split(delimeters)
    if to_int:
        return tuple([int(s[i]) if i in to_int else s[i] for i in range(len(s))])
    return tuple(s)

artistData = sc.textFile("artist_data_small.txt").map(lambda x: parser(x,'\t',[0]))

artistAlias = sc.textFile("artist_alias_small.txt").map(lambda x: parser(x,'\t', [0,1]))

artistAliasMap = artistAlias.collectAsMap()

userArtistData = sc.textFile("user_artist_data_small.txt").map(lambda x: parser(x,' ',[0,1,2]))

userArtistData = userArtistData.map(lambda x: (x[0], artistAliasMap.get(x[1], x[1]), x[2]))'''

'def parser(s, delimeters=" ", to_int=None):\n    s = s.split(delimeters)\n    if to_int:\n        return tuple([int(s[i]) if i in to_int else s[i] for i in range(len(s))])\n    return tuple(s)\n\nartistData = sc.textFile("artist_data_small.txt").map(lambda x: parser(x,\'\t\',[0]))\n\nartistAlias = sc.textFile("artist_alias_small.txt").map(lambda x: parser(x,\'\t\', [0,1]))\n\nartistAliasMap = artistAlias.collectAsMap()\n\nuserArtistData = sc.textFile("user_artist_data_small.txt").map(lambda x: parser(x,\' \',[0,1,2]))\n\nuserArtistData = userArtistData.map(lambda x: (x[0], artistAliasMap.get(x[1], x[1]), x[2]))'

In [4]:
#Define the schema for the datasets
schema_artist = StructType([StructField("artistId",StringType(),True),StructField("artistName",StringType(),True)])
schema_user_artist = StructType([StructField("userId",StringType(),True),StructField("artistId",StringType(),True),StructField("playCount",StringType(),True)])
schema_alias = StructType([StructField("badId",StringType(),True),StructField("goodId",StringType(),True)])

In [5]:
#Convert RDDs into Dataframes
from pyspark.sql.types import DoubleType
artistRDD = sc.textFile("artist_data_small.txt").map(lambda k: k.split("\t"))
#artistRDD.collect()
artist_df = spark.createDataFrame(artistRDD,schema_artist)
artist_df = artist_df.withColumn("artistId", artist_df["artistId"].cast(DoubleType()))

userArtistRDD = sc.textFile("user_artist_data_small.txt").map(lambda k: k.split())
user_artist_df = spark.createDataFrame(userArtistRDD,schema_user_artist)

user_artist_df = user_artist_df.withColumn("userId", user_artist_df["userId"].cast(DoubleType()))
user_artist_df = user_artist_df.withColumn("artistId", user_artist_df["artistId"].cast(DoubleType()))
user_artist_df = user_artist_df.withColumn("playCount", user_artist_df["playCount"].cast(DoubleType()))
#user_artist_df.printSchema()

aliasRDD = sc.textFile("artist_alias_small.txt").map(lambda k: k.split())
#aliasRDD.collect()
alias_df = spark.createDataFrame(aliasRDD,schema_alias)
#alias_df.show()

In [6]:
# #First for convenience, we can create aliases for each dataframes
ua = user_artist_df.alias('ua')
ub = artist_df.alias('ub')

In [7]:
# dataset split into training and testing set
(training, test) = user_artist_df.randomSplit([0.8, 0.2])

In [8]:
training.show()

+---------+--------+---------+
|   userId|artistId|playCount|
+---------+--------+---------+
|1001440.0|     2.0|   1541.0|
|1001440.0|     3.0|      6.0|
|1001440.0|     4.0|     24.0|
|1001440.0|    16.0|      6.0|
|1001440.0|    17.0|    601.0|
|1001440.0|    18.0|    185.0|
|1001440.0|    23.0|      2.0|
|1001440.0|    28.0|      5.0|
|1001440.0|    33.0|    263.0|
|1001440.0|    34.0|      5.0|
|1001440.0|    40.0|     35.0|
|1001440.0|    53.0|     40.0|
|1001440.0|    59.0|      6.0|
|1001440.0|    61.0|      1.0|
|1001440.0|    67.0|      4.0|
|1001440.0|    69.0|      1.0|
|1001440.0|    70.0|      1.0|
|1001440.0|    78.0|     34.0|
|1001440.0|    82.0|    189.0|
|1001440.0|    83.0|   1241.0|
+---------+--------+---------+
only showing top 20 rows



In [9]:
# training the model
als = ALS(maxIter=5, implicitPrefs=True,userCol="userId", itemCol="artistId", ratingCol="playCount",coldStartStrategy="drop")
model = als.fit(training)

In [10]:
# predict using the testing datatset
predictions = model.transform(test)
predictions.show()

+---------+---------+---------+------------+
|   userId| artistId|playCount|  prediction|
+---------+---------+---------+------------+
|1041919.0|    463.0|     50.0|   1.5923305|
|2010008.0|    463.0|    308.0|   1.3008757|
|1058890.0|    463.0|    934.0|  0.27522913|
|1031009.0|    463.0|      4.0|   1.2485688|
|1046559.0|    833.0|    126.0|   0.5753089|
|1072684.0|   3175.0|     45.0|-0.041159123|
|1024631.0|1001129.0|    199.0|   0.8766138|
|1058890.0|1001129.0|     88.0| -0.36416078|
|1059334.0|1004552.0|      1.0| -0.35229862|
|2023686.0|1007972.0|      1.0|  0.64261746|
|2010008.0|1007972.0|      3.0|-0.117314674|
|1026084.0|1007972.0|    103.0| -0.65539455|
|1024631.0|1009031.0|      2.0|  0.06666491|
|2010008.0|1009031.0|      4.0|  0.04065971|
|1063644.0|1009031.0|    100.0|  0.35704744|
|2023686.0|1014191.0|      3.0|  0.39541107|
|1017610.0|1019303.0|     68.0| 0.036941003|
|1024631.0|1028228.0|      1.0|  0.14603704|
|1070641.0|1029443.0|      5.0|  0.35517526|
|1059637.0

In [11]:
def currentLikes(ua,ub,userId,limit):
 df = ua.join(ub,ua.artistId==ub.artistId)\
 .filter(ua.userId==userId)\
 .sort(ua.playCount.desc())\
 .select(ua.userId,ua.playCount,ub.artistName)\
 .limit(limit)
 return df

In [42]:
def recommendedArtists(userId,limit):
  test =  model.recommendForAllUsers(limit).filter(col('userId')==userId).select("recommendations").collect()
  topArtists = []
  for item in test[0][0]:
    topArtists.append(item.artistId)

    schema = StructType([StructField("artistId",IntegerType(),True)])
    artists = spark.createDataFrame(topArtists,IntegerType())
  final=artists.join(ub,artists.value==ub.artistId).select(ub.artistId,ub.artistName)
  
  return final
# display top 10 recommended artists for user 2062243
recommendedArtists(2062243,10).show(truncate=False)

+---------+-------------------+
|artistId |artistName         |
+---------+-------------------+
|1005975.0|Dimmu Borgir       |
|4061.0   |Marilyn Manson     |
|2823.0   |Alanis Morissette  |
|1000639.0|Alice in Chains    |
|1295531.0|菅野よう子         |
|1002128.0|Taking Back Sunday |
|1010512.0|Shivaree           |
|1004421.0|KMFDM              |
|1004303.0|Front Line Assembly|
|1001828.0|A Flock Of Seagulls|
+---------+-------------------+



In [26]:
test =  model.recommendForAllUsers(5).filter(col('userId')==2023686).select("recommendations").collect()
topArtists = []
for item in test[0][0]:
  topArtists.append(item.artistId)

  schema = StructType([StructField("artistId",IntegerType(),True)])
  artists = spark.createDataFrame(topArtists,IntegerType())
  final=artists.join(ub,artists.value==ub.artistId).select(ub.artistId,ub.artistName)

In [33]:
model.recommendForAllUsers(10).filter(col('userId')==2007381).select("recommendations").collect()

[Row(recommendations=[Row(artistId=1004296, rating=2.880728006362915), Row(artistId=1001487, rating=1.8751674890518188), Row(artistId=1002370, rating=1.7512781620025635), Row(artistId=1854, rating=1.6419439315795898), Row(artistId=1000175, rating=1.635480523109436), Row(artistId=1002704, rating=1.6224923133850098), Row(artistId=1004294, rating=1.602113962173462), Row(artistId=1002404, rating=1.5717946290969849), Row(artistId=1319, rating=1.4135123491287231), Row(artistId=1006034, rating=1.401442527770996)])]

In [34]:
test=model.recommendForAllUsers(5).filter(col('userId')==2007381).select("recommendations").collect()

In [71]:
test

[Row(recommendations=[Row(artistId=1004296, rating=2.880728006362915), Row(artistId=1001487, rating=1.8751674890518188), Row(artistId=1002370, rating=1.7512781620025635), Row(artistId=1854, rating=1.6419439315795898), Row(artistId=1000175, rating=1.635480523109436)])]

In [69]:
test[0]

Row(recommendations=[Row(artistId=1004296, rating=2.880728006362915), Row(artistId=1001487, rating=1.8751674890518188), Row(artistId=1002370, rating=1.7512781620025635), Row(artistId=1854, rating=1.6419439315795898), Row(artistId=1000175, rating=1.635480523109436)])

In [74]:
test[0][0]

[Row(artistId=1004296, rating=2.880728006362915),
 Row(artistId=1001487, rating=1.8751674890518188),
 Row(artistId=1002370, rating=1.7512781620025635),
 Row(artistId=1854, rating=1.6419439315795898),
 Row(artistId=1000175, rating=1.635480523109436)]

In [44]:
for i in test[0][0]:
  topArtists.append(item.artistId)
  schema = StructType([StructField("artistId",IntegerType(),True)])
  artists = spark.createDataFrame(topArtists,IntegerType())

In [48]:
final=artists.join(ub,artists.value==ub.artistId).select(ub.artistId,ub.artistName)

In [49]:
final.show()

+---------+-------------------+
| artistId|         artistName|
+---------+-------------------+
|1001530.0|  The Starting Line|
|   1319.0|             Zero 7|
|1002095.0|Something Corporate|
|   1205.0|                 U2|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
+---------+-------------------+



In [41]:
final.show()

+---------+-------------------+
| artistId|         artistName|
+---------+-------------------+
|1001530.0|  The Starting Line|
|   1319.0|             Zero 7|
|1002095.0|Something Corporate|
|   1205.0|                 U2|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
|1000112.0|      Alkaline Trio|
+---------+-------------------+



In [57]:
def add_two(a,b):
  total = a+b
  print(a)
  print(b)
  return total

In [53]:
add_two(9,10)

9
10
19


In [59]:
o = add_two(9,10)

9
10


In [60]:
o

19

In [62]:
l = [((1,2),3),(3,4,5),(5,67,8)]

In [63]:
l[0][0]

(1, 2)

In [70]:
n = [[1,2],[3,4],[5,[6,7]]]
n

[[1, 2], [3, 4], [5, [6, 7]]]

In [67]:
n[2][1][1]

7