In [1]:
import os
import pandas as pd

def read_parquet_files(directory):
    # Get a list of all files in the directory
    all_files = os.listdir(directory)
    
    # Filter out only the parquet files
    parquet_files = [file for file in all_files if file.endswith('.parquet')]

    # Iterate through each parquet file
    for file in parquet_files:
        # Construct the full path to the file
        file_path = os.path.join(directory, file)
        
        # Read the parquet file into a DataFrame and yield it
        yield pd.read_parquet(file_path)

# Now you can use pd.concat() to combine all DataFrames
combined_df = pd.concat(read_parquet_files(r'D:\data_zoomcamp_2024\week-3\green_data_2022'), ignore_index=True)


In [2]:
combined_df

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2022-01-01 00:14:21,2022-01-01 00:15:33,N,1.0,42,42,1.0,0.44,3.50,0.50,0.5,0.00,0.0,,0.3,4.80,2.0,1.0,0.00
1,1,2022-01-01 00:20:55,2022-01-01 00:29:38,N,1.0,116,41,1.0,2.10,9.50,0.50,0.5,0.00,0.0,,0.3,10.80,2.0,1.0,0.00
2,1,2022-01-01 00:57:02,2022-01-01 01:13:14,N,1.0,41,140,1.0,3.70,14.50,3.25,0.5,4.60,0.0,,0.3,23.15,1.0,1.0,2.75
3,2,2022-01-01 00:07:42,2022-01-01 00:15:57,N,1.0,181,181,1.0,1.69,8.00,0.50,0.5,0.00,0.0,,0.3,9.30,2.0,1.0,0.00
4,2,2022-01-01 00:07:50,2022-01-01 00:28:52,N,1.0,33,170,1.0,6.26,22.00,0.50,0.5,5.21,0.0,,0.3,31.26,1.0,1.0,2.75
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
840397,2,2022-12-31 23:33:00,2022-12-31 23:49:00,,,33,79,,3.53,20.72,0.00,0.0,4.89,0.0,,1.0,29.36,,,
840398,2,2022-12-31 23:29:00,2023-01-01 00:01:00,,,49,197,,8.94,39.01,0.00,0.0,8.00,0.0,,1.0,48.01,,,
840399,2,2022-12-31 23:05:00,2022-12-31 23:12:00,,,54,181,,1.27,12.11,0.00,0.0,2.62,0.0,,1.0,15.73,,,
840400,2,2022-12-31 23:03:00,2022-12-31 23:18:00,,,7,129,,2.23,15.51,0.00,0.0,3.30,0.0,,1.0,19.81,,,


In [32]:
print(len(combined_df))
print(combined_df.shape[0]) #for incresing performance use this


840402
840402


In [25]:
print(pd.io.sql.get_schema(combined_df,name='green_taxi_data'))

CREATE TABLE "green_taxi_data" (
"VendorID" INTEGER,
  "lpep_pickup_datetime" TIMESTAMP,
  "lpep_dropoff_datetime" TIMESTAMP,
  "store_and_fwd_flag" TEXT,
  "RatecodeID" REAL,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "ehail_fee" TEXT,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "payment_type" REAL,
  "trip_type" REAL,
  "congestion_surcharge" REAL
)


In [26]:
combined_df.head(0)

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge


In [23]:
#connect to sql
db_url = 'postgresql://postgres:root@127.0.0.1:5432/ny_taxi'

In [24]:
from sqlalchemy import create_engine
engine =  create_engine(db_url)
engine.connect()

<sqlalchemy.engine.base.Connection at 0x23ff84e9d50>

In [27]:
combined_df.head(0).to_sql(name = 'green_taxi_data_2022',con = engine,if_exists='replace')

0

In [28]:
# Insert data from combined_df into the 'green_taxi_data' table
import logging

# Set up logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Log a message before uploading data
logging.info("Starting data upload to SQL database...")

# Upload the DataFrame to SQL database
combined_df.to_sql('green_taxi_data_2022', con=engine, index=False, if_exists='append')

# Log a message after uploading data
logging.info("Data upload completed successfully.")

2024-02-10 18:15:50,277 - INFO - Starting data upload to SQL database...
2024-02-10 18:17:12,135 - INFO - Data upload completed successfully.


In [34]:
# dropping the tabe and again add by proper log info and upload the data by chunks

# Define the table name to drop
table_name = 'green_taxi_data_2022'

# Construct the SQL DROP TABLE statement
drop_table_sql = f"DROP TABLE IF EXISTS {table_name}"

# Execute the DROP TABLE statement
engine.connect().execute(drop_table_sql)

  engine.connect().execute(drop_table_sql)


<sqlalchemy.engine.cursor.LegacyCursorResult at 0x23faf5aa550>

In [None]:
#again create the table schema
combined_df.head(0).to_sql(name = 'green_taxi_data_2022',con = engine,if_exists='replace')

In [None]:
import logging

# Set up logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Log a message before uploading data
logging.info("Starting data upload to SQL database...")

# Define chunk size
chunk_size = 1000

# Upload the DataFrame to SQL database in chunks
for i in range(0, len(combined_df), chunk_size):
    chunk = combined_df.iloc[i:i+chunk_size]
    chunk.to_sql('green_taxi_data_2022', con=engine, index=False, if_exists='append')
    
    # Log information about the current chunk being uploaded
    logging.info(f"Uploaded chunk {i}-{i+chunk_size-1} out of {len(combined_df)}")

# Log a message after uploading data
logging.info("Data upload completed successfully.")
