In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/commom_functions"

In [0]:
silver_folder_path

#Ingesta de archivo

In [0]:
date = "2024-12-16"

## Paso 1: Lectura de archivo CSV

In [0]:
from pyspark.sql.types import StructField, StructType, IntegerType, DoubleType, StringType
from pyspark.sql.functions import col, current_timestamp, lit

In [0]:
from pathlib import Path

In [0]:
movie_df = spark.read.option("header", "True") \
    .option("sep", ",") \
    .option("quote", '"') \
    .option("inferSchema" , "True") \
    .option("escape", "\\") \
    .option("multiLine", True) \
    .option("ignoreLeadingWhiteSpace", True) \
    .option("ignoreTrailingWhiteSpace", True) \
    .option("nullValue", "") \
    .option("mode", "PERMISSIVE") \
    .csv(f"{bronze_folder_path}/{date}/movie.csv")

## Paso 2: Seleccionar columnas requeridas

### Opcion 1

In [0]:
movies_selected_df = movie_df.select("movieId", "title", "budget", "popularity", "yearReleaseDate", "releaseDate", "revenue", "durationTime", "voteAverage", "voteCount")

### Opcion 2

In [0]:
movies_selected_df = movie_df.select(movie_df.movieId, movie_df.title, movie_df.budget, "popularity", "yearReleaseDate", "releaseDate", "revenue", "durationTime", "voteAverage", "voteCount")

### Opcion 3

In [0]:
movies_selected_df = movie_df.select(movie_df["movieId"], movie_df["title"], "budget", "popularity", "yearReleaseDate", "releaseDate", "revenue", "durationTime", "voteAverage", "voteCount")

In [0]:
Path.cwd().parent

### Opcion 4

In [0]:
movies_selected_df = movie_df.select(col("movieId"), col("title"), col("budget"), col("popularity"), col("yearReleaseDate"), col("releaseDate"), col("revenue"), col("durationTime"), col("voteAverage"), col("voteCount"))

In [0]:
display(movies_selected_df)

## Paso 3: Renombrar columnas

### Opcion 1

In [0]:
movies_renamed_df = movies_selected_df.withColumnRenamed("movieId", "movie_id") \
                    .withColumnRenamed("releaseDate", "release_date") \
                    .withColumnRenamed("yearReleaseDate", "year_release_date" ) \
                    .withColumnRenamed("durationTime", "duration_time") \
                    .withColumnRenamed("voteAverage", "vote_average") \
                    .withColumnRenamed("voteCount", "vote_count")

## Paso 4: Agregar Columnas

### Opcion 1

In [0]:
movies_final_df = add_ingestion_date(movies_renamed_df) \
                    .withColumn("env", lit("python")) \
                    .withColumn("file_date", lit(date))

### Opcion 2

## Paso 5: Escribir datos en el datalake en formato Parquet

def overwrite_partition(input_df, db_name, table_name, column_partition):
    for item_list in input_df.select(f"{column_partition}").distinct().collect():
        if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}") ):
            spark.sql(f"alter table {db_name}.{table_name} drop if exists partition ({column_partition} = '{item_list[column_partition]}')")

In [0]:
overwrite_partition(movies_final_df, "movie_silver", "movies", "file_date")

In [0]:
movies_final_df.write.mode("append").partitionBy("file_date").format("parquet").saveAsTable("movie_silver.movies")

## Paso 6: Consultar Datalake

In [0]:
%sql
select *
from movie_silver.movies

In [0]:
%sql
select file_date,count(1) registros
from movie_silver.movies
group by
    file_date;

In [0]:
dbutils.notebook.exit("Ejecucion exitosa!!!")