# Intro to SQLAlchemy and Gevent
The following code was written to use the pgbench database. In this case it was created with the following command: pgbench -i -s 400 pgbench. The timings in this file were from a Retina Macbook Pro 2017 with an i5 running at 2.5Ghz and 8GB of RAM.

In [1]:
from sqlalchemy import MetaData, create_engine
metadata = MetaData()
engine = create_engine('postgresql+psycopg2://jasonmyers@localhost:5432/pgbench')

In [2]:
from sqlalchemy import Table
pgbench_accounts = Table('pgbench_accounts', metadata, autoload=True, autoload_with=engine)
pgbench_tellers = Table('pgbench_tellers', metadata, autoload=True, autoload_with=engine)
pgbench_branches = Table('pgbench_branches', metadata, autoload=True, autoload_with=engine)

In [4]:
from sqlalchemy import select, func

In [5]:
query1 = select([pgbench_tellers])
query2 = select([pgbench_accounts.c.bid, func.count(1)]).group_by(pgbench_accounts.c.bid)
query3 = select([pgbench_branches])
query4 = select([pgbench_accounts.c.bid]).distinct()
query5 = select([pgbench_accounts]).limit(1000)
query6 = select([pgbench_accounts.c.bid, func.count(1)]
               ).group_by(pgbench_accounts.c.bid).limit(5000)

queries = [query1, query2, query3, query4, query5, query6]

In [6]:
def execute_six_queries():
    results = {}
    for idx, query in enumerate(queries):
        conn = engine.connect()
        results[idx] = conn.execute(query).fetchall()

In [8]:
%%time 
execute_six_queries()

CPU times: user 11.4 ms, sys: 3.65 ms, total: 15 ms
Wall time: 57 s


In [10]:
import psycopg2
from psycopg2 import extensions

from gevent.socket import wait_read, wait_write

def make_psycopg_green():
    """Configure Psycopg to be used with gevent in non-blocking way."""
    if not hasattr(extensions, 'set_wait_callback'):
        raise ImportError(
            "support for coroutines not available in this Psycopg version (%s)"
            % psycopg2.__version__)

    extensions.set_wait_callback(gevent_wait_callback)

def gevent_wait_callback(conn, timeout=None):
    """A wait callback useful to allow gevent to work with Psycopg."""
    while True:
        state = conn.poll()
        if state == extensions.POLL_OK:
            break
        elif state == extensions.POLL_READ:
            wait_read(conn.fileno(), timeout=timeout)
        elif state == extensions.POLL_WRITE:
            wait_write(conn.fileno(), timeout=timeout)
        else:
            raise psycopg2.OperationalError(
                "Bad result from poll: %r" % state)

In [12]:
import gevent
from gevent.queue import JoinableQueue, Queue

class QueryPool(object):
    def __init__(self, queries, pool_size=5):
        self.queries = queries
        self.POOL_MAX = pool_size
        self.tasks = JoinableQueue()
        self.output_queue = Queue()

    def __query(self, query):
        conn = engine.connect()
        results = conn.execute(query).fetchall()
        return results

    def executor(self, number):
        while not self.tasks.empty():
            query = self.tasks.get()
            try:
                results = self.__query(query)
                self.output_queue.put(results)
            except Exception as exc_info:
                print(exc_info)
            self.tasks.task_done()

    def overseer(self):
        for query in self.queries:
            self.tasks.put(query)

    def run(self):
        self.running = []
        gevent.spawn(self.overseer).join()
        for i in range(self.POOL_MAX):
            runner = gevent.spawn(self.executor, i)
            runner.start()
            self.running.append(runner)

        self.tasks.join()
        for runner in self.running:
            runner.kill()

In [13]:
make_psycopg_green()

In [14]:
%%time
QueryPool(queries).run()

CPU times: user 15.9 ms, sys: 5.24 ms, total: 21.1 ms
Wall time: 30 s


In [15]:
%%time
QueryPool(queries, 3).run()

CPU times: user 15.2 ms, sys: 4.79 ms, total: 20 ms
Wall time: 31.4 s


In [16]:
%%time
QueryPool(queries, 2).run()

CPU times: user 13.3 ms, sys: 3.76 ms, total: 17 ms
Wall time: 49.7 s


In [17]:
%%time
QueryPool(queries, 6).run()

CPU times: user 14.7 ms, sys: 4.75 ms, total: 19.5 ms
Wall time: 27.5 s
