## 1. Instale as bibliotecas necessárias (se ainda não instalou)

In [None]:
!pip install pandas pyarrow pyspark

## 2. Importe as bibliotecas

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.0,org.apache.iceberg:iceberg-aws-bundle:1.7.0,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4 pyspark-shell'

## 3. Definindo Variáveis Globais

In [None]:
CATALOG_URL = "http://lakekeeper:8181/catalog"
CATALOG = "trusted"
WAREHOUSE = "trusted"
MINIO_ENDPOINT = "http://minio:9000"
MINIO_ACCESS_KEY = "4PRJYFLGzQYTnOJGH1gA"
MINIO_SECRET_KEY = "ovBkCsqh2cXNkyoteCzQMV5JWCUk5tHfsG1GwYbD"
CHECKPOINT_LOCATION = "s3a://checkpoint/popular_critics"
KAFKA_BOOTSTRAP_SERVERS = 'kafka:9092'
KAFKA_TOPIC = "popular_critics"

## 4. Definindo oSchema dos dados de críticas populares

In [None]:
POPULAR_CRITICS_SCHEMA = StructType([
        StructField("name", StringType(), False),
        StructField("film", StringType(), False),
        StructField("rating", DoubleType(), False),
        StructField("review", StringType(), False),
        StructField("ingestion_timestamp", TimestampType(), False)
    ])

## 5.Configuração do Spark

In [None]:
config = {
    "spark.sql.defaultCatalog": "trusted",
    f"spark.sql.catalog.{CATALOG}": "org.apache.iceberg.spark.SparkCatalog",
    f"spark.sql.catalog.{CATALOG}.type": "rest",
    f"spark.sql.catalog.{CATALOG}.uri": CATALOG_URL,
    f"spark.sql.catalog.{CATALOG}.warehouse": WAREHOUSE,
    "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.catalog.iceberg_catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "spark.sql.catalog.iceberg_catalog.s3.endpoint": MINIO_ENDPOINT,
    "spark.sql.catalog.iceberg_catalog.s3.path-style-access": "true",
    "spark.hadoop.fs.s3a.endpoint": MINIO_ENDPOINT,
    "spark.hadoop.fs.s3a.access.key": MINIO_ACCESS_KEY,
    "spark.hadoop.fs.s3a.secret.key": MINIO_SECRET_KEY,
    "spark.hadoop.fs.s3a.path.style.access": "true",
    "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "spark.hadoop.fs.s3a.connection.ssl.enabled": "false",
    "spark.hadoop.fs.s3a.endpoint.region": "local-01",
    "spark.sql.streaming.checkpointLocation": CHECKPOINT_LOCATION,
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.shuffle.partitions": "4"
}

spark_config = SparkConf().setMaster('local').setAppName("PopularCriticsStreamingIngestion")
for k, v in config.items():
    spark_config = spark_config.set(k, v)

spark = SparkSession.builder.config(conf=spark_config).getOrCreate()

spark.sql(f"USE {CATALOG}")

## 6. Criar a tabela Iceberg (se ela não existir)

In [None]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS pagila_db.popular_critics (
        name STRING,
        film STRING,
        rating DOUBLE,
        review STRING,
        ingestion_timestamp TIMESTAMP
    )
    USING iceberg
    """)

## 7. Ler dados do tópico Kafka em modo streaming.

In [None]:
kafka_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
        .option("subscribe", KAFKA_TOPIC) \
        .option("startingOffsets", "earliest") \
        .option("failOnDataLoss", "false") \
        .load()

## 8. Adicionando coluna de timestamp aos dados

In [None]:
parsed_df = kafka_df.select(
from_json(col("value").cast("string"), POPULAR_CRITICS_SCHEMA).alias("data")
).select("data.*")

transformed_df = parsed_df.withColumn("ingestion_timestamp", current_timestamp())

## 9. Escrever os dados na tabela Iceberg

In [None]:
query = transformed_df \
        .writeStream \
        .format("iceberg") \
        .outputMode("append") \
        .trigger(processingTime="10 seconds") \
        .option("fanout-enabled", "true") \
        .toTable("pagila_db.popular_critics")

## Inicie e aguarde o streaming

In [None]:
streaming_query = query  # query já é um StreamingQuery
streaming_query.awaitTermination()

In [None]:
# Debug: Verifique se há dados chegando do Kafka e se o schema está correto
kafka_batch_df = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()
print("Kafka batch count:", kafka_batch_df.count())
kafka_batch_df.show(5)
parsed_batch_df = kafka_batch_df.select(from_json(col("value").cast("string"), POPULAR_CRITICS_SCHEMA).alias("data")).select("data.*")
parsed_batch_df.show(5)
print("Schema do DataFrame lido do Kafka:")
parsed_batch_df.printSchema()