In [1]:
import findspark
findspark.init()

In [33]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, LongType
from pyspark.sql.functions import * 

### Initialize SparkContext and SqlContext

In [3]:
sparkConf = SparkConf().setAppName("MovieLens").setMaster("local[4]")
sc = SparkContext(conf=sparkConf)
sqlContext = SQLContext(sc)

### Define DataFrame schemas

In [5]:
movieSchema = StructType([StructField("movieId", IntegerType(), False),
                         StructField("title", StringType(), True),
                         StructField("genres", StringType(), True)])
ratingSchema = StructType([StructField("userId", IntegerType(), False),
                          StructField("movieId", IntegerType(), False),
                          StructField("rating", DoubleType(), False),
                          StructField("timestamp", LongType(), True)])
userSchema = StructType([StructField("userId", IntegerType(), False),
                        StructField("gender", StringType(), True),
                        StructField("age", IntegerType(), True),
                        StructField("occupation", IntegerType(), True),
                        StructField("zipcode", LongType(), True)])

### Read DataFrames

In [6]:
movieDF = sqlContext.read.option("delimiter", ":").schema(movieSchema).csv("/jupyter/data/ml-1m/movies.dat")
ratingDF = sqlContext.read.option("delimiter", ":").schema(ratingSchema).csv("/jupyter/data/ml-1m/ratings.dat")
userDF = sqlContext.read.option("delimiter", ":").schema(userSchema).csv("/jupyter/data/ml-1m/users.dat")
            

### Register DataFrame as tables

In [20]:
movieDF.registerTempTable("movieTable")
ratingDF.registerTempTable("ratingTable")
userDF.registerTempTable("userTable")

### Check DataFrame schemas

In [7]:
# check why nullable is not set
movieDF.printSchema()
# todo check for the others dataframes
ratingDF.printSchema()
userDF.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- occupation: integer (nullable = true)
 |-- zipcode: long (nullable = true)



### Visualize DataFrames/tables

##### using DataFrame API (DSL)

In [15]:
movieDF.show(10, False)

+-------+----------------------------------+----------------------------+
|movieId|title                             |genres                      |
+-------+----------------------------------+----------------------------+
|1      |Toy Story (1995)                  |Animation|Children's|Comedy |
|2      |Jumanji (1995)                    |Adventure|Children's|Fantasy|
|3      |Grumpier Old Men (1995)           |Comedy|Romance              |
|4      |Waiting to Exhale (1995)          |Comedy|Drama                |
|5      |Father of the Bride Part II (1995)|Comedy                      |
|6      |Heat (1995)                       |Action|Crime|Thriller       |
|7      |Sabrina (1995)                    |Comedy|Romance              |
|8      |Tom and Huck (1995)               |Adventure|Children's        |
|9      |Sudden Death (1995)               |Action                      |
|10     |GoldenEye (1995)                  |Action|Adventure|Thriller   |
+-------+-----------------------------

In [18]:
# show method docs 
movieDF.show?

##### using Spark SQL

In [23]:
sqlContext.sql("select * from movieTable limit 10").show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Animation|Childre...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|        Comedy|Drama|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|Adventure|Children's|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
+-------+--------------------+--------------------+



### Number of users

In [49]:
userDF.count()

6040

In [51]:
sqlContext.sql("select count(*) from userTable").show()

+--------+
|count(1)|
+--------+
|    6040|
+--------+



##### TODO : same queries for the others dataframes

### Gender distribution

In [56]:
userDF.groupBy(col("gender")).count().show()

+------+-----+
|gender|count|
+------+-----+
|     F| 1709|
|     M| 4331|
+------+-----+



#### TODO : sql version

### TODO : Age distribution using DSL and SQL

In [58]:
userDF.groupBy(col("age")).count().show()

+---+-----+
|age|count|
+---+-----+
|  1|  222|
| 35| 1193|
| 50|  496|
| 45|  550|
| 25| 2096|
| 56|  380|
| 18| 1103|
+---+-----+



### Number of users with age = 25 

In [39]:
userDF.where(col("age") == 25).count()

DataFrame[userId: int, gender: string, age: int, occupation: int, zipcode: bigint]

##### TODO : sql version

### Gender distribution for users = 25

