In [1]:
import logging
import sys
import os

# Create logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Clear any existing handlers to avoid duplicates
if logger.handlers:
    logger.handlers.clear()

# File handler - logs to file
log_path = os.path.join(os.path.dirname(os.getcwd()), 'logs', 'eda.log')
file_handler = logging.FileHandler(log_path, mode='a')
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)

# Console handler - minimal output to cell (optional)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.WARNING)  # Only show warnings/errors in notebook
console_formatter = logging.Formatter('%(levelname)s: %(message)s')
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)

# Helper function for flushing
def log_and_flush(message, level='info'):
    if level == 'info':
        logger.info(message)
    elif level == 'warning':
        logger.warning(message)
    elif level == 'error':
        logger.error(message)
    
    for handler in logger.handlers:
        handler.flush()

print("✓ Logger configured - logs will be written to logs/eda.log")

✓ Logger configured - logs will be written to logs/eda.log


In [2]:
from pyspark.sql import SparkSession
import importlib.util

# ========== LOAD CONFIG FIRST ==========
src_path = os.path.join(os.path.dirname(os.getcwd()), 'src')
config_file = os.path.join(src_path, 'config.py')

spec = importlib.util.spec_from_file_location("config", config_file)
config_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(config_module)

Config = config_module.Config
print("✓ Config loaded")

# ========== HELPER FUNCTION ==========
def log_and_flush(message):
    logger.info(message)
    for handler in logger.handlers:
        handler.flush()

# ========== STOP EXISTING SPARK ==========
try:
    spark.stop()
    log_and_flush("Stopped existing Spark session")
except:
    log_and_flush("No existing Spark session to stop")

# ========== CREATE SPARK SESSION ==========
log_and_flush(f"Creating Spark session: {Config.APP_NAME}")

spark = SparkSession.builder \
    .appName(Config.APP_NAME) \
    .config("spark.driver.memory", Config.SPARK_DRIVER_MEMORY) \
    .config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY) \
    .config("spark.executor.instances", Config.SPARK_EXECUTOR_INSTANCES) \
    .config("spark.executor.cores", Config.SPARK_EXECUTOR_CORES) \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .getOrCreate()

log_and_flush(f"Spark session created successfully (version {spark.version})")

# ========== CONFIGURE HADOOP FOR MINIO ==========
log_and_flush("Configuring Hadoop for MinIO")

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", Config.MINIO_ENDPOINT)
hadoop_conf.set("fs.s3a.access.key", Config.MINIO_ACCESS_KEY)
hadoop_conf.set("fs.s3a.secret.key", Config.MINIO_SECRET_KEY)
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

log_and_flush(f"Spark configured for environment: {Config.ENVIRONMENT}")
log_and_flush(f"Using MinIO endpoint: {Config.MINIO_ENDPOINT}")
log_and_flush(f"Reading from bucket: {Config.S3_BUCKET_NAME}")

print("\n" + "="*50)
print("✓ Spark Session Ready")
print("="*50)

# Display configuration (on its own line!)
Config.display_config()

✓ Config loaded

✓ Spark Session Ready
Current Configuration:
App Name: NYC Taxi EDA
Environment: development
MinIO Endpoint: http://minio:9000
S3 Bucket: nyc-taxi
Spark Driver Memory: 3g
Spark Executor Memory: 3g
Spark Executor Instances: 3
Spark Executor Cores: 2
Log Level: INFO
Log File: eda.log


In [3]:
from minio import Minio

logger.info("="*60)
logger.info("CONNECTING TO MINIO")
logger.info("="*60)

try:
    logger.info("Initializing MinIO client")
    logger.info(f"Endpoint: minio:9000")
    
    minio_client = Minio(
        "minio:9000",
        access_key="minioadmin",
        secret_key="minioadmin123",
        secure=False
    )
    logger.info("✓ MinIO client connected successfully")
    
    bucket_name = "nyc-taxi"
    logger.info(f"Listing objects in bucket: {bucket_name}")
    
    object_count = 0
    total_size = 0
    
    print("\n" + "="*60)
    print(f"Objects inside '{bucket_name}':")
    
    for obj in minio_client.list_objects(bucket_name, recursive=True):
        print(f" - {obj.object_name} ({obj.size} bytes)")
        object_count += 1
        total_size += obj.size
    
    logger.info(f"✓ Bucket listing complete")
    logger.info(f"  - Total objects: {object_count}")
    logger.info(f"  - Total size: {total_size:,} bytes ({total_size/(1024**3):.2f} GB)")
    
    print("="*60)
    print("NYC Taxi Parquet Upload Complete")
    print("="*60)
    
except Exception as e:
    logger.error(f"✗ Failed to connect to MinIO or list objects: {str(e)}")
    raise


Objects inside 'nyc-taxi':
 - 2023/yellow_tripdata_2023-01.parquet (47673370 bytes)
 - 2023/yellow_tripdata_2023-02.parquet (47748012 bytes)
 - 2023/yellow_tripdata_2023-03.parquet (56127762 bytes)
 - 2023/yellow_tripdata_2023-04.parquet (54222699 bytes)
 - 2023/yellow_tripdata_2023-05.parquet (58654627 bytes)
 - 2023/yellow_tripdata_2023-06.parquet (54999465 bytes)
 - 2023/yellow_tripdata_2023-07.parquet (48361828 bytes)
 - 2023/yellow_tripdata_2023-08.parquet (48152353 bytes)
 - 2023/yellow_tripdata_2023-09.parquet (47895515 bytes)
 - 2023/yellow_tripdata_2023-10.parquet (59009059 bytes)
 - 2023/yellow_tripdata_2023-11.parquet (56094653 bytes)
 - 2023/yellow_tripdata_2023-12.parquet (56804275 bytes)
 - 2024/yellow_tripdata_2024-01.parquet (49961641 bytes)
 - 2024/yellow_tripdata_2024-02.parquet (50349284 bytes)
 - 2024/yellow_tripdata_2024-03.parquet (60078280 bytes)
 - 2024/yellow_tripdata_2024-04.parquet (59133625 bytes)
 - 2024/yellow_tripdata_2024-05.parquet (62553128 bytes)
 - 

