In [2]:
import pandas as pd
import pyarrow
from sqlalchemy import create_engine

In [3]:
# Read a sample of the data
prefix = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/'
df = pd.read_csv(prefix + 'taxi_zone_lookup.csv', nrows=100)

In [4]:
df.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [5]:
taxi_data_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-11.parquet"

In [6]:
taxi = pd.read_parquet(taxi_data_url)

In [7]:
taxi.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,cbd_congestion_fee
0,2,2025-11-01 00:34:48,2025-11-01 00:41:39,N,1.0,74,42,1.0,0.74,7.2,...,0.5,1.94,0.0,,1.0,11.64,1.0,1.0,0.0,0.0
1,2,2025-11-01 00:18:52,2025-11-01 00:24:27,N,1.0,74,42,2.0,0.95,7.2,...,0.5,0.0,0.0,,1.0,9.7,2.0,1.0,0.0,0.0
2,2,2025-11-01 01:03:14,2025-11-01 01:15:24,N,1.0,83,160,1.0,2.19,13.5,...,0.5,5.0,0.0,,1.0,21.0,1.0,1.0,0.0,0.0
3,2,2025-11-01 00:10:57,2025-11-01 00:24:53,N,1.0,166,127,1.0,5.44,24.7,...,0.5,0.5,0.0,,1.0,27.7,1.0,1.0,0.0,0.0
4,1,2025-11-01 00:03:48,2025-11-01 00:19:38,N,1.0,166,262,1.0,3.2,18.4,...,1.5,1.0,0.0,,1.0,24.65,1.0,1.0,2.75,0.0


In [8]:
df.dtypes

LocationID      int64
Borough           str
Zone              str
service_zone      str
dtype: object

In [9]:
df.shape

(100, 4)

In [10]:
taxi.dtypes

VendorID                          int32
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag                  str
RatecodeID                      float64
PULocationID                      int32
DOLocationID                      int32
passenger_count                 float64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                       float64
improvement_surcharge           float64
total_amount                    float64
payment_type                    float64
trip_type                       float64
congestion_surcharge            float64
cbd_congestion_fee              float64
dtype: object

In [11]:
taxi.shape

(46912, 21)

In [12]:
print(sys.executable)

/workspaces/datatalks-data-engineering/chapter_1/homework/.venv/bin/python3


In [13]:

from sqlalchemy import create_engine

In [14]:
engine = create_engine("postgresql+psycopg://root:root@localhost:5432/ny_taxi")

In [15]:
print(pd.io.sql.get_schema(df, name='taxi_zone', con=engine))



CREATE TABLE taxi_zone (
	"LocationID" BIGINT, 
	"Borough" TEXT, 
	"Zone" TEXT, 
	service_zone TEXT
)




In [16]:
print(pd.io.sql.get_schema(taxi, name='taxi_trips', con=engine))



CREATE TABLE taxi_trips (
	"VendorID" INTEGER, 
	lpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	lpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" FLOAT(53), 
	"PULocationID" INTEGER, 
	"DOLocationID" INTEGER, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type FLOAT(53), 
	trip_type FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	cbd_congestion_fee FLOAT(53)
)




In [17]:
df_iter = pd.read_csv(prefix + "taxi_zone_lookup.csv", chunksize=100000)

first = True
for df_chunk in df_iter:
    if first:
        df_chunk.head(0).to_sql(
            name="taxi_zone",
            con=engine,
            if_exists="replace",
            index=False
        )
        first = False
        print("Table taxi_zone created")

    df_chunk.to_sql(
        name="taxi_zone",
        con=engine,
        if_exists="append",
        index=False
    )
    print("Inserted:", len(df_chunk))


Table taxi_zone created
Inserted: 265


In [18]:
taxi.head(0).to_sql("taxi_trips", engine, if_exists="replace", index=False)
taxi.to_sql("taxi_trips", engine, if_exists="append", index=False, chunksize=100000)

-1