In [3]:
# ---------------------------
# Environment & Spark Init
# ---------------------------
!pip install -q pyspark duckdb prefect requests findspark

import os, json, requests
import findspark
from pyspark.sql import SparkSession

# Windows Spark stability
os.environ["HADOOP_HOME"] = os.path.join(os.getcwd(), "hadoop_home")
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["HADOOP_HOME"], "bin")

findspark.init()

spark = SparkSession.builder \
    .appName("NYC Taxi ETL") \
    .config("spark.sql.session.timeZone", "America/New_York") \
    .getOrCreate()

print("‚úÖ Spark ready")


‚úÖ Spark ready


In [4]:
from pyspark.sql.functions import (
    col, when, count, avg, coalesce, lit, explode,
    to_timestamp, to_date, hour, dayofweek, lower,
    unix_timestamp, round
)


In [5]:
BASE_DIR = os.path.join(os.getcwd(), "data")

TAXI_PARQUET = os.path.join(BASE_DIR, "raw", "taxi_parquet")
ZONE_CSV = os.path.join(BASE_DIR, "raw", "taxi_zone_lookup.csv")
WEATHER_JSON = os.path.join(BASE_DIR, "raw", "weather", "weather_nyc.json")

os.makedirs(os.path.dirname(WEATHER_JSON), exist_ok=True)

API_KEY = os.environ.get("WEATHER_API_KEY")
if not API_KEY:
    raise ValueError("‚ùå WEATHER_API_KEY missing")

print("‚úÖ Paths + API ready")


‚úÖ Paths + API ready


In [6]:
df_taxi = spark.read.parquet(TAXI_PARQUET)
df_zone = spark.read.option("header", True).csv(ZONE_CSV)
df_weather_raw = spark.read.option("multiline", "true").json(WEATHER_JSON)

print("‚úÖ Raw datasets loaded")


‚úÖ Raw datasets loaded


In [7]:
# Define a small helper function
def inspect(df, name):
    print(f"\nüîé {name}")
    print(f"Rows: {df.count()} | Columns: {len(df.columns)}")
    df.show(5, truncate=False)  # show first 5 rows without truncating

# Example usage
inspect(df_taxi, "Taxi Data")
inspect(df_zone, "Zone Lookup Data")
inspect(df_weather_raw, "Weather Raw Data")



üîé Taxi Data
Rows: 2964624 | Columns: 19
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|2       |2024-01-01 00:57:55 |2024-01-01 01:17:43  |1              |1.72         |1         |N                 |186         |79          |2   

In [10]:
df_taxi_clean = (
    df_taxi
    .withColumn("pickup_datetime", to_timestamp("tpep_pickup_datetime"))
    .withColumn("dropoff_datetime", to_timestamp("tpep_dropoff_datetime"))
    .filter(
        (col("trip_distance") > 0) &
        (col("fare_amount") > 0) &
        col("pickup_datetime").isNotNull() &
        col("dropoff_datetime").isNotNull()
    )
    .fillna({
        "passenger_count": 1,
        "tip_amount": 0.0,
        "congestion_surcharge": 0.0
    })
)

# Trip duration (ONCE)
df_taxi_clean = df_taxi_clean.withColumn(
    "trip_duration_min",
    round(
        (unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")) / 60,
        2
    )
).filter(
    (col("trip_duration_min") > 0) &
    (col("trip_duration_min") <= 24 * 60)
)

print("‚úÖ Taxi cleaned")


‚úÖ Taxi cleaned


In [31]:
print(f"Rows: {df_taxi_clean.count()} | Columns: {len(df_taxi_clean.columns)}")

Rows: 2869591 | Columns: 22


In [11]:
df_taxi_clean.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_taxi_clean.columns
]).show(truncate=False)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------+----------------+-----------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_datetime|dropoff_datetime|trip_duration_min|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------+----------------+-----------------+
|0       |0                   |0  

since the column we need(PULocationID) for the joining is not null we will fill null after all transformations,this ensures no unintentional overwriting, maintains traceability, and guarantees ready-to-use final datasets.

