In [0]:
dbutils.widgets.help()

In [0]:
dbutils.widgets.text("p_environment", "")

In [0]:
dbutils.widgets.get("p_environment")
v_environment = dbutils.widgets.get("p_environment")
print(v_environment)

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions" 

In [0]:
bronze_folder_path

Paso 1- Leer el archivo CSV usando "DataFrameReader" de spark

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

In [0]:
movie_schema = StructType( fields = [
    StructField("movieId", IntegerType(), False),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homePage", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("yearReleaseDate", StringType(), True),
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True)
] )
                          

In [0]:
movie_df = spark.read \
            .option("header", True) \
            .schema(movie_schema) \
            .csv(f"{bronze_folder_path}/movie.csv")

In [0]:
%fs
ls /mnt/moviehistoryl/bronce

In [0]:
movie_df.printSchema()

In [0]:
display(movie_df.describe())

### Selecciona solo las columnas requeridas

In [0]:
# movies_selected_df = movie_df.select("movieId","title", "budget","popularity", "yearReleaseDate","ReleaseDate", "revenue","durationTime","voteAverage","voteCount")

In [0]:
# movies_selected_df = movie_df.select(movie_df["movieId"], movie_df["title"], movie_df["budget"],movie_df["popularity"], movie_df["yearReleaseDate"],movie_df["ReleaseDate"], movie_df["revenue"],movie_df["durationTime"], movie_df["voteAverage"],movie_df["voteCount"])

In [0]:
from pyspark.sql.functions import col

In [0]:
movies_selected_df = movie_df.select(
    col("movieId"), 
    col("title"), 
    col("budget"),
    col("popularity"), 
    col("yearReleaseDate"),
    col("ReleaseDate"), 
    col("revenue"),
    col("durationTime"), 
    col("voteAverage"),
    col("voteCount").alias("vote_count")
)

### Cambiar nombre columnas segun lo requerido

In [0]:
movies_renamed_df = movies_selected_df \
.withColumnRenamed("movieId", "movie_id") \
.withColumnRenamed("yearReleaseDate", "year_release_date") \
.withColumnRenamed("releaseDate", "release_date") \
.withColumnRenamed("durationTime", "duration_time") \
.withColumnRenamed("voteAverage", "vote_average") \
.withColumnRenamed("voteCount", "vote_count") 


In [0]:
rename_dict = {
    "movieId": "movie_id",
    "yearReleaseDate": "year_release_date",
    "releaseDate": "release_date",
    "durationTime": "duration_time",
    "voteAverage": "vote_average",
    "voteCount": "vote_count"
}

movies_renamed_df = movies_selected_df
for old_name, new_name in rename_dict.items():
    movies_renamed_df = movies_renamed_df.withColumnRenamed(old_name, new_name)

In [0]:
display(movies_renamed_df)

### Agregar la columna "ingestion_date" al Dataframe

In [0]:
from pyspark.sql.functions import current_timestamp, lit


In [0]:
movies_final_df = add_ingestion_date(movies_renamed_df) \
    .withColumn("environment", lit(v_environment))

In [0]:
# movies_final_df = movies_renamed_df \
# .withColumns({"ingestion_date" : current_timestamp(), \
# "env": lit("production")})

### Escriber datos en formato parquet

In [0]:
movies_final_df.write.mode("overwrite") \
    .format("parquet").saveAsTable("movie_silver.movies")
   


In [0]:
%sql 
SELECT * FROM movie_silver.movies;

In [0]:
%fs
ls /mnt/moviehistoryl/silver/movies

In [0]:
df = spark.read.parquet("/mnt/moviehistoryl/silver/movies")

In [0]:
display(df)

In [0]:
dbutils.notebook.exit("success")