In [1]:
# S3 Configuration Cell - Add this to your Jupyter notebook

import boto3
import pandas as pd

# Install required packages if needed (uncomment if needed)
# !pip install boto3 s3fs pyarrow

# S3 Configuration
AWS_PROFILE = "data228"
BUCKET_NAME = "data228-bigdata-nyc"
# STAGING_PREFIX = "staging/"
STAGING_PREFIX = "staging/2025/"

# Create boto3 session with profile
session = boto3.Session(profile_name=AWS_PROFILE)
s3_client = session.client('s3')
s3_resource = session.resource('s3')

# Test connection
try:
    response = s3_client.head_bucket(Bucket=BUCKET_NAME)
    print(f"‚úÖ Successfully connected to S3!")
    print(f"   Bucket: {BUCKET_NAME}")
    print(f"   Profile: {AWS_PROFILE}")
except Exception as e:
    print(f"‚ùå Error: {e}")

# List files in staging
bucket = s3_resource.Bucket(BUCKET_NAME)
parquet_files = [
    obj.key 
    for obj in bucket.objects.filter(Prefix=STAGING_PREFIX)
    if obj.key.endswith(".parquet")
]

print(f"\nüìÅ Found {len(parquet_files)} parquet files in staging/")
print("Sample files:")
for f in parquet_files[:5]:
    print(f"   - {f}")

‚úÖ Successfully connected to S3!
   Bucket: data228-bigdata-nyc
   Profile: data228

üìÅ Found 8 parquet files in staging/
Sample files:
   - staging/2025/fhv_tripdata_2025-03.parquet
   - staging/2025/fhv_tripdata_2025-04.parquet
   - staging/2025/fhvhv_tripdata_2025-01.parquet
   - staging/2025/fhvhv_tripdata_2025-03.parquet
   - staging/2025/green_tripdata_2025-05.parquet


In [3]:
# Inspect schema for 2025_06 Green Taxi dataset

import pandas as pd

target_file = "staging/2025/green_tripdata_2025-06.parquet"
s3_path = f"s3://{BUCKET_NAME}/{target_file}"
print(f"\nüîç Reading file: {s3_path}")

try:
    # Read Parquet file directly from S3
    df = pd.read_parquet(s3_path, storage_options={"profile": AWS_PROFILE})

    # Show schema (columns and types)
    print("\nüìã Columns and Data Types:")
    print(df.dtypes)

    # Optional: preview first few rows
    display(df.head())

except Exception as e:
    print(f"‚ùå Error reading file: {e}")



üîç Reading file: s3://data228-bigdata-nyc/staging/2025/green_tripdata_2025-06.parquet

üìã Columns and Data Types:
VendorID                          int32
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag               object
RatecodeID                      float64
PULocationID                      int32
DOLocationID                      int32
passenger_count                 float64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
payment_type                    float64
trip_type                       float64
congestion_surcharge            float64
cbd_congestion_fee              float64
dtype: object


Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,cbd_congestion_fee
0,2,2025-06-01 00:33:43,2025-06-01 01:04:33,N,2.0,74,132,1.0,19.6,70.0,0.0,0.5,19.61,6.94,1.0,98.05,1.0,1.0,0.0,0.0
1,2,2025-06-01 00:07:45,2025-06-01 00:14:52,N,1.0,75,74,2.0,1.37,9.3,1.0,0.5,0.0,0.0,1.0,11.8,2.0,1.0,0.0,0.0
2,2,2025-06-01 00:24:07,2025-06-01 00:48:24,N,1.0,83,83,1.0,4.11,25.4,1.0,0.5,0.0,0.0,1.0,27.9,2.0,1.0,0.0,0.0
3,2,2025-06-01 00:00:14,2025-06-01 00:08:29,N,1.0,97,49,1.0,1.29,9.3,1.0,0.5,2.36,0.0,1.0,14.16,1.0,1.0,0.0,0.0
4,2,2025-06-01 00:31:15,2025-06-01 00:43:35,N,1.0,66,25,1.0,1.97,13.5,1.0,0.5,0.0,0.0,1.0,16.0,1.0,1.0,0.0,0.0


