In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.conf import SparkConf
config = SparkConf()
# config.set("property", "value")
config.setMaster("local").setAppName("MovieLens")

from pyspark.sql import SparkSession
# spark Session, entry point for Spark SQL, DataFrame
spark = SparkSession.builder\
                    .config(conf=config)\
                    .getOrCreate()

sc = spark.sparkContext

In [3]:
# hdfs dfs -ls /movies
# hdfs dfs -ls /ratings

In [4]:
# how to create schema programatically instead of using inferSchema
from pyspark.sql.types import StructType, LongType, StringType, IntegerType, DoubleType
# True is nullable, False is non nullable
movieSchema = StructType()\
                .add("movieId", IntegerType(), True)\
                .add("title", StringType(), True)\
                .add("genres", StringType(), True)

ratingSchema = StructType()\
                .add("userId", IntegerType(), True)\
                .add("movieId", IntegerType(), True)\
                .add("rating", DoubleType(), True)\
                .add("timestamp", LongType(), True)



In [5]:
# read movie data
# read using dataframe with defind schema
# we can use folder path - all csv in the folder read
# use file path, only that file read

# spark is session, entry point for data frame/sql

movieDf = spark.read.format("csv")\
                .option("header", True)\
                .schema(movieSchema)\
                .load("hdfs://localhost:9000/movies")

movieDf.printSchema()
movieDf.show(2)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 2 rows



In [7]:

ratingDf = spark.read.format("csv")\
                .option("header", True)\
                .schema(ratingSchema)\
                .load("hdfs://localhost:9000/ratings")

ratingDf.printSchema()
ratingDf.show(2)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
+------+-------+------+---------+
only showing top 2 rows



In [8]:
print (movieDf.count())
print(ratingDf.count())

9742
100836


In [9]:
ratingDf.take(2)

[Row(userId=1, movieId=1, rating=4.0, timestamp=964982703),
 Row(userId=1, movieId=3, rating=4.0, timestamp=964981247)]

In [12]:
# show the distinct ratings
ratingDf.select("rating").distinct().show()

+------+
|rating|
+------+
|   3.5|
|   4.5|
|   2.5|
|   1.0|
|   4.0|
|   0.5|
|   3.0|
|   2.0|
|   1.5|
|   5.0|
+------+



In [15]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find the movies by total ratings by userId
df = ratingDf\
     .groupBy("movieId")\
     .agg(count("userId").alias("total_ratings"))\
     .sort(desc("total_ratings"))

df.printSchema()
df.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- total_ratings: long (nullable = false)

+-------+-------------+
|movieId|total_ratings|
+-------+-------------+
|    356|          329|
|    318|          317|
|    296|          307|
|    593|          279|
|   2571|          278|
|    260|          251|
|    480|          238|
|    110|          237|
|    589|          224|
|    527|          220|
|   2959|          218|
|      1|          215|
|   1196|          211|
|     50|          204|
|   2858|          204|
|     47|          203|
|    780|          202|
|    150|          201|
|   1198|          200|
|   4993|          198|
+-------+-------------+
only showing top 20 rows



In [16]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find  average rating by users sorted by desc
df = ratingDf\
     .groupBy("movieId")\
     .agg(avg("rating").alias("avg_rating"))\
     .sort(desc("avg_rating"))

df.printSchema()
df.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- avg_rating: double (nullable = true)

+-------+----------+
|movieId|avg_rating|
+-------+----------+
|  33138|       5.0|
|    876|       5.0|
| 147300|       5.0|
|  27373|       5.0|
|     53|       5.0|
|  25887|       5.0|
|  84273|       5.0|
| 113829|       5.0|
| 173963|       5.0|
|  26350|       5.0|
|  67618|       5.0|
|    148|       5.0|
| 157775|       5.0|
| 142444|       5.0|
|    633|       5.0|
|    496|       5.0|
|   8911|       5.0|
|   5513|       5.0|
| 152711|       5.0|
| 150554|       5.0|
+-------+----------+
only showing top 20 rows



In [19]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find  the most popular movies, where as rated by many users, at least movies should be rated by 100 users
# and the average rating should be at least 3.5 and above
# and sort the movies by total_ratings
mostPopularMoviesDf = ratingDf\
     .groupBy("movieId")\
     .agg(avg("rating").alias("avg_rating"), count("userId").alias("total_ratings") )\
     .filter( (col("total_ratings") >= 100) & (col("avg_rating") >=3.5) )\
     .sort(desc("total_ratings"))

mostPopularMoviesDf.cache() # MEMORY

mostPopularMoviesDf.printSchema()
mostPopularMoviesDf.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- total_ratings: long (nullable = false)

