In [None]:
# ================================================================

# ---------------------------------------------------------------
# 1. Imports e Configurações
# ---------------------------------------------------------------
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, year, split, explode, trim, upper
from delta import configure_spark_with_delta_pip

# Configurações MinIO / Delta
MINIO_ENDPOINT = "http://minio:9000"
MINIO_ACCESS_KEY = "cursolab"
MINIO_SECRET_KEY = "cursolab"

SILVER_PATH = "s3a://trusted/movies/"
REFINED_PATH = "s3a://refined/movies/"

In [2]:
builder = (
    SparkSession.builder.appName("RefinedViewsDeltaLake")
    .config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar")
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY)
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY)
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [4]:
def read_view(name: str):
    """Lê uma visão Delta Lake da camada refined."""
    path = f"{REFINED_PATH}{name}"
    print(f"Lendo visão: {path}")
    df = spark.read.format("delta").load(path)
    df.createOrReplaceTempView(name)
    return df

In [None]:
titles_by_country = read_view("v1_titles_by_type")
titles_by_type = read_view("v2_top10_countries")
titles_by_year = read_view("v3_avg_duration")
titles_by_genre = read_view("v4_titles_by_year")
titles_by_director = read_view("v5_longest_descriptions")


In [None]:
titles_by_country.show()

In [None]:
titles_by_type.show()

In [None]:
titles_by_year.show()

In [None]:
titles_by_genre.show()

In [None]:
titles_by_director.show()