In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

##### read the data from bronze (titles)

In [0]:
df=spark.read \
        .format("delta") \
        .option("header",True) \
        .option("inferschema",True) \
        .load("abfss://bronze@netflixmainstorage.dfs.core.windows.net/titles")

In [0]:
display(df)

##### replace null values

In [0]:
df=df.fillna({'duration_minutes':0,"duration_seasons":1})

In [0]:
if df is not None: df.display()

#####  changing column types


In [0]:
df=df.withColumn('duration_minutes',col("duration_minutes").cast(IntegerType())) \
    .withColumn('duration_seasons',col("duration_seasons").cast(IntegerType())) 

In [0]:
df.printSchema()

##### split title column at :

In [0]:
df = df.withColumn('shorttitle', split(col('title'), ":").getItem(0))

In [0]:
df.display()

In [0]:
df = df.withColumn('rating', split(col('rating'), "-").getItem(0))

In [0]:
df.display()

##### flag to type column

In [0]:
df=df.withColumn("type_flag",when(col('type')=='Movie',1) \
                 .when(col('type')=='TV Show',2)
     .otherwise(0))

In [0]:
df.display()

#### window functions ranking based on duration

In [0]:
from pyspark.sql.window import Window


In [0]:
df=df.withColumn('rank_duration_minutes',dense_rank().over(Window.orderBy(col("duration_minutes").desc())))

In [0]:
df.display()


#### we can also spark sql but we need to create temp table

In [0]:
df.createOrReplaceTempView("my_table")

In [0]:
df=spark.sql("""
             select *,dense_rank() over (order by duration_minutes asc) as sql_rank from my_table""")

In [0]:
df.display()

## writing df into silver 

In [0]:
df.write \
  .mode("overwrite") \
  .format("delta") \
  .option("path","abfss://silver@netflixmainstorage.dfs.core.windows.net/titles") \
  .save()


##### group by


In [0]:
df_vis=df.groupBy("type").agg(count(col("type").alias("total_count")))
df_vis.display()