# Event volume anomaly detection

Welcome to the event volume anomaly detection solutions accelerator.

This accelerator provides a quick and practical introduction to using anomaly detection with **BigQuery** and **Google Cloud Platform (GCP)** to identify anomalies in Snowplow data.

In this example, you will learn how to apply anomaly detection in a time series, focusing on identifying dips in warehouse data that could signal tracking issues or an increase in failed events.

### Prerequisites
To run this notebook, make sure you have:
- **Access to a GCP project** (including the project ID).
- **Permissions to read, write, and query objects in BigQuery**.
- **Execution permissions** for this notebook.

You do not need a full Snowplow pipeline setup, as this accelerator includes sample data for you to work with.

### Estimated time
This notebook should take approximately **10-15 minutes** to complete. It requires minimal compute resources, allowing you to run everything within the notebook environment. Initial connection may take around **30 seconds**.

### Getting started
You only need to make minimal modifications to this notebook—look out for the `MODIFY ME` text for specific areas that need your input.

By the end of this accelerator, you will be equipped to detect anomalies in your Snowplow event volume. These fundamentals can be extended to detect different types of anomalies in event data.

In [None]:
!pip3 install google-cloud-bigquery python-dotenv ipywidgets pandas matplotlib seaborn bigquery_magics
import os
from google.colab import auth
from google.cloud.bigquery import magics

PROJECT_ID = 'YOUR_PROJECT_ID' # MODIFY ME - put the project id of your GCP project here, you will need access to a GCP project with BigQuery permissions for your currently logged in Google user
auth.authenticate_user()
os.environ['GCP_PROJECT_ID'] = PROJECT_ID
magics.context.project = PROJECT_ID

First let's have a look at what our timeseries data looks like for the month of June 2024.

We'll be looking at a table with three columns.
collector_tstamp: A truncated (to the hour) UTC timestamp of when an event was received
event_count: A count of the number of enriched events in that period
app_id: The application identifier

The intent of this exercise is to determine statistically significant drops in event counts in a given hour so that we can generate alerts in the future to make us aware that there may be a problem with our data collection.

To start we'll visually plot our data which will make it more easily apparent where these dips exist.

In [None]:
import pandas as pd
test_df = pd.read_csv('https://raw.githubusercontent.com/snowplow-industry-solutions/event-volume-anomaly-detection/refs/heads/main/datasets/anomaly_events_test.csv?token=GHSAT0AAAAAAC2DXSNSS76W6IWTDVJISP42Z5V6FMQ', parse_dates=['collector_tstamp'])

import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
sns.lineplot(x='collector_tstamp', y='event_count', data=test_df, marker='o')
plt.xlabel('Timestamp')
plt.ylabel('Event Count')
plt.title('Event Count over Time')
plt.grid(True)
plt.show()


Now that we've plotted our data we are going to load it into our GCP project (in the scratch.sample_events table) so that we can train our anomaly detection model.

In [None]:
# First, we'll load our local data into BigQuery to begin training
%load_ext bigquery_magics
from google.cloud import bigquery
client = bigquery.Client(project=PROJECT_ID)

# TODO: use a project id and dataset of your choosing
table_id = f'{PROJECT_ID}.scratch.sample_events' # optionally customise dataset and table name, you will get a 404 if this dataset does not already exist
schema = [
    bigquery.SchemaField("collector_tstamp", bigquery.enums.SqlTypeNames.TIMESTAMP),
    bigquery.SchemaField("event_count", bigquery.enums.SqlTypeNames.INTEGER),
    bigquery.SchemaField("app_id", bigquery.enums.SqlTypeNames.STRING),
    bigquery.SchemaField("event_name", bigquery.enums.SqlTypeNames.STRING)
]

# create the load configuration
job_config = bigquery.LoadJobConfig(
    schema=schema,
    write_disposition="WRITE_TRUNCATE"
)

test_table_name = f'{PROJECT_ID}.scratch.sample_events'
job = client.load_table_from_dataframe(test_df, test_table_name, job_config=job_config)
result = job.result()
test_table = client.get_table(test_table_name)
print(
    "Loaded {} test rows and {} columns to {}".format(
        test_table.num_rows, len(test_table.schema), table_id
    )
)

