## Stream attributes from Web events using Signals

This notebook creates a new view using the SDK, tests it on the atomic events table and applies.

### Flow of data

```mermaid
flowchart LR
    sp(Snowplow Pipeline)
    stream[/Stream processing/]
    signals(Signals)

    sp --> stream
    stream --> signals
```

---

In [6]:
from snowplow_signals import Signals
from dotenv import load_dotenv
import os

load_dotenv()
# You will need valid BDP credentials (API key, API key ID and org ID) for this to work.
# Instructions on how to generate these here: https://docs.snowplow.io/docs/account-management/managing-console-api-authentication/#credentials-ui-v3
sp_signals = Signals(
    api_url="http://localhost:8008", # Local Signals API endpoint
    api_key=os.environ["SNOWPLOW_API_KEY"],
    api_key_id=os.environ["SNOWPLOW_API_KEY_ID"],
    org_id=os.environ["SNOWPLOW_ORG_ID"],
)

### Define a new attribute

This block creates a single attribute definition including the logic how it should be calculated (it's filters and aggregation).

The attribute calculates the number of add to cart ecommerce events.

In [24]:
from snowplow_signals import (
    Attribute,
    Criteria,
    Criterion,
    Event,
)

link_click_count = Attribute(
    name="link_click_count",
    type="int32",
    events=[
        Event(
            vendor="com.snowplowanalytics.snowplow",
            name="link_click",
            version="1-0-1",
        )
    ],
    aggregation="counter",
    property="unstruct_event_com_snowplowanalytics_snowplow_link_click_1:targetUrl",
    criteria=Criteria(
        all=[
            Criterion(
                property="event_name",
                operator="=",
                value="link_click",
            ),
        ],
    ),
)

# a dummy event is defined for this attribute as we only use this for storing an intervention value so we don't want signals to write a value to it
intervention_feature = Attribute(
    name="intervention_example",
    type="string",
    events=[
        Event(
            vendor="doesnotexist.com",
            name="anything",
            version="1-0-0"
        )
    ],
    criteria=Criteria(
        all=[
            Criterion(
                property="page_urlpath",
                operator="=",
                value="foobar",
            ),
        ]
    ),
    aggregation="first"
)

### Wrapping the attribute in a view

All attributes need to be included in views that can be considered as "tables" of attributes.

Views are immutable and versioned.

In [25]:
from snowplow_signals import View, session_entity, Service

view = View(
    name="demo_attributes",
    version=2,
    entity=session_entity,
    attributes=[
        link_click_count, # a count of link click events
        intervention_feature # stores the result of our intervention once triggered
    ],
    online=True
)

service = Service(
    name="demo_service",
    description='Service for demoing signals and inteventions',
    views=[view]
)

### Applying the view to Signals

The following block pushes the view definition to the Signals API and makes it available for processing.

In [26]:
# Ensure Snowplow Local and Personalization API are running otherwise this will fail
applied = sp_signals.apply([view, service])
print(f"{len(applied)} objects applied")
# If successful you should see two objects applied (one for the view and one for the service).

2 objects applied


In [90]:
# Send in an example link click event
from snowplow_tracker import Snowplow
from snowplow_tracker import Subject, PageView, SelfDescribing, SelfDescribingJson
from time import time

tracker = Snowplow.create_tracker(
    namespace="ns1"+str(time()),
    endpoint="http://localhost:8080", # TODO: Snowplow Local collector
    app_id="ai_demo", # TODO: choose app id that your Signals streaming engine is listening to
)


duid = "1e4a8d2a-4b3b-4b6d-8b1e-6f4f4f4f4f4f"
sid = "5f3f7a2b-0e4b-4b6d-8b1e-6f4f4f4f4f4f"

subject = Subject()
subject.set_domain_user_id(duid)
subject.set_domain_session_id(sid)


link_click = SelfDescribing(
    SelfDescribingJson(
        "iglu:com.snowplowanalytics.snowplow/link_click/jsonschema/1-0-1",
        {
            "targetUrl": "http://test.net/123",
            "elementContent": "Snowplow"
        },
    ),
    event_subject=subject
)

link_click.payload.add("url", "https://snowplow.io/")

tracker.track(link_click)
tracker.flush()

INFO:snowplow_tracker.emitters:Emitter initialized with endpoint http://localhost:8080/com.snowplowanalytics.snowplow/tp2
INFO:snowplow_tracker.snowplow:Tracker with namespace: 'ns11747189129.248855' added to Snowplow
INFO:snowplow_tracker.emitters:Attempting to send 1 events
INFO:snowplow_tracker.emitters:Sending POST request to http://localhost:8080/com.snowplowanalytics.snowplow/tp2...
INFO:snowplow_tracker.emitters:Finished synchronous flush


<snowplow_tracker.tracker.Tracker at 0x11f0ea270>

In [91]:
# wait ~10-30 seconds after the event and check
import requests
r = requests.post(
    'http://localhost:6566/get-online-features', # Personalization API must be up. Auth not required!
    json = {
        'entities': {
            'session': [sid]
        },
        'feature_service': 'demo_service'
    }
)
# print out a prettier version
features = {}
feature_names = r.json()['metadata']['feature_names']
feature_values = r.json()['results']
for i, feature in enumerate(feature_names):
    features[feature] = feature_values[i]['values']
print(features)

{'domain_sessionid': ['5f3f7a2b-0e4b-4b6d-8b1e-6f4f4f4f4f4f'], 'link_click_count': [6], 'intervention_example': ['bob']}


In [None]:
# the value of link_click_count should now be 1 and intervention_example should be blank!
# now let's create an intervention using the API
# this intervention will set an attribute for `intervention_example` if the intervention is triggered
intervention = {
    'name': 'link_clicker5',
    'version': 1,
    'method': 'set_attribute',
    'context': {
        'attribute': 'demo_attributes:intervention_example',
        'value': 'hello world',
        'clear_history': True
    },
    'criteria': {'operator': '>=', 'value': 6, 'attribute': 'demo_attributes:link_click_count'},
}

r = requests.post(
    'http://localhost:8008/api/v1/registry/interventions',
    json=intervention,
    headers={
        'Content-Type': 'application/json',
        'Authorization': 'Bearer NA' # Auth not required
    }
)
print(r.status_code)
print(r.json())

200
{'name': 'link_clicker5', 'version': 1, 'method': 'set_attribute', 'target_agents': None, 'script_uri': None, 'context': {'attribute': 'demo_attributes_2:intervention_example', 'value': 'hello world', 'clear_history': True}, 'description': None, 'tags': None, 'owner': None, 'criteria': {'attribute': 'demo_attributes_2:link_click_count', 'operator': '>=', 'value': 6}}


In [96]:
# subscribe to interventions (in a notebook)
# this would ordinarily be done in some front end code, so this is for demonstration purposes only.
# interrupt this cell (with the stop / interrupt button) to stop listening for interventions
import json

def format_intervention(payload):
    print('New intervention received:')
    payload_json = json.loads(payload)
    print('Context:', json.dumps(payload_json['context'], indent=2))


with requests.get('http://localhost:8008/api/v1/interventions', stream=True, params={'session': sid}) as response:
    response.raise_for_status()
    buffer = ''
    for line in response.iter_lines(decode_unicode=True):
        if line is None:
            continue
        if line == '':
            if buffer.startswith("data:"):
                data = buffer[len("data:"):].strip()
                format_intervention(data)
            buffer = ''
        else:
            buffer += line + '\n'

New intervention received:
Context: {
  "attribute": "string",
  "value": "string",
  "path": "string",
  "clear_history": true
}


KeyboardInterrupt: 