# Configuração inicial

In [0]:
import json
import requests
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

In [0]:
path = "/Volumes/workspace/default/movies_data/"
notebook_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get().split("/")[-1]

# Inicialização dos dados

In [0]:
df_titles = spark.read \
                .option("header", True) \
                .option("sep", "\t") \
                .option("nullValue", "\\N") \
                .csv(f"{path}/inbound/title.basics.tsv")

df_ratings = spark.read \
                .option("header", True) \
                .option("sep", "\t") \
                .option("nullValue", "\\N") \
                .csv(f"{path}/inbound/title.ratings.tsv")

# Unindo tabelas de notas e dados do filme

In [0]:
df_movies = df_titles.join(df_ratings, on="tconst", how="inner")

# Filtragem dos dados

## Manter apenas filmes e remoção de valores nulos

In [0]:
df_movies = df_movies.filter(
            (col("titleType") == "movie")
            & (col("runtimeMinutes").isNotNull())
            & (col("genres").isNotNull()))

## Remoção de colunas desnecessárias

In [0]:
df_movies = df_movies.drop("titleType", "endYear")

# Padronização do schema das colunas

## Renomeação dos nomes de colunas

In [0]:
df_movies = df_movies.withColumnsRenamed({
    "tconst": "id_tconst",
    "primaryTitle": "nm_primary_title",
    "originalTitle": "nm_original_title",
    "isAdult": "bool_is_adult",
    "startYear": "dt_start_year",
    "runtimeMinutes": "nr_runtime_minutes",
    "genres": "nm_genres",
    "averageRating": "nr_average_rating",
    "numVotes": "nr_num_votes"
})

## Cast da tipagem das colunas

In [0]:
df_movies = (
    df_movies
    .withColumn("bool_is_adult", col("bool_is_adult").cast("boolean"))
    .withColumn("dt_start_year", col("dt_start_year").cast("int"))
    .withColumn("nr_runtime_minutes", col("nr_runtime_minutes").cast("int"))
    .withColumn("nr_average_rating", col("nr_average_rating").cast("double"))
    .withColumn("nr_num_votes", col("nr_num_votes").cast("int"))
)

# Salvando os dados como Delta table

In [0]:
df_movies.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable(f"workspace.default.{notebook_name}")
