In [1]:
import pandas as pd
from tqdm import tqdm
import pyarrow.parquet as pq
import pyarrow as pa
import psycopg2
import io
from sqlalchemy import create_engine

In [2]:
data1 = 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-11.parquet'
data2 = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv'


In [7]:
df1 = pd.read_parquet('green_tripdata_2025-11.parquet')


In [8]:
df2 = pd.read_csv('taxi_zone_lookup.csv')

In [5]:
df1.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,cbd_congestion_fee
0,2,2025-11-01 00:34:48,2025-11-01 00:41:39,N,1.0,74,42,1.0,0.74,7.2,...,0.5,1.94,0.0,,1.0,11.64,1.0,1.0,0.0,0.0
1,2,2025-11-01 00:18:52,2025-11-01 00:24:27,N,1.0,74,42,2.0,0.95,7.2,...,0.5,0.0,0.0,,1.0,9.7,2.0,1.0,0.0,0.0
2,2,2025-11-01 01:03:14,2025-11-01 01:15:24,N,1.0,83,160,1.0,2.19,13.5,...,0.5,5.0,0.0,,1.0,21.0,1.0,1.0,0.0,0.0
3,2,2025-11-01 00:10:57,2025-11-01 00:24:53,N,1.0,166,127,1.0,5.44,24.7,...,0.5,0.5,0.0,,1.0,27.7,1.0,1.0,0.0,0.0
4,1,2025-11-01 00:03:48,2025-11-01 00:19:38,N,1.0,166,262,1.0,3.2,18.4,...,1.5,1.0,0.0,,1.0,24.65,1.0,1.0,2.75,0.0


In [6]:
df1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 46912 entries, 0 to 46911
Data columns (total 21 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   VendorID               46912 non-null  int32         
 1   lpep_pickup_datetime   46912 non-null  datetime64[us]
 2   lpep_dropoff_datetime  46912 non-null  datetime64[us]
 3   store_and_fwd_flag     41343 non-null  object        
 4   RatecodeID             41343 non-null  float64       
 5   PULocationID           46912 non-null  int32         
 6   DOLocationID           46912 non-null  int32         
 7   passenger_count        41343 non-null  float64       
 8   trip_distance          46912 non-null  float64       
 9   fare_amount            46912 non-null  float64       
 10  extra                  46912 non-null  float64       
 11  mta_tax                46912 non-null  float64       
 12  tip_amount             46912 non-null  float64       
 13  t

In [7]:
df2.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [8]:
df2.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 265 entries, 0 to 264
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   LocationID    265 non-null    int64 
 1   Borough       265 non-null    object
 2   Zone          264 non-null    object
 3   service_zone  263 non-null    object
dtypes: int64(1), object(3)
memory usage: 8.4+ KB


simulate ingest to db

In [5]:
def arrow_to_postgres(arrow_type):
    if pa.types.is_int64(arrow_type):
        return "DOUBLE PRECISION"
    if pa.types.is_int32(arrow_type):
        return "DOUBLE PRECISION"
    if pa.types.is_float64(arrow_type):
        return "DOUBLE PRECISION"
    if pa.types.is_string(arrow_type):
        return "TEXT"
    if pa.types.is_timestamp(arrow_type):
        return "TIMESTAMP"
    return "TEXT"

def ingest_data1(user, password, host, port, db, table):

    PARQUET_FILE = "green_tripdata_2025-11.parquet"
    TABLE_NAME = "green_tripdata"

    conn = psycopg2.connect(
        host="localhost",
        port=5434,
        dbname="ny_taxi",
        user="root",
        password="root"
    )
    conn.autocommit = False
    cur = conn.cursor()

    pf = pq.ParquetFile(PARQUET_FILE)

    # create table
    schema = pf.schema_arrow

    cols = []
    for field in schema:
        col_name = field.name
        pg_type = arrow_to_postgres(field.type)
        cols.append(f'"{col_name}" {pg_type}')

    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
        {", ".join(cols)}
    );
    """

    cur.execute(create_table_sql)
    conn.commit()

    for rg in range(pf.num_row_groups):
        print(f"Inserting row group {rg+1}/{pf.num_row_groups}")

        table = pf.read_row_group(rg)
        df = table.to_pandas()
        #print(df.head(1))
        #break

        buffer = io.StringIO()
        df.to_csv(buffer, index=False, header=False)
        buffer.seek(0)

        cur.copy_expert(
            f"COPY {TABLE_NAME} FROM STDIN WITH CSV",
            buffer
        )

    conn.commit()
    cur.close()
    conn.close()


In [6]:
ingest_data1('root', 'root', 'localhost', 5434, 'ny_taxi', 'green_tripdata')

Inserting row group 1/1


In [8]:
def ingest_data2(user, password, host, port, db, table):
    # Ingestion logic here

    # Read a sample of the data
    dtype = {
        "LocationID": "Int64",
        "Borough": "string",
        "Zone": "string",
        "service_zone": "string"
    }


    # Create Database Connection
    engine = create_engine(
        f'postgresql://{user}:{password}@{host}:{port}/{db}'
    )

    df_iter = pd.read_csv(
        'taxi_zone_lookup.csv',
        nrows=100,
        dtype=dtype,
        iterator=True,
        chunksize=100000
    )

    first = True

    for df_chunk in tqdm(df_iter):

        if first:
            # Create table schema (no data)
            df_chunk.head(0).to_sql(
                name="taxi_zone_lookup",
                con=engine,
                if_exists="replace"
            )
            first = False
            print("Table created")

        # Insert chunk
        df_chunk.to_sql(
            name="taxi_zone_lookup",
            con=engine,
            if_exists="append"
        )

        print("Inserted:", len(df_chunk))

In [9]:
ingest_data2('root', 'root', 'localhost', 5434, 'ny_taxi', 'taxi_zone_lookup')

1it [00:00,  4.77it/s]

Table created
Inserted: 100



