# Running PySpark with Cassandra using spark-cassandra-connector in Jupyter Notebook

> Used movie lens data and loaded it to Cassandra. You will get latest movie lens data [here](https://grouplens.org/datasets/movielens/latest/)

In [1]:
# Configuratins related to Cassandra connector & Cluster
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'

In [2]:
# Creating PySpark Context
from pyspark import SparkContext
sc = SparkContext("local", "movie lens app")

In [3]:
# Creating PySpark SQL Context
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
# Loads and returns data frame for a table including key space given
def load_and_get_table_df(keys_space_name, table_name):
    table_df = sqlContext.read\
        .format("org.apache.spark.sql.cassandra")\
        .options(table=table_name, keyspace=keys_space_name)\
        .load()
    return table_df

In [5]:
# Loading movies & ratings table data frames
movies = load_and_get_table_df("movie_lens", "movies")
ratings = load_and_get_table_df("movie_lens", "ratings")

In [6]:
# First 20 rows of movies table
movies.show()

+--------+--------------------+--------------------+
|movie_id|              genres|               title|
+--------+--------------------+--------------------+
|  166048|       Drama|Romance|Happy Birthday (2...|
|   99572|               Drama|     Cornelis (2010)|
|   96638|         Documentary|    Klitschko (2011)|
|  146455|       Drama|Fantasy|      Hallway (2015)|
|   87258|Crime|Drama|Film-...|    The Thief (1952)|
|   69227|     Children|Comedy|Ernest Rides Agai...|
|     678|      Drama|Thriller|Some Folks Call I...|
|  154214|    Children|Fantasy|Barbara the Fair ...|
|   71675|              Comedy|    Hiroshima (2009)|
|  139918|     Documentary|War|The Ghost Army (2...|
|  164753|             Romance|Anything for Love...|
|  168310|  (no genres listed)|Μαριχουάνα Στοπ !...|
|   93952|     Horror|Thriller|Silent House, The...|
|  118204|Adventure|Documen...|     McConkey (2013)|
|  168678|              Comedy|     Devolved (2010)|
|  111852|         Documentary|Generation Iron

In [7]:
# First 20 rows of ratings table
ratings.show()

+-------+--------+------+----------+
|user_id|movie_id|rating| timestamp|
+-------+--------+------+----------+
| 117752|     340|   5.0|1004221564|
| 117752|    1275|   5.0|1004221564|
| 117752|    1760|   5.0|1004221564|
| 117752|    2028|   5.0|1004221564|
| 117752|    2268|   5.0|1004221564|
| 117752|    2643|   5.0|1004221564|
| 117752|    3204|   5.0|1004221564|
| 117752|    3677|   5.0|1004221564|
| 117752|    4143|   4.0|1004221564|
| 117752|    4565|   5.0|1004221564|
| 122430|       1|   4.0| 832302134|
| 122430|      25|   5.0| 850195697|
| 122430|      34|   4.0| 832302184|
| 122430|      50|   5.0| 832302207|
| 122430|      52|   3.0| 850195768|
| 122430|      69|   4.0| 832303477|
| 122430|     110|   3.0| 832302172|
| 122430|     111|   3.0| 832302229|
| 122430|     122|   4.0| 832302922|
| 122430|     145|   3.0| 832302767|
+-------+--------+------+----------+
only showing top 20 rows



In [8]:
# Top 20 reviewers in terms of number of ratings given
ratings.groupBy("user_id").count().orderBy('count', ascending=False).show()

+-------+-----+
|user_id|count|
+-------+-----+
|  45811|18276|
|   8659| 9279|
| 270123| 7638|
| 179792| 7515|
| 228291| 7410|
| 243443| 6320|
|  98415| 6094|
| 229879| 6024|
|  98787| 5814|
| 172224| 5701|
| 230417| 5619|
|  70648| 5356|
| 194690| 5206|
| 107720| 5169|
|  24025| 4946|
| 165352| 4921|
| 101276| 4834|
| 243331| 4834|
|  74275| 4815|
|  41190| 4785|
+-------+-----+
only showing top 20 rows



In [9]:
#Schema of ratings data frame
ratings.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)



In [10]:
# Selecting 2 users rated movies
firstUserMovies = ratings.where(ratings["user_id"] == 45811).select("movie_id")
secondUserMovies = ratings.where(ratings["user_id"] == 98415).select("movie_id")

In [11]:
# Caching the data frames of users
firstUserMovies = firstUserMovies.cache()
secondUserMovies = secondUserMovies.cache()

In [12]:
# 20 common movies both users reviewed
firstUserMovies.intersect(secondUserMovies).show()

+--------+
|movie_id|
+--------+
|     471|
|    1088|
|    1238|
|    1342|
|    1580|
|    1645|
|    1959|
|    2122|
|    2366|
|    2866|
|    3175|
|    3918|
|    5300|
|    6357|
|    6466|
|    6620|
|    6658|
|    7253|
|    7982|
|    8638|
+--------+
only showing top 20 rows



In [13]:
# Verifying wether first user reviewed these movies or not
firstUserMovies.where(firstUserMovies["movie_id"] == 3918).show()

+--------+
|movie_id|
+--------+
|    3918|
+--------+



In [14]:
# Verifying wether second user reviewed these movies or not
secondUserMovies.where(secondUserMovies["movie_id"] == 3918).show()

+--------+
|movie_id|
+--------+
|    3918|
+--------+



In [15]:
# Selecting number of movies both users reviewed
firstUserMovies.intersect(secondUserMovies).count()

4488

In [16]:
# In total How many unique movies they reviewed
firstUserMovies.union(secondUserMovies).distinct().count()

19882

In [17]:
# Removing data frame from Cache
firstUserMovies.unpersist()
secondUserMovies.unpersist()

DataFrame[movie_id: int]

In [18]:
# Shutdowning PySpark Context
sc.stop()