In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [0]:
dbutils.widgets.text("catalogo", "catalog_dev")
dbutils.widgets.text("esquema_source", "bronze")
dbutils.widgets.text("esquema_sink", "silver")

In [0]:
catalogo = dbutils.widgets.get("catalogo")
esquema_source = dbutils.widgets.get("esquema_source")
esquema_sink = dbutils.widgets.get("esquema_sink")

In [0]:
%sql
DROP TABLE IF EXISTS catalog_dev.silver.movies_transformed

In [0]:
def userscore_categoria(score):
    if score >= 8:
        return "Excelente"
    elif 5 <= score < 8:
        return "Buena"
    else:
        return "Regular"

In [0]:
score_udf = F.udf(userscore_categoria, StringType())

In [0]:
df_movie = spark.table(f"{catalogo}.{esquema_source}.movies")
df_filmdetails = spark.table(f"{catalogo}.{esquema_source}.filmdetails")
df_posterpath = spark.table(f"{catalogo}.{esquema_source}.posterpath")
#df_races = spark.table(f"{catalogo}.{esquema_source}.races").withColumnRenamed("name","name_race")

In [0]:
df_movie = df_movie.dropna(how="all")\
                        .filter(col("id_pelicula").isNotNull() & col("titulo").isNotNull())

df_filmdetails = df_filmdetails.dropna(how="all")\
                    .filter((col("id_pelicula").isNotNull()) & (col("actores")).isNotNull())

In [0]:
df_movie.show()

In [0]:
df_movie = df_movie.withColumn("userscore_categoria", score_udf("record_usuario"))

In [0]:
df_movie.show()

In [0]:
df_joined_1 = df_movie.alias("x").join(df_filmdetails.alias("y"), col("x.id_pelicula") == col("y.id_pelicula"), "inner")\
.select(col("x.*") ,col("y.director"),col("y.actores"),col("y.presupuesto_usd"),col("y.ingresos_usd"))

In [0]:
df_joined_1.show()

In [0]:
df_joined_2 = df_joined_1.alias("x").join(df_posterpath.alias("y"), col("x.id_pelicula") == col("y.id_pelicula"), "inner")\
.select(col("x.*"),col("y.imagen_poster"),col("y.imagen_fondo"))

In [0]:
df_joined_2.show()

In [0]:
df_movies_1 = df_joined_2.withColumn(
    "anios_lanzamiento", 
    F.year(F.current_date()) - F.year(col("fecha_lanzamiento"))
)

In [0]:
df_movies_2 = df_movies_1.withColumn(
    "anio", 
    F.year(col("fecha_lanzamiento"))
)

In [0]:
df_movies_3 = df_movies_2.withColumn(
    "duraciontotal_min", 
    col("duracion_hora") * 60 + col("duracion_minuto")
)

In [0]:
df_movies_with_ganancia = df_movies_3.withColumn(
    "ganancia_usd",
    (df_movies_1["ingresos_usd"] - df_movies_1["presupuesto_usd"]).cast(IntegerType())
)

In [0]:
df_movies_with_ganancia.show()

In [0]:
df_movies_with_ganancia.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_sink}.movies_transformed")