In [1]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import storage
from pathlib import Path
import os


In [14]:
# GCS ÏÑ§Ï†ï
BUCKET_NAME = "real-module-4"
CREDENTIALS_FILE = "/home/pc/dev/homework/module-4/setup/dezoomcamp-sa.json"

client = storage.Client.from_service_account_json(CREDENTIALS_FILE)
bucket = client.bucket(BUCKET_NAME)

print("‚úÖ GCS client ready")


‚úÖ GCS client ready


In [15]:
FHV_SCHEMA = pa.schema([
    pa.field("dispatching_base_num", pa.string()),
    pa.field("pickup_datetime", pa.timestamp("us")),
    pa.field("dropOff_datetime", pa.timestamp("us")),
    pa.field("PUlocationID", pa.int64()),
    pa.field("DOlocationID", pa.int64()),
    pa.field("SR_Flag", pa.int64()),
])


In [6]:
YEAR = 2019
MONTHS = range(1, 13)

RAW_DIR = Path("./data/fhv_raw")
FIXED_DIR = Path("./data/fhv_fixed")

RAW_DIR.mkdir(parents=True, exist_ok=True)
FIXED_DIR.mkdir(parents=True, exist_ok=True)


In [7]:
for month in MONTHS:
    csv_file = f"fhv_tripdata_{YEAR}-{month:02d}.csv.gz"
    parquet_file = RAW_DIR / f"fhv_tripdata_{YEAR}-{month:02d}.parquet"

    url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/{csv_file}"

    print(f"‚¨áÔ∏è Downloading {csv_file}")
    df = pd.read_csv(url, compression="gzip")
    print(f"   rows: {len(df):,}")

    df.to_parquet(parquet_file, engine="pyarrow")
    print(f"üíæ Saved raw parquet: {parquet_file}\n")


‚¨áÔ∏è Downloading fhv_tripdata_2019-01.csv.gz
   rows: 23,143,222
üíæ Saved raw parquet: data/fhv_raw/fhv_tripdata_2019-01.parquet

‚¨áÔ∏è Downloading fhv_tripdata_2019-02.csv.gz
   rows: 1,707,649
üíæ Saved raw parquet: data/fhv_raw/fhv_tripdata_2019-02.parquet

‚¨áÔ∏è Downloading fhv_tripdata_2019-03.csv.gz
   rows: 1,475,564
üíæ Saved raw parquet: data/fhv_raw/fhv_tripdata_2019-03.parquet

‚¨áÔ∏è Downloading fhv_tripdata_2019-04.csv.gz
   rows: 1,937,844
üíæ Saved raw parquet: data/fhv_raw/fhv_tripdata_2019-04.parquet

‚¨áÔ∏è Downloading fhv_tripdata_2019-05.csv.gz
   rows: 2,073,045
üíæ Saved raw parquet: data/fhv_raw/fhv_tripdata_2019-05.parquet

‚¨áÔ∏è Downloading fhv_tripdata_2019-06.csv.gz
   rows: 2,009,886
üíæ Saved raw parquet: data/fhv_raw/fhv_tripdata_2019-06.parquet

‚¨áÔ∏è Downloading fhv_tripdata_2019-07.csv.gz
   rows: 1,947,739
üíæ Saved raw parquet: data/fhv_raw/fhv_tripdata_2019-07.parquet

‚¨áÔ∏è Downloading fhv_tripdata_2019-08.csv.gz
   rows: 1,880,407
ü

In [8]:
int_cols = ["PUlocationID", "DOlocationID", "SR_Flag"]
datetime_cols = ["pickup_datetime", "dropOff_datetime"]

for file in sorted(RAW_DIR.glob("*.parquet")):
    print(f"üîß Fixing schema: {file.name}")

    df = pq.read_table(file).to_pandas()

    # datetime Î≥µÍµ¨
    for c in datetime_cols:
        if c in df.columns:
            df[c] = pd.to_datetime(df[c])

    # int Î≥µÍµ¨
    for c in int_cols:
        if c in df.columns:
            df[c] = df[c].astype("Int64")

    # SR_Flag ÏóÜÎäî Í≤ΩÏö∞ Î∞©Ïñ¥
    if "SR_Flag" not in df.columns:
        df["SR_Flag"] = pd.NA

    table_fixed = pa.Table.from_pandas(
        df,
        schema=FHV_SCHEMA,
        preserve_index=False
    )

    out_file = FIXED_DIR / file.name
    pq.write_table(table_fixed, out_file, compression="snappy")

    print(f"‚úÖ Saved fixed parquet: {out_file}\n")


