<div style="width: 100%; background-color: #ef7d22; text-align: center">
<br><br>

<h1 style="color: white; font-weight: bold;">
    PostgreSQL adapters for Python
</h1>

<br><br> 
</div>

Most Python developers who access PostgreSQL databases use the `psycopg2` driver/adapter.  It is well tested, reliable, and conforms well to the DB-API 2.0 standard.  The successor `psycopg3` adapter is under development as of late 2020 and will probably become the default in the future.  Most user-facing behaviors will remain the same.

However, there are three other notable options for adaptors that you may want to consider for your particular use case.

* `pg8000` is a pure-Python implementation that complies with DB-API 2.0, and has no external dependencies outside of the standard library.  Very little will differ from a user-level perspective from using `psycopg2`, but in contexts where institutional or technical constraints do not allow you to use C-combiled third-party libraries, `pg8000` is an option.

* `aiopg` is an `asyncio`-friendly wrapper around `psycopg2` that allows you to use PostgreSQL in asynchronous programs.  General programming styles and benefits of async are discussed in another INE course.  In a sentence though, for high-performance I/O bound programs ann asyncronous approach can be *vastly* faster than a thread-based or single-threaded one.  `aiopg` is *mostly* similar to the DB-API, but because of the nature of asynchronous programming, some differences arise.

* `asyncpg` is another `asyncio`-friendly driver, but one that both aims to be *very* fast and that as a consequence is much less concerned with DB-API compatibility.  `asyncpg` is also more complete in its PostgreSQL-specific support, for example in directly supporting a full range of rich PostgreSQL datatypes, such as arrays and composite types.  `psycopg2` is limited by using the text-based communication protocol rather than the binary I/O protocol that PostgreSQL provides as an option.

