# Notifications Event Workflow

Pacifica does have some basic event processing libraries to manage running processes on data present in Pacifica.

## Imports and Requirements

In [None]:
%pip install eventlet pacifica-metadata==0.12.4 pacifica-policy==0.8.2 pacifica-cli==0.5.0 pacifica-downloader==0.4.0 pacifica-uploader==0.3.0 pacifica-dispatcher==0.2.4

In [None]:
import os
import sys
import json
import contextlib
import hashlib
import tempfile
import threading
from io import StringIO

import requests
import cherrypy
import playhouse.db_url
from cloudevents.model import Event
from jsonpath2.path import Path
from celery.utils.log import mlevel

from pacifica.downloader import Downloader
from pacifica.uploader import Uploader

from pacifica.dispatcher.models import File, Transaction, TransactionKeyValue
from pacifica.dispatcher.event_handlers import EventHandler
from pacifica.dispatcher.receiver import create_peewee_model
from pacifica.dispatcher.router import Router
from pacifica.dispatcher.downloader_runners import DownloaderRunner, RemoteDownloaderRunner
from pacifica.dispatcher.uploader_runners import UploaderRunner, RemoteUploaderRunner

## Load the Test Data Set

The test data set is coupled with the metadata service and
can be loaded through docker like the following.

NOTE: If you've already done the following by running the Examples notebook you must skip this.

In [None]:
from test_data.loadit_test import main

os.environ['METADATA_URL'] = 'http://metadataserver:8121'
main()

### The UniqueID Interface

The uniqueid interface needs to be updated with the new files we
inserted prior.

In [None]:
for mode in ['file', 'upload_job']:
    resp = requests.get('http://uniqueid:8051/getid', params={'mode': mode, 'range': '200'})
    assert resp.status_code == 200
    print(resp.json())

## Get a Subscription

We need to register for events and tell the notifications service where to send the events.

In [None]:
resp = requests.post(
    'http://notifyfrontend:8070/eventmatch',
    headers={'Http-Remote-User': 'dmlb2001'},
    json={
        "name": "My Event Match",
        "jsonpath": """
            $[?(
                @["cloudEventsVersion"] = "0.1" and
                @["eventType"] = "org.pacifica.metadata.ingest"
            )]
        """,
        "target_url": "http://jupyter:8080/receive"
    }
)
assert resp.status_code == 200
print(resp.json())
subscription_uuid = resp.json()['uuid']

## Setup the Dispatcher

The dispatcher has several steps to setup. First, we need to create the database. Then we'll setup a sample event handler class to work on the event. We'll then setup the CherryPy web service to receive events. Then we'll setup the Celery workers to handle the work.

### Setup Constants and Create Database

In [None]:
DB_ = playhouse.db_url.connect(os.getenv('DATABASE_URL', 'postgres://jupyter:jupyter@jupyterdb/jupyter'))

ReceiveTaskModel = create_peewee_model(DB_)

MODELS_ = (ReceiveTaskModel, )

DB_.create_tables(MODELS_)

ROUTER = Router()

### Create a Simple Event Handler

The event handler is a class that implements the `pacifica.dispatcher.event_handler.EventHandler` class.

In [None]:
class SimpleEventHandler(EventHandler):
    def __init__(self, downloader_runner: DownloaderRunner, uploader_runner: UploaderRunner) -> None:
        """Save the download and upload runner classes for later use."""
        super(SimpleEventHandler, self).__init__()
        self.downloader_runner = downloader_runner
        self.uploader_runner = uploader_runner
        
    def handle(self, event: Event) -> None:
        """
        Example handle event.
        
        This handler downloads all files in the event.
        Converts the files to uppercase and uploads them back to Pacifica.
        """
        transaction_inst = Transaction.from_cloudevents_model(event)
        transaction_key_value_insts = TransactionKeyValue.from_cloudevents_model(event)
        file_insts = File.from_cloudevents_model(event)
        with tempfile.TemporaryDirectory() as downloader_tempdir_name:
            with tempfile.TemporaryDirectory() as uploader_tempdir_name:
                for file_opener in self.downloader_runner.download(downloader_tempdir_name, file_insts):
                    with file_opener() as file_fd:
                        with open(os.path.join(uploader_tempdir_name, file_fd.name), 'w') as wfile_fd:
                            wfile_fd.write(file_fd.read().upper())
                (_bundle, _job_id, _state) = self.uploader_runner.upload(
                    uploader_tempdir_name, transaction=Transaction(
                        submitter=transaction_inst.submitter,
                        instrument=transaction_inst.instrument,
                        project=transaction_inst.project
                    ), transaction_key_values=[
                        TransactionKeyValue(key='uppercase_text', value='True'),
                        TransactionKeyValue(key='Transactions._id', value=transaction_inst._id)
                    ]
                )


### Link Event Handler