In [None]:
#!/usr/bin/env python3
"""
Create Bloom Filter from S3 Parquet Files (All Years Except Subfolder 2025/)
------------------------------------------------------------------------
This script builds a Bloom filter for all NYC taxi trip data in:
    s3://data228-bigdata-nyc/staging/
excluding any files under:
    s3://data228-bigdata-nyc/staging/2025/
"""

import os
import s3fs
import pyarrow.parquet as pq
import pandas as pd
from pybloom_live import BloomFilter
import pickle
from datetime import datetime

# -----------------------------------------------------------------------
# CONFIGURATION
# -----------------------------------------------------------------------
AWS_PROFILE = "data228"
BUCKET_NAME = "data228-bigdata-nyc"
INPUT_PREFIX = "staging/"     # base folder
EXCLUDE_PREFIX = "staging/2025/"  # subfolder to skip
OUTPUT_BLOOM_PATH = "bloom_filter/bloom_all_years.pkl"

FALSE_POSITIVE_RATE = 0.01
ESTIMATED_TRIPS = 400_000_000  # increased to safely hold historical + 2025 trips
BATCH_SIZE = 2_000_000         # process rows in chunks

os.environ["AWS_PROFILE"] = AWS_PROFILE

print("üöÄ Creating Bloom Filter for all NYC Taxi data (excluding /2025/)")
print(f"Bucket: s3://{BUCKET_NAME}/{INPUT_PREFIX}")
print(f"Exclude: s3://{BUCKET_NAME}/{EXCLUDE_PREFIX}")
print(f"Output Bloom Filter: s3://{BUCKET_NAME}/{OUTPUT_BLOOM_PATH}\n")

# -----------------------------------------------------------------------
# INITIALIZE FILESYSTEM AND BLOOM FILTER
# -----------------------------------------------------------------------
fs = s3fs.S3FileSystem(profile=AWS_PROFILE)
bloom = BloomFilter(capacity=ESTIMATED_TRIPS, error_rate=FALSE_POSITIVE_RATE)
print(f"‚úÖ Initialized Bloom Filter (capacity={ESTIMATED_TRIPS:,}, FPR={FALSE_POSITIVE_RATE:.2%})")

# -----------------------------------------------------------------------
# HELPER: BUILD trip_id STRING
# -----------------------------------------------------------------------
def build_trip_id(df: pd.DataFrame):
    pickup = next((c for c in ["tpep_pickup_datetime", "lpep_pickup_datetime", "pickup_datetime"] if c in df.columns), None)
    dropoff = next((c for c in ["tpep_dropoff_datetime", "lpep_dropoff_datetime", "dropoff_datetime", "dropOff_datetime"] if c in df.columns), None)
    puloc = next((c for c in ["PULocationID", "PUlocationID"] if c in df.columns), None)
    doloc = next((c for c in ["DOLocationID", "DOlocationID"] if c in df.columns), None)
    vendor = "VendorID" if "VendorID" in df.columns else None

    parts = []
    if vendor: parts.append(df[vendor].astype(str))
    if pickup: parts.append(df[pickup].astype(str))
    if dropoff: parts.append(df[dropoff].astype(str))
    if puloc: parts.append(df[puloc].astype(str))
    if doloc: parts.append(df[doloc].astype(str))

    return pd.Series(["_".join(x) for x in zip(*parts)], index=df.index)

# -----------------------------------------------------------------------
# GET ALL PARQUET FILES (EXCLUDING SUBFOLDER)
# -----------------------------------------------------------------------
print("\nüìÇ Listing all Parquet files from S3...")

all_files = fs.glob(f"{BUCKET_NAME}/{INPUT_PREFIX}*.parquet")
excluded_files = fs.glob(f"{BUCKET_NAME}/{EXCLUDE_PREFIX}*.parquet")

