# Data loading 

In [33]:
import pandas as pd 
import pyarrow.parquet as pq
from time import time

from sqlalchemy import create_engine


#### Read metadata 

In [34]:
pq.read_metadata('yellow_tripdata_2021-01.parquet')

<pyarrow._parquet.FileMetaData object at 0x177fe9d60>
  created_by: parquet-cpp-arrow version 7.0.0
  num_columns: 19
  num_rows: 1369769
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 10382

#### Read file, read the table from file and check schema

In [35]:
file = pq.ParquetFile('yellow_tripdata_2021-01.parquet')
table = file.read()
table.schema

VendorID: int64
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
RatecodeID: double
store_and_fwd_flag: string
PULocationID: int64
DOLocationID: int64
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 2492

#### Convert to pandas and check data

In [36]:
df = table.to_pandas()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1369769 entries, 0 to 1369768
Data columns (total 19 columns):
 #   Column                 Non-Null Count    Dtype         
---  ------                 --------------    -----         
 0   VendorID               1369769 non-null  int64         
 1   tpep_pickup_datetime   1369769 non-null  datetime64[ns]
 2   tpep_dropoff_datetime  1369769 non-null  datetime64[ns]
 3   passenger_count        1271417 non-null  float64       
 4   trip_distance          1369769 non-null  float64       
 5   RatecodeID             1271417 non-null  float64       
 6   store_and_fwd_flag     1271417 non-null  object        
 7   PULocationID           1369769 non-null  int64         
 8   DOLocationID           1369769 non-null  int64         
 9   payment_type           1369769 non-null  int64         
 10  fare_amount            1369769 non-null  float64       
 11  extra                  1369769 non-null  float64       
 12  mta_tax                13697

We need to first create the connection to our postgres database. We can feed the connection information to generate the CREATE SQL query for the specific server. SQLAlchemy supports a variety of servers.

In [37]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x30e3adf10>

#### Generate CREATE SQL statement from schema for validation

In [13]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




Datatypes for the table looks good! Since we used paraquet file the datasets seem to have been preserved. You may have to convert some datatypes so it is always good to do this check.

## Finally inserting data

There are 1,369,765 rows in our dataset. We are going to use the ```parquet_file.iter_batches()``` function to create batches of 100,000, convert them into pandas and then load it into the postgres database.

#### Creating batches of 100,000 for the paraquet file

In [18]:
batches_iter = file.iter_batches(batch_size=100000)
batches_iter

df = next(batches_iter).to_pandas()
df

# Creating just the table in postgres
# df.head(0).to_sql(name='ny_taxi_data',con=engine, if_exists='replace')

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.10,1.0,N,142,43,2,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.20,1.0,N,238,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.70,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.60,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,1,2021-01-04 14:04:31,2021-01-04 14:08:52,3.0,0.70,1.0,N,234,224,2,5.0,2.5,0.5,0.00,0.0,0.3,8.30,2.5,
99996,1,2021-01-04 14:18:46,2021-01-04 14:35:45,2.0,3.30,1.0,N,234,236,1,14.5,2.5,0.5,3.55,0.0,0.3,21.35,2.5,
99997,1,2021-01-04 14:42:41,2021-01-04 14:59:22,2.0,4.70,1.0,N,236,79,1,17.0,2.5,0.5,4.05,0.0,0.3,24.35,2.5,
99998,2,2021-01-04 14:39:02,2021-01-04 15:09:37,2.0,17.95,2.0,N,132,148,1,52.0,0.0,0.5,5.00,0.0,0.3,60.30,2.5,


#### Insert values into the table

In [19]:
t_start = time()
count = 0

for batch in file.iter_batches(batch_size=100000):
    count += 1
    batch_df = batch.to_pandas()
    print(f'inserting batch {count}...')
    b_start = time()
    
    batch_df.to_sql(name='ny_taxi_data',con=engine, if_exists='append')
    b_end = time()
    print(f'inserted! time taken {b_end-b_start:10.3f} seconds.\n')
    
t_end = time()   
print(f'Completed! Total time taken was {t_end-t_start:10.3f} seconds for {count} batches.')    

inserting batch 1...
inserted! time taken      5.578 seconds.

inserting batch 2...
inserted! time taken      5.313 seconds.

inserting batch 3...
inserted! time taken      5.546 seconds.

inserting batch 4...
inserted! time taken      5.315 seconds.

inserting batch 5...
inserted! time taken      5.394 seconds.

inserting batch 6...
inserted! time taken      5.455 seconds.

inserting batch 7...
inserted! time taken      5.259 seconds.

inserting batch 8...
inserted! time taken      5.214 seconds.

inserting batch 9...
inserted! time taken      5.214 seconds.

inserting batch 10...
inserted! time taken      5.302 seconds.

inserting batch 11...
inserted! time taken      5.673 seconds.

inserting batch 12...
inserted! time taken      5.406 seconds.

inserting batch 13...
inserted! time taken      5.515 seconds.

inserting batch 14...
inserted! time taken      3.564 seconds.

Completed! Total time taken was     73.860 seconds for 14 batches.


## Extra bit

While trying to do the SQL Refresher, there was a need to add a lookup zones table but the file is in ```.csv``` format. 

Let's code to handle both ```.csv``` and ```.paraquet``` files!

In [38]:
from time import time
import pandas as pd 
import pyarrow.parquet as pq
from sqlalchemy import create_engine
import os

In [39]:
url = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv'
# url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet'

file_name = url.rsplit('/', 1)[-1].strip()
file_name

'taxi+_zone_lookup.csv'

In [28]:
os.system(f"wget {url} -O {file_name}")

--2024-08-05 13:24:19--  https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 2600:9000:263a:3a00:b:20a5:b140:21, 2600:9000:263a:1800:b:20a5:b140:21, 2600:9000:263a:7600:b:20a5:b140:21, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:263a:3a00:b:20a5:b140:21|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: 'taxi+_zone_lookup.csv'

     0K .......... ..                                         100% 12.1M=0.001s

2024-08-05 13:24:19 (12.1 MB/s) - 'taxi+_zone_lookup.csv' saved [12331/12331]



0

In [29]:
if '.csv' in file_name:
    print('yay') 
    df = pd.read_csv(file_name, nrows=10)
    df_iter = pd.read_csv(file_name, iterator=True, chunksize=100000)
elif '.parquet' in file_name:
    print('oh yea')
    file = pq.ParquetFile(file_name)
    df = next(file.iter_batches(batch_size=10)).to_pandas()
    df_iter = file.iter_batches(batch_size=100000)
else: 
    print('Error. Only .csv or .parquet files allowed.')
    sys.exit() 

yay


In [40]:
df_zones = pd.read_csv('taxi+_zone_lookup.csv')
df_zones

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone
...,...,...,...,...
260,261,Manhattan,World Trade Center,Yellow Zone
261,262,Manhattan,Yorkville East,Yellow Zone
262,263,Manhattan,Yorkville West,Yellow Zone
263,264,Unknown,,


In [41]:
df_zones.to_sql(name='zones', con=engine, if_exists='replace')

This code is a rough code and seems to be working. The cleaned up version will be in `data-loading-parquet.py` file.