üîß Fixing schema: fhv_tripdata_2019-01.parquet
‚úÖ Saved fixed parquet: data/fhv_fixed/fhv_tripdata_2019-01.parquet

üîß Fixing schema: fhv_tripdata_2019-02.parquet
‚úÖ Saved fixed parquet: data/fhv_fixed/fhv_tripdata_2019-02.parquet

üîß Fixing schema: fhv_tripdata_2019-03.parquet
‚úÖ Saved fixed parquet: data/fhv_fixed/fhv_tripdata_2019-03.parquet

üîß Fixing schema: fhv_tripdata_2019-04.parquet
‚úÖ Saved fixed parquet: data/fhv_fixed/fhv_tripdata_2019-04.parquet

üîß Fixing schema: fhv_tripdata_2019-05.parquet
‚úÖ Saved fixed parquet: data/fhv_fixed/fhv_tripdata_2019-05.parquet

üîß Fixing schema: fhv_tripdata_2019-06.parquet
‚úÖ Saved fixed parquet: data/fhv_fixed/fhv_tripdata_2019-06.parquet

üîß Fixing schema: fhv_tripdata_2019-07.parquet
‚úÖ Saved fixed parquet: data/fhv_fixed/fhv_tripdata_2019-07.parquet

üîß Fixing schema: fhv_tripdata_2019-08.parquet
‚úÖ Saved fixed parquet: data/fhv_fixed/fhv_tripdata_2019-08.parquet

üîß Fixing schema: fhv_tripdata_2019-09.parquet

In [9]:
test_file = next(FIXED_DIR.glob("*.parquet"))

pq.read_table(test_file).schema


dispatching_base_num: string
pickup_datetime: timestamp[us]
dropOff_datetime: timestamp[us]
PUlocationID: int64
DOlocationID: int64
SR_Flag: int64
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 848

In [10]:
pq.read_table(test_file).to_pandas().head()


Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag
0,B00013,2019-04-01 00:15:00,2019-04-01 00:55:36,264,264,
1,B00013,2019-04-01 00:11:00,2019-04-01 00:31:39,264,264,
2,B00014,2019-04-01 00:25:00,2019-04-01 00:55:36,264,264,
3,B00014,2019-04-01 00:09:36,2019-04-01 00:40:12,264,264,
4,B00014,2019-04-01 00:00:57,2019-04-01 00:41:33,264,264,


In [16]:
for file in FIXED_DIR.glob("*.parquet"):
    blob = bucket.blob(f"raw/fhv_tripdata/{file.name}")
    blob.upload_from_filename(file)
    print(f"‚òÅÔ∏è Uploaded: gs://{BUCKET_NAME}/raw/fhv_tripdata/{file.name}")

print("üéâ All FHV data uploaded to GCS")


‚òÅÔ∏è Uploaded: gs://real-module-4/raw/fhv_tripdata/fhv_tripdata_2019-04.parquet
‚òÅÔ∏è Uploaded: gs://real-module-4/raw/fhv_tripdata/fhv_tripdata_2019-08.parquet
‚òÅÔ∏è Uploaded: gs://real-module-4/raw/fhv_tripdata/fhv_tripdata_2019-07.parquet
‚òÅÔ∏è Uploaded: gs://real-module-4/raw/fhv_tripdata/fhv_tripdata_2019-12.parquet
‚òÅÔ∏è Uploaded: gs://real-module-4/raw/fhv_tripdata/fhv_tripdata_2019-11.parquet
‚òÅÔ∏è Uploaded: gs://real-module-4/raw/fhv_tripdata/fhv_tripdata_2019-02.parquet
‚òÅÔ∏è Uploaded: gs://real-module-4/raw/fhv_tripdata/fhv_tripdata_2019-10.parquet
‚òÅÔ∏è Uploaded: gs://real-module-4/raw/fhv_tripdata/fhv_tripdata_2019-03.parquet
‚òÅÔ∏è Uploaded: gs://real-module-4/raw/fhv_tripdata/fhv_tripdata_2019-06.parquet
‚òÅÔ∏è Uploaded: gs://real-module-4/raw/fhv_tripdata/fhv_tripdata_2019-01.parquet
‚òÅÔ∏è Uploaded: gs://real-module-4/raw/fhv_tripdata/fhv_tripdata_2019-05.parquet
‚òÅÔ∏è Uploaded: gs://real-module-4/raw/fhv_tripdata/fhv_tripdata_2019-09.parquet
üéâ All FHV dat