# Filter out excluded files
files = [f"s3://{f}" for f in all_files if f not in excluded_files]
print(f"‚úÖ Found {len(files)} parquet files (excluded {len(excluded_files)} from /2025/)\n")

# -----------------------------------------------------------------------
# PROCESS FILES AND POPULATE BLOOM FILTER
# -----------------------------------------------------------------------
total_rows = 0
added_rows = 0

for i, path in enumerate(files, 1):
    file_name = os.path.basename(path)
    print(f"üì¶ [{i}/{len(files)}] Processing {file_name} ...")

    try:
        pq_file = pq.ParquetFile(path, filesystem=fs)
        for batch in pq_file.iter_batches(batch_size=BATCH_SIZE):
            df = batch.to_pandas()
            trip_ids = build_trip_id(df)

            for tid in trip_ids:
                bloom.add(tid)

            total_rows += len(df)
            added_rows += len(df)
            print(f"   ‚Üí Added {len(df):,} rows (Total: {total_rows:,})")

    except Exception as e:
        print(f"‚ö†Ô∏è Skipped {path}: {e}")
        continue

print("\n‚úÖ Finished populating Bloom filter.")
print(f"   Total processed rows: {total_rows:,}")
print(f"   Current fill ratio: {bloom.count / bloom.capacity:.2%}\n")

# -----------------------------------------------------------------------
# SAVE BLOOM FILTER (LOCAL + S3)
# -----------------------------------------------------------------------
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
local_file = f"bloom_all_years_{timestamp}.pkl"

with open(local_file, "wb") as f:
    pickle.dump(bloom, f)
print(f"üíæ Saved locally: {local_file}")

# Upload to S3
fs.put(local_file, f"{BUCKET_NAME}/{OUTPUT_BLOOM_PATH}")
print(f"‚òÅÔ∏è Uploaded Bloom Filter to s3://{BUCKET_NAME}/{OUTPUT_BLOOM_PATH}")

print("\nüèÅ Bloom Filter creation complete!")


üöÄ Creating Bloom Filter for all NYC Taxi data (excluding /2025/)
Bucket: s3://data228-bigdata-nyc/staging/
Exclude: s3://data228-bigdata-nyc/staging/2025/
Output Bloom Filter: s3://data228-bigdata-nyc/bloom_filter/bloom_all_years.pkl

‚úÖ Initialized Bloom Filter (capacity=250,000,000, FPR=1.00%)

üìÇ Listing all Parquet files from S3...
‚úÖ Found 91 parquet files (excluded 8 from /2025/)

üì¶ [1/91] Processing fhv_tripdata_2015-02.parquet ...
   ‚Üí Added 2,000,000 rows (Total: 2,000,000)
   ‚Üí Added 1,053,183 rows (Total: 3,053,183)
üì¶ [2/91] Processing fhv_tripdata_2015-12.parquet ...
   ‚Üí Added 2,000,000 rows (Total: 5,053,183)
   ‚Üí Added 2,000,000 rows (Total: 7,053,183)
   ‚Üí Added 2,000,000 rows (Total: 9,053,183)
   ‚Üí Added 2,000,000 rows (Total: 11,053,183)
   ‚Üí Added 888,809 rows (Total: 11,941,992)
üì¶ [3/91] Processing fhv_tripdata_2019-02.parquet ...
   ‚Üí Added 1,707,650 rows (Total: 13,649,642)
üì¶ [4/91] Processing fhv_tripdata_2019-12.parquet ...
  

## bloom Filter check

In [4]:
import pickle
import s3fs

# Configuration
AWS_PROFILE = "data228"
BUCKET_NAME = "data228-bigdata-nyc"
BLOOM_PATH  = "bloom_filter/bloom_all_years.pkl"

# Load from S3 using s3fs
fs = s3fs.S3FileSystem(profile=AWS_PROFILE)

print(f"üì• Loading Bloom filter from s3://{BUCKET_NAME}/{BLOOM_PATH} ...")