In [41]:
userDF.where(col("age") == 25).groupBy(col("gender")).count().show()

+------+-----+
|gender|count|
+------+-----+
|     F|  558|
|     M| 1538|
+------+-----+



##### TODO : sql version

### Number of ratings of each user displayed in desc order

In [44]:
ratingDF.groupBy(col("userId")).count().orderBy(col("count").desc()).show()

+------+-----+
|userId|count|
+------+-----+
|  4169| 2314|
|  1680| 1850|
|  4277| 1743|
|  1941| 1595|
|  1181| 1521|
|   889| 1518|
|  3618| 1344|
|  2063| 1323|
|  1150| 1302|
|  1015| 1286|
|  5795| 1277|
|  4344| 1271|
|  1980| 1260|
|  2909| 1258|
|  1449| 1243|
|  4510| 1240|
|   424| 1226|
|  4227| 1222|
|  5831| 1220|
|  3841| 1216|
+------+-----+
only showing top 20 rows



##### TODO : sql version

### Avg rating for each movie

In [57]:
ratingDF.groupBy(col("movieId")).agg(avg(col("rating")).alias("avg_rating")).show()

+-------+------------------+
|movieId|        avg_rating|
+-------+------------------+
|   1580| 3.739952718676123|
|   2366|3.6560846560846563|
|   1088|3.3114992721979624|
|   1959|3.6533546325878596|
|   3175| 3.771412037037037|
|   1645|3.4358353510895885|
|    496|3.2162162162162162|
|   2142|2.8308457711442787|
|   1591|2.6210526315789475|
|   2122|2.4463519313304722|
|    833|2.1794871794871793|
|    463|  2.74468085106383|
|    471| 3.631051752921536|
|   1342| 2.904580152671756|
|    148| 2.782608695652174|
|   3918| 2.802395209580838|
|   3794|  3.28099173553719|
|   1238|               4.0|
|   2866|3.6884422110552766|
|   3749|3.1363636363636362|
+-------+------------------+
only showing top 20 rows



##### TODO : sort movies by avg rating value

In [74]:
ratingDF.groupBy(col("movieId")).agg(avg(col("rating")).alias("avg_rating")).orderBy(col("avg_rating").desc()).show()

+-------+-----------------+
|movieId|       avg_rating|
+-------+-----------------+
|   3881|              5.0|
|   3382|              5.0|
|   3280|              5.0|
|   3172|              5.0|
|   1830|              5.0|
|    787|              5.0|
|   3233|              5.0|
|   3656|              5.0|
|    989|              5.0|
|   3607|              5.0|
|   3245|              4.8|
|     53|             4.75|
|   2503|4.666666666666667|
|   2905|4.608695652173913|
|   2019|4.560509554140127|
|    318|4.554557700942973|
|    858|4.524966261808367|
|    745| 4.52054794520548|
|     50|4.517106001121705|
|    527|4.510416666666667|
+-------+-----------------+
only showing top 20 rows



### Join between rating and movie DataFrames on movieId column

In [81]:
ratingDF.join(movieDF, ratingDF.movieId == movieDF.movieId).show()

+------+-------+------+---------+-------+--------------------+--------------------+
|userId|movieId|rating|timestamp|movieId|               title|              genres|
+------+-------+------+---------+-------+--------------------+--------------------+
|     1|   1193|   5.0|978300760|   1193|One Flew Over the...|               Drama|
|     1|    661|   3.0|978302109|    661|James and the Gia...|Animation|Childre...|
|     1|    914|   3.0|978301968|    914| My Fair Lady (1964)|     Musical|Romance|
|     1|   3408|   4.0|978300275|   3408|Erin Brockovich (...|               Drama|
|     1|   2355|   5.0|978824291|   2355|Bug's Life, A (1998)|Animation|Childre...|
|     1|   1197|   3.0|978302268|   1197|Princess Bride, T...|Action|Adventure|...|
|     1|   1287|   5.0|978302039|   1287|      Ben-Hur (1959)|Action|Adventure|...|
|     1|   2804|   5.0|978300719|   2804|Christmas Story, ...|        Comedy|Drama|
|     1|    594|   4.0|978302268|    594|Snow White and th...|Animation|Chil

In [12]:
sc.stop()a