In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.conf import SparkConf
config = SparkConf()
# config.set("property", "value")
config.setMaster("local[4]").setAppName("MovieLens")

from pyspark.sql import SparkSession
# spark Session, entry point for Spark SQL, DataFrame
spark = SparkSession.builder\
                    .config(conf=config)\
                    .getOrCreate()

sc = spark.sparkContext

22/06/06 23:40:57 WARN Utils: Your hostname, ubuntu-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.174.129 instead (on interface ens33)
22/06/06 23:40:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark-3.1.3-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3811e9a6-e8c3-49aa-a1b1-b10ef4971596;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.3 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 2105ms :: artifacts dl 11ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from centra

In [3]:
# how to create schema programatically instead of using inferSchema
from pyspark.sql.types import StructType, LongType, StringType, IntegerType, DoubleType
# True is nullable, False is non nullable
movieSchema = StructType()\
                .add("movieId", IntegerType(), True)\
                .add("title", StringType(), True)\
                .add("genres", StringType(), True)

ratingSchema = StructType()\
                .add("userId", IntegerType(), True)\
                .add("movieId", IntegerType(), True)\
                .add("rating", DoubleType(), True)\
                .add("timestamp", LongType(), True)

In [7]:
# read movie data
# read using dataframe with defind schema
# we can use folder path - all csv in the folder read
# use file path, only that file read

# spark is session, entry point for data frame/sql

movieDf = spark.read.format("csv")\
                .option("header", True)\
                .schema(movieSchema)\
                .load("hdfs://localhost:9000/ml-latest-small/movies.csv")

movieDf.cache()
movieDf.printSchema()
movieDf.show(2) # action


root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



[Stage 2:>                                                          (0 + 1) / 1]

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 2 rows



                                                                                

In [12]:
ratingDf = spark.read.format("csv")\
                .option("header", True)\
                .schema(ratingSchema)\
                .load("hdfs://localhost:9000/ml-latest-small/ratings.csv")
ratingDf = ratingDf.repartition(10)
ratingDf.cache()
ratingDf.printSchema()
ratingDf.show(2)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|   249|   5803|   3.0|1354225800|
|   610|  84772|   3.5|1493846852|
+------+-------+------+----------+
only showing top 2 rows



In [13]:
print (movieDf.count())
print(ratingDf.count())

9742
100836


                                                                                

In [10]:
ratingDf.take(2) # same as ratingDf.rdd.take(2)

[Row(userId=1, movieId=1, rating=4.0, timestamp=964982703),
 Row(userId=1, movieId=3, rating=4.0, timestamp=964981247)]

In [15]:
# show the distinct ratings
ratingDf.select("rating").distinct().show()

+------+
|rating|
+------+
|   3.5|
|   4.5|
|   2.5|
|   1.0|
|   4.0|
|   0.5|
|   3.0|
|   2.0|
|   1.5|
|   5.0|
+------+



In [16]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find the movies by total ratings by userId
df = ratingDf\
     .groupBy("movieId")\
     .agg(count("userId"))

df.printSchema()
df.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- count(userId): long (nullable = false)





+-------+-------------+
|movieId|count(userId)|
+-------+-------------+
|   5803|            7|
|   1645|           51|
|  44022|           23|
|   3918|            9|
|   4900|            5|
|   1580|          165|
|   1088|           42|
|   8638|           15|
|   3175|           75|
|   2659|            1|
|    471|           40|
|   1591|           26|
|   6620|           18|
|   1959|           15|
|   2142|           10|
|   1238|            9|
| 140541|            1|
|  68135|           10|
| 175197|            1|
|  32460|            4|
+-------+-------------+
only showing top 20 rows



                                                                                

In [17]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find the movies by total ratings by userId
df = ratingDf\
     .groupBy("movieId")\
     .agg(count("userId").alias("total_ratings"))\
     .sort(desc("total_ratings"))

df.printSchema()
df.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- total_ratings: long (nullable = false)





+-------+-------------+
|movieId|total_ratings|
+-------+-------------+
|    356|          329|
|    318|          317|
|    296|          307|
|    593|          279|
|   2571|          278|
|    260|          251|
|    480|          238|
|    110|          237|
|    589|          224|
|    527|          220|
|   2959|          218|
|      1|          215|
|   1196|          211|
|     50|          204|
|   2858|          204|
|     47|          203|
|    780|          202|
|    150|          201|
|   1198|          200|
|   4993|          198|
+-------+-------------+
only showing top 20 rows



                                                                                

In [12]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find  average rating by users sorted by desc
df = ratingDf\
     .groupBy("movieId")\
     .agg(avg("rating").alias("avg_rating"))\
     .sort(desc("avg_rating"))

