In [33]:
from warnings import simplefilter
simplefilter('ignore')

In [None]:
import sys
import datetime
import psycopg2

import datetime
import pandas as pd
import sqlalchemy

import datetime
import psycopg2

from dataclasses import dataclass
import os
import pandas as pd

import io
import datetime
import psycopg2

In [1]:
def connect_to_postgres(user, password, host, port, database):
    """This function connects to a PostgreSQL database.
    Args:
        user (str): user name.
        password (str): password.
        host (str): host name.
        port (str | int): port number.
        database (str): database name.

    Returns:
        conn (object): pyscopg2 connection to the PostgreSQL database.
    """

    conn = None

    try:
        # connect to the PostgreSQL database
        print(
            f">> {datetime.datetime.now()} | [ CONN ] | Connecting to PostgreSQL database..."
        )

        conn = psycopg2.connect(
            dbname=database,
            user=user,
            password=password,
            host=host,
            port=port,
        )

        print(
            f">> {datetime.datetime.now()} | [ CONN ] | Connection to PostgreSQL database successful."
        )
        return conn

    except (Exception, psycopg2.DatabaseError) as error:
        print(f"Error: {error}")
        sys.exit(1)

In [2]:


def create_table(df, user, password, host, port, database, table):
    """This function creates a table in a PostgreSQL database.
    Args:
        conn (object): connection to the database.
        table (str): name of the table to be created.
    Returns:
        None
    """
    df = df.copy()

    engine = sqlalchemy.create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}")

    try:
        df.head(n=0).to_sql(name = table, con = engine, if_exists = 'replace', index = False)
        print(f">> {datetime.datetime.now()} | [ ETL ] | {table} table created.")

    except (Exception) as error:
        print(f"Error: {error}")
        return 1


In [3]:



def drop_table(conn, table):
    """This function drops a table from a PostgreSQL database.
    Args:
        conn (object): connection to the database.
        table (str): name of the table to be dropped.
    Returns:
        None
    """
    cursor = conn.cursor()

    try:
        cursor.execute(f"DROP TABLE IF EXISTS {table};")
        conn.commit()

    except (Exception, psycopg2.DatabaseError) as error:
        print(f"Error: {error}")
        conn.rollback()
        cursor.close()
        return 1

    print(f">> {datetime.datetime.now()} | [ ETL ] | {table} table dropped.")
    cursor.close()

In [4]:


@dataclass
class Extract:
    url: str
    parquet_file_name: str = "output.parquet"
    csv_file_name: str = "output.csv"
    df = None
    n_rows = None

    def download_parquet(self):
        """ Download the parquet file and converts and stores it to csv. """
        os.system(f"wget {self.url} -O {self.parquet_file_name}")

    def get_pandas_df(self):
        """ Load the pandas dataframe into memmory and update n_rows attribute. """
        self.df = pd.read_parquet(self.parquet_file_name)
        self.n_rows = self.df.shape[0]
        return self.df, self.n_rows

    def convert_parquet_to_csv(self):
        """ Convert the parquet file to csv. """
        self.df.to_csv(self.csv_file_name)

In [19]:


def copy_from_stringio(df, conn, table):
    """This function copies data from a pandas dataframe to a PostgreSQL table.
    Args:
        df (pandas.DataFrame): dataframe with the data to be copied.
        conn (object): connection to the database.
        table (str): name of the table to be populated.

    Returns:
        None
    """
    buffer = io.StringIO()
    df.to_csv(buffer, index=False, header=False)
    buffer.seek(0)

    cursor = conn.cursor()

    try:
        # copy_expert method allows to use COPY FROM STDIN
        cursor.copy_expert(f"COPY {table} FROM STDIN WITH (FORMAT CSV)", buffer)
        conn.commit()

    except (Exception, psycopg2.DatabaseError) as error:
        print(f"Error: {error}")
        conn.rollback()
        cursor.close()
        return 1
    print(f">> {datetime.datetime.now()} | [ ETL ] | {table} table populated.")

    cursor.close()

def copy_from_pandas(df, conn, table):
    """This function copies data from a pandas dataframe to a PostgreSQL table.
    Args:
        df (pandas.DataFrame): dataframe with the data to be copied.
        conn (object): connection to the database.
        table (str): name of the table to be populated.

    Returns:
        None
    """

    df.to_sql(name = table, con = conn, if_exists = 'append')
    print(f">> {datetime.datetime.now()} | [ ETL ] | {table} table populated.")


