#Using PostgreSql in Python (Psycopg2) to build data pipelines

###What is Psycopg2?
A library that allows Python to set up connection to an existing Postgres database to utilize SQL functionalities.

###Why
- Very useful for scaled data pipelines, pre-cleaning, data exploration
- Allows for dynamic query generation

###Documentations
* http://initd.org/psycopg/docs/install.html

#Objectives

- Learn how to connect to and run Postgres queries from Python
- Understand cursors, executes, and commits
- Learn how to generate dynamic queries

#Key Things to Know

* Connections must be established using an existing database, username, database IP/URL, and maybe passwords
* If you have no created databases, you can connect to Postgres using the dbname 'postgres' to initialize db commands
* Data changes are not actually stored until you choose to commit. This can be done either through commit() or setting autocommit = True.  Until commited, all transactions is only temporary stored.
* SQL connection databases utilizes cursors for data traversal and retrieval.  This is kind of like an iterator in Python.
* Cursor operations typically goes like the following:
    - execute a query
    - fetch rows from query result if it is a SELECT query
    - because it is iterative, previously fetched rows can only be fetched again by rerunning the query
    - close cursor through .close()
* Cursors and Connections must be closed using .close() or else Postgres will lock certain operation on the database/tables to connection is severed. 

#Fun facts

* If you ever need to build similar pipelines for other forms of database, there are libraries such Pyodbc which operates essentially the same
* Autocommit = True is necessary to do database commands like CREATE DATABASE.  This is because Postgres does not have temporary transactions at the database level.

#Installation

In [1]:
!pip install psycopg2



#Creating a connection with Postgres

###Import

In [2]:
import psycopg2 as pg2

###Create connection with Postgres

In [3]:
conn = pg2.connect(dbname='postgres', user='minghuang', host = 'localhost')

# Autocommit is necessary when initializing db commands through the default Postgres connection
conn.autocommit = True

###Retrieve the Cursor

* A cursor is a control structure that enables traversal over the records in a database.  You can think of it as an iterator or pointer for Sql data retrieval.

In [4]:
cur = conn.cursor()

#Create a database

In [5]:
# Commented out just incase students accounters permission issue with creating databases
#cur.execute('ALTER USER minghuang CREATEDB')

In [7]:
# Deletes database if it exists.  This is useful for iterative testing.
cur.execute('DROP DATABASE IF EXISTS test;')

In [8]:
cur.execute('CREATE DATABASE test;')

#Disconnect from the cursor and database

In [9]:
cur.close() # Optional since you can just close the connection
conn.close()

#Lets use our new database

In [10]:
conn = pg2.connect(dbname='test', user='minghuang', host = 'localhost')
cur = conn.cursor()

In [11]:
# Deletes table if it exists.  This is useful for iterative testing.
cur.execute('DROP TABLE IF EXISTS logins;')

###Create a new table

In [12]:
query = '''
        CREATE TABLE logins (
            userid integer
            , tmstmp timestamp
            , type varchar(10)
        );
        '''
cur.execute(query)

###Insert csv into new table

In [13]:
query = '''
        COPY logins 
        FROM '/Users/minghuang/Documents/git/zipfian-dsr/sql-python/mh-lecture/data/logins01.csv' 
        DELIMITER ',' 
        CSV;
        '''
cur.execute(query)

###Lets take a look at the data

In [14]:
query = '''
        SELECT *
        FROM logins
        LIMIT 30;
        '''
cur.execute(query)

###One line at a time

In [15]:
cur.fetchone()

(579, datetime.datetime(2013, 11, 20, 3, 20, 6), 'mobile')

###Many lines at a time

In [16]:
cur.fetchmany(10)