df.printSchema()
df.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- avg_rating: double (nullable = true)





+-------+----------+
|movieId|avg_rating|
+-------+----------+
|  33138|       5.0|
|    876|       5.0|
| 147300|       5.0|
|  27373|       5.0|
|     53|       5.0|
|  25887|       5.0|
|  84273|       5.0|
| 113829|       5.0|
| 173963|       5.0|
|  26350|       5.0|
|  67618|       5.0|
|    148|       5.0|
| 157775|       5.0|
| 142444|       5.0|
|    633|       5.0|
|    496|       5.0|
|   8911|       5.0|
|   5513|       5.0|
| 152711|       5.0|
| 150554|       5.0|
+-------+----------+
only showing top 20 rows



                                                                                

In [19]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find  the most popular movies, where as rated by many users, at least movies should be rated by 100 users
# and the average rating should be at least 3.5 and above
# and sort the movies by total_ratings
mostPopularMoviesDf = ratingDf\
     .groupBy("movieId")\
     .agg(avg("rating").alias("avg_rating"), count("userId").alias("total_ratings") )\
     .sort(desc("total_ratings"))\
     .filter( (col("total_ratings") >= 100) & (col("avg_rating") >=3.5) )\

mostPopularMoviesDf.cache() # MEMORY

mostPopularMoviesDf.printSchema()
mostPopularMoviesDf.show(20)

22/06/07 00:38:25 WARN CacheManager: Asked to cache already cached data.


