# More psycopg2

## Init

In [2]:
import os
import psycopg2
import psycopg2.pool
from dotenv import load_dotenv
load_dotenv()

DATABASE_URL = os.getenv("DATABASE_URL")

from prettyprinter import pprint

In [3]:
#connection pool hack
from contextlib import contextmanager
@contextmanager
def withconn(self, key=None):
    try:
        conn = self.getconn(key)
        with conn:
            yield conn
    except:
        raise
    finally:
        self.putconn(conn, key)
        
psycopg2.pool.ThreadedConnectionPool.withconn = withconn

In [4]:
#psycopg2.pool.ThreadedConnectionPool(min_connection, max_connection, database_url)
conn_pool = psycopg2.pool.ThreadedConnectionPool(0, 10, DATABASE_URL)

In [12]:
def fetch_table_names():
    with conn_pool.withconn() as conn, conn.cursor() as cur:
        cur.execute("""
            SELECT
                *
            FROM
                pg_catalog.pg_tables
            WHERE
                schemaname != 'pg_catalog'
            AND schemaname != 'information_schema';
        """)
        table_names = [table[1] for table in cur.fetchall()]
        return table_names

In [16]:
def get_table_structure(table_name):
    with conn_pool.withconn() as conn, conn.cursor() as cur:
        cur.execute("""
            SELECT column_name, data_type, character_maximum_length
            FROM INFORMATION_SCHEMA.COLUMNS 
            WHERE table_name = %s;
        """, (table_name,)
        )
        return cur.fetchall()

In [17]:
def fetch_table(table_name):
    with conn_pool.withconn() as conn, conn.cursor() as cur:
        cur.execute("""
            SELECT * FROM test
        """)
        return cur.fetchall()

## Execute Many/Batch/Bulk

In [10]:
def delete_all(table_name):
    with conn_pool.withconn() as conn, conn.cursor() as cur:
        cur.execute("""
            DELETE FROM %s
        """ % (table_name,))
        return cur.rowcount
    
print("Rowcount: " + str(delete_all("test")))
pprint(fetch_table("test"))

Rowcount: 1


NameError: name 'fetch_table' is not defined

### executemany

psycopg2 provides executemany to perform a query's execution with bulk data. However, executemany is not faster than a normal for loop.

In [30]:
with conn_pool.withconn() as conn, conn.cursor() as cur:
    cur.executemany("""
        INSERT INTO test(id, num, data) VALUES(%s, %s, %s)
    """, [
          (3, 4, "executemany1"),
          (4, 5, "executemany2")
    ])
    print("Rowcount: " + str(cur.rowcount))
    
pprint(fetch_table("test"))
delete_all("test")

Rowcount: 2
[(3, 4, 'executemany1'), (4, 5, 'executemany2')]


2

### execute_batch

execute_batch is faster than execute_many because it actually combines the multiple executions. However, it doesn't return accurate number of rows.

In [31]:
from psycopg2.extras import execute_batch
with conn_pool.withconn() as conn, conn.cursor() as cur:
    execute_batch(cur, """
        INSERT INTO test(id, num, data) VALUES(%s, %s, %s)
    """, [
          (3, 4, "execute_batch1"),
          (4, 5, "execute_batch2")
    ])
    print("Rowcount: " + str(cur.rowcount))
    
pprint(fetch_table("test"))
delete_all("test")

Rowcount: 1
[(3, 4, 'execute_batch1'), (4, 5, 'execute_batch2')]


2

### execute_values

execute_values is supposed to be the fastest of all. However, it's usage is quite different. This is mainly only for INSERT, but it can also be used for UPDATE with some changes. Unlike execute_batch, it returns correct number of rows.

In [48]:
from psycopg2.extras import execute_values
with conn_pool.withconn() as conn, conn.cursor() as cur:
    execute_values(cur, """
        INSERT INTO test(id, num, data) VALUES %s
    """, [
          (3, 4, "execute_values1"),
          (4, 5, "execute_values2")
    ])
    print("Rowcount: " + str(cur.rowcount))
    
pprint(fetch_table("test"))

Rowcount: 2
[(3, 4, 'execute_values1'), (4, 5, 'execute_values2')]


