## Case Study: Queued Pipeline with Dask Distributed + Schema Validation

In this case study, we will take a look at a pipeline which takes in Router environment variables (like temperature and fan RPM) and determines whether they are outside of normal ranges. 

We will define schema in [Voluptuous](https://github.com/alecthomas/voluptuous) to set the threshholds we expect to see and use [Dask Distributed](http://distributed.readthedocs.io/en/latest/index.html) to schedule and distribute the work across several workers. We use [dataset](https://dataset.readthedocs.io/en/latest/) to make a quick `sqlite3` database to store our output.

In [None]:
import logging
import random
import dataset
import sys
from time import sleep
from datetime import datetime
from queue import Queue
from queue_example import generate_example, generate_machine_db
from distributed import Client
from voluptuous import Schema, Required, Range, All, ALLOW_EXTRA
from voluptuous.error import MultipleInvalid

In [None]:
logger = logging.getLogger(0)
logger.setLevel(logging.WARNING)

### We set up a Queue and start adding events via another thread. This will keep running until we stop the notebook.

In [None]:
queue = Queue()
db = dataset.connect('sqlite:///output_db.db')
table = db['readings']

In [None]:
def load_data(input_q):
    while True:
        input_q.put(generate_example())
        sleep(1)

In [None]:
from threading import Thread
load_thread = Thread(target=load_data, args=(queue,))
load_thread.start()

In [None]:
generate_example()

## Then, we define our schema in Voluptuous

In [None]:
schema = Schema({
    Required('AmbientTemp'): All(float, Range(min=3, max=40)),
    Required('Fan'): All(int, Range(min=100, max=2000)),
    Required('CpuTemp'): All(float, Range(min=5, max=50)),
}, extra=ALLOW_EXTRA)

### Now, we need to start our scheduler and workers.

### Commands (if you are in environment you installed distributed in AND this folder validation-notebooks): 
    - To start the scheduler: dask-scheduler
    - Then, in a terminal, navigate to validation-notebooks and run an export to add the path to your PYTHONPATH
        i.e. export PYTHONPATH=PYTHONPATH:/path/to/validation/notebooks
    - In that same terminal, start a worker: dask-worker SCHEDULER_IP:SCHEDULER_PORT (most often 127.0.0.1:8786)

To view the Bokeh application: click on the Web UI link (usually: http://127.0.0.1:8787/status/ )

#### Note: you need to start your workers in *this* directory, or copy the `queue_example.py` file to an accessible place.

In [None]:
client = Client('127.0.0.1:8786')
client

## Now we can define our pipeline functions:

1. Test the schema adding warnings if we find schema failures.
2. Add some extra machine information from our machine database*. (*just a dict, but use your imagination)
3. Insert our reading into our database of readings

In [None]:
def test_schema(reading):
    try:
        schema(reading)
        reading['warning'] = False
    except MultipleInvalid as e:
        logger.warning('SCHEMA: Issue with router %s (%s)', 
                       reading.get('MachineId'), e)
        reading['warning'] = True
    return reading

In [None]:
def add_machine_info(reading):
    mdb = generate_machine_db()
    reading['brand'] = mdb[reading['MachineId']]
    return reading

In [None]:
def add_reading(reading):
    db = dataset.connect('sqlite:///output_db.db',
                        engine_kwargs={'connect_args': 
                                       {'check_same_thread':False}})
    table = db['readings']
    reading['processed_at'] = datetime.now()
    table.insert(reading)
    return reading

## To begin, we scatter the queue from the data to our workers:

In [None]:
remote_q = client.scatter(queue)

### Then, we create a series of `map` functions, passing the futures objects to the next step of the pipeline. At the end we `gather` the data into one queue.

In [None]:
schema_q = client.map(test_schema, remote_q)
info_q = client.map(add_machine_info, schema_q)
insert_q = client.map(add_reading, info_q)
final = client.gather(insert_q)

### Then, we can collect the data using `get`

#### Note: you can watch errors and logging in the worker processes. And make sure to check the Bokeh Status Web UI!

In [None]:
count = 0
while count < 40:
    item = final.get()
    print(item)
    print('Queue size: ', queue.qsize())
    count += 1

In [None]:
db = dataset.connect('sqlite:///output_db.db')
table = db['readings']
table.count()

In [None]:
warnings = db.query('''SELECT COUNT(*) as cnt FROM readings 
                       where warning == 1''')

In [None]:
list(warnings)[0].get('cnt')

### Exercise: did you see any other errors (or can you spot a potential error when checking the `queue_example.py` file?)  How might we prevent the error?