# PySpark Exercises 2.2 
---
Özgün Yargı

## Install Dependencies

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 49.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=7798879ec5ceb68e155bfbf8a645beead8e3d8641d49beaf072ead988c07b6ab
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


## Install Libraries

In [140]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import CountVectorizer
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType,StringType
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import Vectors
from pyspark.ml.linalg import DenseVector

from scipy.spatial.distance import cosine

## Get the Dataset

In [3]:
spark = SparkSession.builder.appName('MovieLens').getOrCreate()

movies = spark.read.csv('movies.csv',header=True)
ratings = spark.read.csv('ratings.csv',header=True)
links = spark.read.csv('links.csv',header=True)
tags = spark.read.csv('tags.csv',header=True)

In [4]:
tags.show()

+------+-------+-----------------+----------+
|userId|movieId|              tag| timestamp|
+------+-------+-----------------+----------+
|     2|  60756|            funny|1445714994|
|     2|  60756|  Highly quotable|1445714996|
|     2|  60756|     will ferrell|1445714992|
|     2|  89774|     Boxing story|1445715207|
|     2|  89774|              MMA|1445715200|
|     2|  89774|        Tom Hardy|1445715205|
|     2| 106782|            drugs|1445715054|
|     2| 106782|Leonardo DiCaprio|1445715051|
|     2| 106782|  Martin Scorsese|1445715056|
|     7|  48516|     way too long|1169687325|
|    18|    431|        Al Pacino|1462138765|
|    18|    431|         gangster|1462138749|
|    18|    431|            mafia|1462138755|
|    18|   1221|        Al Pacino|1461699306|
|    18|   1221|            Mafia|1461699303|
|    18|   5995|        holocaust|1455735472|
|    18|   5995|       true story|1455735479|
|    18|  44665|     twist ending|1456948283|
|    18|  52604|  Anthony Hopkins|

In [6]:
links.show()

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
|      6|0113277|   949|
|      7|0114319| 11860|
|      8|0112302| 45325|
|      9|0114576|  9091|
|     10|0113189|   710|
|     11|0112346|  9087|
|     12|0112896| 12110|
|     13|0112453| 21032|
|     14|0113987| 10858|
|     15|0112760|  1408|
|     16|0112641|   524|
|     17|0114388|  4584|
|     18|0113101|     5|
|     19|0112281|  9273|
|     20|0113845| 11517|
+-------+-------+------+
only showing top 20 rows



## Extract Features

### Movies

Count the number of genres that user watched by using movies.csv dataset and add them as a feature. To do this, string is manipulated by splitting each genre as to be an element of a list. One hot encode genre types and merge the dataset with ratings.csv by using "userId" column.

In [32]:
def split_genres(mystring):

  genre_list = [x.strip() for x in mystring.split("|")]
  return genre_list

myfunction = F.udf(split_genres, ArrayType(StringType()))

In [41]:
movies = movies.withColumn("genres_list", myfunction("genres"))
movies.show()

