In [1]:
import findspark
findspark.init()

In [3]:
from pyspark.conf import SparkConf
config = SparkConf()
# config.set("property", "value")
config.setMaster("local").setAppName("MovieLens")

from pyspark.sql import SparkSession
# spark Session, entry point for Spark SQL, DataFrame
spark = SparkSession.builder\
                    .config(conf=config)\
                    .getOrCreate()

sc = spark.sparkContext

22/05/11 22:35:40 WARN Utils: Your hostname, ubuntu-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.174.129 instead (on interface ens33)
22/05/11 22:35:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/05/11 22:35:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# how to create schema programatically instead of using inferSchema
from pyspark.sql.types import StructType, LongType, StringType, IntegerType, DoubleType
# True is nullable, False is non nullable
movieSchema = StructType()\
                .add("movieId", IntegerType(), True)\
                .add("title", StringType(), True)\
                .add("genres", StringType(), True)

ratingSchema = StructType()\
                .add("userId", IntegerType(), True)\
                .add("movieId", IntegerType(), True)\
                .add("rating", DoubleType(), True)\
                .add("timestamp", LongType(), True)

In [5]:
# read movie data
# read using dataframe with defind schema
# we can use folder path - all csv in the folder read
# use file path, only that file read

# spark is session, entry point for data frame/sql

movieDf = spark.read.format("csv")\
                .option("header", True)\
                .schema(movieSchema)\
                .load("hdfs://localhost:9000/ml-latest-small/movies.csv")

movieDf.printSchema()
movieDf.show(2)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 2 rows



                                                                                

In [9]:
ratingDf = spark.read.format("csv")\
                .option("header", True)\
                .schema(ratingSchema)\
                .load("hdfs://localhost:9000/ml-latest-small/ratings.csv")

ratingDf.printSchema()
ratingDf.show(2)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
+------+-------+------+---------+
only showing top 2 rows



In [10]:
print (movieDf.count())
print(ratingDf.count())

9742
100836


In [11]:
ratingDf.take(2)


[Row(userId=1, movieId=1, rating=4.0, timestamp=964982703),
 Row(userId=1, movieId=3, rating=4.0, timestamp=964981247)]

In [12]:
# show the distinct ratings
ratingDf.select("rating").distinct().show()

                                                                                

+------+
|rating|
+------+
|   3.5|
|   4.5|
|   2.5|
|   1.0|
|   4.0|
|   0.5|
|   3.0|
|   2.0|
|   1.5|
|   5.0|
+------+



In [13]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find the movies by total ratings by userId
df = ratingDf\
     .groupBy("movieId")\
     .agg(count("userId").alias("total_ratings"))\
     .sort(desc("total_ratings"))

df.printSchema()
df.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- total_ratings: long (nullable = false)



                                                                                

+-------+-------------+
|movieId|total_ratings|
+-------+-------------+
|    356|          329|
|    318|          317|
|    296|          307|
|    593|          279|
|   2571|          278|
|    260|          251|
|    480|          238|
|    110|          237|
|    589|          224|
|    527|          220|
|   2959|          218|
|      1|          215|
|   1196|          211|
|     50|          204|
|   2858|          204|
|     47|          203|
|    780|          202|
|    150|          201|
|   1198|          200|
|   4993|          198|
+-------+-------------+
only showing top 20 rows



In [14]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find  average rating by users sorted by desc
df = ratingDf\
     .groupBy("movieId")\
     .agg(avg("rating").alias("avg_rating"))\
     .sort(desc("avg_rating"))

df.printSchema()
df.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- avg_rating: double (nullable = true)





+-------+----------+
|movieId|avg_rating|
+-------+----------+
|  33138|       5.0|
|    876|       5.0|
| 147300|       5.0|
|  27373|       5.0|
|     53|       5.0|
|  25887|       5.0|
|  84273|       5.0|
| 113829|       5.0|
| 173963|       5.0|
|  26350|       5.0|
|  67618|       5.0|
|    148|       5.0|
| 157775|       5.0|
| 142444|       5.0|
|    633|       5.0|
|    496|       5.0|
|   8911|       5.0|
|   5513|       5.0|
| 152711|       5.0|
| 150554|       5.0|
+-------+----------+
only showing top 20 rows



                                                                                

In [15]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find the movies by total ratings by userId
df = ratingDf\
     .groupBy("movieId")\
     .agg(count("userId").alias("total_ratings"))\
     .sort(desc("total_ratings"))