In [20]:
# from typing import Union
# import pandas as pd

# def to_datetime(df, columns: Union[str, list[str]]) -> pd.DataFrame:
#     """This function converts a column to datetime.
#     Args:
#         df (pandas.DataFrame): dataframe with the data.
#         columns (str or list[str]): name of the column to be converted.
#     Returns:
#         df (pandas.DataFrame): dataframe with the data.
#     """
#     if isinstance(columns, str):
#         columns = [columns]

#     for column in columns:
#         df[column] = pd.to_datetime(df[column])

#     return df

In [21]:
conn = connect_to_postgres(user='root', password='root', host='localhost', database='nyc_taxi', port=5432)

>> 2022-08-12 16:53:16.959542 | [ CONN ] | Connecting to PostgreSQL database...
>> 2022-08-12 16:53:17.079940 | [ CONN ] | Connection to PostgreSQL database successful.


In [22]:
drop_table(conn, 'nyc_tripdata_2021_01')

>> 2022-08-12 16:53:17.115631 | [ ETL ] | nyc_tripdata_2021_01 table dropped.


In [23]:
URL="https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet"
extract = Extract(URL)

In [24]:
extract.download_parquet() # output.parquet

--2022-08-12 16:53:17--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.8.245.171, 65.8.245.51, 65.8.245.50, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.8.245.171|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 21686067 (21M) [application/x-www-form-urlencoded]
Saving to: ‘output.parquet’

     0K .......... .......... .......... .......... ..........  0%  345K 61s
    50K .......... .......... .......... .......... ..........  0%  428K 55s
   100K .......... .......... .......... .......... ..........  0% 1022K 44s
   150K .......... .......... .......... .......... ..........  0% 1.06M 37s
   200K .......... .......... .......... .......... ..........  1% 1002K 34s
   250K .......... .......... .......... .......... ..........  1% 1.58M 30s
   300K .......... .......... .......... .......... ..........  1% 1.71

  7750K .......... .......... .......... .......... .......... 36% 33.0M 2s
  7800K .......... .......... .......... .......... .......... 37% 13.5M 2s
  7850K .......... .......... .......... .......... .......... 37% 96.3M 2s
  7900K .......... .......... .......... .......... .......... 37% 50.1M 2s
  7950K .......... .......... .......... .......... .......... 37% 95.9M 2s
  8000K .......... .......... .......... .......... .......... 38% 59.3M 2s
  8050K .......... .......... .......... .......... .......... 38% 32.7M 2s
  8100K .......... .......... .......... .......... .......... 38% 49.7M 2s
  8150K .......... .......... .......... .......... .......... 38% 21.7M 2s
  8200K .......... .......... .......... .......... .......... 38% 16.1M 2s
  8250K .......... .......... .......... .......... .......... 39%  120M 2s
  8300K .......... .......... .......... .......... .......... 39% 35.6M 2s
  8350K .......... .......... .......... .......... .......... 39% 61.5M 2s
  8400K ....

 13650K .......... .......... .......... .......... .......... 64% 9.00M 1s
 13700K .......... .......... .......... .......... .......... 64% 6.98M 1s
 13750K .......... .......... .......... .......... .......... 65% 48.4M 1s
 13800K .......... .......... .......... .......... .......... 65% 30.1M 1s
 13850K .......... .......... .......... .......... .......... 65% 72.2M 1s
 13900K .......... .......... .......... .......... .......... 65% 98.6M 1s
 13950K .......... .......... .......... .......... .......... 66% 79.7M 1s
 14000K .......... .......... .......... .......... .......... 66% 73.4M 1s
 14050K .......... .......... .......... .......... .......... 66% 50.0M 1s
 14100K .......... .......... .......... .......... .......... 66% 6.32M 1s
 14150K .......... .......... .......... .......... .......... 67%  189M 1s
 14200K .......... .......... .......... .......... .......... 67% 18.5M 1s
 14250K .......... .......... .......... .......... .......... 67%  452M 1s
 14300K ....

 19400K .......... .......... .......... .......... .......... 91% 19.8M 0s
 19450K .......... .......... .......... .......... .......... 92% 96.3M 0s
 19500K .......... .......... .......... .......... .......... 92%  425M 0s
 19550K .......... .......... .......... .......... .......... 92% 6.87M 0s
 19600K .......... .......... .......... .......... .......... 92% 42.2M 0s
 19650K .......... .......... .......... .......... .......... 93%  483M 0s
 19700K .......... .......... .......... .......... .......... 93%  132M 0s
 19750K .......... .......... .......... .......... .......... 93% 58.0M 0s
 19800K .......... .......... .......... .......... .......... 93%  488M 0s
 19850K .......... .......... .......... .......... .......... 93% 58.7M 0s
 19900K .......... .......... .......... .......... .......... 94% 21.6M 0s
 19950K .......... .......... .......... .......... .......... 94% 56.5M 0s
 20000K .......... .......... .......... .......... .......... 94% 16.7M 0s
 20050K ....