[(823, datetime.datetime(2013, 11, 20, 3, 20, 49), 'web'),
 (953, datetime.datetime(2013, 11, 20, 3, 28, 49), 'web'),
 (612, datetime.datetime(2013, 11, 20, 3, 36, 55), 'web'),
 (269, datetime.datetime(2013, 11, 20, 3, 43, 13), 'web'),
 (799, datetime.datetime(2013, 11, 20, 3, 56, 55), 'web'),
 (890, datetime.datetime(2013, 11, 20, 4, 2, 33), 'mobile'),
 (330, datetime.datetime(2013, 11, 20, 4, 54, 59), 'mobile'),
 (628, datetime.datetime(2013, 11, 20, 4, 57, 22), 'mobile'),
 (398, datetime.datetime(2013, 11, 20, 5, 3, 19), 'mobile'),
 (482, datetime.datetime(2013, 11, 20, 5, 4, 43), 'mobile')]

###Or everything at once

In [17]:
cur.fetchall()

[(581, datetime.datetime(2013, 11, 20, 5, 12, 3), 'mobile'),
 (370, datetime.datetime(2013, 11, 20, 5, 26, 46), 'mobile'),
 (230, datetime.datetime(2013, 11, 20, 5, 28, 29), 'web'),
 (596, datetime.datetime(2013, 11, 20, 5, 28, 36), 'web'),
 (274, datetime.datetime(2013, 11, 20, 5, 43, 8), 'mobile'),
 (581, datetime.datetime(2013, 11, 20, 5, 47, 10), 'web'),
 (417, datetime.datetime(2013, 11, 20, 5, 54, 37), 'mobile'),
 (185, datetime.datetime(2013, 11, 20, 5, 56, 22), 'mobile'),
 (371, datetime.datetime(2013, 11, 20, 5, 58, 35), 'mobile'),
 (133, datetime.datetime(2013, 11, 20, 5, 59, 7), 'web'),
 (621, datetime.datetime(2013, 11, 20, 6, 1, 46), 'web'),
 (306, datetime.datetime(2013, 11, 20, 6, 3, 23), 'mobile'),
 (509, datetime.datetime(2013, 11, 20, 6, 4, 43), 'web'),
 (505, datetime.datetime(2013, 11, 20, 6, 9, 52), 'web'),
 (678, datetime.datetime(2013, 11, 20, 6, 34, 18), 'web'),
 (889, datetime.datetime(2013, 11, 20, 6, 36, 32), 'mobile'),
 (202, datetime.datetime(2013, 11, 20, 

#Dynamic Queries

We have 8 login csv files that we need to insert into the logins table.  Instead of doing a COPY FROM query 8 times, we should utilize Python (or any future languages) to make this more efficient.  This is possible due to tokenized strings.

In [18]:
# os is needed because we want to dynamically identify the files we need to insert.
import os

In [19]:
query = '''
        COPY logins 
        FROM {file_path} 
        DELIMITER ',' 
        CSV;
        '''

folder_path = '/Users/minghuang/Documents/git/zipfian-dsr/sql-python/mh-lecture/data/'

for f in os.listdir(folder_path):
    if f.endswith('.csv') and f != 'logins01.csv':
        file_path = "'{0}'".format(folder_path + f)
        cur.execute(query.format(file_path=file_path))
        print file_path + 'inserted'

'/Users/minghuang/Documents/git/zipfian-dsr/sql-python/mh-lecture/data/logins02.csv'inserted
'/Users/minghuang/Documents/git/zipfian-dsr/sql-python/mh-lecture/data/logins03.csv'inserted
'/Users/minghuang/Documents/git/zipfian-dsr/sql-python/mh-lecture/data/logins04.csv'inserted
'/Users/minghuang/Documents/git/zipfian-dsr/sql-python/mh-lecture/data/logins05.csv'inserted
'/Users/minghuang/Documents/git/zipfian-dsr/sql-python/mh-lecture/data/logins06.csv'inserted
'/Users/minghuang/Documents/git/zipfian-dsr/sql-python/mh-lecture/data/logins07.csv'inserted
'/Users/minghuang/Documents/git/zipfian-dsr/sql-python/mh-lecture/data/logins08.csv'inserted


In [20]:
cur.execute('SELECT count(*) FROM logins;')
cur.fetchall()

[(78588L,)]

###Don't forget to commit your changes

In [21]:
conn.commit()

#All done!  Time for morning exercise!

In [22]:
conn.close()