In [4]:
from pyspark.sql.functions import col

logger.info("="*60)
logger.info("READING 2023 DATA WITH SCHEMA ADJUSTMENTS")
logger.info("="*60)

try:
    # Read January separately
    logger.info("Reading January 2023 data (special schema)")
    df_jan = spark.read.parquet("s3a://nyc-taxi/2023/yellow_tripdata_2023-01.parquet")
    jan_count = df_jan.count()
    logger.info(f"✓ January data loaded: {jan_count:,} rows")
    
    logger.info("Applying schema transformations to January data")
    df_jan = df_jan \
        .withColumn("VendorID", col("VendorID").cast("integer")) \
        .withColumn("passenger_count", col("passenger_count").cast("long")) \
        .withColumn("RatecodeID", col("RatecodeID").cast("long")) \
        .withColumn("PULocationID", col("PULocationID").cast("integer")) \
        .withColumn("DOLocationID", col("DOLocationID").cast("integer")) \
        .withColumnRenamed("airport_fee", "Airport_fee")
    logger.info("✓ Schema transformations applied")
    
    # Read Feb-Dec
    logger.info("Reading February-December 2023 data")
    df_rest = spark.read.parquet(
        "s3a://nyc-taxi/2023/yellow_tripdata_2023-0[2-9].parquet",
        "s3a://nyc-taxi/2023/yellow_tripdata_2023-1[0-2].parquet"
    )
    rest_count = df_rest.count()
    logger.info(f"✓ Feb-Dec data loaded: {rest_count:,} rows")
    
    # Union them
    logger.info("Merging January with Feb-Dec data")
    df1 = df_jan.unionByName(df_rest)
    df1_count = df1.count()
    logger.info(f"✓ 2023 data merged: {df1_count:,} rows")
    
except Exception as e:
    logger.error(f"✗ Failed to read 2023 data: {str(e)}")
    raise

In [5]:
from pyspark.sql.functions import lit

logger.info("="*60)
logger.info("COMBINING DATA FROM ALL YEARS")
logger.info("="*60)

try:
    # Read 2024 data
    logger.info("Reading 2024 data")
    df2 = spark.read.parquet("s3a://nyc-taxi/2024/*.parquet")
    df2_count = df2.count()
    logger.info(f"✓ 2024 data loaded: {df2_count:,} rows")
    
    # Union 2023 and 2024
    logger.info("Merging 2023 and 2024 data")
    df_1_2 = df1.union(df2)
    df_1_2_count = df_1_2.count()
    logger.info(f"✓ 2023-2024 merged: {df_1_2_count:,} rows")
    
    # Add cbd_congestion_fee column for 2023-2024
    logger.info("Adding 'cbd_congestion_fee' column to 2023-2024 data")
    df_1_2 = df_1_2.withColumn('cbd_congestion_fee', lit(0.0))
    logger.info("✓ Column added")
    
    # Read 2025 data
    logger.info("Reading 2025 data")
    df3 = spark.read.parquet("s3a://nyc-taxi/2025/*.parquet")
    df3_count = df3.count()
    logger.info(f"✓ 2025 data loaded: {df3_count:,} rows")
    
    # Final union
    logger.info("Merging all years (2023-2025)")
    df = df_1_2.union(df3)
    final_count = df.count()
    
    logger.info("="*60)
    logger.info("✓ ALL DATA COMBINED SUCCESSFULLY")
    logger.info(f"  - 2023 rows: {df1_count:,}")
    logger.info(f"  - 2024 rows: {df2_count:,}")
    logger.info(f"  - 2025 rows: {df3_count:,}")
    logger.info(f"  - TOTAL rows: {final_count:,}")
    logger.info("="*60)
    
    final_count
    
except Exception as e:
    logger.error(f"✗ Failed to combine yearly data: {str(e)}")
    raise

In [6]:
logger.info("Optimizing DataFrame partitions (coalescing to 6 partitions)")
df = df.coalesce(6)
logger.info("✓ DataFrame coalesced to 6 partitions")

In [7]:
logger.info("="*60)
logger.info("SAVING COMBINED DATASET TO S3")
logger.info("="*60)

try:
    output_path = "s3a://nyc-taxi/Initial_DF"
    logger.info(f"Writing to: {output_path}")
    logger.info(f"Total rows to write: {final_count:,}")
    logger.info("Mode: overwrite")
    logger.info("Format: parquet")
    
    df.write \
      .mode("overwrite") \
      .parquet(output_path)
    
    logger.info("="*60)
    logger.info("✓ DATASET SAVED SUCCESSFULLY")
    logger.info(f"  - Location: {output_path}")
    logger.info(f"  - Total rows: {final_count:,}")
    logger.info(f"  - Format: Parquet")
    logger.info("="*60)
    
except Exception as e:
    logger.error(f"✗ Failed to save dataset: {str(e)}")
    raise