root
 |-- movieId: integer (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- total_ratings: long (nullable = false)

+-------+------------------+-------------+
|movieId|        avg_rating|total_ratings|
+-------+------------------+-------------+
|    356| 4.164133738601824|          329|
|    318| 4.429022082018927|          317|
|    296| 4.197068403908795|          307|
|    593| 4.161290322580645|          279|
|   2571| 4.192446043165468|          278|
|    260| 4.231075697211155|          251|
|    480|              3.75|          238|
|    110| 4.031645569620253|          237|
|    589| 3.970982142857143|          224|
|    527|             4.225|          220|
|   2959| 4.272935779816514|          218|
|      1|3.9209302325581397|          215|
|   1196|4.2156398104265405|          211|
|     50| 4.237745098039215|          204|
|   2858| 4.056372549019608|          204|
|     47|3.9753694581280787|          203|
|    150| 3.845771144278607|          201|
|   1198

In [14]:
mostPopularMoviesDf.count()

                                                                                

110

In [15]:
mostPopularMoviesDf.count()

                                                                                

110

In [20]:
mostPopularMoviesDf.explain() # print physical plan

== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [movieId#23, avg_rating#441, total_ratings#443L]
      +- InMemoryRelation [movieId#23, avg_rating#441, total_ratings#443L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(3) Sort [total_ratings#443L DESC NULLS LAST], true, 0
               +- Exchange rangepartitioning(total_ratings#443L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#303]
                  +- *(2) Filter ((isnotnull(avg_rating#441) AND (total_ratings#443L >= 100)) AND (avg_rating#441 >= 3.5))
                     +- *(2) HashAggregate(keys=[movieId#23], functions=[avg(rating#24), count(userId#22)])
                        +- Exchange hashpartitioning(movieId#23, 200), ENSURE_REQUIREMENTS, [id=#298]
                           +- *(1) HashAggregate(keys=[movieId#23], functions=[partial_avg(rating#24), partial_count(userId#22)])
                              +- FileScan csv [userId#22,movieId#23,rating#24] Batched: false, DataFilters: []

In [21]:
# Plan should be read bottom .. to up  [bottom action is the first.. top action is last]
mostPopularMoviesDf.explain(extended=True) # print parsed plan, logical and optimized and physical plan

# == Parsed Logical Plan == 
# the code as is parsed or deconded by spark catalyst, not yet validated with table, or columns or data types, 
# no optimization

# == Analyzed Logical Plan ==
# catalog /hive, schema and validate columns and data types

# == Optimized Logical Plan ==
# spark shall optmimize analyised plan 

== Parsed Logical Plan ==
'Filter (('total_ratings >= 100) AND ('avg_rating >= 3.5))
+- Sort [total_ratings#443L DESC NULLS LAST], true
   +- Aggregate [movieId#23], [movieId#23, avg(rating#24) AS avg_rating#441, count(userId#22) AS total_ratings#443L]
      +- Relation[userId#22,movieId#23,rating#24,timestamp#25L] csv

== Analyzed Logical Plan ==
movieId: int, avg_rating: double, total_ratings: bigint
Filter ((total_ratings#443L >= cast(100 as bigint)) AND (avg_rating#441 >= 3.5))
+- Sort [total_ratings#443L DESC NULLS LAST], true
   +- Aggregate [movieId#23], [movieId#23, avg(rating#24) AS avg_rating#441, count(userId#22) AS total_ratings#443L]
      +- Relation[userId#22,movieId#23,rating#24,timestamp#25L] csv

== Optimized Logical Plan ==
InMemoryRelation [movieId#23, avg_rating#441, total_ratings#443L], StorageLevel(disk, memory, deserialized, 1 replicas)
   +- *(3) Sort [total_ratings#443L DESC NULLS LAST], true, 0
      +- Exchange rangepartitioning(total_ratings#443L DESC NULLS

In [22]:
# join, inner join 
# get the movie title for the mostPopularMoviesDf
# join mostPopularMoviesDf with movieDf based on condition that mostPopularMoviesDf.movieId == movieDf.movieId

popularMoviesDf = mostPopularMoviesDf.join(movieDf, mostPopularMoviesDf.movieId == movieDf.movieId)\
                                     .select(movieDf.movieId, "title", "avg_rating", "total_ratings")\
                                     .sort(desc("total_ratings"))

popularMoviesDf.cache()

popularMoviesDf.show(100)



+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|    356| Forrest Gump (1994)| 4.164133738601824|          329|
|    318|Shawshank Redempt...| 4.429022082018927|          317|
|    296| Pulp Fiction (1994)| 4.197068403908795|          307|
|    593|Silence of the La...| 4.161290322580645|          279|
|   2571|  Matrix, The (1999)| 4.192446043165468|          278|
|    260|Star Wars: Episod...| 4.231075697211155|          251|
|    480|Jurassic Park (1993)|              3.75|          238|
|    110|   Braveheart (1995)| 4.031645569620253|          237|
|    589|Terminator 2: Jud...| 3.970982142857143|          224|
|    527|Schindler's List ...|             4.225|          220|
|   2959|   Fight Club (1999)| 4.272935779816514|          218|
|      1|    Toy Story (1995)|3.9209302325581397|          215|
|   1196|Star Wars: Episod...|4.21563981

                                                                                

In [23]:
popularMoviesDf.explain(extended=True)

== Parsed Logical Plan ==
'Sort ['total_ratings DESC NULLS LAST], true
+- Project [movieId#0, title#1, avg_rating#441, total_ratings#443L]
   +- Join Inner, (movieId#23 = movieId#0)
      :- Filter ((total_ratings#443L >= cast(100 as bigint)) AND (avg_rating#441 >= 3.5))
      :  +- Sort [total_ratings#443L DESC NULLS LAST], true
      :     +- Aggregate [movieId#23], [movieId#23, avg(rating#24) AS avg_rating#441, count(userId#22) AS total_ratings#443L]
      :        +- Relation[userId#22,movieId#23,rating#24,timestamp#25L] csv
      +- Relation[movieId#0,title#1,genres#2] csv

== Analyzed Logical Plan ==
movieId: int, title: string, avg_rating: double, total_ratings: bigint
Sort [total_ratings#443L DESC NULLS LAST], true
+- Project [movieId#0, title#1, avg_rating#441, total_ratings#443L]
   +- Join Inner, (movieId#23 = movieId#0)
      :- Filter ((total_ratings#443L >= cast(100 as bigint)) AND (avg_rating#441 >= 3.5))
      :  +- Sort [total_ratings#443L DESC NULLS LAST], true
      

In [24]:
popularMoviesDf.rdd.getNumPartitions()
#popularMoviesDf.rdd.glom().collect()

72

In [25]:
popularMoviesDf.count()

                                                                                

110

In [26]:
popularMoviesDf.rdd.glom().collect()

                                                                                

[[Row(movieId=356, title='Forrest Gump (1994)', avg_rating=4.164133738601824, total_ratings=329)],
 [Row(movieId=318, title='Shawshank Redemption, The (1994)', avg_rating=4.429022082018927, total_ratings=317)],
 [Row(movieId=296, title='Pulp Fiction (1994)', avg_rating=4.197068403908795, total_ratings=307)],
 [Row(movieId=593, title='Silence of the Lambs, The (1991)', avg_rating=4.161290322580645, total_ratings=279)],
 [Row(movieId=2571, title='Matrix, The (1999)', avg_rating=4.192446043165468, total_ratings=278)],
 [Row(movieId=260, title='Star Wars: Episode IV - A New Hope (1977)', avg_rating=4.231075697211155, total_ratings=251)],
 [Row(movieId=480, title='Jurassic Park (1993)', avg_rating=3.75, total_ratings=238)],
 [Row(movieId=110, title='Braveheart (1995)', avg_rating=4.031645569620253, total_ratings=237)],
 [Row(movieId=589, title='Terminator 2: Judgment Day (1991)', avg_rating=3.970982142857143, total_ratings=224)],
 [Row(movieId=527, title="Schindler's List (1993)", avg_ratin

In [27]:
# create a dattarframe with popularMoviesDf where we reduce the parititons to 1

df = popularMoviesDf.coalesce(1)
print (df.rdd.getNumPartitions())
df.rdd.glom().collect()

1


[[Row(movieId=356, title='Forrest Gump (1994)', avg_rating=4.164133738601824, total_ratings=329),
  Row(movieId=318, title='Shawshank Redemption, The (1994)', avg_rating=4.429022082018927, total_ratings=317),
  Row(movieId=296, title='Pulp Fiction (1994)', avg_rating=4.197068403908795, total_ratings=307),
  Row(movieId=593, title='Silence of the Lambs, The (1991)', avg_rating=4.161290322580645, total_ratings=279),
  Row(movieId=2571, title='Matrix, The (1999)', avg_rating=4.192446043165468, total_ratings=278),
  Row(movieId=260, title='Star Wars: Episode IV - A New Hope (1977)', avg_rating=4.231075697211155, total_ratings=251),
  Row(movieId=480, title='Jurassic Park (1993)', avg_rating=3.75, total_ratings=238),
  Row(movieId=110, title='Braveheart (1995)', avg_rating=4.031645569620253, total_ratings=237),
  Row(movieId=589, title='Terminator 2: Judgment Day (1991)', avg_rating=3.970982142857143, total_ratings=224),
  Row(movieId=527, title="Schindler's List (1993)", avg_rating=4.225, 

In [29]:
# write popularMoviesDf to hadoop with header [by default headers shall not be written]
# overwrite existing files
# 70 plus partitions having approx total of 100 plus records
# write 70 plus files into hadoop
# write opeartions runs parallel on executors on each paritions
#Write to HDFS 
popularMoviesDf.write.mode("overwrite")\
                .option("header", True)\
                .csv("hdfs://localhost:9000/most-popular-movies-many-files")

                                                                                

In [30]:
# write popularMoviesDf into single file
# coalesce(1) to reduce partitions
popularMoviesDf.coalesce(1).write.mode("overwrite")\
                .option("header", True)\
                .csv("hdfs://localhost:9000/most-popular-movies")

In [31]:
# now  read the files back from hdfs 
# for schema, we try to use inferSchema, let spark to build schema itself
# use inferSchema for small data set

In [32]:
# inferSchema will scan csvs and define data types for  youy schema
# spark can load all the csv files from directory
popularMovies = spark.read.format("csv")\
                .option("header", True)\
                .option("inferSchema", True)\
                .load("hdfs://localhost:9000/most-popular-movies-many-files")

popularMovies.printSchema()
print("Partitions", popularMovies.rdd.getNumPartitions())
popularMovies.show()

                                                                                

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- total_ratings: integer (nullable = true)

Partitions 3
+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|   1206|Clockwork Orange,...| 3.995833333333333|          120|
|   2716|Ghostbusters (a.k...|             3.775|          120|
|   4973|Amelie (Fabuleux ...| 4.183333333333334|          120|
|   5445|Minority Report (...|            3.6375|          120|
|   1089|Reservoir Dogs (1...| 4.202290076335878|          131|
|   1240|Terminator, The (...|3.8969465648854964|          131|
|   6874|Kill Bill: Vol. 1...|3.9618320610687023|          131|
|   7361|Eternal Sunshine ...|4.1603053435114505|          131|
|   1208|Apocalypse Now (1...| 4.219626168224299|          107|
|   4896|Harry Potter and ...|3.7616822429906542

In [33]:
# inferSchema will scan csvs and define data types for  youy schema
popularMovies = spark.read.format("csv")\
                .option("header", True)\
                .option("inferSchema", True)\
                .load("hdfs://localhost:9000/most-popular-movies")

popularMovies.printSchema()
print("Partitions", popularMovies.rdd.getNumPartitions())
popularMovies.show()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- total_ratings: integer (nullable = true)

Partitions 1
+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|    356| Forrest Gump (1994)| 4.164133738601824|          329|
|    318|Shawshank Redempt...| 4.429022082018927|          317|
|    296| Pulp Fiction (1994)| 4.197068403908795|          307|
|    593|Silence of the La...| 4.161290322580645|          279|
|   2571|  Matrix, The (1999)| 4.192446043165468|          278|
|    260|Star Wars: Episod...| 4.231075697211155|          251|
|    480|Jurassic Park (1993)|              3.75|          238|
|    110|   Braveheart (1995)| 4.031645569620253|          237|
|    589|Terminator 2: Jud...| 3.970982142857143|          224|
|    527|Schindler's List ...|             4.225