# Silver Data Transformation

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df = spark.read.format('delta')\
        .option('header', 'true')\
        .option('inferSchema', 'true')\
        .load('abfss://bronze@storagenetflixde.dfs.core.windows.net/netflix_titles')

In [0]:
display(df)

In [0]:
df = df.fillna({ 'duration_minutes': 0, 'duration_seasons': 1})

In [0]:
df = df.withColumn('duration_minutes', col('duration_minutes').cast(IntegerType()))\
        .withColumn('duration_seasons', col('duration_seasons').cast(IntegerType()))

In [0]:
df = df.withColumn('short_title', split(col('title'), ':').getItem(0))

In [0]:
df = df.withColumn('rating', split(col('rating'), '-').getItem(0))

In [0]:
df = df.withColumn('movie_type_flag', when(col('type') == 'Movie', 1)\
                                        .when(col('type') == 'TV Show', 2).otherwise(0))

In [0]:
display(df.select(col('type')).distinct())

In [0]:
display(df.count())

In [0]:
from pyspark.sql.window import Window

In [0]:
df = df.withColumn('duration_rank', dense_rank().over(Window.orderBy(col('duration_minutes').desc())))

In [0]:
df.createOrReplaceTempView('temp_view')

In [0]:
%sql
select * from temp_view

In [0]:
df_total_cnt = df.groupBy(col('type')).agg(count(col('*')).alias('count'))

In [0]:
display(df_total_cnt)

In [0]:
df.write.format('delta')\
    .mode('overwrite')\
    .option('path','abfss://silver@storagenetflixde.dfs.core.windows.net/netflix_titles')\
    .save()