In [0]:
dbutils.widgets.removeAll()

In [0]:
# DEMO using taskvalues.get, but not recommended as we may use different name for tasks in different workflow.

# read task values from downstream jobs
# movies_path = dbutils.jobs.taskValues.get(taskKey = "movies_to_silver", key = "movies_silver_path", debugValue = "movies-not-found")

# print ("MOVIES PATH FROM PREVIOUS ONE ", movies_path)

# BEST PRACTICE: use dynamic variables in the task parameters

# add parameter with expression {{tasks.<task_name>.values.<value_name>}}.
# like parameter_name = {{tasks.movies_to_silver.values.movies_silver_path}}
# then use widgets.get to get the value.

In [0]:

dbutils.widgets.text("ratingsPath", "abfss://silver@gksdatalake.dfs.core.windows.net/ratings/")

dbutils.widgets.text("moviesSilverPath", "abfss://silver@gksdatalake.dfs.core.windows.net/movies/")

dbutils.widgets.text("popularMoviesPath", "abfss://gold@gksdatalake.dfs.core.windows.net/popular-movies/")

In [0]:
# READ SILVER DATA WHICH IS PARQUET FORMAT
# DATA ANALYTICS , ENRICHMENT ON SILVER DATA, NOT CSV/BRONZE
# QUALITY OF DATA IMPROVED, LIKE RATING > 0.5
# Find Top rated movies
# At leasted rated by 100 users, avg rating should be 4 or above
# MUST Modify two things, storage account name, and the key
 

In [0]:
MOVIES_PATH = dbutils.widgets.get("moviesSilverPath")
RATINGS_PATH= dbutils.widgets.get("ratingsPath")
print (MOVIES_PATH)
print (RATINGS_PATH)

abfss://silver@gksdatalake.dfs.core.windows.net/movies/
abfss://silver@gksdatalake.dfs.core.windows.net/ratings/


In [0]:
# PARQUET HAS SCHEMA AT END OF THE FILE
# HERE IT IS NOT INFER DATA FOR SCHEMA
movieDf = spark.read.parquet(MOVIES_PATH) 
movieDf.printSchema()
movieDf.show(5, truncate = False) # does not truncate long strings

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows



In [0]:
ratingDf = spark.read.parquet(RATINGS_PATH)
ratingDf.printSchema()
ratingDf.show(5)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [0]:
# dont write this code , 
def col(name):
    print ("hi col", name)
    return name.upper()

print (col("rating")) # our col defined above

# we have functions from std lib, 3rd party librares, pyspark etc, we import function etc, and use them

from pyspark.sql.functions import col  # col from pyspark will override your col function

# from pyspark.sql.functions import * # also override your functions

print (col("rating")) # this calls pyspark col

hi col rating
RATING
Column<'rating'>


In [0]:
import pyspark.sql.functions as F # F is alias name
# F.col 
# groupBy will cause wider transformation, shuffle operation

# print ("ratingDf partitions ", ratingDf.rdd.getNumPartitions())

# MISTAKE : SORTING DATA AT TOO EARLY, spark can optimize that for you.
# sort is wider transformation, shuffle

popularMoviesDf = ( ratingDf
                        .groupBy("movieId")
                        .agg(
                            F.count("userId").alias("userCount"),
                            F.avg("rating").alias("avgRating")
                        ) # agg
                        .filter (F.col("userCount") > 100)
                        
                        .sort (F.col("userCount").desc())
                    )

# print ("popularMoviesDf partitions ", popularMoviesDf.rdd.getNumPartitions())
popularMoviesDf.printSchema()
popularMoviesDf.show(20) # action

root
 |-- movieId: integer (nullable = true)
 |-- userCount: long (nullable = false)
 |-- avgRating: double (nullable = true)

+-------+---------+------------------+
|movieId|userCount|         avgRating|
+-------+---------+------------------+
|    356|      328| 4.175304878048781|
|    318|      317| 4.429022082018927|
|    296|      305| 4.221311475409836|
|    593|      276| 4.201086956521739|
|   2571|      273|  4.26007326007326|
|    260|      250|             4.246|
|    480|      237|3.7637130801687766|
|    110|      236| 4.046610169491525|
|    589|      221| 4.018099547511312|
|    527|      218| 4.259174311926605|
|   2959|      215| 4.325581395348837|
|      1|      214|3.9369158878504673|
|   1196|      210| 4.233333333333333|
|     50|      204| 4.237745098039215|
|   2858|      203| 4.073891625615763|
|     47|      201| 4.009950248756219|
|    150|      201| 3.845771144278607|
|    780|      200|             3.475|
|   1198|      199| 4.226130653266332|
|   1210|      