In [12]:
# Define a small helper function
def inspect(df, name):
    print(f"\nüîé {name}")
    print(f"Rows: {df.count()} | Columns: {len(df.columns)}")
    df.show(5, truncate=False)  # show first 5 rows without truncating

# Example usage
inspect(df_taxi_clean, "Taxi Data")



üîé Taxi Data
Rows: 2869591 | Columns: 22
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+-------------------+-----------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_datetime    |dropoff_datetime   |trip_duration_min|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+--------

In [13]:
df_zone = (
    df_zone
    .withColumnRenamed("LocationID", "PULocationID")
    .withColumnRenamed("Borough", "PU_Borough")
)

print("‚úÖ Zone lookup ready")


‚úÖ Zone lookup ready


In [14]:
df_zone.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_zone.columns
]).show(truncate=False)


+------------+----------+----+------------+
|PULocationID|PU_Borough|Zone|service_zone|
+------------+----------+----+------------+
|0           |0         |0   |0           |
+------------+----------+----+------------+



In [15]:
df_taxi_geo = (
    df_taxi_clean
    .join(df_zone, "PULocationID", "left")
    .withColumn("pickup_date", to_date("pickup_datetime"))
    .withColumn("pickup_hour", hour("pickup_datetime"))
    .withColumn("pickup_dayofweek", dayofweek("pickup_datetime"))
    .withColumn("is_weekend", dayofweek("pickup_datetime").isin([1, 7]))
)

print("‚úÖ Taxi geo + time features added")


‚úÖ Taxi geo + time features added


In [16]:
# Define a small helper function
def inspect(df, name):
    print(f"\nüîé {name}")
    print(f"Rows: {df.count()} | Columns: {len(df.columns)}")
    df.show(5, truncate=False)  # show first 5 rows without truncating

# Example usage
inspect(df_taxi_geo, "Taxi Data")


üîé Taxi Data
Rows: 2869591 | Columns: 29
+------------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+-------------------+-----------------+----------+----------------------------+------------+-----------+-----------+----------------+----------+
|PULocationID|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_datetime    |dropoff_datetime   |trip_duration_min|PU_Borough|Zone                        |service_zone|pickup_date|pickup_hour|pickup_dayofweek|is_weekend|
+------------+--------+--------------------+---------------------+---------------+------

In [17]:
df_weather = (
    df_weather_raw
    .select(explode("days").alias("day"))
    .select(
        to_date(to_timestamp(col("day.datetime"))).alias("weather_date"),
        explode(col("day.hours")).alias("hour")
    )
    .select(
        col("weather_date"),
        hour(to_timestamp(col("hour.datetime"))).alias("weather_hour"),
        col("hour.temp").alias("temperature"),
        col("hour.conditions")
    )
)

df_weather = df_weather.withColumn(
    "weather_category",
    when(lower(col("conditions")).like("%rain%"), "Rain")
    .when(lower(col("conditions")).like("%snow%"), "Snow")
    .when(lower(col("conditions")).like("%cloud%"), "Cloudy")
    .otherwise("Clear")
).withColumn(
    "weather_dayofweek", dayofweek("weather_date")
)

print("‚úÖ Weather parsed")


‚úÖ Weather parsed


In [18]:
stats = df_weather.agg(
    avg("temperature").alias("global_avg_temp")
).collect()[0]

global_avg_temp = stats["global_avg_temp"] or 10.0

weather_patterns = (
    df_weather
    .groupBy("weather_dayofweek", "weather_hour")
    .agg(avg("temperature").alias("avg_temp_dow_hour"))
    .withColumnRenamed("weather_dayofweek", "pattern_dow")
    .withColumnRenamed("weather_hour", "pattern_hour")
)

print("‚úÖ Weather fallback stats ready")


‚úÖ Weather fallback stats ready


In [19]:
df_weather.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_weather.columns
]).show(truncate=False)


+------------+------------+-----------+----------+----------------+-----------------+
|weather_date|weather_hour|temperature|conditions|weather_category|weather_dayofweek|
+------------+------------+-----------+----------+----------------+-----------------+
|0           |0           |0          |0         |0               |0                |
+------------+------------+-----------+----------+----------------+-----------------+



