### Ingestión del archivo "movie.csv"

In [0]:
dbutils.widgets.text("p_environment","")
v_environment = dbutils.widgets.get("p_environment")

In [0]:
dbutils.widgets.text("p_file_date","2024-12-30")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

#### Paso 1 - Leer archvio CSV usando "DataFrame Reader" de Spark ####

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType

In [0]:
movie_schema = StructType(fields = [
    StructField("movieId", IntegerType(), False),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homePage", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("yearReleaseDate",IntegerType(), True),
    StructField("releaseDate",DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True)
])

In [0]:
#Inferir el esquema automáticamente ->option("inferSchema",True)
    
movie_df=spark.read\
    .option("header",True)\
    .schema(movie_schema)\
    .csv(f"{bronze_folder_path}/{v_file_date}/movie.csv")

In [0]:
display(movie_df)

#### Paso 2 - Seleccionar sólo las columnas "requeridas"

In [0]:
#movies_selected_df = movie_df.select("movieId","title","budget","popularity","yearReleaseDate","releaseDate","revenue","durationTime","voteAverage","voteCount")

In [0]:
#movies_selected_df = movie_df.select(movie_df.movieId,movie_df.title,movie_df.budget, movie_df.popularity, movie_df.yearReleaseDate,movie_df.releaseDate,movie_df.revenue,movie_df.durationTime,movie_df.voteAverage,movie_df.voteCount)

In [0]:
#movies_selected_df = movie_df.select(movie_df["movieId"],movie_df["title"],movie_df["budget"],movie_df["popularity"],movie_df["yearReleaseDate"],movie_df["releaseDate"],movie_df["revenue"],movie_df["durationTime"],movie_df["voteAverage"],movie_df["voteCount"])

In [0]:
from pyspark.sql.functions import col

In [0]:
movies_selected_df = movie_df.select(col("movieId"),col("title"),col("budget"),col("popularity"),col("yearReleaseDate"),col("releaseDate"),col("revenue"),col("durationTime"),col("voteAverage"),col("voteCount"))

#### Paso 3 - Cambiar el nombre de las columnas según lo "requerido"

In [0]:
movies_renamed_df = movies_selected_df \
    .withColumnRenamed("movieId","movie_id") \
    .withColumnRenamed("yearReleaseDate", "year_release_date") \
    .withColumnRenamed("releaseDate","release_date") \
    .withColumnRenamed("durantionTime","duration_time") \
    .withColumnRenamed("voteAverage","vote_average") \
    .withColumnRenamed("voteCount","vote_count")

In [0]:
#movies_renamed_df = movies_selected_df \
#    .withColumnsRenamed({"movieId":"movie_id", "yearReleaseDate": "year_release_date", "releaseDate": "release_date", "durantionTime": "duration_time", "voteAverage":"vote_average", "voteCount":"vote_count"})

#### Paso 4 - Agregar la columna "ingestion_date" al DataFrame

In [0]:
from pyspark.sql.functions import current_timestamp, lit

In [0]:
movies_final_df  = add_ingestion_date(movies_renamed_df) \
                .withColumn("env", lit(v_environment)) \
                .withColumn("file_date", lit(v_file_date))

                 
 #movies_final_df = movies_renamed_df.withColumn("ingestion_date", current_timestamp()) \
 #               .withColumn("env", lit("production"))

In [0]:
#movies_final_df = movies_renamed_df.withColumns({"ingestion_date" :current_timestamp(),"env": lit("production")})

#### Paso 5 - Escribir datos en el datalake en formato "Parquet"

In [0]:
# overwrite_partition(movies_final_df, "movie_silver","movies", "file_date")

In [0]:
merge_condition = 'tgt.movie_id = src.movie_id AND tgt.file_date = src.file_date'
merge_delta_lake(movies_final_df, "movie_silver", "movies", silver_folder_path, merge_condition, "file_date")

# Añadir + Particionado por file_date
#movies_final_df.write.mode("append").partitionBy("file_date").format("parquet").saveAsTable("movie_silver.movies")

# Sobreescritura + No particionado
#movies_final_df.write.mode("overwrite").parquet(f"{silver_folder_path}/movies") 

# Sobreescritura + Particionado por year_release_date
#movies_final_df.write.mode("overwrite").partitionBy("year_release_date").parquet("/mnt/moviehistoryjaag/silver/movies") #particionado

In [0]:
%sql
SELECT file_date, COUNT(1) 
FROM movie_silver.movies
GROUP BY file_date;

In [0]:
dbutils.notebook.exit("Success")