# General Information
The task of this script is to get data from the web and populate our database

So we have nyc taxi data and we clean it here (it is already very clean, but we define the column types). 
After cleaning we want to load it into the database, but here we have more than 1.3 million entries and
it is not possible or smart to push 1.3 million entries into the db at once

So I have to chunk the data when I read it so that an iterator is passed to me by pandas.
Then I can move over that iterator (chunk by chunk) and push each chunk into the database.


In [1]:
import pandas as pd


# read data from the github repo
prefix = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/'
filename = 'yellow_tripdata_2021-01.csv.gz'
url = prefix + filename

# df = pd.read_csv(prefix + 'yellow_tripdata_2021-01.csv.gz') # there was a nrows=100 here, but I want to get all the data
# pandas can read a gzipped file easily, so no unzipping needs to be performed

dtypes = {
    "VendorID": "Int64",
    "passenger_count": "Int64",
    "trip_distance": "float64",
    "RatecodeID": "Int64",
    "store_and_fwd_flag": "string",
    "PULocationID": "Int64",
    "DOLocationID": "Int64",
    "payment_type": "Int64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "congestion_surcharge": "float64"
}

parse_dates = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]

df = pd.read_csv(url, dtype = dtypes, parse_dates = parse_dates)



print(df.head())
print("-------------")
print(df.dtypes)
print("-------------")
print(df.shape)


   VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         1  2021-01-01 00:30:10   2021-01-01 00:36:12                1   
1         1  2021-01-01 00:51:20   2021-01-01 00:52:19                1   
2         1  2021-01-01 00:43:30   2021-01-01 01:11:06                1   
3         1  2021-01-01 00:15:48   2021-01-01 00:31:01                0   
4         2  2021-01-01 00:31:49   2021-01-01 00:48:21                1   

   trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0           2.10           1                  N           142            43   
1           0.20           1                  N           238           151   
2          14.70           1                  N           132           165   
3          10.60           1                  N           138           132   
4           4.94           1                  N            68            33   

   payment_type  fare_amount  extra  mta_tax  tip_amount  tolls_amount  \


In [2]:
from sqlalchemy import create_engine

engine = create_engine("postgresql://root:root@localhost:5432/ny_taxi")

In [3]:
df.head(0).to_sql(name="yellow_taxi_data", con=engine, if_exists="replace")

0

