In [18]:
# Take data from silver zone (clean zone) and apply aggregate, move to gold zone

In [19]:
# spark session is not initialized

from pyspark.sql import SparkSession
# in general, you dont hardcode the config in code, instead we will use spark-submit
spark = SparkSession.builder \
    .appName("GKSMovieRatingAggregate") \
    .master("yarn") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.executor.instances", "1") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

25/02/06 05:59:10 INFO SparkEnv: Registering MapOutputTracker
25/02/06 05:59:10 INFO SparkEnv: Registering BlockManagerMaster
25/02/06 05:59:10 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/02/06 05:59:10 INFO SparkEnv: Registering OutputCommitCoordinator


In [20]:
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hadoop")
spark.conf.set("spark.sql.catalog.iceberg.warehouse", "gs://gks-tpch/iceberg_warehouse") # FIX the path

In [21]:
movieDf = spark.sql("SELECT * FROM iceberg.movielens.movies")
movieDf.printSchema()
movieDf.show(2)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 2 rows



                                                                                

In [22]:
ratingDf = spark.table("iceberg.movielens.ratings")
ratingDf.printSchema()
ratingDf.show(2)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
+------+-------+------+---------+
only showing top 2 rows



                                                                                

In [27]:
# aggregation with groupBy
from pyspark.sql.functions import col, desc, avg, count

# find  the most popular movies, where as rated by many users, at least movies should be rated by 100 users
# and the average rating should be at least 3.5 and above
# and sort the movies by total_ratings
mostPopularMoviesDf = ratingDf\
     .groupBy("movieId")\
     .agg(avg("rating").alias("avg_rating"), count("userId").alias("total_ratings") )\
     .sort(desc("total_ratings"))\
     .filter( (col("total_ratings") >= 100) & (col("avg_rating") >=3.5) )
    
# mostPopularMoviesDf.cache() # MEMORY

mostPopularMoviesDf.printSchema()
mostPopularMoviesDf.show(20)