In [20]:
df_final = (
    df_taxi_geo
    .join(
        df_weather,
        (df_taxi_geo.pickup_date == df_weather.weather_date) &
        (df_taxi_geo.pickup_hour == df_weather.weather_hour),
        "left"
    )
    .join(
        weather_patterns,
        (df_taxi_geo.pickup_dayofweek == weather_patterns.pattern_dow) &
        (df_taxi_geo.pickup_hour == weather_patterns.pattern_hour),
        "left"
    )
    .withColumn(
        "temperature",
        coalesce(col("temperature"), col("avg_temp_dow_hour"), lit(global_avg_temp))
    )
    .withColumn(
        "weather_date",
        coalesce(col("weather_date"), col("pickup_date"))
    )
    .withColumn(
        "weather_hour",
        coalesce(col("weather_hour"), col("pickup_hour"))
    )
)

print("‚úÖ Taxi + weather joined")


‚úÖ Taxi + weather joined


In [21]:
# Define a small helper function
def inspect(df, name):
    print(f"\nüîé {name}")
    print(f"Rows: {df.count()} | Columns: {len(df.columns)}")
    df.show(5, truncate=False)  # show first 5 rows without truncating

# Example usage
inspect(df_final, "Taxi Data")



üîé Taxi Data
Rows: 2869591 | Columns: 38
+------------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+-------------------+-----------------+----------+----------------------------+------------+-----------+-----------+----------------+----------+------------+------------+-----------+----------+----------------+-----------------+-----------+------------+-----------------+
|PULocationID|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_datetime    |dropoff_datetime   |trip_duration_min|PU_Borough|Zone                        |service_zone|pickup_date|pi

In [22]:
df_final = df_final.drop(
    "avg_temp_dow_hour",
    "pattern_dow",
    "pattern_hour",
    "weather_dayofweek",
    "pickup_dayofweek"
)

df_final.printSchema()


root
 |-- PULocationID: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = false)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = false)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = false)
 |-- Airport_fee: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- trip_duration_min

In [23]:
df_final.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_final.columns
]).show(truncate=False)

+------------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------+----------------+-----------------+----------+----+------------+-----------+-----------+----------+------------+------------+-----------+----------+----------------+
|PULocationID|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_datetime|dropoff_datetime|trip_duration_min|PU_Borough|Zone|service_zone|pickup_date|pickup_hour|is_weekend|weather_date|weather_hour|temperature|conditions|weather_category|
+------------+--------+--------------------+---------------------+---------------+-------------+

In [24]:
from pyspark.sql.functions import col, coalesce, lit, when, lower

df_final = (
    df_final
    # RatecodeID ‚Üí Standard rate
    .withColumn("RatecodeID", coalesce(col("RatecodeID"), lit(1)))

    # Store & forward flag ‚Üí No
    .withColumn("store_and_fwd_flag", coalesce(col("store_and_fwd_flag"), lit("N")))

    # Airport fee ‚Üí 0.0
    .withColumn("Airport_fee", coalesce(col("Airport_fee"), lit(0.0)))

    # Weather condition fallback
    .withColumn("conditions", coalesce(col("conditions"), lit("Unknown")))

    # Weather category derived safely
    .withColumn(
        "weather_category",
        when(col("weather_category").isNotNull(), col("weather_category"))
        .when(lower(col("conditions")).like("%rain%"), "Rain")
        .when(lower(col("conditions")).like("%snow%"), "Snow")
        .when(lower(col("conditions")).like("%cloud%"), "Cloudy")
        .when(col("conditions") == "Clear", "Clear")
        .otherwise("Unknown")
    )
)


In [25]:
from pyspark.sql.functions import count, when

# List of columns to check
critical_cols = [
    "RatecodeID",
    "store_and_fwd_flag",
    "Airport_fee",
    "conditions",
    "weather_category"
]

# Show null counts
df_final.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in critical_cols
]).show(truncate=False)


