In [22]:
pip install snowplow-signals==0.1.1 python-dotenv

Collecting snowplow-signals==0.1.1
  Downloading snowplow_signals-0.1.1-py3-none-any.whl.metadata (3.2 kB)
Downloading snowplow_signals-0.1.1-py3-none-any.whl (58 kB)
Installing collected packages: snowplow-signals
  Attempting uninstall: snowplow-signals
    Found existing installation: snowplow-signals 0.1.0
    Uninstalling snowplow-signals-0.1.0:
      Successfully uninstalled snowplow-signals-0.1.0
Successfully installed snowplow-signals-0.1.1
Note: you may need to restart the kernel to use updated packages.


In [23]:
from dotenv import dotenv_values
from snowplow_signals import Signals

config = dotenv_values("../.env.local")

signals = Signals(
    api_url=config["SIGNALS_API_URL"],
    api_key=config["SIGNALS_API_KEY"],
    api_key_id=config["SIGNALS_API_KEY_ID"],
    org_id=config["SIGNALS_ORG_ID"],
)

In [24]:
from snowplow_signals import Attribute, Event, Criteria, Criterion

event_article_details = Event(
    vendor="com.snplow.sales.aws",
    name="article_details",
    version="1-0-0",
)

def get_category_criteria(category):
    return Criteria(
        all=[
            Criterion(
                property="unstruct_event_com_snplow_sales_aws_article_details_1:category",
                operator="=",
                value=category
            )
        ]
    )


page_view_count = Attribute(
    name="page_view_count",
    type="int32",
    events=[event_article_details],
    aggregation="counter"
)

article_read_count = Attribute(
    name="article_read_count",
    type="int32",
    events=[event_article_details],
    aggregation="counter"
)

articles_read = Attribute(
    name="articles_read",
    type="string_list",
    events=[event_article_details],
    property="unstruct_event_com_snplow_sales_aws_article_details_1:name",
    aggregation="unique_list"
)

article_last_read = Attribute(
    name="article_last_read",
    type="string",
    events=[event_article_details],
    property="unstruct_event_com_snplow_sales_aws_article_details_1:name",
    aggregation="last"
)

article_category_business_read_count = Attribute(
    name="article_category_business_read_count",
    type="int32",
    events=[event_article_details],
    criteria=get_category_criteria("Business"),
    aggregation="counter"
)

article_category_ai_read_count = Attribute(
    name="article_category_ai_read_count",
    type="int32",
    events=[event_article_details],
    criteria=get_category_criteria("AI"),
    aggregation="counter"
)

article_category_data_read_count = Attribute(
    name="article_category_data_read_count",
    type="int32",
    events=[event_article_details],
    criteria=get_category_criteria("Data"),
    aggregation="counter"
)

article_category_technology_read_count = Attribute(
    name="article_category_technology_read_count",
    type="int32",
    events=[event_article_details],
    criteria=get_category_criteria("Technology"),
    aggregation="counter"
)

In [25]:
from snowplow_signals import View, Service, domain_userid

media_demo_view = View(
    name="media_demo_view",
    version=2,
    entity=domain_userid,
    online=True,
    owner="james.borlase@snowplow.io",
    attributes=[
        page_view_count,
        article_last_read,
        article_read_count,
        articles_read,
        article_category_business_read_count,
        article_category_ai_read_count,
        article_category_data_read_count,
        article_category_technology_read_count
    ]
)

media_demo_service = Service(
    name="media_demo_service",
    description="Media Demo Service",
    views=[media_demo_view],
    owner="james.borlase@snowplow.io"
)

In [26]:
data = signals.test(
    view=media_demo_view,
    app_ids=["website"],
)
print(data)

                          domain_userid  page_view_count article_last_read  \
0  89c2d702-343f-46e7-b00d-8080aa83dc99                0              None   
1  f241bcd2-ad57-4a29-9372-768aa2c3c319                0              None   
2  745cd761-4195-4875-996c-0d096a72077e                0              None   
3  83634b3a-22df-4b52-9905-6a634fc74f09                0              None   
4  ba111ad7-846f-442e-bd99-35bbea8d1fea                0              None   
5  4d4ce2ee-dbcf-41e2-8d0a-d41c5bc50237                0              None   
6  e4816664-e1b6-4016-a597-a102a9539866                0              None   
7  36f95db5-a2a9-4085-ad46-ef308c3f5781                0              None   
8  629c3095-fa56-4e0c-b3fc-b75192439cca                0              None   
9  041c7ff4-7775-4f21-aa92-385706ffe34a                0              None   

   article_read_count articles_read  article_category_business_read_count  \
0                   0            []                             

In [27]:
applied = signals.apply([media_demo_view])
print(f"{len(applied)} objects applied")

applied = signals.apply([media_demo_service])
print(f"{len(applied)} objects applied")

1 objects applied
1 objects applied


In [None]:
response = signals.get_online_attributes(
    source=media_demo_service,
    identifiers="614587d9-000b-4177-b362-2aaf98a54209"
)

response.model_dump()['data']

SignalsAPIError: [Signals API] 422: {'detail': [{'type': 'value_error', 'loc': ['body', 'entities'], 'msg': 'Value error, Either domain_userid or domain_sessionid or network_userid or user_id must be provided.', 'input': {'session': None, 'user': None}, 'ctx': {'error': {}}}]}