In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, minute, second, date_format, sha2, concat_ws

# -----------------------------
# DEFINE SENSITIVE VARIABLES
# -----------------------------
CATALOG_URI = "http://nessie:19120/api/v1"      # Nessie Server URI
WAREHOUSE = "s3://lakehouse/"                   # MinIO Iceberg warehouse
MINIO_ENDPOINT = "http://minio:9000"            # MinIO endpoint
MINIO_ACCESS_KEY = "admin"                      # MinIO access key
MINIO_SECRET_KEY = "password"                   # MinIO secret key
RAW_BUCKET = "lakehouse/raw/"                   # Bucket where Parquet files are stored

# MONTHS = [f"2025-{str(m).zfill(2)}" for m in range(1, 4)] # adjust the range of the files if needed

# -----------------------------
# CONFIGURE SPARK
# -----------------------------
conf = (
    pyspark.SparkConf()
    .setAppName('yellow_tripdata_app')
    .set(
        "spark.jars.packages",
        "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,"
        "software.amazon.awssdk:bundle:2.24.8,"
        "software.amazon.awssdk:url-connection-client:2.24.8,"
        "org.apache.hadoop:hadoop-aws:3.3.4"
    )
    .set("spark.sql.extensions",
         "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,"
         "org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
    .set("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.nessie.uri", CATALOG_URI)
    .set("spark.sql.catalog.nessie.ref", "main")
    .set("spark.sql.catalog.nessie.authentication.type", "NONE")
    .set("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
    .set("spark.sql.catalog.nessie.warehouse", WAREHOUSE)
    .set("spark.sql.catalog.nessie.s3.endpoint", MINIO_ENDPOINT)
    .set("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.catalog.nessie.s3.access-key", MINIO_ACCESS_KEY)
    .set("spark.sql.catalog.nessie.s3.secret-key", MINIO_SECRET_KEY)
    .set("spark.sql.catalog.nessie.s3.path-style-access", "true")
    .set("spark.sql.catalog.nessie.s3.region", "us-east-1")
    .set("spark.sql.catalog.nessie.s3.connection-ssl-enabled", "false")

    # S3A driver for Spark
    .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
    .set("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY)
    .set("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY)
    .set("spark.hadoop.fs.s3a.path.style.access", "true")
    .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .set("spark.hadoop.fs.s3a.aws.credentials.provider",
         "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
)


# Start Spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("ðŸ”¥ Spark Session Started")

# -----------------------------
# 1. CREATE ICEBERG NAMESPACE
# -----------------------------
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.nyc_bronze;")

# -----------------------------
# 2. Read Parquet files from S3
# -----------------------------
# Optionally, you can read all files and filter by month later
df = spark.read.parquet("s3a://lakehouse/raw/*.parquet")

# -----------------------------
# 3. Transform the DataFrame
# -----------------------------
df_transformed = df \
    .withColumn("pickup_date_id", date_format(col("tpep_pickup_datetime"), "yyyy-MM-dd")) \
    .withColumn("dropoff_date_id", date_format(col("tpep_dropoff_datetime"), "yyyy-MM-dd")) \
    .withColumn("pickup_time_id", hour(col("tpep_pickup_datetime"))*3600 + 
                                  minute(col("tpep_pickup_datetime"))*60 + 
                                  second(col("tpep_pickup_datetime"))) \
    .withColumn("dropoff_time_id", hour(col("tpep_dropoff_datetime"))*3600 + 
                                   minute(col("tpep_dropoff_datetime"))*60 + 
                                   second(col("tpep_dropoff_datetime"))) \
    .withColumn("trip_id", sha2(concat_ws("_", col("tpep_pickup_datetime"), col("vendorid"), col("ratecodeid")), 256))

# -----------------------------
# 4. Write month by month to Iceberg
# -----------------------------
months = df_transformed.select(date_format(col("tpep_pickup_datetime"), "yyyy-MM").alias("month")) \
                       .distinct() \
                       .collect()

for month_row in months:
    month = month_row["month"]
    print(f"Processing month: {month}")

    # Filter only rows for this month
    df_month = df_transformed.filter(date_format(col("tpep_pickup_datetime"), "yyyy-MM") == month)
    
    # Write to Iceberg table
    df_month.write \
        .format("iceberg") \
        .mode("append") \
        .option("write-format", "parquet") \
        .option("partition-spec", "months(tpep_pickup_datetime)") \
        .saveAsTable("nessie.nyc_bronze.yellow_tripdata_raw")

    print(f"âœ… Finished writing month: {month}")

# -----------------------------
# 5. Verify
# -----------------------------
spark.sql("SELECT * FROM nessie.nyc_bronze.yellow_tripdata_raw LIMIT 5").show()