In [0]:
# runs on driver, not on worker
popularMoviesDf.explain () # PHYSICAL PLAN

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   Sort [userCount#3039L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(userCount#3039L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=3891]
      +- Filter (userCount#3039L > 100)
         +- HashAggregate(keys=[movieId#3002], functions=[finalmerge_count(merge count#3090L) AS count(userId#3001)#3084L, finalmerge_avg(merge sum#3093, count#3094L) AS avg(rating#3003)#3085])
            +- Exchange hashpartitioning(movieId#3002, 200), ENSURE_REQUIREMENTS, [plan_id=3887]
               +- HashAggregate(keys=[movieId#3002], functions=[partial_count(userId#3001) AS count#3090L, partial_avg(rating#3003) AS (sum#3093, count#3094L)])
                  +- FileScan parquet [userId#3001,movieId#3002,rating#3003] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[abfss://silver@gksdatalake.dfs.core.windows.net/ratings], PartitionFilters: [], PushedFilters: [], ReadSchem

In [0]:
popularMoviesDf.explain (extended= True ) # Print all plans

== Parsed Logical Plan ==
'Sort ['userCount DESC NULLS LAST], true
+- 'Filter '`>`('userCount, 100)
   +- 'Aggregate ['movieId], ['movieId, 'count('userId) AS userCount#3039, 'avg('rating) AS avgRating#3040]
      +- Relation [userId#3001,movieId#3002,rating#3003,timestamp#3004L] parquet

== Analyzed Logical Plan ==
movieId: int, userCount: bigint, avgRating: double
Sort [userCount#3039L DESC NULLS LAST], true
+- Filter (userCount#3039L > cast(100 as bigint))
   +- Aggregate [movieId#3002], [movieId#3002, count(userId#3001) AS userCount#3039L, avg(rating#3003) AS avgRating#3040]
      +- Relation [userId#3001,movieId#3002,rating#3003,timestamp#3004L] parquet

== Optimized Logical Plan ==
Sort [userCount#3039L DESC NULLS LAST], true
+- Filter (userCount#3039L > 100)
   +- Aggregate [movieId#3002], [movieId#3002, count(userId#3001) AS userCount#3039L, avg(rating#3003) AS avgRating#3040]
      +- Project [userId#3001, movieId#3002, rating#3003]
         +- Relation [userId#3001,movieId#30

In [0]:
# we have to to join with movieDf to know movie title
# you may see that sort, what we have in previous cell is not needd, it was not respected here
# join will shuffle data, sorted data again disordered.
mostPopularMoviesDf =  popularMoviesDf.join (movieDf, popularMoviesDf.movieId == movieDf.movieId, "inner" )\
                                      .filter (F.col("avgRating") > 4.0)\
                                      .sort (F.col("userCount").desc())\
                                      .drop(popularMoviesDf.movieId)\
                                      .select("movieId", "title", "userCount", "avgRating" )

mostPopularMoviesDf.printSchema()
mostPopularMoviesDf.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- userCount: long (nullable = false)
 |-- avgRating: double (nullable = true)

+-------+--------------------+---------+-----------------+
|movieId|               title|userCount|        avgRating|
+-------+--------------------+---------+-----------------+
|    356| Forrest Gump (1994)|      328|4.175304878048781|
|    318|Shawshank Redempt...|      317|4.429022082018927|
|    296| Pulp Fiction (1994)|      305|4.221311475409836|
|    593|Silence of the La...|      276|4.201086956521739|
|   2571|  Matrix, The (1999)|      273| 4.26007326007326|
|    260|Star Wars: Episod...|      250|            4.246|
|    110|   Braveheart (1995)|      236|4.046610169491525|
|    589|Terminator 2: Jud...|      221|4.018099547511312|
|    527|Schindler's List ...|      218|4.259174311926605|
|   2959|   Fight Club (1999)|      215|4.325581395348837|
|   1196|Star Wars: Episod...|      210|4.233333333333333|
|     50|U

In [0]:
mostPopularMoviesDf.explain(extended = True)

== Parsed Logical Plan ==
'Project ['movieId, 'title, 'userCount, 'avgRating]
+- Project [userCount#3039L, avgRating#3040, movieId#2971, title#2972, genres#2973]
   +- Sort [userCount#3039L DESC NULLS LAST], true
      +- Filter (avgRating#3040 > 4.0)
         +- Join Inner, (movieId#3002 = movieId#2971)
            :- Sort [userCount#3039L DESC NULLS LAST], true
            :  +- Filter (userCount#3039L > cast(100 as bigint))
            :     +- Aggregate [movieId#3002], [movieId#3002, count(userId#3001) AS userCount#3039L, avg(rating#3003) AS avgRating#3040]
            :        +- Relation [userId#3001,movieId#3002,rating#3003,timestamp#3004L] parquet
            +- Relation [movieId#2971,title#2972,genres#2973] parquet

== Analyzed Logical Plan ==
movieId: int, title: string, userCount: bigint, avgRating: double
Project [movieId#2971, title#2972, userCount#3039L, avgRating#3040]
+- Project [userCount#3039L, avgRating#3040, movieId#2971, title#2972, genres#2973]
   +- Sort [userCou

In [0]:
mostPopularMoviesDf.is_cached # whether cached or not
# DATA FRAME IS NOT CACHED
# ASSUME WE DO Anotehr action
# df.write # ACTION, read movies AGAIN , ratings AGAIN, aggreration AGAIN, join AGAIN
# df.write # JOB AGAIN, STAGES AGAIN, TASKS/PARTITION AGAIN, SHUFFLEING AGAIN
#mostPopularMoviesDf.write.mode("overwrite").json('json-file-path')

False

In [0]:
# CAche will happen when we apply first action
mostPopularMoviesDf.cache () # LAZY # MEMORY_AND_DISK
from pyspark import StorageLevel
#mostPopularMoviesDf.persist(StorageLevel.DISK_ONLY)
mostPopularMoviesDf.is_cached # true

True

In [0]:
# now write the results to gold zone in parquet format
POPULAR_MOVIES_TARGET_PATH = dbutils.widgets.get("popularMoviesPath")
# Wer GOT OUTPUT, write result to jdbc, json, orc, parquet, ...
# df.write # ACTION, read movies, ratings, aggreration, join
# df.write # JOB, STAGES, TASKS/PARTITION, SHUFFLEING
mostPopularMoviesDf.coalesce(1).write.mode("overwrite").parquet(POPULAR_MOVIES_TARGET_PATH)