with fs.open(f"{BUCKET_NAME}/{BLOOM_PATH}", "rb") as f:
    bloom = pickle.load(f)

print("‚úÖ Bloom filter loaded successfully!\n")

# --- Inspect key properties ---
print("üìä Bloom Filter Summary:")
print(f"   ‚Üí Capacity:         {bloom.capacity:,}")
print(f"   ‚Üí Elements stored:  {bloom.count:,}")
print(f"   ‚Üí Fill ratio:       {bloom.count / bloom.capacity:.4%}")
print(f"   ‚Üí Error rate:       {bloom.error_rate * 100:.2f}%")

# --- Optional: check membership manually ---
print("\nüîç Membership test (example):")
test_ids = [
    "2024-06-01 10:00:00_2024-06-01 10:45:00_132_256",
    "random_trip_not_existing"
]
for tid in test_ids:
    print(f"   {tid} ‚Üí {'Possibly in filter' if tid in bloom else 'Definitely not in filter'}")

# --- Optional: show internal details (bit array stats) ---
print("\nüß† Internal representation:")
print(f"   Bit array length:   {len(bloom.bitarray):,}")
print(f"   Bits set to 1:      {bloom.bitarray.count(1):,}")
print(f"   Bits still 0:       {len(bloom.bitarray) - bloom.bitarray.count(1):,}")


üì• Loading Bloom filter from s3://data228-bigdata-nyc/bloom_filter/bloom_all_years.pkl ...


KeyboardInterrupt: 

## Creating streaming and apllyong bloom filter

In [2]:
import os, gc, time, pickle
import numpy as np
import pandas as pd
import pyarrow.dataset as ds
import s3fs
from pyspark.sql import SparkSession
from pybloom_live import BloomFilter

# -----------------------------------------------------------------------
# CONFIGURATION
# -----------------------------------------------------------------------
AWS_PROFILE   = "data228"
BUCKET_NAME   = "data228-bigdata-nyc"
STAGING_PREFIX = "staging/2025/"     # STREAMING DATA LOCATION
OUTPUT_PREFIX = "dedup/2025_stream_dedup.parquet"
BLOOM_PATH = "bloom_filter/bloom_all_years.pkl"  # Historical Bloom filter

os.environ["AWS_PROFILE"] = AWS_PROFILE

# -----------------------------------------------------------------------
# LOAD BLOOM FILTER FROM S3
# -----------------------------------------------------------------------
fs = s3fs.S3FileSystem(profile=AWS_PROFILE)
print(f"üì• Loading Bloom filter from s3://{BUCKET_NAME}/{BLOOM_PATH} ...")

with fs.open(f"{BUCKET_NAME}/{BLOOM_PATH}", "rb") as f:
    bloom = pickle.load(f)

print("‚úÖ Bloom filter loaded!")
print(f"   Capacity: {bloom.capacity:,}")
print(f"   Existing elements: {bloom.count:,}")
print(f"   Fill ratio: {bloom.count / bloom.capacity:.2%}\n")