df.printSchema()
df.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- total_ratings: long (nullable = false)





+-------+-------------+
|movieId|total_ratings|
+-------+-------------+
|    356|          329|
|    318|          317|
|    296|          307|
|    593|          279|
|   2571|          278|
|    260|          251|
|    480|          238|
|    110|          237|
|    589|          224|
|    527|          220|
|   2959|          218|
|      1|          215|
|   1196|          211|
|     50|          204|
|   2858|          204|
|     47|          203|
|    780|          202|
|    150|          201|
|   1198|          200|
|   4993|          198|
+-------+-------------+
only showing top 20 rows



                                                                                

In [16]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find  average rating by users sorted by desc
df = ratingDf\
     .groupBy("movieId")\
     .agg(avg("rating").alias("avg_rating"))\
     .sort(desc("avg_rating"))

df.printSchema()
df.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- avg_rating: double (nullable = true)





+-------+----------+
|movieId|avg_rating|
+-------+----------+
|  33138|       5.0|
|    876|       5.0|
| 147300|       5.0|
|  27373|       5.0|
|     53|       5.0|
|  25887|       5.0|
|  84273|       5.0|
| 113829|       5.0|
| 173963|       5.0|
|  26350|       5.0|
|  67618|       5.0|
|    148|       5.0|
| 157775|       5.0|
| 142444|       5.0|
|    633|       5.0|
|    496|       5.0|
|   8911|       5.0|
|   5513|       5.0|
| 152711|       5.0|
| 150554|       5.0|
+-------+----------+
only showing top 20 rows



                                                                                

In [17]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find  the most popular movies, where as rated by many users, at least movies should be rated by 100 users
# and the average rating should be at least 3.5 and above
# and sort the movies by total_ratings
mostPopularMoviesDf = ratingDf\
     .groupBy("movieId")\
     .agg(avg("rating").alias("avg_rating"), count("userId").alias("total_ratings") )\
     .filter( (col("total_ratings") >= 100) & (col("avg_rating") >=3.5) )\
     .sort(desc("total_ratings"))

mostPopularMoviesDf.cache() # MEMORY

mostPopularMoviesDf.printSchema()
mostPopularMoviesDf.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- total_ratings: long (nullable = false)



                                                                                

+-------+------------------+-------------+
|movieId|        avg_rating|total_ratings|
+-------+------------------+-------------+
|    356| 4.164133738601824|          329|
|    318| 4.429022082018927|          317|
|    296| 4.197068403908795|          307|
|    593| 4.161290322580645|          279|
|   2571| 4.192446043165468|          278|
|    260| 4.231075697211155|          251|
|    480|              3.75|          238|
|    110| 4.031645569620253|          237|
|    589| 3.970982142857143|          224|
|    527|             4.225|          220|
|   2959| 4.272935779816514|          218|
|      1|3.9209302325581397|          215|
|   1196|4.2156398104265405|          211|
|     50| 4.237745098039215|          204|
|   2858| 4.056372549019608|          204|
|     47|3.9753694581280787|          203|
|    150| 3.845771144278607|          201|
|   1198|            4.2075|          200|
|   4993| 4.106060606060606|          198|
|   1210| 4.137755102040816|          196|
+-------+--

In [18]:
# join, inner join 
# get the movie title for the mostPopularMoviesDf
# join mostPopularMoviesDf with movieDf based on condition that mostPopularMoviesDf.movieId == movieDf.movieId

popularMoviesDf = mostPopularMoviesDf.join(movieDf, mostPopularMoviesDf.movieId == movieDf.movieId)\
                                     .select(movieDf.movieId, "title", "avg_rating", "total_ratings")\
                                     .sort(desc("total_ratings"))

popularMoviesDf.cache()

popularMoviesDf.show(100)

+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|    356| Forrest Gump (1994)| 4.164133738601824|          329|
|    318|Shawshank Redempt...| 4.429022082018927|          317|
|    296| Pulp Fiction (1994)| 4.197068403908795|          307|
|    593|Silence of the La...| 4.161290322580645|          279|
|   2571|  Matrix, The (1999)| 4.192446043165468|          278|
|    260|Star Wars: Episod...| 4.231075697211155|          251|
|    480|Jurassic Park (1993)|              3.75|          238|
|    110|   Braveheart (1995)| 4.031645569620253|          237|
|    589|Terminator 2: Jud...| 3.970982142857143|          224|
|    527|Schindler's List ...|             4.225|          220|
|   2959|   Fight Club (1999)| 4.272935779816514|          218|
|      1|    Toy Story (1995)|3.9209302325581397|          215|
|   1196|Star Wars: Episod...|4.21563981

