In [2]:
import pandas as pd

In [2]:
# !pip install sqlalchemy psycopg2 

In [5]:
from sqlalchemy import create_engine

In [6]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [7]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7f1ff690c040>

In [6]:
query = """
select 1 as number;
"""

pd.read_sql(query, con = engine)

Unnamed: 0,number
0,1


In [7]:
#The following is the equivalent of `\\dt` in pgcli. Only, we're using sqlalchemy"
query = """
select *
from pg_catalog.pg_tables
where schemaname != 'pg_catalog' 
and schemaname != 'information_schema';
"""

pd.read_sql(query, con = engine)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity


In [8]:
df = pd.read_csv('yellow_tripdata_2021-01.csv.gz', nrows = 100)

In [9]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [10]:
#schema that would be used to create table. Pandas would use this out put to create teh table.
print(pd.io.sql.get_schema(df, name = 'yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [11]:
#now send df to sql

df.to_sql(name = 'yellow_taxi_data', con = engine, index = False)

100

In [12]:
#Now let's check tables we might have:
query = """
select *
from pg_catalog.pg_tables
where schemaname != 'pg_catalog' 
and schemaname != 'information_schema';
"""

pd.read_sql(query, con = engine)

#see tablename  we've just created? we can now also query this table

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
0,public,yellow_taxi_data,root,,False,False,False,False


In [13]:
query = """
select * from yellow_taxi_data limit 10;
"""

pd.read_sql(query, con = engine)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5
5,1,2021-01-01 00:16:29,2021-01-01 00:24:30,1,1.6,1,N,224,68,1,8.0,3.0,0.5,2.35,0.0,0.3,14.15,2.5
6,1,2021-01-01 00:00:28,2021-01-01 00:17:28,1,4.1,1,N,95,157,2,16.0,0.5,0.5,0.0,0.0,0.3,17.3,0.0
7,1,2021-01-01 00:12:29,2021-01-01 00:30:34,1,5.7,1,N,90,40,2,18.0,3.0,0.5,0.0,0.0,0.3,21.8,2.5
8,1,2021-01-01 00:39:16,2021-01-01 01:00:13,1,9.1,1,N,97,129,4,27.5,0.5,0.5,0.0,0.0,0.3,28.8,0.0
9,1,2021-01-01 00:26:12,2021-01-01 00:39:46,2,2.7,1,N,263,142,1,12.0,3.0,0.5,3.15,0.0,0.3,18.95,2.5


In [14]:
#df is now not a dataframe, it's an iterator. We have to use this method as we can't add 1300000 rows to db all at once
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv.gz', iterator = True, chunksize=100000)

In [15]:
#see file type?
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x7fe5190434f0>

In [16]:
#to make it a df. This will only return one iteration for now though. See len(df)
df = next(df_iter)
len(df)

100000

In [17]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [18]:
#we will now use the schema-creation code to create oour table. First we will just create column names df.head(n=0)

In [19]:
df.head(0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [20]:
#note minor differences in creation of column names and inserting data to the table (chunks- update)
df.head(0).to_sql(name= 'yellow_taxi_data', con = engine, if_exists = 'replace')

0

In [21]:
query = """
select *
from pg_catalog.pg_tables
where schemaname != 'pg_catalog' 
and schemaname != 'information_schema';
"""

pd.read_sql(query, con = engine)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
0,public,yellow_taxi_data,root,,True,False,False,False


In [22]:
query = """
select * from yellow_taxi_data;
"""

pd.read_sql(query, con = engine)

Unnamed: 0,index,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [23]:
#now we will update our table with information in the chunks. Removing df.head(0) - note `append`
%time df.to_sql(name= 'yellow_taxi_data', con = engine, if_exists = 'append')

CPU times: user 6.35 s, sys: 141 ms, total: 6.49 s
Wall time: 9.86 s


1000

In [24]:
query = """
select count(*) from yellow_taxi_data;
"""

pd.read_sql(query, con = engine)

Unnamed: 0,count
0,100000


In [25]:
#appended one chunk of 100000. now we need to do it iteratively. 

In [30]:
from time import time

In [31]:
while True:
    try:
        t_start = time()

        df = next(df_iter)

        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

        df.to_sql(name= 'yellow_taxi_data', con = engine, if_exists = 'append') #adds data to the table as chunks because inside while loop

        t_end = time()

        duration = t_end - t_start

        print('inserted another chunk... this chunk took %.3f seconds' % (duration))
    
    except StopIteration:
        print('finished inserting all chunks.')
        break

inserted another chunk... this chunk took 9.861 seconds
inserted another chunk... this chunk took 9.942 seconds
inserted another chunk... this chunk took 10.143 seconds
inserted another chunk... this chunk took 9.926 seconds
inserted another chunk... this chunk took 10.112 seconds
inserted another chunk... this chunk took 9.891 seconds
inserted another chunk... this chunk took 10.482 seconds
inserted another chunk... this chunk took 10.020 seconds
inserted another chunk... this chunk took 9.873 seconds


  df = next(df_iter)


inserted another chunk... this chunk took 9.831 seconds
inserted another chunk... this chunk took 6.349 seconds
finished inserting all chunks.


In [9]:
#verify that we've added all rows to our database:
query = """
select count(*) from yellow_taxi_data;
"""

pd.read_sql(query, con = engine)

Unnamed: 0,count
0,1369765


In [11]:
#we can also do a bit of eda
query = """
select max(tpep_pickup_datetime) as pick_up_max, min(tpep_pickup_datetime) as pickup_min, max(total_amount) as total_amount_max
from yellow_taxi_data;
"""

pd.read_sql(query, con = engine)

Unnamed: 0,pick_up_max,pickup_min,total_amount_max
0,2021-02-22 16:52:16,2008-12-31 23:05:14,7661.28


In [None]:
#We will now use PgAdmin to connect to Postgres and interact with it. 

 # docker run -it \
 # -e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
 # -e PGADMIN_DEFAULT_PASSWORD="root" \
 # -p 8080:80 \
 # dpage/pgadmin4


#Netwrk create to have PgAdmin and Postgres containers running in one container

docker network create pg-network

docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
-v $(pwd)/ny_taxi_postgres_data:/var/lib/postgresql/data \
-p 5432:5432 \
--network=pg-network \
--name pg-database \
  postgres:13

 docker run -it \
 -e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
 -e PGADMIN_DEFAULT_PASSWORD="root" \
 -p 8080:80 \
 --network=pg-network \
 --name pgadmin-2
 dpage/pgadmin4

# on your local machine's browser, type localhost:8080 and put in in the credentials supplied when initialising pgAdmin
#... email: admin@admin.com; password: root 
#right click and register server

# Name: Docker localhost
# Port: prepopulated from postgres config (5432)
# Maintenance database: Prepopulated (postgres)
# Password: root
# Username: root
