## Process Models in the Database and Elsewhere

Code examples here taken from the Python Cookbook, freely available at http://chimera.labs.oreilly.com/books/1230000000393/index.html. This is an excellent book and will help you with your python programming and with your project.

I saw the characterization of database systems along the lines we have talked about in this course first in http://db.cs.berkeley.edu/papers/fntdb07-architecture.pdf . This is an incredible review paper, if a little bit dated. But I stongly recommend you read it now: you will get a lot out of it. Between this paper, "Designing Database Intensive Applications, and what you have learnt, you will be able to read all modern database papers.

We need concurrency for a multitude of reasons. You are writing a multi-user database: it must take more than one request at a time.

You must plan how to handle these requests. Lets talk in terms of a **unit of execution**, something used to handle one user, or do one transaction. Then the question arises: what do these map to?

- an operating system process (multiprocessing). Scheduled by OS kernel, state maintained in private, per-process address space and execution context. Really slow switching.
- an operating system thread (multithreading). Scheduled by OS, but does not have its own address space and context; shares it with other threads within the same process. Fast switching
- an inside-process thread (light weight thread or coroutine) with asynchronous IO: one process, so the adress and context are shared with the other "user" threads or coroutines. Any long running IO can block the process, so must be done asynchronously and perhaps in another OS thread. Super fast switching.

We have seen all three of thse choices, the last one ad nauseum. We'll focus a bit more on the first two today, but hopefully you have seen more about them in cs205 or other courses as well...

We'll call the unit-of-execution in the case of a DBMS, the DBMS Worker. In general, this will map onto 1 client request, or one transaction

### Process per DBMS worker model

This really is two models: spawn a process every thime a new request comes in, or use a process pool from which a process is chosen to handle the request. Once the request is done, the process goes back to the pool.

This is easy to implement, and many databases were first started when OS based thread support was poor. Thus they used this model.

