Make sure that your Python env has `pandas` and `sqlalchemy` installed. I also had to install `psycopg2` manually.

In [18]:
import pandas as pd
import pyarrow as pa
pd.__version__

'2.2.3'

Set url variable and read parquet file using pandas setting engine

In [19]:
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet"
df = pd.read_parquet(url, engine="pyarrow")
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1.0,1.60,1.0,N,229,237,1,10.00,3.5,0.5,3.00,0.0,1.0,18.00,2.5,0.0,0.00
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1.0,0.50,1.0,N,236,237,1,5.10,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.00
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1.0,0.60,1.0,N,141,141,1,5.10,3.5,0.5,2.00,0.0,1.0,12.10,2.5,0.0,0.00
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3.0,0.52,1.0,N,244,244,2,7.20,1.0,0.5,0.00,0.0,1.0,9.70,0.0,0.0,0.00
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3.0,0.66,1.0,N,244,116,2,5.80,1.0,0.5,0.00,0.0,1.0,8.30,0.0,0.0,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3475221,2,2025-01-31 23:01:48,2025-01-31 23:16:29,,3.35,,,79,237,0,15.85,0.0,0.5,0.00,0.0,1.0,20.60,,,0.75
3475222,2,2025-01-31 23:50:29,2025-02-01 00:17:27,,8.73,,,161,116,0,28.14,0.0,0.5,0.00,0.0,1.0,32.89,,,0.75
3475223,2,2025-01-31 23:26:59,2025-01-31 23:43:01,,2.64,,,144,246,0,14.91,0.0,0.5,0.00,0.0,1.0,19.66,,,0.75
3475224,2,2025-01-31 23:14:34,2025-01-31 23:34:52,,3.16,,,142,107,0,17.55,0.0,0.5,0.00,0.0,1.0,22.30,,,0.75


We will now create the ***schema*** for the database. The _schema_ is the structure of the database; in this case it describes the columns of our table. Pandas can output the SQL ***DDL*** (Data definition language) instructions necessary to create the schema.

In [20]:
# We need to provide a name for the table; we will use 'yellow_taxi_data'
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "Airport_fee" REAL,
  "cbd_congestion_fee" REAL
)


Note that this only outputs the instructions, it hasn't actually created the table in the database yet.

Note that `tpep_pickup_datetime` and `tpep_dropoff_datetime` are text fields even though they should be timestamps. Let's change that.

In [21]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "Airport_fee" REAL,
  "cbd_congestion_fee" REAL
)


Even though we have the DDL instructions, we still need specific instructions for Postgres to connect to it and create the table. We will use `sqlalchemy` for this.

In [22]:
from sqlalchemy import create_engine

An ***engine*** specifies the database details in a URI. The structure of the URI is:

`database://user:password@host:port/database_name`

In [23]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [24]:
# run this cell when the Postgres Docker container is running
engine.connect()

<sqlalchemy.engine.base.Connection at 0x21560f27890>

In [25]:
# we can now use our engine to get the specific output for Postgres
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" INTEGER, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" INTEGER, 
	"DOLocationID" INTEGER, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	"Airport_fee" FLOAT(53), 
	cbd_congestion_fee FLOAT(53)
)




We will now create an ***iterator*** that will allow us to read the CSV file in chunks and send them to the database. Otherwise, we may run into problems trying to send too much data at once.

In [26]:
# read the CSV file in chunks
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000)
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x2156235e490>

We can use the `next()` function to get the chunks using the iterator.

In [27]:
df = next(df_iter)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1.0,1.60,1.0,N,229,237,1,10.0,3.5,0.5,3.00,0.0,1.0,18.00,2.5,0.0,0.0
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1.0,0.50,1.0,N,236,237,1,5.1,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.0
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1.0,0.60,1.0,N,141,141,1,5.1,3.5,0.5,2.00,0.0,1.0,12.10,2.5,0.0,0.0
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3.0,0.52,1.0,N,244,244,2,7.2,1.0,0.5,0.00,0.0,1.0,9.70,0.0,0.0,0.0
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3.0,0.66,1.0,N,244,116,2,5.8,1.0,0.5,0.00,0.0,1.0,8.30,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,2,2025-01-02 12:29:15,2025-01-02 12:46:16,2.0,2.44,1.0,N,186,50,1,17.7,0.0,0.5,4.34,0.0,1.0,26.04,2.5,0.0,0.0
99996,2,2025-01-02 12:54:47,2025-01-02 13:11:29,3.0,0.88,1.0,N,246,164,1,14.9,0.0,0.5,1.00,0.0,1.0,19.90,2.5,0.0,0.0
99997,2,2025-01-02 12:12:22,2025-01-02 12:39:41,1.0,5.33,1.0,N,230,88,1,29.6,0.0,0.5,6.72,0.0,1.0,40.32,2.5,0.0,0.0
99998,2,2025-01-02 12:31:07,2025-01-02 12:58:04,1.0,2.67,1.0,N,48,263,1,23.3,0.0,0.5,8.19,0.0,1.0,35.49,2.5,0.0,0.0