+----------+------------------+-----------+----------+----------------+
|RatecodeID|store_and_fwd_flag|Airport_fee|conditions|weather_category|
+----------+------------------+-----------+----------+----------------+
|0         |0                 |0          |0         |0               |
+----------+------------------+-----------+----------+----------------+



In [26]:
df_final.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_final.columns
]).show(truncate=False)


+------------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------+----------------+-----------------+----------+----+------------+-----------+-----------+----------+------------+------------+-----------+----------+----------------+
|PULocationID|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_datetime|dropoff_datetime|trip_duration_min|PU_Borough|Zone|service_zone|pickup_date|pickup_hour|is_weekend|weather_date|weather_hour|temperature|conditions|weather_category|
+------------+--------+--------------------+---------------------+---------------+-------------+

In [27]:
# Define a small helper function
def inspect(df, name):
    print(f"\nüîé {name}")
    print(f"Rows: {df.count()} | Columns: {len(df.columns)}")
    df.show(5, truncate=False)  # show first 5 rows without truncating

# Example usage
inspect(df_final, "Taxi Data")


üîé Taxi Data
Rows: 2869591 | Columns: 33
+------------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+-------------------+-----------------+----------+----------------------------+------------+-----------+-----------+----------+------------+------------+-----------+----------+----------------+
|PULocationID|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_datetime    |dropoff_datetime   |trip_duration_min|PU_Borough|Zone                        |service_zone|pickup_date|pickup_hour|is_weekend|weather_date|weather_hour|temperature|conditions|weather_

In [None]:
import duckdb
import os
import shutil

# -----------------------------
# Paths
# -----------------------------
processed_dir = os.path.join(BASE_DIR, "processed")
os.makedirs(processed_dir, exist_ok=True)

processed_parquet = os.path.join(processed_dir, "taxi_weather.parquet")
duckdb_db_path = os.path.join(processed_dir, "taxi_weather.duckdb")

# -----------------------------
# 1Ô∏è‚É£ Save df_final as Parquet
# -----------------------------
# Optional: clean old parquet
if os.path.exists(processed_parquet):
    shutil.rmtree(processed_parquet)

df_final.write.mode("overwrite").parquet(processed_parquet)
print(f"‚úÖ Saved df_final to {processed_parquet}")

# -----------------------------
# 2Ô∏è‚É£ Ensure old DuckDB connection/file is closed
# -----------------------------
# On Windows, delete the old DB file if it exists
if os.path.exists(duckdb_db_path):
    try:
        os.remove(duckdb_db_path)
        print(f"üóë Old DuckDB file removed: {duckdb_db_path}")
    except PermissionError:
        raise PermissionError(
            f"Cannot delete {duckdb_db_path}. Make sure no other process is using it."
        )

# -----------------------------
# 3Ô∏è‚É£ Connect and sync Parquet to DuckDB
# -----------------------------
def sync_parquet_to_duckdb(parquet_path, db_path, table_name="taxi_weather"):
    """
    Load/update DuckDB table from Parquet efficiently.
    Automatically creates/replaces table for dynamic updates.
    """
    # Use context manager to safely close connection
    with duckdb.connect(database=db_path) as con:
        con.execute(f"""
            CREATE OR REPLACE TABLE {table_name} AS
            SELECT * FROM read_parquet('{parquet_path}')
        """)
        print(f"‚úÖ DuckDB table '{table_name}' synced from {parquet_path}")

        # Optional verification
        result = con.execute(f"""
            SELECT COUNT(*) AS row_count, COUNT(DISTINCT pickup_date) AS distinct_dates
            FROM {table_name}
        """).fetchall()
        print("‚úÖ DuckDB verification:", result)

        sample_rows = con.execute(f"SELECT * FROM {table_name} LIMIT 5").fetchall()
        print("Sample rows:", sample_rows)

# -----------------------------
# 4Ô∏è‚É£ Run sync
# -----------------------------
sync_parquet_to_duckdb(processed_parquet, duckdb_db_path)


