# Teacher only: server

In [1]:
import socket
import pandas as pd
import numpy as np
import random
import threading

## Loading the database

In [2]:
database_filename = 'ADULT.csv'

In [3]:
column_names = ['age', 
           'workclass', 
           'fnlwgt', 
           'education', 
           'education_num', 
           'marital_status', 
           'occupation', 
           'relationship', 
           'race', 
           'sex', 
           'capital_gain', 
           'capital_loss', 
           'hours_per_week', 
           'native_country', 
           'salaryclass']

In [4]:
pdb = pd.read_csv(
    database_filename, 
    delimiter=',', 
    names=column_names
)

In [5]:
# normalise
cols = pdb.select_dtypes(include=['O']).columns
for cname in cols:
    pdb[cname] = pdb[cname].astype('category').cat.codes # pandas magic

In [6]:
def user_count(pdb, query):
    'count users for a given query'
    uset = pdb.query(query)
    return len(uset.index)

# Server

In this portion of the Notebook we define the basic server function, and retrieve the **IP address** to show to students.

In [7]:
HOST = 'localhost'

In [8]:
def query_server(port, serve, stop_event=None):
    '''Run a query-based server on port `port`.
       This opens a socket on (localhost, port), and 
       applies the function `serve` on the input.
       `serve`: text --> value (or raises an exception).
       The argument `stop_event` specifies a threading.Thread event
        that will stop the server when set.'''
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((HOST,port))
        print('STARTED SERVER AT %s:%d' % (HOST, port))
        s.listen(10) # i dunno, 10 seems fine
        # accept connections to the database
        while True:
            conn, addr = s.accept()
            with conn:
                # connection !
                print('Connected by', addr)
                # receive a query
                query = conn.recv(1024).decode('utf-8')
                # test it on the database
                try:
                    res = str(serve(query))
                except Exception as err:
                    res = 'ERROR ' + str(err)
                # send answer lol
                conn.send(res.encode('utf-8'))
            # check whether to run the server some more
            if stop_event is not None:
                if stop_event.is_set():
                    break
    print('SERVER STOPPED')

In [9]:
def start_server_threaded(port, serve):
    'starts a server, using `query_server`, and returns an Event to stop it'
    stop_event = threading.Event()
    thread = threading.Thread(target = lambda: query_server(port, serve, stop_event))
    thread.start()
    return stop_event

## Query parsing

We use a syntax similar to SQL, but restricted to `AND`, and conditions only in `==`, `>=`, `<=`, `<>`, `<` and `>`.

In [10]:
sql_operators = ['>=', '<=', '<>', '<', '>', '=']
pandas_operators = {'=':'==', '>=':'>=', '<=':'<=', '<>':'!=', '<':'<', '>':'>'}

In [11]:
def parse_query( query ):
    'returns a list of conditions (column, panda_operator, value) or raises a syntax error'
    # lower case only (all column names are in lc)
    query = query.strip().lower()
    query = query.replace(';', '') # just in case
    # split by ` and `
    conditions = query.split(' and ')
    # parse the conditions
    parsed_conditions = []
    for c in conditions:
        # 0. prevent common error
        if '==' in c:
            raise SyntaxError("'==' is not a valid SQL operator")
        if '!=' in c:
            raise SyntaxError("'!=' is not a valid SQL operator")
        # 1. find the operator to use
        operator = None
        for op in sql_operators:
            if op in c:
                operator = op
                break
        if operator is None:
            raise SyntaxError('No valid operator found in "%s".' % c)
        # 2. split according to operator
        column, value = [word.strip() for word in c.split(operator)]
        if column not in column_names:
            raise SyntaxError('Column "%s" does not exist.' % column)
        # aaaaand add to the final result
        parsed_conditions.append( (column, pandas_operators[operator], value) )
    return parsed_conditions

In [12]:
def conditions_to_pandas(conditions):
    return ' and '.join('%s %s %s' % x for x in conditions)

In [13]:
def convert_to_pandas( query ):
    'converts a SQL query to a Pandas accepted query'
    parsed_conditions = parse_query( query )
    return conditions_to_pandas(parsed_conditions)

# Exercise 1

The first exercise is divided in three parts:
1. a noise of $\sim \mathcal{N}(0, 5)$ is added to the results;
2. queries with query set size <= 5 are suppressed;
3. a combination of both.

The first character of the query is either 'a', 'b' or 'c' to distinguish what exercise to serve.

In [14]:
PORT_exercise_1    = 42420
ex1_noise_variance = 5
query_set_size     = 5

In [15]:
def simple_protection(query):
    'mechanism: (a) adds normal independent noise, (b) suppresses small QS and (c) both'
    exercise = query[0]
    if exercise not in ['a', 'b', 'c']:
        raise Exception('Unknown exercise: "%s"' % query[0])
    # parse the query and apply mechanism
    query = query[1:]
    pandas_query = convert_to_pandas( query )
    res = user_count(pdb, pandas_query)
    # bucket suppression
    if exercise in ['b', 'c'] and res < query_set_size:
        return -1
    # noise addition
    noise = 0
    if exercise in ['a', 'c']:
        noise = np.random.normal(loc=0, scale=np.sqrt(ex1_noise_variance))
    return res + noise

### Running the server

**Run this line of code to start the server on the specified port:**

In [16]:
stop_event_ex1 = start_server_threaded(PORT_exercise_1, simple_protection)

STARTED SERVER AT localhost:42420


# Exercise 2

In this exercise we add static noise, to prevent simple averaging attacks.

In [17]:
PORT_exercise_2 = 42422
ex2_noise_variance = 2
salt = 21003 # this salt makes our attack bad, at the very least

In [18]:
def static_noise(condition):
    seed = salt + hash(''.join(condition))
    np.random.seed(seed % (2**32-1))
    return np.random.normal(loc=0, scale=np.sqrt(ex2_noise_variance))

In [19]:
def static_noise_protection(query):
    conditions = parse_query(query)
    pandas_query = conditions_to_pandas(conditions)
    res = user_count(pdb, pandas_query)
    if res < query_set_size:
        return -1
    static_noises = sum([static_noise(c) for c in conditions])
    return res + static_noises

### Running the server

**Run this line of code to start the server on the specified port:**

Actually, we need to serve one more query before the server stops (it is currently waiting for a connection). Hence, another TA should run a client server, and run _any_ query.

In [20]:
stop_event_ex2 = start_server_threaded(PORT_exercise_2, static_noise_protection)

STARTED SERVER AT localhost:42422
Connected by ('127.0.0.1', 50501)
Connected by ('127.0.0.1', 50502)
Connected by ('127.0.0.1', 50504)
Connected by ('127.0.0.1', 50505)
Connected by ('127.0.0.1', 50506)
Connected by ('127.0.0.1', 50507)
