# Real Time User Segmentation
This notebook shows how to build a streaming complex event processing on a sliding time window for <br>
tagging and untagging users based on programmatic rules of user behavior.

Example of such rule: User who did 200 spins in the last two hours and had more than 10$ purchases should be tagged

In this notebook we will generate simulated slot machine application data and write it to iguazio stream (V3IO-Stream). <br> 
Then we'll create a Nuclio function that tracks the in-app purchases and slot machine spins per user.  

### You will need to include your password in the Nuclio configuration later in this notebook.
[You will need to provide your password for spec.triggers.slotStream.password here](#nuclioconfig)

This notebook creates a V3IO Stream called slots_stream and a KV table called slots_users in your /examples folder.

## Overview of Pipeline
Key points:
- Slot machine spins or in-app purchase events get written to a V3IO Stream.
- A Nuclio function instance handles the incoming event.
- One Nuclio function instance exists per shard.
- A sliding window of the last two hours of user activity is kept in memory.
- User activity is persisted in a KV table. Once a minute, to increase throughput.

In order to keep throughput high, it is necessary to batch the latest user data before persisting in the KV store.  

This increases throughput because there are less network calls and disk reads and writes.
The last two hours of user activity are kept in 

In [1]:
!pip install params



## Define workflow constants


In [2]:
STREAM = 'slots_stream'
CONTAINER = 'users'
KV_TABLE = 'slots_users'
EVENT_SPIN = 'spin'
EVENT_PURCHASE = 'in_app_purchase'
PATH="/User/demos/slots-stream"
EVENTS = [EVENT_SPIN, EVENT_PURCHASE]
# Total number of users to test
MAX_USERS = 5000
# Upper limit of generated events
MAX_RECORDS = 10000000
# Limits for users actions: top 20% (1/5) have more than 200 spins, top 2% (1/50) have more than 2 purchases
USER_ID_LIMITS = {EVENT_SPIN: MAX_USERS // 5, EVENT_PURCHASE: MAX_USERS // 50} 
# Limits for all low-grade users
EVENTS_LIMITS = {EVENT_SPIN: 199, EVENT_PURCHASE: 1}
# Total duration of the run
DURATION_HOURS = 3
# Parallel processing
N_PARTITIONS = 5
# Spin tracking
TRACK_MINUTES = 120
# Time to keep user activity after it lost active events, minutes
KEEP_ACTIVITY = 3
# Aggregation delay
CHECK_SECONDS = 60
# Spin limit to toggle the activity (set low to increase the toggle rate)
SPINS_THRESHOLD = 10
# Purchases threshold to toggle the activity
PURCHASES_THRESHOLD = 2

## Generate Stream Data

In [3]:
# nuclio: ignore
from random import choices, randint, uniform
import datetime
import tqdm
import time
import random
import math
import v3io_frames as v3f
import pandas as pd
import os
from params import *

LAST_TIME = datetime.datetime.now()

event_records = {user_id: {EVENT_SPIN: 0, EVENT_PURCHASE: 0} for user_id in range(MAX_USERS+1)}

purchase_sequence = []
spin_sequence = []

event = EVENT_PURCHASE
for _ in range(MAX_USERS * 2):
    user_id = randint(1, MAX_USERS)
    if user_id > USER_ID_LIMITS[event]:
        user_limit = EVENTS_LIMITS[event]
    else:
        user_limit = math.inf
    if event_records[user_id][event] < user_limit:
        event_records[user_id][event] += 1
        purchase_sequence.append({user_id: event})

event = EVENT_SPIN
for _ in range(MAX_RECORDS):
    user_id = randint(1, MAX_USERS)
    if user_id > USER_ID_LIMITS[event]:
        user_limit = EVENTS_LIMITS[event]
    else:
        user_limit = math.inf
    if event_records[user_id][event] < user_limit:
        event_records[user_id][event] += 1
        spin_sequence.append({user_id: event})

len_purchases = len(purchase_sequence)
len_spins = len(spin_sequence)
print('Events generated:', len_purchases + len_spins)

Events generated: 2802511


## Create the stream
Each shard will each be consumed by an instance of a Nuclio function which uses the stream as a trigger  
5 shards will spawn 5 instances of a Nuclio function when data is available

In [4]:
# nuclio: ignore
client = v3f.Client("framesd:8081", container="users")

TABLE = os.getenv('V3IO_USERNAME') + '/examples/' + STREAM

client.delete(backend="stream", table=TABLE)
client.create(backend="stream", table=TABLE, shards=N_PARTITIONS, retention_hours=DURATION_HOURS)

## Write the data to the stream
### Bulk writing in batches of 1000 to improve performance

In [5]:
# nuclio: ignore
timestamps_list = []
for i in range(len_spins+len_purchases):
    timestamps_list.append((LAST_TIME - datetime.timedelta(hours = random.random() * DURATION_HOURS)).strftime('%Y-%m-%d %H:%M:%S.%f UTC'))

# Populate stream with purchases

In [6]:
# nuclio: ignore

data = []
keys = []
for i in tqdm.tqdm(range(len_purchases)):
    key = list(purchase_sequence[i].keys())[0]
    keys.append(str(key))
    data.append(f'{purchase_sequence[i][key]},{timestamps_list[i]},{key}')
    if i % 1000 == 0:
        data = []
        keys = []
df = pd.DataFrame(data=data, columns=["data"])
client.write(backend="stream", table=TABLE, dfs=df, partition_keys=keys)        

100%|██████████| 4462/4462 [00:00<00:00, 429065.63it/s]


# Populate stream with spins

In [7]:
# nuclio: ignore

data = []
keys = []
for i in tqdm.tqdm(range(len_spins)):
    key = list(spin_sequence[i].keys())[0]
    data.append(f'{spin_sequence[i][key]},{timestamps_list[i]},{key}')
    keys.append(str(key))
    if i % 1000 == 0:
        df = pd.DataFrame(data=data, columns=["data"])
        client.write(backend="stream", table=TABLE, dfs=df, partition_keys=keys)
        data = []
        keys = []
df = pd.DataFrame(data=data, columns=["data"])
client.write(backend="stream", table=TABLE, dfs=df, partition_keys=keys)

100%|██████████| 2798049/2798049 [01:04<00:00, 43185.62it/s]


## Sample output of the stream, just making sure data is writen

In [8]:
# nuclio: ignore
client.read(backend="stream", table=TABLE, seek="earliest", shard_id="0")

Unnamed: 0_level_0,data,stream_time
seq_number,Unnamed: 1_level_1,Unnamed: 2_level_1
1,"in_app_purchase,2020-01-14 07:36:28.082548 UTC...",2020-01-14 08:38:37.336345897
2,"in_app_purchase,2020-01-14 08:35:58.539310 UTC...",2020-01-14 08:38:37.336345897
3,"in_app_purchase,2020-01-14 07:09:53.958808 UTC...",2020-01-14 08:38:37.336345897
4,"in_app_purchase,2020-01-14 08:19:04.456987 UTC...",2020-01-14 08:38:37.336345897
5,"in_app_purchase,2020-01-14 07:10:40.941940 UTC...",2020-01-14 08:38:37.336345897
6,"in_app_purchase,2020-01-14 06:17:30.567858 UTC...",2020-01-14 08:38:37.336345897
7,"in_app_purchase,2020-01-14 08:20:21.625474 UTC...",2020-01-14 08:38:37.336345897
8,"in_app_purchase,2020-01-14 06:10:56.360445 UTC,50",2020-01-14 08:38:37.336345897
9,"in_app_purchase,2020-01-14 06:01:46.902076 UTC...",2020-01-14 08:38:37.336345897
10,"in_app_purchase,2020-01-14 08:23:36.829636 UTC...",2020-01-14 08:38:37.336345897


## Delete output KV store to clean up the old data

In [9]:
!rm -rf /User/examples/slots_users

In [10]:
# nuclio: ignore
import nuclio

## Install necessary packages

In [11]:
%%nuclio cmd -c
pip install --upgrade pip
pip install v3io_frames

<a id="nuclioconfig"></a>
## Nuclio configuration

In [12]:
%%nuclio config
spec.build.baseImage = "python:3.6-jessie"
spec.resources.requests.memory = "4G"
# ERROR: {"error":"json: cannot unmarshal string into Go struct field Build.noCache of type bool","errorStackTrace":"\nError - json: cannot unmarshal string into Go struct field Build.noCache of type bool\n    .../nuclio/pkg/dashboard/resource/function.go:344\n\nCall stack:\nFailed to parse JSON body\n    .../nuclio/pkg/dashboard/resource/function.go:344\n"}
# spec.build.noCache = "true"
spec.readinessTimeoutSeconds = 10
spec.loggerLevel = "info"
spec.triggers.slotStream.kind = "v3ioStream"
spec.triggers.slotStream.url = "http://${V3IO_API}/users/${V3IO_USERNAME}/examples/slots_stream"
spec.triggers.slotStream.username = "${V3IO_USERNAME}"
###### SET PASSWORD HERE ######
spec.triggers.slotStream.password = "<SET PASSWORD>"
###############################
spec.triggers.slotStream.attributes.seekTo = "earliest"
spec.triggers.slotStream.attributes.partitions = [0, 1, 2, 3, 4]

%nuclio: setting spec.build.baseImage to 'python:3.6-jessie'
%nuclio: setting spec.resources.requests.memory to '4G'
%nuclio: setting spec.readinessTimeoutSeconds to 10
%nuclio: setting spec.loggerLevel to 'info'
%nuclio: setting spec.triggers.slotStream.kind to 'v3ioStream'
%nuclio: setting spec.triggers.slotStream.url to 'http://v3io-webapi.default-tenant.svc:8081/users/admin/examples/slots_stream'
%nuclio: setting spec.triggers.slotStream.username to 'admin'
%nuclio: setting spec.triggers.slotStream.password to 'data123'
%nuclio: setting spec.triggers.slotStream.attributes.seekTo to 'earliest'
%nuclio: setting spec.triggers.slotStream.attributes.partitions to [0, 1, 2, 3, 4]


## Create Nuclio environment variables

In [13]:
%nuclio env -c V3IO_USERNAME=${V3IO_USERNAME}
%nuclio env -c CONTAINER_NAME=users
%nuclio env -c TABLE_NAME=slot_data
%nuclio env -c WEB_API_HOST_AND_PORT=v3io-webapi:8081
%nuclio env -c SESSION_KEY=${V3IO_ACCESS_KEY}
%nuclio env -c FRAMESD_URL=framesd:8081

## Imports for the Function code

In [14]:
import os
import json
import time
import http.client
import uuid
import v3io_frames as v3f
import pandas as pd
from urllib3 import HTTPConnectionPool

## Nuclio function code
This function will spawn an instance for each shard of the stream with available data.  
At the end of the notebook you will see the metrics for the throughput.  
You will see how easily you can get high throughput with just a simple Stream + Nuclio configuration like in this demo!

In [15]:
# Initializes the context on instance creation
# Setup user_data which is persisted between function invocations
def init_context(context):
    context.user_data.TABLE_NAME = os.getenv('V3IO_USERNAME') + '/examples/' + KV_TABLE
    context.user_data.CONTAINER_NAME = os.getenv('CONTAINER_NAME')
    context.user_data.WEB_API_HOST_AND_PORT = os.getenv('WEB_API_HOST_AND_PORT')
    context.user_data.SESSION_KEY = os.getenv('SESSION_KEY')
    context.user_data.FRAMESD_URL = os.getenv('FRAMESD_URL')  
    
    # V3F client for packet write (KV table)
    context.user_data.client = v3f.Client(context.user_data.FRAMESD_URL, container=context.user_data.CONTAINER_NAME, token=context.user_data.SESSION_KEY)

    # Nuclio instance identifier
    context.user_data.worker_id = uuid.uuid4()

    # Records of active users (within the aggregation delay)
    context.user_data.active_users = {}

    # Users with tagged state
    context.user_data.tagged_users = {}

    # Time of last aggregation, seconds of epoch
    context.user_data.last_aggregation = -1

    # Aggregation lock for the active thread
    context.user_data.aggregation_lock = False

    # Instance start time
    context.user_data.start_time = -1

    # Number of processed messages
    context.user_data.processed_events = 0
    
    headers = {'Content-Type': 'application/json',
           'X-v3io-function': 'GetItem',
           'cache-control': 'no-cache',
           'X-v3io-session-key': context.user_data.SESSION_KEY}
    context.user_data.pool = HTTPConnectionPool(context.user_data.WEB_API_HOST_AND_PORT, maxsize=100, headers = headers)


# Handles the stream events
#This nuclio functions pulls data from kafka of number of slots per user. Each user data is aggregated in memory and ones a minute is stored to KV
#Only active users data is stored as user_id, array of by minute aggregated slots, and the timestamp it was updated the last time
#each time data is stored old data is read for that user to make sure a tag condi
def handler(context, event):
    try:
        TABLE_NAME = context.user_data.TABLE_NAME
        CONTAINER_NAME = context.user_data.CONTAINER_NAME
        WEB_API_HOST_AND_PORT = context.user_data.WEB_API_HOST_AND_PORT
        SESSION_KEY = context.user_data.SESSION_KEY
        FRAMESD_URL = context.user_data.FRAMESD_URL  
        # Account the fresh event
        context.user_data.processed_events += 1

        # Initialize start time during the first run
        if context.user_data.start_time < 0:
            context.user_data.start_time = time.time()

        # Extract action parameters
        raw = event.body.decode().strip()
        msg = json.loads(raw)
        action, timestamp, user_id = msg['data'].split(',')

        if context.user_data.processed_events % 10000 == 0:
            context.logger.debug("sample event: " + raw)
            context.logger.debug("context.user_data.processed_events=" + str(context.user_data.processed_events) + "\n" + str(context))

        # Current time in seconds of epoch
        current_time_seconds = int(time.time())

        # Current time in minutes of epoch
        current_time_minutes = current_time_seconds // 60

        # Check for a new user
        if user_id not in context.user_data.active_users:
            context.user_data.active_users[user_id] = {}
            context.user_data.active_users[user_id]['spin_array'] = [0] * TRACK_MINUTES
            context.user_data.active_users[user_id]['purchases'] = 0

        # Process the action
        if action == 'in_app_purchase':
            context.user_data.active_users[user_id]['purchases'] += 1
        elif action == 'spin':
            event_time = int(time.mktime(time.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f UTC'))) // 60
            time_lag = current_time_minutes - event_time
            time_lag = max(time_lag, 0)
            if time_lag < TRACK_MINUTES:
                context.user_data.active_users[user_id]['spin_array'][time_lag] += 1

        # Exit if aggregation is already in progress
        if context.user_data.aggregation_lock:
            return event.body
        # Start data aggregation during the first run or after the predefined period
        if (context.user_data.last_aggregation == -1) or (current_time_seconds - context.user_data.last_aggregation > CHECK_SECONDS):
            # Set the aggregation trigger to avoid overlays
            context.user_data.aggregation_lock = True

            # Reset aggregation time
            context.user_data.last_aggregation = time.time()

            # Copy all active users and purge the previous records
            active_users_local = dict(context.user_data.active_users)
            context.user_data.active_users = {}

            # Lists of fresh tagged and untagged users for the report
            fresh_tagged = set()
            fresh_untagged = set()

            # Global payload for KV table update
            global_payload = []

            # Process list of recent users
            for user_id in active_users_local:

                # Set aggregation structure subset
                aggregation = {}

                # Read stored user data
                response_status, response_data = read_item(WEB_API_HOST_AND_PORT, CONTAINER_NAME, TABLE_NAME, user_id, context.user_data.pool, SESSION_KEY)
                if response_status == 404:

                    # Initialize user not found in KV table
                    if user_id in active_users_local:
                        aggregation['spin_array'] = active_users_local[user_id]['spin_array']
                        aggregation['purchases'] = active_users_local[user_id]['purchases']
                    else:
                        aggregation['spin_array'] = [0] * TRACK_MINUTES
                        aggregation['purchases'] = 0
                    aggregation['updated'] = current_time_minutes
                else:

                    # Load existing data
                    loaded = json.loads(response_data.decode())
                    aggregation['updated'] = int(loaded['Item']['updated']['N'])
                    aggregation['spin_array'] = list(map(int, loaded['Item']['spin_array']['S'].split()))
                    aggregation['purchases'] = int(loaded['Item']['purchases']['N'])

                    # Update existing data
                    time_delta = int(current_time_minutes - aggregation['updated'])
                    time_delta = min(time_delta, TRACK_MINUTES)
                    if time_delta > 0:
                        aggregation['updated'] = current_time_minutes
                        for _ in range(time_delta):
                            aggregation['spin_array'] = [0] + aggregation['spin_array']
                            aggregation['spin_array'].pop()

                # Account fresh data
                if user_id in active_users_local:
                    aggregation['purchases'] += active_users_local[user_id]['purchases']
                    aggregation['spin_array'] = [sum(x) for x in zip(aggregation['spin_array'], active_users_local[user_id]['spin_array'])]
                    aggregation['updated'] = current_time_minutes

                # Check activity conditions and tag active user
                if (sum(aggregation['spin_array']) >= SPINS_THRESHOLD) and (aggregation['purchases'] >= PURCHASES_THRESHOLD):
                    if user_id not in context.user_data.tagged_users:
                        context.user_data.tagged_users[user_id] = {}
                    context.user_data.tagged_users[user_id]['expire'] = current_time_minutes + KEEP_ACTIVITY
                    fresh_tagged.add(user_id)

                # Prepare combined payload
                if aggregation['updated'] == current_time_minutes:
                    global_payload.append({'user_id': user_id,
                                           'spin_array': ' '.join(list(map(str, aggregation['spin_array']))),
                                           'purchases': aggregation['purchases'],
                                           'updated': aggregation['updated']})

            # Untag expired tagged users
            for user_id in context.user_data.tagged_users:
                if context.user_data.tagged_users[user_id]['expire'] <= current_time_minutes:
                    fresh_untagged.add(user_id)
            for user_id in fresh_untagged:
                del context.user_data.tagged_users[user_id]

            tagged_users_ids = list(dict(context.user_data.tagged_users).keys())
            runtime = time.time() - context.user_data.start_time
            context.logger.debug(f'Worker ID: {context.user_data.worker_id}\n',
                  f'Runtime: {runtime} s\n',
                  f'Processed events: {context.user_data.processed_events} \n',
                  f'Events per second: {int(context.user_data.processed_events / runtime)}\n',
                  f'Toggled users: {len(fresh_tagged) + len(fresh_untagged)}\n',
                #  f'Tagged users ({len(fresh_tagged)}): {fresh_tagged} \n',
                  f'Active users ({len(active_users_local)}) \n',
                  f'Tagged users ({len(context.user_data.tagged_users)}): {str(tagged_users_ids)} \n',
                  f'Fresh Tagged users ({len(fresh_tagged)}): {fresh_tagged}  \n',
                  f'Fresh Untagged users ({len(fresh_untagged)}): {fresh_untagged}')

            # Write combined payload in KV table
            df = pd.DataFrame(global_payload)
            df.set_index('user_id', inplace=True)
            context.user_data.client.write(backend='kv', table=TABLE_NAME, dfs=df, save_mode="overwriteItem")
            
            context.user_data.aggregation_lock = False
        return event.body
    except BaseException as e:
        context.logger.debug("EXCEPTION: " + str(e))
        return event.body


# Read single item from KV table
def read_item(url, container, table, key, pool, session_key):
    headers = {'Content-Type': 'application/json',
               'X-v3io-function': 'GetItem',
               'cache-control': 'no-cache',
               'X-v3io-session-key': session_key}
    payload = {'Key': {'user_id': {'S': key}}, 'AttributesToGet': '*'}
    #conn = http.client.HTTPConnection(url)
    #conn.request('POST', f'/{container}/{table}/', bytes(json.dumps(payload), encoding='utf-8'), headers=headers)

    response = pool.urlopen('POST', f'/{container}/{table}/', body=bytes(json.dumps(payload), encoding='utf-8'), headers=headers)
    #response = conn.getresponse()
    data = response.data#.decode('utf-8')
    # data = response.read()
    #conn.close()
    return response.status, data



## Deploy job

In [16]:
%nuclio deploy -n slot-machine-v3iostream -p examples -v

[nuclio] 2020-01-14 08:40:24,565 updating slot-machine-v3iostream
[nuclio] 2020-01-14 08:40:24,617 deploying ...
[nuclio] 2020-01-14 08:40:31,707 (info) Build complete
[nuclio] 2020-01-14 08:40:31,708 {'level': 'info', 'message': 'Build complete', 'name': 'deployer', 'result': {'Image': 'nuclio/processor-slot-machine-v3iostream:latest', 'UpdatedFunctionConfig': {'metadata': {'annotations': {'nuclio.io/generated_by': 'function generated at 14-01-2020 by admin from /User/demos/slots-stream/real-time-user-segmentation.ipynb'}, 'labels': {'nuclio.io/project-name': 'examples'}, 'name': 'slot-machine-v3iostream', 'namespace': 'default-tenant'}, 'spec': {'build': {'baseImage': 'python:3.6-jessie', 'codeEntryType': 'sourceCode', 'commands': ['pip install --upgrade pip', 'pip install v3io_frames'], 'functionSourceCode': 'IyBHZW5lcmF0ZWQgYnkgbnVjbGlvLmV4cG9ydC5OdWNsaW9FeHBvcnRlciBvbiAyMDIwLTAxLTE0IDA4OjQwCgpTVFJFQU0gPSAnc2xvdHNfc3RyZWFtJwpDT05UQUlORVIgPSAndXNlcnMnCktWX1RBQkxFID0gJ3Nsb3RzX3VzZXJz

## Wait for job to start

In [17]:
# nuclio: ignore
import pandas as pd
import v3io_frames as v3f
import os
from sqlalchemy.engine import create_engine

In [18]:
# nuclio: ignore
while True:
    if os.path.isdir("/User/examples/" + KV_TABLE): 
        break
    time.sleep(5)

## Wait for job to finish

In [19]:
# nuclio: ignore
client = v3f.Client("framesd:8081", container="users")

In [20]:
# nuclio: ignore
TABLE_NAME = os.getenv('V3IO_USERNAME') + '/examples/' + KV_TABLE

In [21]:
# nuclio: ignore
engine = create_engine(os.getenv('DATABASE_URL'))
table_path = os.path.join('v3io.users."'+str(os.getenv('V3IO_USERNAME'))+'/examples/slots_users"')
query = 'select count(1) count from '+table_path

def count():
    df = pd.read_sql(query,engine)
    return df.loc[0,'count']

timeStart = time.time()
while count() < MAX_USERS:
    time.sleep(1)
totalTime = int(round((time.time() - timeStart) * 1000))
print("Total run time: " + str(totalTime) + " ms")

Total run time: 56732 ms


# Sample output of the data that is stored in KV table.
## We use table to store arrays of spins, purchase and last update time stamp

In [22]:
# nuclio: ignore
client = v3f.Client("framesd:8081", container="users")
no_spins = ' '.join(list(map(str, TRACK_MINUTES * [0])))
df = client.read(backend="kv", table=TABLE_NAME, filter="purchases>0 AND spin_array!='" + no_spins + "'")
print(df.head(20).to_string())

         purchases                                         spin_array   updated
user_id                                                                        
2855             2  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 2 0 0 2 0 0 0 ...  26316521
3045             2  0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 ...  26316521
1930             2  0 0 0 0 0 0 0 2 0 0 1 0 0 0 0 0 0 2 1 0 0 3 0 ...  26316521
529              2  0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 1 0 1 0 ...  26316521
1714             2  0 0 0 0 0 0 0 0 0 0 2 0 0 0 2 2 2 1 0 0 0 0 0 ...  26316521
4239             2  0 0 0 0 0 0 1 0 0 0 0 0 0 0 1 0 0 0 1 0 0 0 0 ...  26316521
1654             2  0 0 0 0 3 0 0 2 0 0 3 2 0 1 0 0 0 0 2 1 2 0 0 ...  26316521
691              2  0 0 0 0 0 0 1 1 0 1 0 0 0 1 1 1 1 0 0 0 0 2 0 ...  26316521


## Print logs
Metrics such as events per second here.  
Total throughput is each worker's events per second all added together.  
With 5 shards on a m5.8xlarge EC2 instance you should get about 20,000 events per second.

In [23]:
!kubectl logs $(kubectl get pods | grep slot | cut -d ' ' -f 1) | grep -e Runtime -e Worker