‚úÖ Saved df_final to c:\Users\HP\Documents\etl-project\data\processed\taxi_weather.parquet
üóë Old DuckDB file removed: c:\Users\HP\Documents\etl-project\data\processed\taxi_weather.duckdb


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ DuckDB table 'taxi_weather' synced from c:\Users\HP\Documents\etl-project\data\processed\taxi_weather.parquet
‚úÖ DuckDB verification: [(2869591, 35)]
Sample rows: [(186, 2, datetime.datetime(2024, 1, 1, 0, 57, 55), datetime.datetime(2024, 1, 1, 1, 17, 43), 1, 1.72, 1, 'N', 79, 2, 17.7, 1.0, 0.5, 0.0, 0.0, 1.0, 22.7, 2.5, 0.0, datetime.datetime(2024, 1, 1, 5, 57, 55), datetime.datetime(2024, 1, 1, 6, 17, 43), 19.8, 'Manhattan', 'Penn Station/Madison Sq West', 'Yellow Zone', datetime.date(2024, 1, 1), 0, False, datetime.date(2024, 1, 1), 0, 6.2, 'Overcast', 'Clear'), (140, 1, datetime.datetime(2024, 1, 1, 0, 3), datetime.datetime(2024, 1, 1, 0, 9, 36), 1, 1.8, 1, 'N', 236, 1, 10.0, 3.5, 0.5, 3.75, 0.0, 1.0, 18.75, 2.5, 0.0, datetime.datetime(2024, 1, 1, 5, 3), datetime.datetime(2024, 1, 1, 5, 9, 36), 6.6, 'Manhattan', 'Lenox Hill East', 'Yellow Zone', datetime.date(2024, 1, 1), 0, False, datetime.date(2024, 1, 1), 0, 6.2, 'Overcast', 'Clear'), (236, 1, datetime.datetime(2024, 1, 1, 

In [27]:
import os
import shutil
from pyspark.sql import SparkSession

# -----------------------------
# Paths
# -----------------------------
processed_dir = os.path.join(BASE_DIR, "processed")
final_parquet_folder = os.path.join(processed_dir, "taxi_weather.parquet")
single_parquet_file = os.path.join(processed_dir, "taxi_weather_single.parquet")

# -----------------------------
# 1Ô∏è‚É£ Clean old outputs
# -----------------------------
if os.path.exists(final_parquet_folder):
    shutil.rmtree(final_parquet_folder)
if os.path.exists(single_parquet_file):
    os.remove(single_parquet_file)

# -----------------------------
# 2Ô∏è‚É£ Save df_final as single Parquet
# -----------------------------
df_final.coalesce(1).write.mode("overwrite").parquet(final_parquet_folder)
print(f"‚úÖ df_final saved as single Parquet folder: {final_parquet_folder}")

# -----------------------------
# 3Ô∏è‚É£ Move the single part file
# -----------------------------
parquet_files = [f for f in os.listdir(final_parquet_folder) if f.endswith(".parquet")]

if parquet_files:
    source_file = os.path.join(final_parquet_folder, parquet_files[0])
    shutil.move(source_file, single_parquet_file)
    shutil.rmtree(final_parquet_folder)
    print(f"‚úÖ Single Parquet file ready: {single_parquet_file}")
else:
    raise FileNotFoundError("‚ùå No Parquet file found in the folder!")

# -----------------------------
# 4Ô∏è‚É£ Verify the export
# -----------------------------
# Re-load the Parquet file to check
df_check = spark.read.parquet(single_parquet_file)

# Compare row count
if df_check.count() != df_final.count():
    raise ValueError("‚ùå Row count mismatch! Export may be incomplete.")

# Compare columns
if set(df_check.columns) != set(df_final.columns):
    raise ValueError("‚ùå Column mismatch! Some columns may be missing.")

print(f"‚úÖ Verification passed: {df_check.count()} rows, {len(df_check.columns)} columns")


‚úÖ df_final saved as single Parquet folder: c:\Users\HP\Documents\etl-project\data\processed\taxi_weather.parquet
‚úÖ Single Parquet file ready: c:\Users\HP\Documents\etl-project\data\processed\taxi_weather_single.parquet
‚úÖ Verification passed: 2869591 rows, 33 columns
