In [None]:
#default_exp data.prepare_db

### Prepare Database

> This step uses calculated features, and extract most recent values for each customer_id and email in the dataset to initialize a real time database (Redis) that can be used by live fraud prediction service.

In [None]:
#export
from typing import Dict
from datetime import datetime, timezone, timedelta
import random
import math
import dask.dataframe as dd
import numpy as np
import redis

from hopeit.server.serialization import serialize, Serialization, deserialize
from hopeit.server.compression import Compression
from hopeit.app.context import EventContext
from hopeit.app.events import Spawn, SHUFFLE
from hopeit.app.api import event_api
from hopeit.app.logger import app_logger
from hopeit.toolkit.storage.redis import RedisStorage

from fraud_poc.jobs import get_client, FeatureCalcJob, PrepareDbJob

In [None]:
#export
__steps__ = ['update_database']

logger = app_logger()


In [None]:
#export
def _save_values_by_key(key, path, db_host, db_port):
    df = dd.read_parquet(path, engine='fastparquet')
    df['key'] = df[key]
    return (key, df.map_partitions(_foreach_partition, db_host, db_port, meta=('value', object)).count().compute().item())
            
def _foreach_partition(df, db_host, db_port):
    db = redis.Redis(host=db_host, port=db_port, db=0)
    items = df.groupby(['key'])[df.columns].apply(_last_item)
    items['order_date'] = items['order_date'].apply(lambda x: x.isoformat())
    items = items.apply(lambda x: _persist(x, db), axis=1)
    db.close()
    return items

def _last_item(group):
    group = group.sort_values('order_date')
    return group.tail(1)     

def _persist(item, db):
    v = item.to_dict()
    key = v['key']
    payload = serialize(v, Serialization.PICKLE4, Compression.LZ4)
    db.set(key, payload)
    return v

In [None]:
#export
async def update_database(job: FeatureCalcJob, context: EventContext):
    client = get_client(context)
    db_host = context.env['db']['host']
    db_port = context.env['db']['port']
    logger.info(context, f"Preparing to save to database {db_host}:{db_port}...")
    try:
        tasks = []
        for key, path in job.features.items():
            logger.info(context, f"Saving latest state for {key} features...")
            tasks.append(client.submit(_save_values_by_key, key, path, db_host, db_port))
        res = client.gather(tasks)
        return PrepareDbJob(
            features=job.features,
            db=f'{db_host}:{db_port}',
            saved=dict(res)
        )
    except Exception as e:
        logger.error(context, e)
        return None
    finally:
        client.close()    
    

### Test from notebook

In [None]:
from hopeit.testing.apps import config, execute_event

app_config = config('config/training-pipeline.json')
job = FeatureCalcJob(sources={'customer_id': './data/partitioned/customer_id/', 'email': './data/partitioned/email'}, 
                     features={'customer_id': './data/features/customer_id/', 'email': './data/features/email/'})
result = await execute_event(app_config, 'data.prepare-db', job)
result

2020-07-08 14:47:30,084 | INFO | fraud-poc 0.0.1-training data.prepare-db leos13 27299 | Preparing to save to database localhost:6379... | track.operation_id=test_operation_id | track.request_id=test_request_id | track.request_ts=2020-07-08T14:47:28.743022+00:00 | stream.name= | stream.msg_id= | stream.consumer_group=
2020-07-08 14:47:30,085 | INFO | fraud-poc 0.0.1-training data.prepare-db leos13 27299 | Saving latest state for customer_id features... | track.operation_id=test_operation_id | track.request_id=test_request_id | track.request_ts=2020-07-08T14:47:28.743022+00:00 | stream.name= | stream.msg_id= | stream.consumer_group=
2020-07-08 14:47:30,087 | INFO | fraud-poc 0.0.1-training data.prepare-db leos13 27299 | Saving latest state for email features... | track.operation_id=test_operation_id | track.request_id=test_request_id | track.request_ts=2020-07-08T14:47:28.743022+00:00 | stream.name= | stream.msg_id= | stream.consumer_group=


PrepareDbJob(features={'customer_id': './data/features/customer_id/', 'email': './data/features/email/'}, db='localhost:6379', saved={'customer_id': 64, 'email': 100})

In [None]:
#customer id aggregation
db = redis.Redis(host='localhost', port=6379, db=0)
item = db.get('d555b585-5511-4a16-9f22-819834110239')
deserialize(item, Serialization.PICKLE4, Compression.LZ4, dict)

TypeError: a bytes-like object is required, not 'NoneType'

In [None]:
#email aggregations
item = db.get('1f5d34b02ef1975d5a82dcfe2e53fad6182e118c')
deserialize(item, Serialization.PICKLE4, Compression.LZ4, dict)