The complication here is sharing memory structures, especially for the lock table and bufer pool(this is a memory pool into which database pages are mapped to use: these must be flushed to disk on transactions but also serve as a cache for gets). This uses POSIX shared memory or System V ipc (see http://semanchuk.com/philip/sysv_ipc/)

Also note that memory-mapping can be used for this purpose. Memory maps map part of a file to memory, and let you address the filea s if you were addressing a memory buffer. They take care of bring in parts of the file to memory under the hood. This implementation is used in lmdb and boltdb to provide fast transactions.

(Note that OS's also provide buffer caches in which they load existing files in memory. But this is not shareable, and many databases prefer to manage this mapping themselves)

Because of the overhead of context-switching, process per dbms worker models tend to be not very scaleable. But in Python, with the GIL, it might be a preferred model if there is any substantial CPU bound processing like a stored procedure. Still, ine could run a Threadpool, with the ability to use a process as a coprocessor. See http://chimera.labs.oreilly.com/books/1230000000393/ch12.html#_dealing_with_the_gil_and_how_to_stop_worrying_about_it .

The buffer pool is stored in shared memory thats shared by all these processes. Its important to use shared memory. You might be tempted to create the database process in the memory of the main process, and after fork write to it in the children. This is a bad idea as memory semantics for children are copy on write, so that you will be then writing to the new processes memory space and thus database replica, and not to the main database.

Process per worker model is supported by IBM DB2, PostgreSQL, and Oracle.

### Worker per thread or threadpool model

Here the main thread listens for the database connections, and each connection is then allocated a new thread. This is a mdel thats really simple to start of with, but has issues with deadlocks and stuff: you must be careful. Although, these challenges are also present in the multiprocess model due to use of ahared memory and thus the need to lock there.

The thread per worker model scales well to many concurrent connections (as long as you dont have the GIL, and even then, if the computation part is small: although note that if you had an overall faster speed by using an in memory db and thus minimizing IO, most of your remaining overhead would be in the GIL unless you used C code/Cython to access the memory and do some computations on it.) IBM DB2 has a worker per thread model, as do MS SQL server, MySQL, Informix, and Sybase.

In this case, the buffer pool is simply a heap resident data structure. where as in the process based model, it is allocated in shared memory so that it is available to all processes)

When a thread or process needs a page to be read from disk it will generate an io request (read-into) with the disk address and the memory address. By doing stuff in fixed size pages (where the size is oprimized for cache and the size of your data) makes this process fast.

The reverse is dont to write (write from). The lock table uses a similar implementation.

### Light weight threads or coroutines

In the past, when OS thread support was not so good, and now again since the resurgence of interest in asynchronous programming, many widely used databases implement their own light-weight threads. To quote the stonebraker paper:

> These lightweight threads, or DBMS threads, replace the role of the OS threads described in the previous section. Each DBMS thread is programmed to manage its own state, to perform all potentially block- ing operations (e.g., I/Os) via non-blocking, asynchronous interfaces, and to frequently yield control to a scheduling routine that dispatches among these tasks. 

Sybase and Informix are examples of this mode of usage.

### Threads

To avoid the potential for deadlock, programs that use locks should be written in a way such that each thread is only allowed to acquire one lock at a time.

Here is a locked dictionary, a proxy for a database if you like:

In [52]:
import threading
class LockableDict: 
    def __init__(self):
        self._d={}
        self._dlock={}
        
    def __getitem__(self, attr):
        return self._d[attr]
    
    def __setitem__(self, attr, val):
        if attr not in self._d:
            self._dlock[attr]=threading.Lock()
        with self._dlock[attr]:
            self._d[attr] = val

In [53]:
l = LockableDict()
l['a'] = 3
l['a']

3

Here is the usual echo server written with a thread pool from the `concurrent.futures` package. We saw some other ways of writing this server in the sockets class using the `socketserver` module

In [54]:
%%file threads0.py
from socket import AF_INET, SOCK_STREAM, socket 
from concurrent.futures import ThreadPoolExecutor

def echo_client(sock, client_addr):
    print('Got connection from', client_addr) 
    while True:
        msg = sock.recv(65536) 
        if not msg:
            break
        sock.sendall(msg) 
    print('Client closed connection') 
    sock.close()
    
def echo_server(addr):
    pool = ThreadPoolExecutor(12) 
    sock = socket(AF_INET, SOCK_STREAM) 
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        pool.submit(echo_client, client_sock, client_addr)
        
echo_server(('',15000))

Overwriting threads0.py


And here is something more along the lines of what we did last time in setting up a thread pool, but you can see how the pool is set up and fed.

In [2]:
%%file threads1.py
from socket import socket, AF_INET, SOCK_STREAM 
from threading import Thread
from queue import Queue

def echo_client(q):
    sock, client_addr = q.get()
    print('Got connection from', client_addr) 
    while True:
        msg = sock.recv(65536) 
        if not msg:
            break
        sock.sendall(msg) 
    print('Client closed connection')
    sock.close()

def echo_server(addr, nworkers): 
    # Launch the client workers 
    q = Queue()
    for n in range(nworkers):
        t = Thread(target=echo_client, args=(q,))
        t.daemon = True
        t.start()
    # Run the server
    sock = socket(AF_INET, SOCK_STREAM) sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        q.put((client_sock, client_addr))
    
echo_server(('',15000), 128)

Writing threads1.py


So lets use this model to write a simple, in-memory database server.

In [55]:
%%file dbserver.py
from socket import AF_INET, SOCK_STREAM, socket, SOL_SOCKET, SO_REUSEADDR
from concurrent.futures import ThreadPoolExecutor
import threading
class LockableDict: 
    def __init__(self):
        self._d={}
        self._dlock={}
        
    def __getitem__(self, attr):
        return self._d[attr]
    
    def __setitem__(self, attr, val):
        if attr not in self._d:
            self._dlock[attr]=threading.Lock()
        print("LOCKING FOR", attr, val)
        with self._dlock[attr]:
            self._d[attr] = val
        print("UNLOCKED FOR", attr, val)
            
def db_client(sock, client_addr, ldict):
    print('Got connection from', client_addr) 
    while True:
        msg = sock.recv(65536)
        print("msg", msg)
        if not msg:
            break
        key, value = msg.decode().split('=')
        print("k,v", key, value)
        ldict[key] = value
        sock.sendall(value.encode())
    print('Client closed connection') 
    sock.close()
    
def db_server(addr):
    print("creating lockable dict and pool")
    ldict=LockableDict()
    pool = ThreadPoolExecutor(50) 
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind(addr)
    sock.listen(15)
    while True:
        print('connection')
        client_sock, client_addr = sock.accept()
        pool.submit(db_client, client_sock, client_addr, ldict)
        
db_server(('',15000))

Overwriting dbserver.py


Here's a client that uses a threadpool

In [56]:
%%file dbclient.py
import sys
from socket import socket, AF_INET, SOCK_STREAM
from concurrent.futures import ThreadPoolExecutor
def fetch(i):
    s = socket(AF_INET, SOCK_STREAM)
    s.connect(('localhost', 15000))
    print("sending, i",i)
    s.send("a={}".format(i).encode())
    print("sent")
    return s.recv(65536)
pool = ThreadPoolExecutor(20)
thrs=[]
for i in range(40):
    t = pool.submit(fetch, i)
    thrs.append(t)
for i in range(40):
    print('i', i, thrs[i].result())

Overwriting dbclient.py


And another that uses a processpool

In [39]:
%%file dbclient2.py
import sys
from socket import socket, AF_INET, SOCK_STREAM
from concurrent.futures import ProcessPoolExecutor
def fetch(i):
    s = socket(AF_INET, SOCK_STREAM)
    s.connect(('localhost', 15000))
    print("sending, i",i)
    s.send("a={}".format(i).encode())
    print("sent")
    return s.recv(65536)
pool = ProcessPoolExecutor(20)
thrs=[]
for i in range(40):
    t = pool.submit(fetch, i)
    thrs.append(t)
for i in range(40):
    print('i', i, thrs[i].result())

Overwriting dbclient2.py


And a third that uses the process pool using the built in `map` method, just for illustration.

In [40]:
%%file dbclient3.py
import sys
from socket import socket, AF_INET, SOCK_STREAM
from concurrent.futures import ProcessPoolExecutor
def fetch(i):
    s = socket(AF_INET, SOCK_STREAM)
    s.connect(('localhost', 15000))
    print("sending, i",i)
    s.send("a={}".format(i).encode())
    print("sent")
    return s.recv(65536)

with ProcessPoolExecutor(20) as pool: 
    for result in pool.map(fetch, range(40)):
        print(result)

Writing dbclient3.py


Instead of blocking, one can use callbacks. For example (from the cookbook):

```python
def when_done(r): 
    print('Got:', r.result())
with ProcessPoolExecutor() as pool: 
        future_result = pool.submit(work, arg)
        future_result.add_done_callback(when_done)
```



We do not show how we can create a multiprocessing based db server. But its doable. Three things to keep in mind

(a) you can create the database by doing a multiprocessing.lock
(b) you must however do this in shared memory
(c) you cannot easily pass the socket in the client function like we did above. This is as function arguments are passed from one process to the other by pickling, and sockets are not pickle-able. This leaves you with the choice of sending the data only to a child process, or using `multiprocessing.reduction` to somehow pickle the socket, or preforking (what apache does) in which the accept is run in the client (see http://foobarnbaz.com/2011/08/30/developing-scalable-services-with-python/) for an example.

## Daemonization

You want your process to run afteryou kill your shell. See http://chimera.labs.oreilly.com/books/1230000000393/ch12.html#_problem_210 .