In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

spark = (
    SparkSession.builder
        .appName("SilverToGold")
        .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.1")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()
)

silver_path = "hdfs://master:8020/delta/silver/movies"
df_silver = spark.read.format("delta").load(silver_path)

df_gold = (
    df_silver
    .withColumn("Revenue_Per_Minute",
                (col("Revenue_Millions") * 1_000_000) / col("Runtime_Minutes"))
    .withColumn("Revenue_Category",
                when(col("Revenue_Millions") >= 500, "Blockbuster")
                .when(col("Revenue_Millions") >= 100, "Hit")
                .when(col("Revenue_Millions") >= 10, "Average")
                .otherwise("Flop"))
)

gold_path = "hdfs://master:8020/delta/gold/movies"

df_gold.write.format("delta") \
    .mode("overwrite") \
    .save(gold_path)

print(f" Gold Layer saved to {gold_path}")


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jupyter/.ivy2/cache
The jars for the packages stored in: /home/jupyter/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-562b408f-a780-4677-abf4-4e9f70b2eb8c;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.1 in central
	found io.delta#delta-storage;3.2.1 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 149ms :: artifacts dl 10ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.1 from central in [default]
	io.delta#delta-storage;3.2.1 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |  

 Gold Layer saved to hdfs://master:8020/delta/gold/movies


In [2]:
df_gold.printSchema()

root
 |-- Actors: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Metascore: integer (nullable = true)
 |-- Revenue_Millions: double (nullable = true)
 |-- Runtime_Minutes: integer (nullable = true)
 |-- Votes: integer (nullable = true)
 |-- _id: string (nullable = true)
 |-- _rev: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- language: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- Revenue_Per_Minute: double (nullable = true)
 |-- Revenue_Category: string (nullable = false)