+-------+------------------+-------------+
|movieId|        avg_rating|total_ratings|
+-------+------------------+-------------+
|    356| 4.164133738601824|          329|
|    318| 4.429022082018927|          317|
|    296| 4.197068403908795|          307|
|    593| 4.161290322580645|          279|
|   2571| 4.192446043165468|          278|
|    260| 4.231075697211155|          251|
|    480|              3.75|          238|
|    110| 4.031645569620253|          237|
|    589| 3.970982142857143|          224|
|    527|             4.225|          220|
|   2959| 4.272935779816514|          218|
|      1|3.9209302325581397|          215|
|   1196|4.2156398104265405|          211|
|     50| 4.237745098039215|          204|
|   2858| 4.056372549019608|          204|
|     47|3.9753694581280787|          203|
|    150| 3.845771144278607|          201|
|   1198

In [25]:
# join, inner join 
# get the movie title for the mostPopularMoviesDf
# join mostPopularMoviesDf with movieDf based on condition that mostPopularMoviesDf.movieId == movieDf.movieId

popularMoviesDf = mostPopularMoviesDf.join(movieDf, mostPopularMoviesDf.movieId == movieDf.movieId)\
                                     .select(movieDf.movieId, "title", "avg_rating", "total_ratings")\
                                     .sort(desc("total_ratings"))

popularMoviesDf.cache()

popularMoviesDf.show(100)

+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|    356| Forrest Gump (1994)| 4.164133738601824|          329|
|    318|Shawshank Redempt...| 4.429022082018927|          317|
|    296| Pulp Fiction (1994)| 4.197068403908795|          307|
|    593|Silence of the La...| 4.161290322580645|          279|
|   2571|  Matrix, The (1999)| 4.192446043165468|          278|
|    260|Star Wars: Episod...| 4.231075697211155|          251|
|    480|Jurassic Park (1993)|              3.75|          238|
|    110|   Braveheart (1995)| 4.031645569620253|          237|
|    589|Terminator 2: Jud...| 3.970982142857143|          224|
|    527|Schindler's List ...|             4.225|          220|
|   2959|   Fight Club (1999)| 4.272935779816514|          218|
|      1|    Toy Story (1995)|3.9209302325581397|          215|
|   1196|Star Wars: Episod...|4.21563981

In [27]:
popularMoviesDf.rdd.getNumPartitions()
#popularMoviesDf.rdd.glom().collect()

72

In [28]:
# write popularMoviesDf to hadoop with header [by default headers shall not be written]
# overwrite existing files
# 70 plus partitions having approx total of 100 plus records
# write 70 plus files into hadoop
popularMoviesDf.write.mode("overwrite")\
                .option("header", True)\
                .csv("hdfs://localhost:9000/most-popular-movies-many-files")

In [29]:
# write popularMoviesDf into single file
# coalesce(1) to reduce partitions
popularMoviesDf.coalesce(1).write.mode("overwrite")\
                .option("header", True)\
                .csv("hdfs://localhost:9000/most-popular-movies")


In [30]:
# now  read the files back from hdfs 
# for schema, we try to use inferSchema, let spark to build schema itself
# use inferSchema for small data set

In [35]:
# inferSchema will scan csvs and define data types for  youy schema
popularMovies = spark.read.format("csv")\
                .option("header", True)\
                .option("inferSchema", True)\
                .load("hdfs://localhost:9000/most-popular-movies-many-files")

popularMovies.printSchema()
print("Partitions", popularMovies.rdd.getNumPartitions())
popularMovies.show()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- total_ratings: integer (nullable = true)

Partitions 3
+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|   1206|Clockwork Orange,...| 3.995833333333333|          120|
|   2716|Ghostbusters (a.k...|             3.775|          120|
|   4973|Amelie (Fabuleux ...| 4.183333333333334|          120|
|   5445|Minority Report (...|            3.6375|          120|
|   1089|Reservoir Dogs (1...| 4.202290076335878|          131|
|   1240|Terminator, The (...|3.8969465648854964|          131|
|   6874|Kill Bill: Vol. 1...|3.9618320610687023|          131|
|   7361|Eternal Sunshine ...|4.1603053435114505|          131|
|   1208|Apocalypse Now (1...| 4.219626168224299|          107|
|   4896|Harry Potter and ...|3.7616822429906542

In [34]:
# inferSchema will scan csvs and define data types for  youy schema
popularMovies = spark.read.format("csv")\
                .option("header", True)\
                .option("inferSchema", True)\
                .load("hdfs://localhost:9000/most-popular-movies")

popularMovies.printSchema()
print("Partitions", popularMovies.rdd.getNumPartitions())
popularMovies.show()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- total_ratings: integer (nullable = true)

Partitions 1
+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|    356| Forrest Gump (1994)| 4.164133738601824|          329|
|    318|Shawshank Redempt...| 4.429022082018927|          317|
|    296| Pulp Fiction (1994)| 4.197068403908795|          307|
|    593|Silence of the La...| 4.161290322580645|          279|
|   2571|  Matrix, The (1999)| 4.192446043165468|          278|
|    260|Star Wars: Episod...| 4.231075697211155|          251|
|    480|Jurassic Park (1993)|              3.75|          238|
|    110|   Braveheart (1995)| 4.031645569620253|          237|
|    589|Terminator 2: Jud...| 3.970982142857143|          224|
|    527|Schindler's List ...|             4.225