This is a brand new dataframe, so we need to convert the time columns to timestamp format.

In [28]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1.0,1.60,1.0,N,229,237,1,10.0,3.5,0.5,3.00,0.0,1.0,18.00,2.5,0.0,0.0
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1.0,0.50,1.0,N,236,237,1,5.1,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.0
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1.0,0.60,1.0,N,141,141,1,5.1,3.5,0.5,2.00,0.0,1.0,12.10,2.5,0.0,0.0
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3.0,0.52,1.0,N,244,244,2,7.2,1.0,0.5,0.00,0.0,1.0,9.70,0.0,0.0,0.0
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3.0,0.66,1.0,N,244,116,2,5.8,1.0,0.5,0.00,0.0,1.0,8.30,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,2,2025-01-02 12:29:15,2025-01-02 12:46:16,2.0,2.44,1.0,N,186,50,1,17.7,0.0,0.5,4.34,0.0,1.0,26.04,2.5,0.0,0.0
99996,2,2025-01-02 12:54:47,2025-01-02 13:11:29,3.0,0.88,1.0,N,246,164,1,14.9,0.0,0.5,1.00,0.0,1.0,19.90,2.5,0.0,0.0
99997,2,2025-01-02 12:12:22,2025-01-02 12:39:41,1.0,5.33,1.0,N,230,88,1,29.6,0.0,0.5,6.72,0.0,1.0,40.32,2.5,0.0,0.0
99998,2,2025-01-02 12:31:07,2025-01-02 12:58:04,1.0,2.67,1.0,N,48,263,1,23.3,0.0,0.5,8.19,0.0,1.0,35.49,2.5,0.0,0.0


We will now finally create the table in the database. With `df.head(n=0)` we can get the name of the columns only, without any additional data. We will use it to generate a SQL instruction to generate the table.

In [29]:
# we need to provide the table name, the connection and what to do if the table already exists
# we choose to replace everything in case you had already created something by accident before.
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

You can now use `pgcli -h localhost -p 5432 -u root -d ny_taxi` on a separate terminal to look at the database:

* `\dt` for looking at available tables.
* `\d yellow_taxi_data` for describing the new table.

Let's include our current chunk to our database and time how long it takes.

In [15]:
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: total: 5.61 s
Wall time: 10.3 s


1000

Back on the terminal running `pgcli`, we can check how many lines were to the database with:

```sql
SELECT count(1) FROM yellow_taxi_data;
```

You should see 100,000 lines.


Let's write a loop to write all chunks to the database. Use the terminal with `pgcli` to check the database after the code finishes running.

In [30]:
from time import time

while True: 
    try:
        t_start = time()
        df = next(df_iter)

        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
        
        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

        t_end = time()

        print('inserted another chunk, took %.3f second' % (t_end - t_start))
    except StopIteration:
        print('completed')
        break

inserted another chunk, took 9.967 second
inserted another chunk, took 9.860 second
inserted another chunk, took 9.780 second
inserted another chunk, took 9.840 second
inserted another chunk, took 9.906 second
inserted another chunk, took 10.772 second
inserted another chunk, took 9.986 second
inserted another chunk, took 10.459 second
inserted another chunk, took 10.154 second
inserted another chunk, took 10.293 second
inserted another chunk, took 11.226 second
inserted another chunk, took 10.364 second
inserted another chunk, took 10.154 second
inserted another chunk, took 10.726 second
inserted another chunk, took 10.677 second
inserted another chunk, took 11.457 second
inserted another chunk, took 11.162 second
inserted another chunk, took 10.650 second
inserted another chunk, took 10.885 second
inserted another chunk, took 10.818 second
inserted another chunk, took 10.720 second
inserted another chunk, took 11.158 second
inserted another chunk, took 11.482 second
inserted another 

  df = next(df_iter)


inserted another chunk, took 10.415 second
inserted another chunk, took 10.295 second
inserted another chunk, took 9.762 second
inserted another chunk, took 9.650 second
inserted another chunk, took 10.269 second
inserted another chunk, took 7.708 second
completed


And that's it! Feel free to go back to the [notes](../notes/1_intro.md#inserting-data-to-postgres-with-python)