In [21]:
mostPopularMoviesDf.count()

110

In [24]:
mostPopularMoviesDf.explain(extended=True)

== Parsed Logical Plan ==
'Sort ['total_ratings DESC NULLS LAST], true
+- Filter ((total_ratings#203L >= cast(100 as bigint)) AND (avg_rating#201 >= 3.5))
   +- Aggregate [movieId#52], [movieId#52, avg(rating#53) AS avg_rating#201, count(userId#51) AS total_ratings#203L]
      +- Relation[userId#51,movieId#52,rating#53,timestamp#54L] csv

== Analyzed Logical Plan ==
movieId: int, avg_rating: double, total_ratings: bigint
Sort [total_ratings#203L DESC NULLS LAST], true
+- Filter ((total_ratings#203L >= cast(100 as bigint)) AND (avg_rating#201 >= 3.5))
   +- Aggregate [movieId#52], [movieId#52, avg(rating#53) AS avg_rating#201, count(userId#51) AS total_ratings#203L]
      +- Relation[userId#51,movieId#52,rating#53,timestamp#54L] csv

== Optimized Logical Plan ==
InMemoryRelation [movieId#52, avg_rating#201, total_ratings#203L], StorageLevel(disk, memory, deserialized, 1 replicas)
   +- *(3) Sort [total_ratings#203L DESC NULLS LAST], true, 0
      +- Exchange rangepartitioning(total_rati

In [25]:
# join, inner join 
# get the movie title for the mostPopularMoviesDf
# join mostPopularMoviesDf with movieDf based on condition that mostPopularMoviesDf.movieId == movieDf.movieId

popularMoviesDf = mostPopularMoviesDf.join(movieDf, mostPopularMoviesDf.movieId == movieDf.movieId)\
                                     .select(movieDf.movieId, "title", "avg_rating", "total_ratings")\
                                     .sort(desc("total_ratings"))

popularMoviesDf.cache()

popularMoviesDf.show(100)

22/05/11 21:00:17 WARN CacheManager: Asked to cache already cached data.


+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|    356| Forrest Gump (1994)| 4.164133738601824|          329|
|    318|Shawshank Redempt...| 4.429022082018927|          317|
|    296| Pulp Fiction (1994)| 4.197068403908795|          307|
|    593|Silence of the La...| 4.161290322580645|          279|
|   2571|  Matrix, The (1999)| 4.192446043165468|          278|
|    260|Star Wars: Episod...| 4.231075697211155|          251|
|    480|Jurassic Park (1993)|              3.75|          238|
|    110|   Braveheart (1995)| 4.031645569620253|          237|
|    589|Terminator 2: Jud...| 3.970982142857143|          224|
|    527|Schindler's List ...|             4.225|          220|
|   2959|   Fight Club (1999)| 4.272935779816514|          218|
|      1|    Toy Story (1995)|3.9209302325581397|          215|
|   1196|Star Wars: Episod...|4.21563981

In [26]:
popularMoviesDf.rdd.getNumPartitions()
#popularMoviesDf.rdd.glom().collect()

72

In [27]:
df = popularMoviesDf.coalesce(1)

In [29]:
print(df.rdd.getNumPartitions())

1


In [30]:
df.rdd.glom().collect()

                                                                                

[[Row(movieId=356, title='Forrest Gump (1994)', avg_rating=4.164133738601824, total_ratings=329),
  Row(movieId=318, title='Shawshank Redemption, The (1994)', avg_rating=4.429022082018927, total_ratings=317),
  Row(movieId=296, title='Pulp Fiction (1994)', avg_rating=4.197068403908795, total_ratings=307),
  Row(movieId=593, title='Silence of the Lambs, The (1991)', avg_rating=4.161290322580645, total_ratings=279),
  Row(movieId=2571, title='Matrix, The (1999)', avg_rating=4.192446043165468, total_ratings=278),
  Row(movieId=260, title='Star Wars: Episode IV - A New Hope (1977)', avg_rating=4.231075697211155, total_ratings=251),
  Row(movieId=480, title='Jurassic Park (1993)', avg_rating=3.75, total_ratings=238),
  Row(movieId=110, title='Braveheart (1995)', avg_rating=4.031645569620253, total_ratings=237),
  Row(movieId=589, title='Terminator 2: Judgment Day (1991)', avg_rating=3.970982142857143, total_ratings=224),
  Row(movieId=527, title="Schindler's List (1993)", avg_rating=4.225, 

In [31]:
# write popularMoviesDf to hadoop with header [by default headers shall not be written]
# overwrite existing files
# 70 plus partitions having approx total of 100 plus records
# write 70 plus files into hadoop
popularMoviesDf.write.mode("overwrite")\
                .option("header", True)\
                .csv("hdfs://localhost:9000/most-popular-movies-many-files")

                                                                                

In [32]:
# write popularMoviesDf into single file
# coalesce(1) to reduce partitions
popularMoviesDf.coalesce(1).write.mode("overwrite")\
                .option("header", True)\
                .csv("hdfs://localhost:9000/most-popular-movies")

In [33]:
# inferSchema will scan csvs and define data types for  youy schema
popularMovies = spark.read.format("csv")\
                .option("header", True)\
                .option("inferSchema", True)\
                .load("hdfs://localhost:9000/most-popular-movies-many-files")

popularMovies.printSchema()
print("Partitions", popularMovies.rdd.getNumPartitions())
popularMovies.show()

                                                                                

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- total_ratings: integer (nullable = true)

Partitions 3
+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|   1206|Clockwork Orange,...| 3.995833333333333|          120|
|   2716|Ghostbusters (a.k...|             3.775|          120|
|   4973|Amelie (Fabuleux ...| 4.183333333333334|          120|
|   5445|Minority Report (...|            3.6375|          120|
|   1089|Reservoir Dogs (1...| 4.202290076335878|          131|
|   1240|Terminator, The (...|3.8969465648854964|          131|
|   6874|Kill Bill: Vol. 1...|3.9618320610687023|          131|
|   7361|Eternal Sunshine ...|4.1603053435114505|          131|
|   1208|Apocalypse Now (1...| 4.219626168224299|          107|
|   4896|Harry Potter and ...|3.7616822429906542

In [6]:
# inferSchema will scan csvs and define data types for  youy schema
popularMovies = spark.read.format("csv")\
                .option("header", True)\
                .option("inferSchema", True)\
                .load("hdfs://localhost:9000/most-popular-movies")

popularMovies.printSchema()
print("Partitions", popularMovies.rdd.getNumPartitions())
popularMovies.show()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- total_ratings: integer (nullable = true)

Partitions 1
+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|    356| Forrest Gump (1994)| 4.164133738601824|          329|
|    318|Shawshank Redempt...| 4.429022082018927|          317|
|    296| Pulp Fiction (1994)| 4.197068403908795|          307|
|    593|Silence of the La...| 4.161290322580645|          279|
|   2571|  Matrix, The (1999)| 4.192446043165468|          278|
|    260|Star Wars: Episod...| 4.231075697211155|          251|
|    480|Jurassic Park (1993)|              3.75|          238|
|    110|   Braveheart (1995)| 4.031645569620253|          237|
|    589|Terminator 2: Jud...| 3.970982142857143|          224|
|    527|Schindler's List ...|             4.225

In [7]:
# inferSchema will scan csvs and define data types for  youy schema
stocksDaily = spark.read.format("csv")\
                .option("header", True)\
                .option("inferSchema", True)\
                .load("hdfs://localhost:9000/stocks-main/daily")

stocksDaily.printSchema()
print("Partitions", stocksDaily.rdd.getNumPartitions())
stocksDaily.show()

root
 |-- SYMBOL: string (nullable = true)
 |-- SERIES: string (nullable = true)
 |-- OPEN: double (nullable = true)
 |-- HIGH: double (nullable = true)
 |-- LOW: double (nullable = true)
 |-- CLOSE: double (nullable = true)
 |-- LAST: double (nullable = true)
 |-- PREVCLOSE: double (nullable = true)
 |-- TOTTRDQTY: integer (nullable = true)
 |-- TOTTRDVAL: double (nullable = true)
 |-- TIMESTAMP: string (nullable = true)
 |-- TOTALTRADES: integer (nullable = true)
 |-- ISIN: string (nullable = true)
 |-- _c13: string (nullable = true)

Partitions 1
+----------+------+-------+-------+-------+-------+-------+---------+---------+-------------+-----------+-----------+------------+----+
|    SYMBOL|SERIES|   OPEN|   HIGH|    LOW|  CLOSE|   LAST|PREVCLOSE|TOTTRDQTY|    TOTTRDVAL|  TIMESTAMP|TOTALTRADES|        ISIN|_c13|
+----------+------+-------+-------+-------+-------+-------+---------+---------+-------------+-----------+-----------+------------+----+
| 20MICRONS|    EQ|   70.1|   73.6| 

22/05/11 22:37:55 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: SYMBOL, SERIES, OPEN, HIGH, LOW, CLOSE, LAST, PREVCLOSE, TOTTRDQTY, TOTTRDVAL, TIMESTAMP, TOTALTRADES, ISIN, 
 Schema: SYMBOL, SERIES, OPEN, HIGH, LOW, CLOSE, LAST, PREVCLOSE, TOTTRDQTY, TOTTRDVAL, TIMESTAMP, TOTALTRADES, ISIN, _c13
Expected: _c13 but found: 
CSV file: hdfs://localhost:9000/stocks-main/daily/cm02MAR2022bhav.csv


In [9]:
# inferSchema will scan csvs and define data types for  youy schema
stocksSector = spark.read.format("csv")\
                .option("header", True)\
                .option("inferSchema", True)\
                .load("hdfs://localhost:9000/stocks-main/sectors")

stocksSector.printSchema()
print("Partitions", stocksSector.rdd.getNumPartitions())
stocksSector.show()

root
 |-- Company Name: string (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Symbol: string (nullable = true)
 |-- Series: string (nullable = true)
 |-- ISIN Code: string (nullable = true)

Partitions 1
+--------------------+------------------+----------+------+------------+
|        Company Name|          Industry|    Symbol|Series|   ISIN Code|
+--------------------+------------------+----------+------+------------+
|      Axis Bank Ltd.|FINANCIAL SERVICES|  AXISBANK|    EQ|INE238A01034|
|  Bajaj Finance Ltd.|FINANCIAL SERVICES|BAJFINANCE|    EQ|INE296A01024|
|  Bajaj Finserv Ltd.|FINANCIAL SERVICES|BAJAJFINSV|    EQ|INE918I01018|
|Cholamandalam Inv...|FINANCIAL SERVICES|  CHOLAFIN|    EQ|INE121A01024|
|HDFC Asset Manage...|FINANCIAL SERVICES|   HDFCAMC|    EQ|INE127D01025|
|      HDFC Bank Ltd.|FINANCIAL SERVICES|  HDFCBANK|    EQ|INE040A01034|
|HDFC Life Insuran...|FINANCIAL SERVICES|  HDFCLIFE|    EQ|INE795G01014|
|Housing Developme...|FINANCIAL SERVICES|      HDF

In [11]:
stocksDaily.join(stocksSector, stocksDaily["Symbol"] ==  stocksSector["Symbol"], "inner").show()


+----------+------+-------+--------+-------+--------+-------+---------+---------+---------------+-----------+-----------+------------+----+--------------------+-------------------+----------+------+------------+
|    SYMBOL|SERIES|   OPEN|    HIGH|    LOW|   CLOSE|   LAST|PREVCLOSE|TOTTRDQTY|      TOTTRDVAL|  TIMESTAMP|TOTALTRADES|        ISIN|_c13|        Company Name|           Industry|    Symbol|Series|   ISIN Code|
+----------+------+-------+--------+-------+--------+-------+---------+---------+---------------+-----------+-----------+------------+----+--------------------+-------------------+----------+------+------------+
|ABBOTINDIA|    EQ|17500.0| 17500.0|16900.1|17200.25|17176.2|  17556.5|    29193| 5.0080170615E8|02-MAR-2022|      10804|INE358A01014|null|   Abbott India Ltd.|             PHARMA|ABBOTINDIA|    EQ|INE358A01014|
|  ADANIENT|    EQ| 1638.0| 1664.25| 1615.5|  1641.8| 1647.0|  1644.45|  1249817| 2.0597624144E9|02-MAR-2022|      30797|INE423A01024|null|Adani Enterpr

22/05/11 22:42:57 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: SYMBOL, SERIES, OPEN, HIGH, LOW, CLOSE, LAST, PREVCLOSE, TOTTRDQTY, TOTTRDVAL, TIMESTAMP, TOTALTRADES, ISIN, 
 Schema: SYMBOL, SERIES, OPEN, HIGH, LOW, CLOSE, LAST, PREVCLOSE, TOTTRDQTY, TOTTRDVAL, TIMESTAMP, TOTALTRADES, ISIN, _c13
Expected: _c13 but found: 
CSV file: hdfs://localhost:9000/stocks-main/daily/cm02MAR2022bhav.csv