## Object-relational mapping
![orange-divider](https://user-images.githubusercontent.com/7065401/98619088-44ab6000-22e1-11eb-8f6d-5532e68ab274.png)

Not covered in this course is object-relational mapping libraries that convert SQL and tuple-level interfaces with PostgreSQL into Python method calls.  In particular, `SQLAlchemy` is very popular among many Python developers.  Personally, I find the extra layer between code and database a distraction rather than a benefit.

In any event though, `SQLAlchemy` relies on the actual PostgreSQL adapters we discuss in this lesson.  The abstractions it provides are not PostgreSQL specific, but generic patterns for accessing RDBMS data.

## Pure Python
![orange-divider](https://user-images.githubusercontent.com/7065401/98619088-44ab6000-22e1-11eb-8f6d-5532e68ab274.png)

Let us use the `pg8000` adapter to load some data into our database.  Here we will need to use a different parameter style than we saw with `psycopg2` in the first lesson, but most of the API is the same.  The thread safety level is lower for `pg8000` as well; we will need a separate connection per-thread if we program in a multi-threaded way with this adapter.

In [1]:
import pg8000
print(f"API level       | {pg8000.apilevel}")
print(f"Parameter style | {pg8000.paramstyle}")
print(f"Thread safety   | {pg8000.threadsafety}")

API level       | 2.0
Parameter style | format
Thread safety   | 1


We need credentials and host/port setting for any of these adaptors, of course.

In [2]:
from collections import namedtuple
Login = namedtuple("Login", "user host database port password")
login = Login('ine_student', 'localhost', 'ine', '5432', 'ine-password')
login

Login(user='ine_student', host='localhost', database='ine', port='5432', password='ine-password')

In [3]:
conn_pg8000 = pg8000.connect(*login)
cur_pg8k = conn_pg8000.cursor()

We can create a connection using this (named)tuple of information.  The order is the same as the argument order of the `.connect()` function, for convenience.  We could use named arguments to the function as well, if we preferred.

## Deciding on schemata
![orange-divider](https://user-images.githubusercontent.com/7065401/98619088-44ab6000-22e1-11eb-8f6d-5532e68ab274.png)

As some data to load into the database, let us take some information on United States zip codes published by the U.S. Census Bureau.  We have two source files available.  One tab separated file that gives explanations of column names, and another that gives information per zip code.

> The data associated with this notebook can be found in the files associated with this course.

In [4]:
!head -5 ../data/census-zipcodes-2018.fields

USPS	United States Postal Service State Abbreviation
GEOID	Geographic Identifier - fully concatenated geographic code (State FIPS and district number)
ALAND	Land Area (square meters) - Created for statistical purposes only
AWATER	Water Area (square meters) - Created for statistical purposes only
ALAND_SQMI	Land Area (square miles) - Created for statistical purposes only


In [5]:
!head -5 ../data/census-zipcodes-2018.tsv

USPS	ALAND	AWATER	ALAND_SQMI	AWATER_SQMI	INTPTLAT	INTPTLONG
00601	166659749	799292	64.348	0.309	18.180555	-66.749961
00602	79307535	4428429	30.621	1.71	18.361945	-67.175597
00603	81887185	181411	31.617	0.07	18.455183	-67.119887
00606	109579993	12487	42.309	0.005	18.158327	-66.932928


Before putting the data into tables, we should decide on good table layouts.  The field key is relatively straightforward.

In [6]:
cur_pg8k.execute('DROP TABLE IF EXISTS census_zipcode_fields;')

sql_census_fields = """
CREATE TABLE census_zipcode_fields (
  key VARCHAR(15) PRIMARY KEY,  -- by implication, UNIQUE NOT NULL
  description VARCHAR
);
"""
cur_pg8k.execute(sql_census_fields)

<pg8000.core.Cursor at 0x7f04a4361f70>

The data set describing fields is small, and can easily be read into memory.

In [7]:
fields = [tuple(line.strip().split('\t')) 
          for line in open('../data/census-zipcodes-2018.fields')]

sql = """
INSERT INTO census_zipcode_fields (key, description)
VALUES (%s, %s)
"""
cur_pg8k.executemany(sql, fields)

<pg8000.core.Cursor at 0x7f04a4361f70>

The types for the main data allows us to use the data types of PostgreSQL more versatilely.

In [8]:
cur_pg8k.execute('DROP TABLE IF EXISTS census_zipcode_geography;')

sql_geography = """
CREATE TABLE census_zipcode_geography (
  USPS CHAR(5) PRIMARY KEY,  -- by implication, UNIQUE NOT NULL
  ALAND BIGINT,              -- some zips are larger than 2e9 m^2
  AWATER BIGINT,
  ALAND_SQMI NUMERIC(8, 3),  -- largest zips need 5 to left of decimal
  AWATER_SQMI NUMERIC(8, 3), -- sizes with 3 digits of precision
  INTPTLAT REAL,             -- keep fields from key, although duplicative
  INTPTLONG REAL,
  location POINT             -- use geometric type for lat/lon
);
"""
cur_pg8k.execute(sql_geography)

<pg8000.core.Cursor at 0x7f04a4361f70>

We stipulate that this data is large enough we do not want to load it all at once (really it is not).

In [9]:
fields = ('USPS', 'ALAND', 'AWATER', 'ALAND_SQMI', 
          'AWATER_SQMI', 'INTPTLAT', 'INTPTLONG', 'location')

sql_insert_geo = f"""
INSERT into census_zipcode_geography ({','.join(fields)})
VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
"""
with open('../data/census-zipcodes-2018.tsv') as fh:
    next(fh)   # discard header line
    for line in fh:
        row = line.strip().split('\t')
        row.append(f"({row[-2]}, {row[-1]})")
        cur_pg8k.execute(sql_insert_geo, tuple(row))

conn_pg8000.commit()

Just to see that our data is in the database, and as a preview of the POINT data type, let us make a query for the land area of those zipcodes that are close to where I live.  Unfortunately, the percent-sign format codes do not work inside a where clause, only in the VALUES.  We can interpolate manually though.

In [10]:
sql_near = """
SELECT USPS, ALAND_SQMI, AWATER_SQMI, location 
FROM census_zipcode_geography 
WHERE location <@ circle '((%f, %f), 0.15)';
"""
cur_pg8k.execute(sql_near % (45.1, -69.3))
cur_pg8k.fetchall()

(['04443', Decimal('137.384'), Decimal('8.674'), '(45.227773,-69.353261)'],
 ['04479', Decimal('38.441'), Decimal('1.359'), '(45.124208,-69.287058)'],
 ['04930', Decimal('53.969'), Decimal('2.373'), '(45.027572,-69.31797)'],
 ['04939', Decimal('37.670'), Decimal('0.269'), '(45.077258,-69.158771)'])

In preparation for working with another adapter, let us close the connection we created with `pg8000`.

In [11]:
conn_pg8000.close()

## Asynchronous access
![orange-divider](https://user-images.githubusercontent.com/7065401/98619088-44ab6000-22e1-11eb-8f6d-5532e68ab274.png)

On modern computers, I/O is by far the slowest component.  Thread switches, let alone process switches, are relatively expensive.  Simply checking whether a given I/O operation is ready to provide more data is one or two orders of magnitude cheaper, and has zero memory cost compared to allocating a thread.  

Using `aiopg` or `asyncpg` allows your program to perform other work while waiting for the results to arrive from a query.  However, doing so *does* require becoming familiar with the `await` and `async` keywords, and generally shifting your thinking towards the styles of programming required by `asyncio` in the standard library.  If speed of many simultaneous operations becomes even more imperative, using the third-party `uvloop` instead of `asyncio` can speed things up still more.

The simple examples in this lesson will not come remotely close to those where any of this matters.  But for much larger datasets, and for multi-tenancy of RDBMS access, the differences can be huge.

We first will import the `asyncio` scaffolding and the `aiopg` module.  Because `asyncio` does not claim to follow the DB-API, the module attributes like `.apilevel`, and `.paramstyle` do not exist.  However, *most* of the DB-API is still consistent; e.g. `.connect()`, `.cursor()`, `.execute()`, `.fetchall()` still have their familiar meanings.

In [12]:
import asyncio
from asyncio import get_event_loop, gather, as_completed
import aiopg, asyncpg

Because this code is running inside a Jupyter notebook which already has its own `asyncio` event loop, we need to use a third-party module called `nest_asyncio` to path the event loop and run async code in cells.  Outside of environments (Jupyter, web servers, GUI applications) that might create their own event loops, this is not necessary.

In [13]:
import nest_asyncio
nest_asyncio.apply()

We might want to check the zip codes near certain latitude/longitude locations.

In [14]:
Location = namedtuple("Location", "latitude longitude")
locs = [Location(40.0, -105.3), Location(45.1, -69.3), 
        Location(34.9, -82.4), Location(42.6, -72.5)]

---
### Aiopg

For an asyncronous adapter, we need to wrap our operation in a special coroutine function that is defined with `async def` rather than plain `def`.  Each of the steps has an extra `await` keyword to indicate that the event loop is free to do other work between each such line.  The logic, however, is very much the same as we have seen with other adapters.

In [15]:
async def near_location(loc):
    conn = await aiopg.connect(**login._asdict())
    cur = await conn.cursor()
    await cur.execute(sql_near % loc)
    results = await cur.fetchall()
    return (loc, results)

We cannot simply run this function, but need instead to tell the event loop to manage it.  In fact, let us let the event loop handle several such coroutines, each for a different reference location.

In [16]:
aws = [near_location(loc) for loc in locs]

loop = get_event_loop()
for ret in loop.run_until_complete(gather(*aws)):
    loc, results = ret
    print(loc, "...", len(results), "tuples\n", "-"*70)
    for tup in results[:2]:
        print(tup)
    print()

Location(latitude=40.0, longitude=-105.3) ... 10 tuples
 ----------------------------------------------------------------------
('80025', Decimal('11.722'), Decimal('0.004'), '(39.939848,-105.283942)')
('80027', Decimal('19.462'), Decimal('0.196'), '(39.950796,-105.159688)')

Location(latitude=45.1, longitude=-69.3) ... 4 tuples
 ----------------------------------------------------------------------
('04443', Decimal('137.384'), Decimal('8.674'), '(45.227773,-69.353261)')
('04479', Decimal('38.441'), Decimal('1.359'), '(45.124208,-69.287058)')

Location(latitude=34.9, longitude=-82.4) ... 12 tuples
 ----------------------------------------------------------------------
('29601', Decimal('4.280'), Decimal('0.024'), '(34.847112,-82.402264)')
('29605', Decimal('25.579'), Decimal('0.175'), '(34.774425,-82.37661)')

Location(latitude=42.6, longitude=-72.5) ... 13 tuples
 ----------------------------------------------------------------------
('01054', Decimal('22.790'), Decimal('0.164'), '(4

The above code has two limitations.  

* Each time a new coroutine is created, it makes a new connection.  A more efficient approach is to create a *connection pool* and share connections as they are requested (but not close them implicitly at function end).
* We wait for all the results from the various coroutines to be complete in `loop.run_until_complete()`.  If the 4th query is ready early, we cannot process it while waiting for the 1st query to complete.

As a secondary concern, by doing a `.fetchall()` on the results, we cannot not process each result tuple immediately.

In [17]:
async def one_location(pool, loc):
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute(sql_near % loc)
            results = []
            async for row in cur:
                # might process each tuple as soon as received
                results.append(row)
    return (loc, results)

In [18]:
async def near_locations(locs):
    async with aiopg.create_pool(**login._asdict(), maxsize=10) as pool:
        queries = [one_location(pool, loc) for loc in locs]
        for future in as_completed(queries):
            loc, results = await future
            print(loc, "...", len(results), "tuples\n", "-"*70)
            for tup in results[:2]:
                print(tup)
            print()

In [19]:
loop = asyncio.get_event_loop()  
loop.run_until_complete(near_locations(locs))

Location(latitude=42.6, longitude=-72.5) ... 13 tuples
 ----------------------------------------------------------------------
('01054', Decimal('22.790'), Decimal('0.164'), '(42.468898,-72.484579)')
('01301', Decimal('25.494'), Decimal('0.541'), '(42.626761,-72.60153)')

Location(latitude=34.9, longitude=-82.4) ... 12 tuples
 ----------------------------------------------------------------------
('29601', Decimal('4.280'), Decimal('0.024'), '(34.847112,-82.402264)')
('29605', Decimal('25.579'), Decimal('0.175'), '(34.774425,-82.37661)')

Location(latitude=45.1, longitude=-69.3) ... 4 tuples
 ----------------------------------------------------------------------
('04443', Decimal('137.384'), Decimal('8.674'), '(45.227773,-69.353261)')
('04479', Decimal('38.441'), Decimal('1.359'), '(45.124208,-69.287058)')

Location(latitude=40.0, longitude=-105.3) ... 10 tuples
 ----------------------------------------------------------------------
('80025', Decimal('11.722'), Decimal('0.004'), '(39.9

An important issue to keep in mind with `aiopg` is that is always sets AUTOCOMMIT to ON.  This means that every time an INSERT or DELETE or UPDATE command is executed, it is as if a `.commit()` was called immediately afterward.  Some of the transactional options are lost with this limitation.

---
### Asyncpg

Using `asyncpg` can both be much faster than other adapters under high-volume, highly concurrent, access, and also is more accurate in capturing PostgreSQL features.  It *does* require jumping into the asyncronous style of programming, which is often less familiar.  Moreover, generally `asyncpg` *simplifies* the DB-API rather than following it.

Let us write another short program to find records *near* certain latitude and longitude points as we did above.  In this case, we push the asynchronous style slightly more.  Rather than return entire queries, we will add each individual row to a shared queue; here we use a set, but a `collections.deque` or `asyncio.Queue` would be good choices also (probably better).

In [20]:
import sys
zipcodes = set()

async def print_from_queue(q, early_stop=sys.maxsize):
    while True:
        while q:
            row = q.pop()
            print(f"{row['usps']} | {row['location']} | {row['aland_sqmi']}")
            early_stop -= 1
            if early_stop == 0:
                return
        await asyncio.sleep(0.1)
        if not zipcodes:
            break

This coroutine `print_from_queue` can be managed by an event loop.  It will terminate if it does not have any new data for 1/10th of a second, or also possibly if some limit is set to make it stop early (the latter mostly only to limit output for this lesson).  Notice that a record returned by `asyncpg` is a custom data type that lets us index by the column names.  This setup is more interesting if we peform actual processing on each record rather than only printing it.

We have to populate the queue as well.  Here we make a connection (with an `await`).  Then we do our query within a transaction.  Since it is a SELECT rather than an UPDATE/INSERT/DELETE, the transaction makes no difference other than an opportunity to show the usage. The transaction can by wrapped in a context manager, but the connection cannot in `asyncpg`.

In [22]:
async def near_many(locs, zipcodes):
    conn = await asyncpg.connect(**login._asdict())
    for loc in locs:
        async with conn.transaction():
            for row in await conn.fetch(sql_near  % loc):
                zipcodes.add(row)
    await conn.close()

loop = get_event_loop()
loop.run_until_complete(near_many(locs, zipcodes))
loop.run_until_complete(print_from_queue(zipcodes, early_stop=10))

29661 | asyncpg.pgproto.types.Point((35.016915, -82.491086)) | 68.860
29609 | asyncpg.pgproto.types.Point((34.912592, -82.38817)) | 24.159
01360 | asyncpg.pgproto.types.Point((42.677091, -72.453876)) | 34.263
01347 | asyncpg.pgproto.types.Point((42.559294, -72.518753)) | 0.579
80027 | asyncpg.pgproto.types.Point((39.950796, -105.159688)) | 19.462
29601 | asyncpg.pgproto.types.Point((34.847112, -82.402264)) | 4.280
04939 | asyncpg.pgproto.types.Point((45.077258, -69.158771)) | 37.670
29615 | asyncpg.pgproto.types.Point((34.856825, -82.296139)) | 19.189
04479 | asyncpg.pgproto.types.Point((45.124208, -69.287058)) | 38.441
01375 | asyncpg.pgproto.types.Point((42.466691, -72.546751)) | 14.221


Let us take a look at one of those Record objects we get from the adapter.

In [23]:
zipcodes.pop()

<Record usps='01301' aland_sqmi=Decimal('25.494') awater_sqmi=Decimal('0.541') location=asyncpg.pgproto.types.Point((42.626761, -72.60153))>

Another flexible ability with `asyncpq` is choosing the encoding of the response.

In [24]:
from json import dumps, loads

async def get_users():
    sql = "SELECT row_to_json(t) basic FROM (SELECT username, created_on FROM users) t;"
    conn = await asyncpg.connect(**login._asdict())
    try:
        await conn.set_type_codec('json', 
                                  encoder=dumps, decoder=loads, 
                                  schema='pg_catalog')
        for row in await conn.fetch(sql):
            print(type(row['basic']), row['basic'])
    finally:
        await conn.close()

asyncio.get_event_loop().run_until_complete(get_users())

<class 'dict'> {'username': 'Alice', 'created_on': '2020-11-30T16:27:30.115556'}
<class 'dict'> {'username': 'Bob', 'created_on': '2020-11-30T16:27:30.116392'}
<class 'dict'> {'username': 'Carlos', 'created_on': '2020-11-30T16:27:30.11666'}
<class 'dict'> {'username': 'Sybil', 'created_on': '2020-11-30T16:27:30.207111'}
<class 'dict'> {'username': 'Trudy', 'created_on': '2020-11-30T16:27:30.207111'}
<class 'dict'> {'username': 'Vanna', 'created_on': '2020-11-30T16:27:30.207111'}


In this particular example, we are probably better off simply creating a Python dict from the dictionary-like `Record` object used by `asyncpg` though.  For example, this will preserve the datetime data type, whereas round-tripping through JSON will lose it.

In [25]:
async def get_users2():
    sql = "SELECT username, created_on FROM users;"
    conn = await asyncpg.connect(**login._asdict())
    try:
        for row in await conn.fetch(sql):
            print(dict(row))
    finally:
        await conn.close()

asyncio.get_event_loop().run_until_complete(get_users2())

{'username': 'Alice', 'created_on': datetime.datetime(2020, 11, 30, 16, 27, 30, 115556)}
{'username': 'Bob', 'created_on': datetime.datetime(2020, 11, 30, 16, 27, 30, 116392)}
{'username': 'Carlos', 'created_on': datetime.datetime(2020, 11, 30, 16, 27, 30, 116660)}
{'username': 'Sybil', 'created_on': datetime.datetime(2020, 11, 30, 16, 27, 30, 207111)}
{'username': 'Trudy', 'created_on': datetime.datetime(2020, 11, 30, 16, 27, 30, 207111)}
{'username': 'Vanna', 'created_on': datetime.datetime(2020, 11, 30, 16, 27, 30, 207111)}


## Summary
![orange-divider](https://user-images.githubusercontent.com/7065401/98619088-44ab6000-22e1-11eb-8f6d-5532e68ab274.png)

For users who want to venture beyond the standard `psycopg2` (or in the future `psycopg3`) adapter, several other good options are available.  For heavy workloads, using one of the asynchronous adapters can be a big win.  The speedup with `asyncpg` is greater than with `aiopg`, but at the cost of more distance from DB-API 2.0 conventions.

In the future, async support is also planned for `psycopg3`, but at the time of this lesson, it is too early to guess how performance will compare between future options.