In [1]:
import pandas as pd

In [2]:
df = pd.read_csv('dataset/yellow_tripdata_2021-01.csv', nrows=1000)

In [3]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


In [4]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


Note:  "tpep_pickup_datetime" and "tpep_dropoff_datetime" are TEXT
But we need it to be time-stamp fields

In [5]:
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])

In [6]:
# other way is dot notation
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)

In [7]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [8]:
from sqlalchemy import create_engine

In [9]:
# postgressql://<user_name>:<password>@<host>:<port>/<database>
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [None]:
# pip install psycopg2

In [10]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7fec306ef5e0>

In [11]:
# To get DDL for the connecting database (here postgres for our case)

print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TEXT, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [12]:
# To load data we will chunk the data using 'iterators'

df_iter = pd.read_csv('dataset/yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000)

In [13]:
df_iter # its an iterator

<pandas.io.parsers.readers.TextFileReader at 0x7fec3070b5e0>

In [14]:
df = next(df_iter) # Reading one chunk at a time

len(df)

100000

In [15]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


In [16]:
# updating the datatype of 2 fields
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)

In [17]:
# To only see the header, i.e., column names
df.head(n=0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [19]:
# Lets create table (and no data loading)

# dataframe.to_sql() will add the data from dataframe to the table
# dataframe.to_sql(name=<database_name>, con=engine, if_exists='replace')

df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

Go to the pgcli and cross-check that table is created

command is \dt
+--------+------------------+-------+-------+
| Schema | Name             | Type  | Owner |
|--------+------------------+-------+-------|
| public | yellow_taxi_data | table | root  |
+--------+------------------+-------+-------+

describe yellow_taxi_data
or \d yellow_taxi_data


In [20]:
# Load data for this first chunk of data

# %time will tell us how much time it took
# Note: if_exists ='append' to load the data 

%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: user 2.89 s, sys: 58.2 ms, total: 2.95 s
Wall time: 4.97 s


1000

In [22]:
from time import time

In [23]:
# To load all the chunks

while True:
    start_ts = time()
    df = next(df_iter)
    
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    
    
    df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
    
    end_ts = time()
    print('Next chunk data inserted/appended in %.3f seconds.' %(end_ts - start_ts))

Next chunk data inserted/appended in 5.002 seconds.
Next chunk data inserted/appended in 4.926 seconds.
Next chunk data inserted/appended in 4.995 seconds.
Next chunk data inserted/appended in 5.013 seconds.
Next chunk data inserted/appended in 4.936 seconds.
Next chunk data inserted/appended in 4.939 seconds.
Next chunk data inserted/appended in 4.957 seconds.
Next chunk data inserted/appended in 5.028 seconds.
Next chunk data inserted/appended in 4.914 seconds.
Next chunk data inserted/appended in 5.040 seconds.
Next chunk data inserted/appended in 5.015 seconds.


  df = next(df_iter)


Next chunk data inserted/appended in 4.796 seconds.
Next chunk data inserted/appended in 3.170 seconds.


StopIteration: 

In [None]:
# Gracefully handling of the 'StopIteration' exception, i.e, when there is no more data in the iterator
numbers_iterator = NumberIterator([1, 2, 3, 4, 5])

while True:
    try:
        num = next(numbers_iterator)
        print(num)
    except StopIteration:
        print("Reached the end of the iteration.")
        break

Let's read a parquet file

In [None]:
# pip install --upgrade cmake

In [None]:
# import cmake

In [None]:
# from platform import python_version

# print(python_version())

In [None]:
# pip install pyarrow

In [24]:
# One time installation
# This was done
# conda install -c conda-forge pyarrow

done
Solving environment: failed with initial frozen solve. Retrying with flexible solve.
Solving environment: failed with repodata from current_repodata.json, will retry with next repodata source.
done
Solving environment: done


  current version: 22.9.0
  latest version: 25.3.1

Please update conda by running

    $ conda update -n base -c defaults conda



## Package Plan ##

  environment location: /opt/anaconda3

  added / updated specs:
    - pyarrow


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    arrow-cpp-9.0.0            |py39h04a14be_7_cpu        23.7 MB  conda-forge
    aws-c-cal-0.5.11           |       hd2e2f4b_0          30 KB  conda-forge
    aws-c-common-0.6.2         |       h0d85af4_0         150 KB  conda-forge
    aws-c-event-stream-0.2.7   |      hb9330a7_13          40 KB  conda-forge
    aws-c-io-0.10.5            |       h35aa462_0         109 KB  conda-forge
 

In [25]:
import pyarrow.parquet as pq

In [26]:
# Read metadata 
pq.read_metadata('dataset/yellow_tripdata_2025-01.parquet')

<pyarrow._parquet.FileMetaData object at 0x7febd035af90>
  created_by: parquet-cpp-arrow version 14.0.2
  num_columns: 19
  num_rows: 3475226
  num_row_groups: 4
  format_version: 2.6
  serialized_size: 10625

In [27]:
# Read file, read the table from file and check schema
file = pq.ParquetFile('dataset/yellow_tripdata_2025-01.parquet')
table = file.read()
table.schema

VendorID: int32
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: int64
trip_distance: double
RatecodeID: int64
store_and_fwd_flag: large_string
PULocationID: int32
DOLocationID: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
Airport_fee: double

In [28]:
# Convert to pandas and check data 
pq_df = table.to_pandas()
pq_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3475226 entries, 0 to 3475225
Data columns (total 19 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int32         
 1   tpep_pickup_datetime   datetime64[ns]
 2   tpep_dropoff_datetime  datetime64[ns]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   RatecodeID             float64       
 6   store_and_fwd_flag     object        
 7   PULocationID           int32         
 8   DOLocationID           int32         
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount           float64       
 17  congestion_surcharge   float64       
 18  Airport_fee           

In [29]:
#This part is for testing

# Creating batches of 100,000 for the paraquet file
batches_iter = file.iter_batches(batch_size=100000)
batches_iter


<generator at 0x7febd035d720>

In [30]:
# Insert values into the table 
t_start = time()
count = 0

for batch in file.iter_batches(batch_size=100000):
    count+=1
    batch_df = batch.to_pandas()
    print(f'inserting batch {count}...')
    b_start = time()
    
    batch_df.to_sql(name='ny_taxi_data',con=engine, if_exists='append')
    b_end = time()
    print(f'inserted! time taken {b_end-b_start:10.3f} seconds.\n')
    
t_end = time()   
print(f'Completed! Total time taken was {t_end-t_start:10.3f} seconds for {count} batches.')   

inserting batch 1...
inserted! time taken      5.097 seconds.

inserting batch 2...
inserted! time taken      4.982 seconds.

inserting batch 3...
inserted! time taken      5.044 seconds.

inserting batch 4...
inserted! time taken      4.956 seconds.

inserting batch 5...
inserted! time taken      4.974 seconds.

inserting batch 6...
inserted! time taken      4.914 seconds.

inserting batch 7...
inserted! time taken      4.938 seconds.

inserting batch 8...
inserted! time taken      5.023 seconds.

inserting batch 9...
inserted! time taken      5.032 seconds.

inserting batch 10...
inserted! time taken      4.939 seconds.

inserting batch 11...
inserted! time taken      4.978 seconds.

inserting batch 12...
inserted! time taken      4.948 seconds.

inserting batch 13...
inserted! time taken      4.991 seconds.

inserting batch 14...
inserted! time taken      5.121 seconds.

inserting batch 15...
inserted! time taken      5.014 seconds.

inserting batch 16...
inserted! time taken      5