# -----------------------------------------------------------------------
# INITIALIZE SPARK (for writing deduplicated data)
# -----------------------------------------------------------------------
spark = (
    SparkSession.builder
    .appName("StreamDedup-Bloom-2025")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","16")
    .config("spark.driver.memory","8g")
    .config("spark.executor.memory","8g")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

# -----------------------------------------------------------------------
# DISCOVER 2025 FILES FROM S3
# -----------------------------------------------------------------------
print(f"üìÇ Scanning s3://{BUCKET_NAME}/{STAGING_PREFIX} ...")
files = fs.glob(f"{BUCKET_NAME}/{STAGING_PREFIX}*.parquet")
print(f"‚úÖ Found {len(files)} 2025 files.\n")

# -----------------------------------------------------------------------
# HELPER: BUILD TRIP_ID (vectorized)
# -----------------------------------------------------------------------
def build_trip_id_vectorized(df: pd.DataFrame):
    cols = df.columns
    pickup = next((c for c in ["tpep_pickup_datetime","lpep_pickup_datetime","pickup_datetime"] if c in cols), None)
    dropoff = next((c for c in ["tpep_dropoff_datetime","lpep_dropoff_datetime","dropoff_datetime","dropOff_datetime"] if c in cols), None)
    puloc = next((c for c in ["PULocationID","PUlocationID"] if c in cols), None)
    doloc = next((c for c in ["DOLocationID","DOlocationID"] if c in cols), None)
    
    parts = []
    if pickup:  parts.append(df[pickup].astype(str))
    if dropoff: parts.append(df[dropoff].astype(str))
    if puloc:   parts.append(df[puloc].astype(str))
    if doloc:   parts.append(df[doloc].astype(str))
    return pd.Series(["_".join(x) for x in zip(*parts)], index=df.index)

# -----------------------------------------------------------------------
# STREAM THROUGH 2025 FILES
# -----------------------------------------------------------------------
local_tmp = "/tmp/stream_2025_dedup"
os.system(f"rm -rf {local_tmp}")
total_rows, unique_rows, dup_rows = 0, 0, 0
batch_no = 0

start = time.time()
for fpath in files:
    batch_no += 1
    fname = fpath.split("/")[-1]
    print(f"\nüì¶ [{batch_no}/{len(files)}] Processing {fname} ...")

    df = pd.read_parquet(f"s3://{fpath}", storage_options={"profile": AWS_PROFILE})
    trip_ids = build_trip_id_vectorized(df)

    # Membership test (True if already in Bloom ‚Üí duplicate)
    seen_mask = np.fromiter((tid in bloom for tid in trip_ids), bool, len(trip_ids))
    keep_mask = ~seen_mask
    unique_df = df.loc[keep_mask].copy()

    # Add new unique trip IDs to Bloom filter (protect against capacity overflow)
    if bloom.count < bloom.capacity:
        for tid in trip_ids[keep_mask]:
            try:
                bloom.add(tid)
            except IndexError:
                print("   ‚ö†Ô∏è Bloom filter reached capacity; stopping further additions.")
                break
    else:
        print("   ‚ö†Ô∏è Bloom filter already at capacity; skipping additions for this batch.")

    total_rows += len(df)
    unique_rows += len(unique_df)
    dup_rows += seen_mask.sum()

    print(f"   Rows: {len(df):,} | Unique: {len(unique_df):,} | Dups: {seen_mask.sum():,}")
    
    # Write unique batch to local parquet (repartition to avoid huge tasks / OOM)
    if not unique_df.empty:
        sdf = spark.createDataFrame(unique_df).repartition(200)
        sdf.write.mode("append").parquet(local_tmp)
        del sdf

    del df, trip_ids, unique_df, seen_mask, keep_mask
    gc.collect()

print("\n‚úÖ STREAMING DEDUP COMPLETE")
print(f"   Total processed: {total_rows:,}")
print(f"   Unique kept:     {unique_rows:,}")
print(f"   Duplicates:      {dup_rows:,} ({dup_rows/total_rows*100:.2f}%)")
print(f"   Time elapsed:    {time.time()-start:.2f}s")

# -----------------------------------------------------------------------
# WRITE MERGED RESULT BACK TO S3
# -----------------------------------------------------------------------
output_s3 = f"s3://{BUCKET_NAME}/{OUTPUT_PREFIX}"
print(f"\nüíæ Writing deduplicated data to {output_s3} ...")

df_final = spark.read.parquet(local_tmp)
df_final.write.mode("overwrite").option("compression","snappy").parquet(output_s3)

print(f"‚úÖ Wrote deduplicated stream data to: {output_s3}")

# -----------------------------------------------------------------------
# SAVE UPDATED BLOOM FILTER (now includes 2025 trips)
# -----------------------------------------------------------------------
updated_path = f"bloom_filter/bloom_all_years_plus2025.pkl"
with fs.open(f"{BUCKET_NAME}/{updated_path}", "wb") as f:
    pickle.dump(bloom, f)
print(f"‚úÖ Updated Bloom filter uploaded to s3://{BUCKET_NAME}/{updated_path}")

spark.stop()
print("üèÅ Spark session stopped")


#check stream

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("check-dedup-2025").getOrCreate()

df = spark.read.parquet("s3a://data228-bigdata-nyc/dedup/2025_stream_dedup.parquet")
print(df.count())
df.show(5)

# spark.stop()

25/11/18 02:06:33 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3a://data228-bigdata-nyc/dedup/2025_stream_dedup.parquet.
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2737)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3569)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3612)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:172)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3716)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3667)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:55)
	at org.apache.spark.sql.execution.datasou