+-------+--------------------+--------------------+--------------------+
|movieId|               title|              genres|         genres_list|
+-------+--------------------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|
|      2|      Jumanji (1995)|Adventure|Childre...|[Adventure, Child...|
|      3|Grumpier Old Men ...|      Comedy|Romance|   [Comedy, Romance]|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|[Comedy, Drama, R...|
|      5|Father of the Bri...|              Comedy|            [Comedy]|
|      6|         Heat (1995)|Action|Crime|Thri...|[Action, Crime, T...|
|      7|      Sabrina (1995)|      Comedy|Romance|   [Comedy, Romance]|
|      8| Tom and Huck (1995)|  Adventure|Children|[Adventure, Child...|
|      9| Sudden Death (1995)|              Action|            [Action]|
|     10|    GoldenEye (1995)|Action|Adventure|...|[Action, Adventur...|
|     11|American Presiden...|Comedy|Drama|Romance|

In [64]:
merged_df = ratings.join(movies, on="movieId", how="left")
merged_df.show()

+-------+------+------+---------+--------------------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|         genres_list|
+-------+------+------+---------+--------------------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|   [Comedy, Romance]|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|[Action, Crime, T...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller| [Mystery, Thriller]|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|[Crime, Mystery, ...|
|     70|     1|   3.0|964982400|From Dusk Till Da...|Action|Comedy|Hor...|[Action, Comedy, ...|
|    101|     1|   5.0|964980868|Bottle Rocket (1996)|Adventure|Comedy|...|[Adventure, Comed...|
|    110|     1|   4.0|9649821

In [67]:
merged_df = merged_df.withColumn("genres_expanded", F.explode("genres_list"))
merged_df.show()

+-------+------+------+---------+--------------------+--------------------+--------------------+---------------+
|movieId|userId|rating|timestamp|               title|              genres|         genres_list|genres_expanded|
+-------+------+------+---------+--------------------+--------------------+--------------------+---------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|      Adventure|
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|      Animation|
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|       Children|
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|         Comedy|
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|        Fantasy|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|   [Comedy, Romance]|

In [69]:
genres_df = merged_df.groupby("userId").agg(F.collect_list("genres_expanded").alias("genres_list_by_id"))
genres_df.show()

+------+--------------------+
|userId|   genres_list_by_id|
+------+--------------------+
|     1|[Adventure, Anima...|
|    10|[Comedy, Crime, D...|
|   100|[Comedy, Romance,...|
|   101|[Comedy, Drama, D...|
|   102|[Comedy, Romance,...|
|   103|[Adventure, Anima...|
|   104|[Adventure, Child...|
|   105|[Action, Crime, T...|
|   106|[Crime, Drama, Ac...|
|   107|[Adventure, Anima...|
|   108|[Drama, Romance, ...|
|   109|[Action, Crime, T...|
|    11|[Action, Crime, T...|
|   110|[Mystery, Thrille...|
|   111|[Comedy, Crime, D...|
|   112|[Adventure, Anima...|
|   113|[Comedy, Drama, R...|
|   114|[Action, Adventur...|
|   115|[Comedy, Crime, T...|
|   116|[Comedy, Romance,...|
+------+--------------------+
only showing top 20 rows



In [73]:
cv = CountVectorizer(inputCol='genres_list_by_id', outputCol='genresVec')
genres_fea = cv.fit(genres_df).transform(genres_df).drop('genres_list_by_id')
genres_fea.show(truncate=False)

+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|genresVec                                                                                                                                                          |
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1     |(20,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,15,16,18],[68.0,83.0,90.0,55.0,85.0,26.0,40.0,45.0,47.0,42.0,18.0,17.0,29.0,22.0,22.0,7.0,1.0])                               |
|10    |(20,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[72.0,79.0,26.0,13.0,31.0,78.0,5.0,13.0,17.0,14.0,3.0,2.0,15.0,4.0,18.0,9.0])                                          |
|100   |(20,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16],[74.0,85.0,26.0,19.0,17.0,79.0,7.0,12.0,10.0,6.0,5.0,5.0,2.0,6.0,1.0,12.0,5.0]) 

### Ratings

Add Two additional features:

* Average rating for each user
* Number of times a user rated a movie

In [74]:
ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [76]:
user_count_df = ratings.groupBy("userId").count()
user_count_df.show()

+------+-----+
|userId|count|
+------+-----+
|   296|   27|
|   467|   22|
|   125|  360|
|   451|   34|
|     7|  152|
|    51|  359|
|   124|   50|
|   447|   78|
|   591|   54|
|   307|  975|
|   475|  155|
|   574|   23|
|   169|  269|
|   205|   27|
|   334|  154|
|   544|   22|
|   577|  161|
|   581|   40|
|   272|   31|
|   442|   20|
+------+-----+
only showing top 20 rows



In [78]:
ratings = ratings.withColumn("rating", ratings["rating"].cast("float"))
ratings_ave_fea = ratings.groupby("userId").mean("rating")
ratings_ave_fea.show()

+------+------------------+
|userId|       avg(rating)|
+------+------------------+
|   296| 4.166666666666667|
|   467| 3.409090909090909|
|   125| 3.859722222222222|
|   451|3.7941176470588234|
|     7|3.2302631578947367|
|    51|3.7757660167130918|
|   124|              3.99|
|   447| 3.871794871794872|
|   591|3.2777777777777777|
|   307|2.6656410256410257|
|   475| 4.409677419354839|
|   574|3.9565217391304346|
|   169|  4.24907063197026|
|   205|3.8703703703703702|
|   334| 3.418831168831169|
|   544| 4.363636363636363|
|   577|3.5652173913043477|
|   581|             4.375|
|   272| 3.629032258064516|
|   442|             1.275|
+------+------------------+
only showing top 20 rows



## Merge The Features

Merge all features as to be one dataframe

In [84]:
user_df = genres_fea.join(user_count_df, on="userId", how="outer")
user_df.show()

+------+--------------------+-----+
|userId|           genresVec|count|
+------+--------------------+-----+
|     1|(20,[0,1,2,3,4,5,...|  232|
|    10|(20,[0,1,2,3,4,5,...|  140|
|   100|(20,[0,1,2,3,4,5,...|  148|
|   101|(20,[0,1,2,3,4,5,...|   61|
|   102|(20,[0,1,2,3,4,5,...|   56|
|   103|(20,[0,1,2,3,4,5,...|  377|
|   104|(20,[0,1,2,3,4,5,...|  273|
|   105|(20,[0,1,2,3,4,5,...|  722|
|   106|(20,[0,1,2,3,4,5,...|   33|
|   107|(20,[0,1,2,3,4,5,...|   34|
|   108|(20,[0,1,2,3,4,5,...|   76|
|   109|(20,[0,1,2,3,4,5,...|  127|
|    11|(20,[0,1,2,3,4,5,...|   64|
|   110|(20,[0,1,2,3,4,5,...|   51|
|   111|(20,[0,1,2,3,4,5,...|  646|
|   112|(20,[0,1,2,3,4,5,...|   65|
|   113|(20,[0,1,2,3,4,5,...|  150|
|   114|(20,[0,1,2,3,4,5,...|   31|
|   115|(20,[0,1,2,3,4,5,...|  112|
|   116|(20,[0,1,2,3,4,5,...|   87|
+------+--------------------+-----+
only showing top 20 rows



In [87]:
user_df = user_df.join(ratings_ave_fea, on="userId", how="outer")
user_df.show()

+------+--------------------+-----+------------------+
|userId|           genresVec|count|       avg(rating)|
+------+--------------------+-----+------------------+
|     1|(20,[0,1,2,3,4,5,...|  232| 4.366379310344827|
|    10|(20,[0,1,2,3,4,5,...|  140|3.2785714285714285|
|   100|(20,[0,1,2,3,4,5,...|  148| 3.945945945945946|
|   101|(20,[0,1,2,3,4,5,...|   61| 3.557377049180328|
|   102|(20,[0,1,2,3,4,5,...|   56| 3.357142857142857|
|   103|(20,[0,1,2,3,4,5,...|  377| 3.907161803713528|
|   104|(20,[0,1,2,3,4,5,...|  273|3.5073260073260073|
|   105|(20,[0,1,2,3,4,5,...|  722| 4.116343490304709|
|   106|(20,[0,1,2,3,4,5,...|   33|4.4393939393939394|
|   107|(20,[0,1,2,3,4,5,...|   34| 3.911764705882353|
|   108|(20,[0,1,2,3,4,5,...|   76| 3.986842105263158|
|   109|(20,[0,1,2,3,4,5,...|  127| 3.220472440944882|
|    11|(20,[0,1,2,3,4,5,...|   64|           3.78125|
|   110|(20,[0,1,2,3,4,5,...|   51|3.7254901960784315|
|   111|(20,[0,1,2,3,4,5,...|  646|3.3397832817337463|
|   112|(2

## Similarity Scores

Calculate cosine similarity score for each pairwise user and print the most similar 10 for each unique user

In [126]:
similarity_rdd = user_df.rdd

In [127]:
similarity_rdd.collect()[0]

Row(userId='1', genresVec=SparseVector(20, {0: 68.0, 1: 83.0, 2: 90.0, 3: 55.0, 4: 85.0, 5: 26.0, 6: 40.0, 7: 45.0, 8: 47.0, 9: 42.0, 10: 18.0, 11: 17.0, 12: 29.0, 13: 22.0, 15: 22.0, 16: 7.0, 18: 1.0}), count=232, avg(rating)=4.366379310344827)

In [139]:
def  toDense(v_):

    v = DenseVector(v_[1])

    new_array = list([int(x) for x in v])

    new_list = [int(v_[0])]
    new_list += new_array
    new_list.append(int(v_[2]))
    new_list.append(float(v_[3]))

    return new_list

similarity_rdd = similarity_rdd.map(lambda x: toDense(x)).map(lambda x: (x[0], x[1:]))
similarity_rdd.collect()[0]

(1,
 [68,
  83,
  90,
  55,
  85,
  26,
  40,
  45,
  47,
  42,
  18,
  17,
  29,
  22,
  0,
  22,
  7,
  0,
  1,
  0,
  232,
  4.366379310344827])

In [141]:
  similarity_cartesian = similarity_rdd.cartesian(similarity_rdd)
  similarity_cartesian.collect()[0]

((1,
  [68,
   83,
   90,
   55,
   85,
   26,
   40,
   45,
   47,
   42,
   18,
   17,
   29,
   22,
   0,
   22,
   7,
   0,
   1,
   0,
   232,
   4.366379310344827]),
 (1,
  [68,
   83,
   90,
   55,
   85,
   26,
   40,
   45,
   47,
   42,
   18,
   17,
   29,
   22,
   0,
   22,
   7,
   0,
   1,
   0,
   232,
   4.366379310344827]))

In [146]:
def find_similarity (mytuple):

  user_1 = mytuple[0][0]
  user_2 = mytuple[1][0]

  user_1_feas = mytuple[0][1]
  user_2_feas = mytuple[1][1]

  cosine_sim = 1-cosine(user_1_feas, user_2_feas)

  return (user_1, user_2, cosine_sim)

similarities_rdd = similarity_cartesian.map(lambda x: find_similarity(x))
similarities_rdd.collect()[0]

(1, 1, 1.0)

In [157]:
for unique_user in similarities_rdd.map(lambda x: x[0]).distinct().collect():
  print_str = f"For userId {unique_user}, following 10 are the most similar users:"
  print(print_str + "\n" + "-"*len(print_str))
  
  similars = similarities_rdd.filter(lambda x: x[0]==unique_user).sortBy(lambda x: x[2], ascending=False).collect()[1:11]
  for similar in similars:
    print(f"userId {similar[1]} with {similar[2]} similarity score")

  print("\n")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m


For userId 327, following 10 are the most similar users:
--------------------------------------------------------
userId 484 with 0.9859571766231472 similarity score
userId 543 with 0.9851609220299536 similarity score
userId 307 with 0.9824026090307231 similarity score
userId 412 with 0.9823364840982074 similarity score
userId 575 with 0.9822347237602064 similarity score
userId 273 with 0.9809531042382362 similarity score
userId 409 with 0.9803029519955414 similarity score
userId 310 with 0.9793260657645331 similarity score
userId 283 with 0.9789799975832416 similarity score
userId 226 with 0.9787148290982162 similarity score


For userId 328, following 10 are the most similar users:
--------------------------------------------------------
userId 308 with 0.9944709853064448 similarity score
userId 381 with 0.9928815546432783 similarity score
userId 1 with 0.990840080317499 similarity score
userId 98 with 0.9900336773425