In [36]:
with conn_pool.withconn() as conn, conn.cursor() as cur:
    execute_values(cur, """
        UPDATE test 
        SET data=newdata.data 
        FROM (VALUES %s) AS newdata(id, data)
        WHERE test.id=newdata.id
    """, [
          (3, "execute_values_update1"),
          (4, "execute_values_update2")
    ])
    print("Rowcount: " + str(cur.rowcount))
    
pprint(fetch_table("test"))

Rowcount: 2
[(3, 4, 'execute_values_update1'), (4, 5, 'execute_values_update2')]


In [49]:
delete_all("test")

2

### mogrify

mogrify is a function to parametrize a query without executing it. The mogrify strategy takes advantage of SQL's ability to execute many inserts in one query. This is truly the fastest of all, but this is only applicable for insert. Aside that, the string returned from mogrify is a bytes string, so you need to decode it back. Due to using simple insert statement, it returns correct row count.

In [45]:
with conn_pool.withconn() as conn, conn.cursor() as cur:
    args = [
        (3, 4, "mogrify1"),
        (4, 5, "mogrify2")
    ]
    args_str = ','.join(cur.mogrify("(%s,%s,%s)", x).decode('utf-8') for x in args)
    cur.execute("""
        INSERT INTO test VALUES %s
    """ % (args_str,))
    print("Rowcount: " + str(cur.rowcount))
    
pprint(fetch_table("test"))
delete_all("test")

Rowcount: 2
[(3, 4, 'mogrify1'), (4, 5, 'mogrify2')]


2

## Insert Returning

RETURNING clause can be used on INSERT query to return the auto increment PK.

In [47]:
with conn_pool.withconn() as conn, conn.cursor() as cur:
    cur.execute("""
        INSERT INTO test(id, num, data) 
        VALUES(%s, %s, %s)
        RETURNING id
    """, (1, 2, "hello"))
    print("Rowcount: " + str(cur.rowcount))
    id, = cur.fetchone()
    print("Inserted ID: " + str(id))
    
pprint(fetch_table("test"))
delete_all("test")

Rowcount: 1
Inserted ID: 1
[(1, 2, 'hello')]


1

Now, does it work with bulk insert?

### executemany

It doesn't work with executemany even though it returns the correct row count.

In [59]:
with conn_pool.withconn() as conn, conn.cursor() as cur:
    cur.executemany("""
        INSERT INTO test(id, num, data) 
        VALUES(%s, %s, %s)
        RETURNING id
    """, [
          (3, 4, "executemany1"),
          (4, 5, "executemany2")
    ])
    print("Rowcount: " + str(cur.rowcount))
    pprint(cur.fetchall())
    
pprint(fetch_table("test"))
delete_all("test")

Rowcount: 2


ProgrammingError: no results to fetch

### execute_batch

execute_batch doesn't even return the correct row count, so of course it won't work.

In [51]:
from psycopg2.extras import execute_batch
with conn_pool.withconn() as conn, conn.cursor() as cur:
    execute_batch(cur, """
        INSERT INTO test(id, num, data) 
        VALUES(%s, %s, %s)
        RETURNING id
    """, [
          (3, 4, "execute_batch1"),
          (4, 5, "execute_batch2")
    ])
    print("Rowcount: " + str(cur.rowcount))
    pprint(cur.fetchall())
    
pprint(fetch_table("test"))
delete_all("test")

Rowcount: 1
[(4,)]
[
    (1, 2, 'hello'),
    (2, 3, 'bye'),
    (3, 4, 'execute_batch1'),
    (4, 5, 'execute_batch2')
]


4

### execute_values

It works with execute_values.

In [57]:
with conn_pool.withconn() as conn, conn.cursor() as cur:
    execute_values(cur, """
        INSERT INTO test(id, num, data) 
        VALUES %s
        RETURNING id
    """, [
          (3, 4, "execute_values1"),
          (4, 5, "execute_values2")
    ])
    print("Rowcount: " + str(cur.rowcount))
    pprint(cur.fetchall())
    
pprint(fetch_table("test"))
delete_all("test")

Rowcount: 2
[(3,), (4,)]
[(3, 4, 'execute_values1'), (4, 5, 'execute_values2')]


2

### mogrify

Obviously it works with mogrify, since it's just a simple insert.