We need to link up the SimpleEventHandler with the remote downloader and uploader. After that we should setup the Celery worker and CherryPy application.

In [None]:
os.environ['CARTD_ADDR'] = 'cartfrontend'
os.environ['INGEST_ADDR'] = 'ingestfrontend'
os.environ['POLICY_ADDR'] = 'policyserver'

ROUTER.add_route(
    Path.parse_str("""
        $["data"][*][?(
            @["destinationTable"] = "TransactionKeyValue" and
            @["key"] = "uppercase_text" and
            @["value"] = "False"
          )]
    """),
    SimpleEventHandler(
        RemoteDownloaderRunner(Downloader()), RemoteUploaderRunner(Uploader())
    )
)

CELERY_APP = ReceiveTaskModel.create_celery_app(
    ROUTER,
    'pacifica.dispatcher.app',
    'pacifica.dispatcher.tasks.receive',
    backend='rpc://',
    broker='pyamqp://guest:guest@jupyteramqp:5672//'
)

APPLICATION = ReceiveTaskModel.create_cherrypy_app(CELERY_APP.tasks['pacifica.dispatcher.tasks.receive'])

### Start CherryPy Server

This should start the CherryPy server in a thread and give control back to the notebook.

In [None]:
cherrypy.tree.mount(APPLICATION)
cherrypy.config.update({'server.socket_host': '0.0.0.0'})
cherrypy.engine.start()

### Start the Celery Application

We are going to start the celery application in a separate thread in solo mode.

In [None]:
def run_celery_worker():
    celery_workers = CELERY_APP.Worker(pool_cls='solo', loglevel=mlevel('debug'))
    celery_workers.start()
    return celery_workers.exitcode
    
celery_worker = threading.Thread(target=run_celery_worker)
celery_worker.start()

## Trigger Workflow with Upload

To trigger the processes we need to initiate everything with an upload. First, we need to configure the uploader by reading the original uploader configuration and adding our trigger key value pair.

In [None]:
with open(os.path.join('pacifica-cli', 'uploader.json'), 'r') as uploader_fd:
    uploader_data = json.loads(uploader_fd.read())
uploader_data.append({
    "destinationTable": "TransactionKeyValue",
    "key": "uppercase_text",
    "metaID": "uppercase-tkv",
    "query_results": [],
    "value": "False"
})
with open(os.path.join('pacifica-cli', 'uploader-notify.json'), 'w') as uploader_fd:
    uploader_fd.write(json.dumps(uploader_data))

Then we configure the rest of the uploader environment variables.

In [None]:
os.environ['UPLOAD_URL'] = 'http://ingestfrontend:8066/upload'
os.environ['UPLOAD_STATUS_URL'] = 'http://ingestfrontend:8066/get_state'
os.environ['UPLOAD_POLICY_URL'] = 'http://policyserver:8181/uploader'
os.environ['UPLOAD_VALIDATION_URL'] = 'http://policyserver:8181/ingest'
os.environ['DOWNLOAD_URL'] = 'http://cartfrontend:8081'
os.environ['DOWNLOAD_POLICY_URL'] = 'http://policyserver:8181/status/transactions/by_id'
os.environ['AUTHENTICATION_TYPE'] = 'None'
os.environ['UPLOADER_CONFIG'] = os.path.join('pacifica-cli', 'uploader-notify.json')
from pacifica.cli.__main__ import main
sys.argv = ['cli', 'configure']
main()

### Actually Upload Data

In [None]:
from pacifica.cli.__main__ import main
stdout_buf = StringIO()
stderr_buf = StringIO()
with contextlib.redirect_stderr(stderr_buf):
    with contextlib.redirect_stdout(stdout_buf):
        sys.argv = ['cli', 'upload', '--logon=10', 'test_data']
        main()
json_obj_start = stdout_buf.getvalue().find('{')
json_obj_end = stdout_buf.getvalue().find('}')
print(stdout_buf.getvalue()[json_obj_start:json_obj_end+1])
job_id = json.loads(stdout_buf.getvalue()[json_obj_start:json_obj_end+1])['job_id']
print(job_id)

### Verification

Make sure the notification didn't barf.

In [None]:
resp = requests.get('http://notifyfrontend:8070/eventmatch/{}'.format(subscription_uuid),
    headers={'Http-Remote-User': 'dmlb2001'})
assert resp.status_code == 200
assert resp.json()['error'] is None

### Dispatcher Checks

We should also make sure the dispatcher ran successfully.

In [None]:
resp = requests.get('http://127.0.0.1:8080')
assert resp.status_code == 200
assert resp.json()

# the first one is more recent and represents a second event from the uppercase upload.
resp = requests.get('http://127.0.0.1:8080/get/{}'.format(resp.json()[1]['taskID']))
assert resp.status_code == 200
assert resp.json()['taskStatus'] == '200 OK'