In [25]:
df, n_rows = extract.get_pandas_df()

In [26]:
display(df.head(5))
display(df.tail(5))

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1369764,2,2021-01-31 23:03:00,2021-01-31 23:33:00,,8.89,,,229,181,0,27.78,0.0,0.5,7.46,0.0,0.3,38.54,,
1369765,2,2021-01-31 23:29:00,2021-01-31 23:51:00,,7.43,,,41,70,0,32.58,0.0,0.5,0.0,6.12,0.3,39.5,,
1369766,2,2021-01-31 23:25:00,2021-01-31 23:38:00,,6.26,,,74,137,0,16.85,0.0,0.5,3.9,0.0,0.3,24.05,,
1369767,6,2021-01-31 23:01:06,2021-02-01 00:02:03,,19.7,,,265,188,0,53.68,0.0,0.5,0.0,0.0,0.3,54.48,,
1369768,2,2021-01-31 23:08:29,2021-01-31 23:31:22,,4.68,,,89,61,0,25.45,2.75,0.5,0.0,0.0,0.3,29.0,,


In [27]:
n_rows

1369769

In [28]:
extract.convert_parquet_to_csv() # output.csv

In [29]:
!ls -la

total 338312
drwxr-xr-x   8 renan  staff        256 Aug 12 16:53 [1m[36m.[m[m
drwxr-xr-x  18 renan  staff        576 Aug 12 15:50 [1m[36m..[m[m
drwxr-xr-x   3 renan  staff         96 Aug 12 16:48 [1m[36m.ipynb_checkpoints[m[m
-rw-r--r--   1 renan  staff      95245 Aug 12 15:30 class_Extract_sandbox.ipynb
-rw-r--r--   1 renan  staff      14759 Aug 12 13:24 ingest_data.ipynb
-rw-r--r--   1 renan  staff  148065274 Aug 12 16:53 output.csv
-rw-r--r--   1 renan  staff   21686067 Jun 30 00:16 output.parquet
-rw-r--r--   1 renan  staff      69857 Aug 12 16:53 run_etl_sandbox.ipynb


In [30]:
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

In [31]:
create_table(df, 'root', 'root', 'localhost', 5432, 'nyc_taxi', 'yellow_tripdata_2021_01')

>> 2022-08-12 16:53:34.701948 | [ ETL ] | yellow_tripdata_2021_01 table created.


In [32]:
copy_from_stringio(df = df, conn = conn, table='yellow_tripdata_2021_01')

>> 2022-08-12 16:55:28.219197 | [ ETL ] | yellow_tripdata_2021_01 table populated.


In [None]:
# load = LoadData(conn=conn, df=df, table='yellow_tripdata_2021_01')

In [None]:
# load.copy_from_stringio()

In [None]:
from typing import Union
import pandas as pd

def to_datetime(df, columns: Union[str, list]) -> pd.DataFrame:
    """This function converts a column to datetime.
    Args:
        df (pandas.DataFrame): dataframe with the data.
        columns (str or list[str]): name of the column to be converted.
    Returns:
        df (pandas.DataFrame): dataframe with the data.
    """
    if isinstance(columns, str):
        columns = [columns]

    for column in columns:
        df[column] = pd.to_datetime(df[column])

    return df

In [None]:
df.head()

In [None]:
to_datetime(pd.read_csv('../output.csv', nrows=10), columns=['tpep_pickup_datetime', 'tpep_dropoff_datetime']).info()