Py4JJavaError: An error occurred while calling o35.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2737)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3569)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3612)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:172)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3716)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3667)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:777)
	at scala.collection.immutable.List.map(List.scala:247)
	at scala.collection.immutable.List.map(List.scala:79)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:775)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:575)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:419)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$2(ResolveDataSource.scala:61)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:61)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:45)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:139)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:139)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:131)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:112)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:111)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:45)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:43)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
	at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
	at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
	at scala.collection.immutable.List.foldLeft(List.scala:79)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:231)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:231)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:340)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:336)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:234)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:336)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:299)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:190)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:121)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:80)
	at org.apache.spark.sql.classic.Dataset$.$anonfun$ofRows$1(Dataset.scala:115)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.classic.Dataset$.ofRows(Dataset.scala:113)
	at org.apache.spark.sql.classic.DataFrameReader.load(DataFrameReader.scala:109)
	at org.apache.spark.sql.classic.DataFrameReader.load(DataFrameReader.scala:58)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:457)
	at org.apache.spark.sql.classic.DataFrameReader.parquet(DataFrameReader.scala:306)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2737)
		at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3569)
		at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3612)
		at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:172)
		at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3716)
		at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3667)
		at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
		at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366)
		at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:777)
		at scala.collection.immutable.List.map(List.scala:247)
		at scala.collection.immutable.List.map(List.scala:79)
		at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:775)
		at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:575)
		at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:419)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$2(ResolveDataSource.scala:61)
		at scala.Option.getOrElse(Option.scala:201)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:61)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:45)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:139)
		at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:139)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:135)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:131)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:112)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:111)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:45)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:43)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
		at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
		at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
		at scala.collection.immutable.List.foldLeft(List.scala:79)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:231)
		at scala.collection.immutable.List.foreach(List.scala:334)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:231)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:340)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:336)
		at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:234)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:336)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:299)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:201)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:201)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:190)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
		at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
		at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
		at scala.util.Try$.apply(Try.scala:217)
		at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
		at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
		at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
		... 22 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2641)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2735)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3569)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3612)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:172)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3716)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3667)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:777)
	at scala.collection.immutable.List.map(List.scala:247)
	at scala.collection.immutable.List.map(List.scala:79)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:775)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:575)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:419)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$2(ResolveDataSource.scala:61)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:61)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:45)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:139)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:139)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:131)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:112)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:111)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:45)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:43)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
	at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
	at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
	at scala.collection.immutable.List.foldLeft(List.scala:79)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:231)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:231)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:340)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:336)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:234)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:336)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:299)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:190)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
	at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
	... 22 more


In [5]:
import pandas as pd

AWS_PROFILE = "data228"
path = "s3://data228-bigdata-nyc/dedup/2025_stream_dedup.parquet"

df = pd.read_parquet(path, storage_options={"profile": AWS_PROFILE})
print("Rows in deduped 2025 dataset:", len(df))
print(df.head())

severe performance issues, see also https://github.com/dask/dask/issues/10276

To fix, you should specify a lower version bound on s3fs, or
update the current installation.



FileNotFoundError: data228-bigdata-nyc/dedup/2025_stream_dedup.parquet