In [88]:
import pandas as pd
from google.cloud import storage
import os, datetime
from sqlalchemy import create_engine

### I) Load data from GCS

In [35]:
def load_data(bucket, key_path):
    dfs = []
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = key_path
    client = storage.Client()
    bucket = client.get_bucket(bucket)
    for month in range(1, 4):
        gcs_path = f'green_taxi/green_tripdata_2021-{month:02d}.parquet'
        temp_path = f'./data/{month:02d}.parquet'
        blob = bucket.blob(gcs_path)
        content = blob.download_to_filename(temp_path)
        df = pd.read_parquet(temp_path)
        dfs.append(df)
    return pd.concat(dfs, ignore_index=True)

In [41]:
df = load_data('mle-batch-and-stream-processing-bucket', './service_account_key.json')

### II) Calculate revenue per day

In [82]:
def transform_revenue_per_day(df):
    columns_to_sum_up = ['fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge']
    df['lpep_dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime']).dt.date
    df_grouped = df.groupby('lpep_dropoff_datetime')[columns_to_sum_up].sum().reset_index()
    return df_grouped

In [83]:
df_grouped = transform_revenue_per_day(df)

In [84]:
df_grouped

Unnamed: 0,lpep_dropoff_datetime,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,2009-01-01,3.00,0.00,0.5,0.00,0.00,0.3,3.80,0.00
1,2021-01-01,21411.12,135.75,286.0,1992.31,621.34,301.8,25072.02,324.50
2,2021-01-02,35064.80,192.00,579.0,3502.09,847.83,565.5,41263.72,566.50
3,2021-01-03,25854.78,146.00,374.5,2650.61,685.39,395.1,30488.63,453.75
4,2021-01-04,58393.96,619.75,699.0,5952.27,1679.51,870.3,68902.04,778.00
...,...,...,...,...,...,...,...,...,...
87,2021-03-28,28411.23,1438.55,419.5,1725.55,800.61,430.5,33792.44,558.25
88,2021-03-29,57334.06,4479.90,616.0,1759.69,1856.02,817.2,67478.62,767.00
89,2021-03-30,61681.80,4605.57,726.0,2250.15,1577.31,893.7,72608.03,959.50
90,2021-03-31,60069.14,4554.60,760.0,2112.80,1581.66,914.7,70792.35,918.50


### III) Write to database

docker run -d -e POSTGRES_USER='postgres' \
    --network=ny-taxi \
    -e POSTGRES_PASSWORD='postgres' \
    -e POSTGRES_DB='ny_taxi' \
    -v $(pwd)/db-data:/var/lib/postgresql/data \
    -p 5432:5432 \
    --name ny-taxi-db \
    postgres

docker exec -it ny-taxi-db psql -U postgres 

In [86]:
def data_ingestion(df, username, password, host, port, db_name):
    engine = create_engine(f'postgresql://{username}:{password}@{host}:{port}/{db_name}')
    df.to_sql('revenue_per_day', engine, if_exists='replace', index=False)

In [92]:
data_ingestion(df_grouped, 'postgres', 'postgres', 'localhost', '5432', 'ny_taxi')