In [58]:
with conn_pool.withconn() as conn, conn.cursor() as cur:
    args = [
        (3, 4, "mogrify1"),
        (4, 5, "mogrify2")
    ]
    args_str = ','.join(cur.mogrify("(%s,%s,%s)", x).decode('utf-8') for x in args)
    cur.execute("""
        INSERT INTO test 
        VALUES %s
        RETURNING id
    """ % (args_str,))
    print("Rowcount: " + str(cur.rowcount))
    pprint(cur.fetchall())
    
pprint(fetch_table("test"))
delete_all("test")

Rowcount: 2
[(3,), (4,)]
[(3, 4, 'mogrify1'), (4, 5, 'mogrify2')]


2

## Insert On Conflict

### INSERT ON CONFLICT DO NOTHING

This is just like normal insert, except it doesn't throw any error when having conflicts, for example when PK value already exists. I will be doing two identical inserts. Notice the row count.

In [60]:
with conn_pool.withconn() as conn, conn.cursor() as cur:
    cur.execute("""
        INSERT INTO test(id, num, data) 
        VALUES(%s, %s, %s)
        ON CONFLICT DO NOTHING
    """, (2, 3, "bye"))
    print("Rowcount: " + str(cur.rowcount))
    cur.execute("""
        INSERT INTO test(id, num, data) 
        VALUES(%s, %s, %s)
        ON CONFLICT DO NOTHING
    """, (2, 3, "bye"))
    print("Rowcount: " + str(cur.rowcount))
    
pprint(fetch_table("test"))
delete_all("test")

Rowcount: 1
Rowcount: 0
[(2, 3, 'bye')]


1

### INSERT ON CONFLICT DO UPDATE

This is like an upsert, but with more control. When having a conflict, for example inserting a value which PK already exists in a row, you can update that row instead with this feature. The difference with upsert is that you can have more control on what to update and how to update it. With ON CONFLICT DO UDPATE, you must specify what conflict you want to handle. Also, you can access the data you passed and wanted to insert in the EXCLUDED object.

In [63]:
with conn_pool.withconn() as conn, conn.cursor() as cur:
    cur.execute("""
        INSERT INTO test(id, num, data) 
        VALUES(%s, %s, %s)
    """, (2, 3, "bye"))
    print("Rowcount: " + str(cur.rowcount))
    
    cur.execute("""
        INSERT INTO test(id, num, data) 
        VALUES(%s, %s, %s)
        ON CONFLICT (id) DO UPDATE SET
            num=EXCLUDED.num+1
    """, (2, 5, "byeee"))
    print("Rowcount: " + str(cur.rowcount))
    
pprint(fetch_table("test"))
delete_all("test")

Rowcount: 1
Rowcount: 1
[(2, 6, 'bye')]


1

## Accessing Row as Dictionary

In [20]:
def print_row(data, id, num):
    print("id: " + str(id))
    print("num: " + str(num))
    print("data: " + data)

### DictCursor

DictCursor is a dictionary-like cursor. It's not actually a dictionary, but it provides dictionary-like interface (key based acceess). You can still access the values with indexes. You can also use this as dictionary for keyword args.

In [23]:
from psycopg2.extras import DictCursor

with conn_pool.withconn() as conn, conn.cursor(cursor_factory=DictCursor) as cur:
    cur.execute("""
        INSERT INTO test(id, num, data) 
        VALUES(%s, %s, %s)
        ON CONFLICT DO NOTHING
    """, (2, 3, "bye"))
    
    cur.execute("""
        SELECT * FROM test
    """)
    
    for row in cur.fetchall():
        print(row["id"])
        print(row["num"])
        print(row["data"])
        print(row)
        
        try:
            print_row(**row)
        except Exception:
            print("Can't use as keyword args")
            

2
3
bye
[2, 3, 'bye']
id: 2
num: 3
data: bye


### RealDictCursor

RealDictCursor is really a dictionary based cursor. You can no longer access value by index. 

In [24]:
from psycopg2.extras import RealDictCursor

with conn_pool.withconn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
    
    cur.execute("""
        SELECT * FROM test
    """)
    
    for row in cur.fetchall():
        print(row["id"])
        print(row["num"])
        print(row["data"])
        print(row)
        
        try:
            print_row(**row)
        except Exception:
            print("Can't use as keyword args")

2
3
bye
RealDictRow([('id', 2), ('num', 3), ('data', 'bye')])
id: 2
num: 3
data: bye


### Cleanup

In [25]:
delete_all("test")

1

This concludes this notebook.