In [1]:
#Install required libraries
pip install pandas numpy psycopg2-binary sqlalchemy

Note: you may need to restart the kernel to use updated packages.


In [2]:
#Importing all required libraries
import pandas as pd
import numpy as np
import time
import psycopg2
from psycopg2 import OperationalError, errorcodes, errors
import psycopg2.extras as extras
from sqlalchemy import create_engine

In [19]:
#Create a dataframe of 15 columns and 10 million rows with random numbers and strings. Export it to CSV format which comes around ~1 GB in size.
df = pd.DataFrame(data=np.random.randint(99999, 99999999, size=(10000000,14)),columns=['C1','C2','C3','C4','C5','C6','C7','C8','C9','C10','C11','C12','C13','C14'])
df['C15'] = pd.util.testing.rands_array(5,10000000)
df.to_csv("huge_data.csv")

AttributeError: module 'pandas.testing' has no attribute 'rands_array'

In [4]:
start = time.time()
#read data in chunks of 10000 rows at a time. This will reduce time and ram. 
#The chunksize can be increased if there is more ram to improve performance time or decreased if the ram restrictions still apply.
df = pd.read_csv('huge_data.csv',chunksize=10000)
end = time.time()
print("Time taken to read in chunks: ",(end-start),"sec")
pd_df = pd.concat(df)

Time taken to read in chunks:  0.26674938201904297 sec


In [5]:
# Define a function that handles and parses psycopg2 exceptions
def show_psycopg2_exception(err):
    # get details about the exception
    err_type, err_obj, traceback = sys.exc_info()    
    # get the line number when exception occured
    line_n = traceback.tb_lineno    
    # print the connect() error
    print ("\npsycopg2 ERROR:", err, "on line number:", line_n)
    print ("psycopg2 traceback:", traceback, "-- type:", err_type) 
    # psycopg2 extensions.Diagnostics object attribute
    print ("\nextensions.Diagnostics:", err.diag)    
    # print the pgcode and pgerror exceptions
    print ("pgerror:", err.pgerror)
    print ("pgcode:", err.pgcode, "\n")

In [20]:
# Postgres username, password, and database name
POSTGRES_ADDRESS = 'localhost'
POSTGRES_PORT = '5432'
POSTGRES_USERNAME = 'postgres'
POSTGRES_PASSWORD = 'kevin' 
POSTGRES_DBNAME = 'postgres'
# A long string that contains the necessary Postgres login information
postgres_str = ('postgresql://{username}:{password}@{ipaddress}:{port}/{dbname}'
                .format(username=POSTGRES_USERNAME,
                        password=POSTGRES_PASSWORD,
                        ipaddress=POSTGRES_ADDRESS,
                        port=POSTGRES_PORT,
                        dbname=POSTGRES_DBNAME))
# Create the connection to postgres db
try:
    print('Connecting to the PostgreSQL...........')
    cnx = create_engine(postgres_str)
    print("Connection successfully..................")
except OperationalError as err:
    # passing exception to function
    show_psycopg2_exception(err)        
    # set the connection to 'None' in case of error

Connecting to the PostgreSQL...........
Connection successfully..................


In [7]:
start_time = time.time()
try:
    #using connection created, push the data in public.test_od in chunksizes of 10000. 
    #This can be adjusted too similar to the import from csv to improve performance time or to adjust for ram restrictions
    pd_df.to_sql('public.test_od', con=cnx, if_exists='replace', index=False, chunksize=100000)
    print("to_sql duration: {} seconds".format(time.time() - start_time))
except OperationalError as err:
    # passing exception to function
    show_psycopg2_exception(err)

to_sql duration: 647.5341262817383 seconds


In [22]:
conn = psycopg2.connect(
   database=POSTGRES_DBNAME, user=POSTGRES_USERNAME, password=POSTGRES_PASSWORD, host=POSTGRES_ADDRESS, port= POSTGRES_PORT
)
#1) Cities frequently visited?

sql='''select v.city_id_visited,
            trim(c.city_name) as city_name,
            count(distinct customer_id) as count 
            from visits v 
            join city c on c.city_id=v.city_id_visited 
            group by v.city_id_visited,c.city_name 
            having count(distinct customer_id)>1'''
cursor.execute(sql)
output1=cursor.fetchall()
print("Cities with more than 1 visits:")
for row in output1:
    print(row[1])

Cities with more than 1 visits:
SFO