In [4]:
print(pd.io.sql.get_schema(df, name="yellow_taxi_data", con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [5]:
df_iter = pd.read_csv(url, dtype = dtypes, parse_dates = parse_dates, iterator=True, chunksize=100000)

In [6]:
from tqdm.auto import tqdm

In [7]:

# add each chunk individually to the db now
first = True   
for chunk in tqdm(df_iter):
    if first:
        chunk.head(0).to_sql(name="yellow_taxi_data", con=engine, if_exists="replace")
        first = False
        print("Table created")
    
    chunk.to_sql(name="yellow_taxi_data", con=engine, if_exists="append", chunksize=2000)
    print("inserted: {0}".format(len(chunk)))

0it [00:00, ?it/s]

Table created
inserted: 100000
inserted: 100000
inserted: 100000
inserted: 100000
inserted: 100000
inserted: 100000
inserted: 100000
inserted: 100000
inserted: 100000
inserted: 100000
inserted: 100000
inserted: 100000
inserted: 100000
inserted: 69765


In [3]:
import pandas as pd
import click
from sqlalchemy import create_engine

@click.command()
@click.option("--user", default="root", help="PostgreSQL user")
@click.option("--password", default="root", type=str, help="PostgreSQL password")
@click.option("--host", default="localhost", type=str, help="PostgreSQL host")
@click.option("--port", default=5432, type=int, help="PostgreSQL port")
@click.option("--db", default="ny_taxi", type=str, help="PostgreSQL database name")
@click.option("--table", default="yellow_taxi_data", type=str, help="Target table name")
@click.option(
    "--schema",
    default=False,
    type=bool,
    help="Want to see the schema printed that had been ingested?",
)
@click.option("--url", default="https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-11.parquet", type=str, help="Full url to data source")
@click.option("--dtype", default="parquet", type=click.Choice(["parquet", "csv"], case_sensitive=False), help="choose if data source is parquet or csv")
def ingest_data_from_link_cli(user, password, host, port, db, table, schema, url, dtype):
    ingest_data_from_link(user, password, host, port, db, table, schema, url, dtype)



def ingest_data_from_link(user: str,
    password: str,
    host: str,
    port: int,
    db: str,
    table: str,
    schema: bool,url:str, dtype:str):
    url = url

    # declaring types of columns after inspection
    dtypes = {
        "VendorID": "Int64",
        "passenger_count": "Int64",
        "trip_distance": "float64",
        "RatecodeID": "Int64",
        "store_and_fwd_flag": "string",
        "PULocationID": "Int64",
        "DOLocationID": "Int64",
        "payment_type": "Int64",
        "fare_amount": "float64",
        "extra": "float64",
        "mta_tax": "float64",
        "tip_amount": "float64",
        "tolls_amount": "float64",
        "improvement_surcharge": "float64",
        "total_amount": "float64",
        "congestion_surcharge": "float64",
    }

    parse_dates = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]

    # building connection path to database
    engine = create_engine(
        "postgresql://{0}:{1}@{2}:{3}/{4}".format(user, password, host, port, db)
    )

    # reading data in chunks in proper defined way
    
    if dtype == "parquet":
        df_iter = pd.read_parquet(url)
        # for parquet there exists no iterator. I will also have to check whether the data fits the db schema...

        df_iter.head(0).to_sql(name=table, con=engine, if_exists="replace")
        print("Table created")

        df_iter.to_sql(name=table, con=engine, if_exists="append")
        print("Data inserted successfully")
        # this is a pretty bad way because the database will be locked for minutes...

        if schema:
            print(pd.io.sql.get_schema(df_iter.head(0), name=table, con=engine))

        
    elif dtype =="csv":
        df_iter = pd.read_csv(
            url, dtype=dtypes, parse_dates=parse_dates, iterator=True, chunksize=100000
        )

        # add each chunk individually to the db now
        first = True
        for chunk in tqdm(df_iter):
            if first:
                # first push the fresh schema and the data afterwards
                chunk.head(0).to_sql(name=table, con=engine, if_exists="replace")
    
                first = False
                print("Table created")
    
            chunk.to_sql(
                name=table, con=engine, if_exists="append", chunksize=2000, method="multi"
            )
            print("inserted: {0}".format(len(chunk)))
    
        if schema:
            # schema look
            print(pd.io.sql.get_schema(chunk.head(0), name=table, con=engine))

    return

In [6]:
ingest_data_from_link(user="root" ,password="root", host="localhost", port=5432, db="ny_taxi", table="testing",
                      schema=True, url="https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-11.parquet", dtype="parquet" )

Table created
Data inserted successfully

CREATE TABLE testing (
	"VendorID" INTEGER, 
	lpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	lpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" FLOAT(53), 
	"PULocationID" INTEGER, 
	"DOLocationID" INTEGER, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type FLOAT(53), 
	trip_type FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	cbd_congestion_fee FLOAT(53)
)




In [1]:
import pandas as pd


df = pd.read_csv(filepath_or_buffer="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv")

In [6]:
df

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone
...,...,...,...,...
260,261,Manhattan,World Trade Center,Yellow Zone
261,262,Manhattan,Yorkville East,Yellow Zone
262,263,Manhattan,Yorkville West,Yellow Zone
263,264,Unknown,NV,


In [5]:
df.dtypes

LocationID       int64
Borough         object
Zone            object
service_zone    object
dtype: object

In [None]:
dtypes = {
        "LocationID": "Int64",
        "Borough": "string",
        "Zone":"string",
        "service_zone":"string"}

In [31]:
import yaml

def load_schema(file_path, table_name):
    with open(file_path, "r") as file:
        config = yaml.safe_load(file)
    return config



sch = load_schema("./loadSchema.yaml", "I dont know")
sch = sch.get("green_taxi")
# sch = sch.get("columns")
if "date_columns" in sch:
    print("TRUE")
sch
# sch.get("date_columns")

TRUE


{'columns': {'VendorID': 'Int64',
  'passenger_count': 'Int64',
  'trip_distance': 'float64',
  'store_and_fwd_flag': 'string',
  'fare_amount': 'float64'},
 'date_columns': ['lpep_pickup_datetime', 'lpep_dropoff_datetime']}