root
 |-- movieId: integer (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- total_ratings: long (nullable = false)



[Stage 5:>                                                          (0 + 1) / 1]

+-------+------------------+-------------+
|movieId|        avg_rating|total_ratings|
+-------+------------------+-------------+
|    356| 4.175304878048781|          328|
|    318| 4.429022082018927|          317|
|    296| 4.221311475409836|          305|
|    593| 4.201086956521739|          276|
|   2571|  4.26007326007326|          273|
|    260|             4.246|          250|
|    480|3.7637130801687766|          237|
|    110| 4.046610169491525|          236|
|    589| 4.018099547511312|          221|
|    527| 4.259174311926605|          218|
|   2959| 4.325581395348837|          215|
|      1|3.9369158878504673|          214|
|   1196| 4.233333333333333|          210|
|     50| 4.237745098039215|          204|
|   2858| 4.073891625615763|          203|
|    150| 3.845771144278607|          201|
|     47| 4.009950248756219|          201|
|   1198| 4.226130653266332|          199|
|   4993| 4.142857142857143|          196|
|   1210| 4.137755102040816|          196|
+-------+--

                                                                                

In [24]:
mostPopularMoviesDf.explain() # print physical plan

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [total_ratings#155L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(total_ratings#155L DESC NULLS LAST, 1000), ENSURE_REQUIREMENTS, [plan_id=175]
      +- Filter (isnotnull(avg_rating#153) AND ((total_ratings#155L >= 100) AND (avg_rating#153 >= 3.5)))
         +- HashAggregate(keys=[movieId#115], functions=[avg(rating#116), count(userId#114)])
            +- Exchange hashpartitioning(movieId#115, 1000), ENSURE_REQUIREMENTS, [plan_id=171]
               +- HashAggregate(keys=[movieId#115], functions=[partial_avg(rating#116), partial_count(userId#114)])
                  +- BatchScan iceberg.movielens.ratings[userId#114, movieId#115, rating#116] iceberg.movielens.ratings (branch=null) [filters=, groupedBy=] RuntimeFilters: []




In [28]:
mostPopularMoviesDf.explain(extended = True) # print parsed logical plan, analysed locical plan, optimized logical plan, physical plan

# parsed logical plan - the code logic as we wrote.. not optimized, no schema/datatypes applied
# execution shall be from bottom up, bottom statement exuected first
# movieId#115, 115 is a internal spark reference for a column

== Parsed Logical Plan ==
'Filter (('total_ratings >= 100) AND ('avg_rating >= 3.5))
+- Sort [total_ratings#201L DESC NULLS LAST], true
   +- Aggregate [movieId#115], [movieId#115, avg(rating#116) AS avg_rating#199, count(userId#114) AS total_ratings#201L]
      +- SubqueryAlias iceberg.movielens.ratings
         +- RelationV2[userId#114, movieId#115, rating#116, timestamp#117L] iceberg.movielens.ratings iceberg.movielens.ratings

== Analyzed Logical Plan ==
movieId: int, avg_rating: double, total_ratings: bigint
Filter ((total_ratings#201L >= cast(100 as bigint)) AND (avg_rating#199 >= 3.5))
+- Sort [total_ratings#201L DESC NULLS LAST], true
   +- Aggregate [movieId#115], [movieId#115, avg(rating#116) AS avg_rating#199, count(userId#114) AS total_ratings#201L]
      +- SubqueryAlias iceberg.movielens.ratings
         +- RelationV2[userId#114, movieId#115, rating#116, timestamp#117L] iceberg.movielens.ratings iceberg.movielens.ratings

== Optimized Logical Plan ==
Sort [total_ratings#2

In [29]:
# join, inner join 
# get the movie title for the mostPopularMoviesDf
# join mostPopularMoviesDf with movieDf based on condition that mostPopularMoviesDf.movieId == movieDf.movieId

# left as big table, fact/transactional data, huge data
# right as small table, dimention data, small data set

# based on runtime, at the time of physical plans, spark will see how many movies, how much executor memory present
# based on that, it decide normal shuffle join (time consumng,expensive) or broadCast join
popularMoviesDf = mostPopularMoviesDf.join(movieDf, mostPopularMoviesDf.movieId == movieDf.movieId)\
                                     .select(movieDf.movieId, "title", "avg_rating", "total_ratings")\
                                     .sort(desc("total_ratings"))

# popularMoviesDf.cache()

popularMoviesDf.show(100)

+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|    356| Forrest Gump (1994)| 4.175304878048781|          328|
|    318|Shawshank Redempt...| 4.429022082018927|          317|
|    296| Pulp Fiction (1994)| 4.221311475409836|          305|
|    593|Silence of the La...| 4.201086956521739|          276|
|   2571|  Matrix, The (1999)|  4.26007326007326|          273|
|    260|Star Wars: Episod...|             4.246|          250|
|    480|Jurassic Park (1993)|3.7637130801687766|          237|
|    110|   Braveheart (1995)| 4.046610169491525|          236|
|    589|Terminator 2: Jud...| 4.018099547511312|          221|
|    527|Schindler's List ...| 4.259174311926605|          218|
|   2959|   Fight Club (1999)| 4.325581395348837|          215|
|      1|    Toy Story (1995)|3.9369158878504673|          214|
|   1196|Star Wars: Episod...| 4.2333333

In [30]:
popularMoviesDf.explain(extended = True)

== Parsed Logical Plan ==
'Sort ['total_ratings DESC NULLS LAST], true
+- Project [movieId#87, title#88, avg_rating#199, total_ratings#201L]
   +- Join Inner, (movieId#115 = movieId#87)
      :- Filter ((total_ratings#201L >= cast(100 as bigint)) AND (avg_rating#199 >= 3.5))
      :  +- Sort [total_ratings#201L DESC NULLS LAST], true
      :     +- Aggregate [movieId#115], [movieId#115, avg(rating#116) AS avg_rating#199, count(userId#114) AS total_ratings#201L]
      :        +- SubqueryAlias iceberg.movielens.ratings
      :           +- RelationV2[userId#114, movieId#115, rating#116, timestamp#117L] iceberg.movielens.ratings iceberg.movielens.ratings
      +- Project [movieId#87, title#88, genres#89]
         +- SubqueryAlias iceberg.movielens.movies
            +- RelationV2[movieId#87, title#88, genres#89] iceberg.movielens.movies iceberg.movielens.movies

== Analyzed Logical Plan ==
movieId: int, title: string, avg_rating: double, total_ratings: bigint
Sort [total_ratings#201L DES

In [32]:
# so far, no cache
popularMoviesDf.write\
  .mode("overwrite")\
  .format("iceberg")\
  .saveAsTable("iceberg.movielens.popular_movies1")

25/02/06 06:56:53 WARN HadoopTableOperations: Error reading version hint file gs://gks-tpch/iceberg_warehouse/movielens/popular_movies1/metadata/version-hint.text
java.io.FileNotFoundException: Item not found: 'gs://gks-tpch/iceberg_warehouse/movielens/popular_movies1/metadata/version-hint.text'. Note, it is possible that the live version is still available but the requested generation is deleted.
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:47) ~[gcs-connector-3.0.4.jar:?]
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:678) ~[gcs-connector-3.0.4.jar:?]
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:669) ~[gcs-connector-3.0.4.jar:?]
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.Goo

In [36]:
import pyspark.sql.functions as F

broadcastedMoviesDf = F.broadcast(movieDf) # explict, instead of spark optimizer doing
# broadcastedMoviesDf used instead of movieDf
tempDf = mostPopularMoviesDf.join(broadcastedMoviesDf, mostPopularMoviesDf.movieId == broadcastedMoviesDf.movieId)\
                                     .select(broadcastedMoviesDf.movieId, "title", "avg_rating", "total_ratings")\
                                     .sort(desc("total_ratings"))

tempDf.explain(extended = True)

print (F.col("movieId"), broadcastedMoviesDf.movieId, movieDf.movieId)

== Parsed Logical Plan ==
'Sort ['total_ratings DESC NULLS LAST], true
+- Project [movieId#87, title#88, avg_rating#199, total_ratings#201L]
   +- Join Inner, (movieId#115 = movieId#87)
      :- Filter ((total_ratings#201L >= cast(100 as bigint)) AND (avg_rating#199 >= 3.5))
      :  +- Sort [total_ratings#201L DESC NULLS LAST], true
      :     +- Aggregate [movieId#115], [movieId#115, avg(rating#116) AS avg_rating#199, count(userId#114) AS total_ratings#201L]
      :        +- SubqueryAlias iceberg.movielens.ratings
      :           +- RelationV2[userId#114, movieId#115, rating#116, timestamp#117L] iceberg.movielens.ratings iceberg.movielens.ratings
      +- ResolvedHint (strategy=broadcast)
         +- Project [movieId#87, title#88, genres#89]
            +- SubqueryAlias iceberg.movielens.movies
               +- RelationV2[movieId#87, title#88, genres#89] iceberg.movielens.movies iceberg.movielens.movies

== Analyzed Logical Plan ==
movieId: int, title: string, avg_rating: double

In [37]:
mostPopularMoviesDf.rdd.take(2)

                                                                                

[Row(movieId=356, avg_rating=4.175304878048781, total_ratings=328),
 Row(movieId=318, avg_rating=4.429022082018927, total_ratings=317)]

In [39]:
movieDf.rdd.take(2)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy')]

In [40]:
broadcastedMoviesDf.rdd.take(2)

25/02/06 07:25:16 WARN HintErrorLogger: A join hint (strategy=broadcast) is specified but it is not part of a join relation.


[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy')]

In [41]:
# Write to iceberg table. ACTION, all operations done for a single action
# we are reusing dataframe upto the join, then write action different from each other
"""
  read movies - 100 GB    Input/IO, 10 Sec,    .1 $ bandwidth
  read ratings - 1000 GB   Input/IO, 100 Sec, .5 $ bandwidth
  analytics on ratings group by, count, avg - Compute - 60 Minutes $$ , Shuffling....
  join - shuffle, compute - 1 minute - $
   write (iceberg)
"""

# Write to parquet table. ACTION, all operations done for a single action
"""
  read movies - 100 GB    Input/IO, 10 Sec,    .1 $ bandwidth
  read ratings - 1000 GB   Input/IO, 100 Sec, .5 $ bandwidth
  analytics on ratings group by, count, avg - Compute - 60 Minutes $$ , Shuffling....
  join - shuffle, compute - 1 minute - $
   write (format(parquet)
"""

# Write to orc table. ACTION, all operations done for a single action
"""
  read movies - 100 GB    Input/IO, 10 Sec,    .1 $ bandwidth
  read ratings - 1000 GB   Input/IO, 100 Sec, .5 $ bandwidth
  analytics on ratings group by, count, avg - Compute - 60 Minutes $$ , Shuffling....
  join - shuffle, compute - 1 minute - $
   write (format(orc)
"""



# Write to oracle jdbc table. ACTION, all operations done for a single action
"""
  read movies - 100 GB    Input/IO, 10 Sec,    .1 $ bandwidth
  read ratings - 1000 GB   Input/IO, 100 Sec, .5 $ bandwidth
  analytics on ratings group by, count, avg - Compute - 60 Minutes $$ , Shuffling....
  join - shuffle, compute - 1 minute - $
   write (format(jdbc)
"""

'\n  read movies - 100 GB    Input/IO, 10 Sec,    .1 $ bandwidth\n  read ratings - 1000 GB   Input/IO, 100 Sec, .5 $ bandwidth\n  analytics on ratings group by, count, avg - Compute - 60 Minutes $$ , Shuffling....\n  join - shuffle, compute - 1 minute - $\n   write (format(jdbc)\n'

In [None]:
# if we you see rdd, datafrmae is reused for write, or other downstream operations.. you may cache the reusable dataframe
# cache shall be stored in executor MEMORY itself

# SPARK USES MEMORY
# IF USE MEMORY FOR CACHE, further downstream operations might need memory too, we may face insufficient memory
# CACHE - Many options
# CACHE - MEMORY, DISK or MEMORY_DISK
# CACHE - REPLICAS Available for CACHE

In [43]:
popularMoviesDf.cache() # MEMORY_AND_DISK
# cache is lazy operation, until an action performed, no cache is applied
print (popularMoviesDf.is_cached) # true

True


In [44]:
# so far, NOW IT WILL CACHE popularMoviesDf
popularMoviesDf.write\
  .format("iceberg")\
  .saveAsTable("iceberg.movielens.popular_movies2")

                                                                                

In [45]:
# so far, it will use the cache
popularMoviesDf.write\
  .format("iceberg")\
  .saveAsTable("iceberg.movielens.popular_movies3") # assume you write to other format

                                                                                

In [46]:
popularMoviesDf.is_cached

True

In [47]:
popularMoviesDf.unpersist() # when dataframe cache no longer, ie you may not reuse datafrmae again
popularMoviesDf.is_cached

False