Now we will create an [ARIMA+](https://cloud.google.com/vertex-ai/docs/tabular-data/forecasting-arima/overview) model based on this data. ARIMA+ is a statistical model that can be used for both timeseries forecasting (predicting the future) as well as anomaly detection.

ARIMA models aim to decompose a time series into their individual components such as trends, seasons, and holiday and incorporate these cyclicty elements into the model. This is far more useful than a model which detects anomalies over a fixed value (e.g., anything about 100), a basic statistical model (any value 2 deviations above the rolling mean) or more complex methods like Holt-Winters that make it more difficult to account for things like holiday periods e.g., Black Friday.

We'll optionally include holidays for a given country to account for spikes or dips in traffic for US public holidays. You can change this value below if you would like to use holidays for a different country.

Running the cell below will create and train the model. This ordinarily takes a few minutes to complete.

In [None]:
%%bigquery --project $PROJECT_ID
create or replace model scratch.app_id_event_forecast
    options(
        model_type="ARIMA_PLUS",
        time_series_timestamp_col="collector_tstamp",
        time_series_data_col="event_count", -- the thing we are attempting to forecast
        time_series_id_col=["app_id"],

        holiday_region="US"
    )
    AS (
SELECT
    collector_tstamp,
    app_id,
    event_count
FROM
    scratch.sample_events
    )

Now that we've trained a forecasting model above, we'll use it to predict any anomalies in our test data set above from 2024. We can do this by creating a table full of anomalies against the detected dataset.

We will write the high confidence (probably > 0.99) anomalies to a table that we can then query later for safekeeping. You can adjust the confidence level below by moving the control left or right. Decreasing the confidence will increase the number of anomalies detected (and as a result potential false positives) and increasing the confidence will reduce the number of anomalies detected.

In [None]:
import ipywidgets as widgets
confidence = widgets.IntSlider(
    value=99,
    min=0,
    max=100,
    step=5,
    description='Confidence (higher is more confident):',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)
display(confidence)
params = {'confidence': confidence.value}

In [None]:
%%bigquery edf --params $params --project $PROJECT_ID
  create or replace table scratch.anomaly_predictions
  OPTIONS()
  as SELECT
    collector_tstamp,
    app_id,
    event_count,
    is_anomaly,
    lower_bound,
    upper_bound,
    anomaly_probability,
    CASE
        WHEN event_count > upper_bound THEN 'above'
        WHEN event_count < lower_bound THEN 'below'
    END AS direction
    FROM
        ML.DETECT_ANOMALIES(MODEL scratch.app_id_event_forecast)
where
is_anomaly = true
and
anomaly_probability >= @confidence / 100;

Now let's select the anomalous values out into a dataframe from the table that we have just created above (scratch.anomaly_predictions).

In [None]:
%%bigquery anomalies_df --project $PROJECT_ID
SELECT * FROM scratch.anomaly_predictions

In [None]:
# show a limited view of 10 anomalies
anomalies_df.head(10)

Now that you are a certified data scientist let's turn this frankly downright unappealing table into something that is a little bit more explanatory.

To do that we will take the anomalous values above and replot them against our original timeseries plot of our traffic.

For any data point in this set we will mark it with a red cross.

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10, 6))
sns.lineplot(x='collector_tstamp', y='event_count', data=test_df, marker='o')
plt.xlabel('Timestamp')
plt.ylabel('Event Count')
plt.title('Event Count over Time')
plt.grid(True)
dips = anomalies_df[anomalies_df['direction'] == 'below']

plt.scatter(dips['collector_tstamp'], dips['event_count'], color='r', marker='x', label='Anomaly', zorder=2)

plt.show()

Congratulations on completing this event volume anomaly detection accelerator!

You've learned how to use Snowplow data in combination with GCP and BigQuery to detect anomalies in a time series, specifically identifying dips in event counts that could indicate tracking issues or increases in failed events.

With the skills you've gained, you can:
- Set up automated alerts for data collection issues before they impact your analytics.
- Apply anomaly detection to monitor the health of your Snowplow pipeline.
- Experiment with forecasting to anticipate traffic fluctuations and optimize your data strategy.

The possibilities with Snowplow and anomaly detection are vast - whether you're improving ecommerce tracking, enhancing product analytics, or maintaining robust data quality.

Keep exploring, keep experimenting, and let Snowplow empower your data-driven decisions!