# Sessionize an Event Log using Ibis

Guides used to create this example:
* https://www.pola.rs/posts/the-expressions-api-in-polars-is-amazing/
* https://knowledge.dataiku.com/latest/courses/advanced-code/python/sessionization.html

### Imports

In [1]:
# We really only need Ibis and the underscore API for this:
import ibis
from ibis import _ as c

### Setup

Here we will point to where our data lives and set up some variables.

Our data should contain at least two columns:
* `entity_col`: a column noting an entity identifier, e.g. an event or user
* `timestamp_col`: a column noting the timestamp for a row, e.g. an event log or a timestamp when an entity is polled

In this example, we are polling all of the entities that are active on a server.
This poll occurs on a cadence of around 10 minutes.

We want to be able to estimate how long an entity is on a server without a break.
To determine if an entity takes a break from being on the server, we check to see if it was absent from the poll results for 30 minutes (about 3 polls).

#### Read our data

Here we define where our data lives.

`ibis.read` is a new function in 4.0 that allows users to quickly read in files as table expressions using the default backend (current is DuckDB but can be altered through `ibis.options.default_backend`).

`ibis.read` accepts paths on the local machine—parquet, csv, text, and globs—and can even accept http paths (files hosted remotely on a server).

You can either download the file from `https://storage.googleapis.com/ibis-tutorial-data/wowah_data/wowah_data.csv` or read the file directly using `ibis.read`'s http functionality:

In [2]:
# str path to data
## Reading from remote using ibis.read's read_http functionality:
path = "https://storage.googleapis.com/ibis-tutorial-data/wowah_data/wowah_data.csv"

## It's faster to read locally though
# path = '../../data/wowah_data.csv'

# Read files into table expressions with ibis.read:
# This function accepts kwarg from upstream functions
# such as timestampformat to specify the timestamp format for reading timestamp columns
data = ibis.read(path, timestampformat='%m/%d/%y %H:%M:%S')

#### Additional Variables

Here we're defining how our columns are labeled.

In [3]:
# name of a column noting an entity identifier, e.g. an event or user
entity_col = 'char'
# name of a column noting the timestamp for a row, e.g. when entity polled
timestamp_col = 'timestamp'

And now we define what constitutes a break in seconds:

In [4]:
# integer delay in seconds noting if a row should be included in the previous session for an entity
session_boundary_threshold = 30 * 60

### Separating Entities and Sessions

We need to take our data and partition it by entity.
After we've partitioned our data, we should order each set by timestamp so that we have a chronological log for each entity.

We'll use this order of timestamps in each set to find the distance between a timestamp and its previous timestamp.

We can partition our dataset using a window.
Our window should have the following attributes:
* `group_by` our entity since sessions should contain one and only one entity
* `order_by` our timestamp so that our timestamps are in chronological order - for each row we want to make sure that the previous row either doesn't exist (is a new character/new session) or is the last poll result that the entity was present for
* Don't look at rows following (`following=0`) - as we cycle through our window, we want to make sure that for each row we are looking at the cumulation of the previous rows but not the following rows.  So for each row we will only take into consideration the current row and the rows preceeding it and not aggregate rows following.

We can take each row within a window and determine the timestamp for the preceeding row.

A shortcut for windows with `following=0` is `ibis.cumulative_window`.

In [5]:
# Window for finding session ids per character
entity_window = ibis.cumulative_window(group_by=[entity_col], order_by=[timestamp_col])

We can use this window to find the previous timestamp for each row:

In [6]:
# Take the previous timestamp within a window (by character ordered by timestamp):
# Note: the first value in a window will be null
ts_lag = c[timestamp_col].lag().over(entity_window)

Once we have the previous timestamp for each row, we can find the delta between the current timestamp and the previous timestamp by simply subtracting them:

In [7]:
# Subtract the lag from the current timestamp to get a timedelta
ts_delta = c[timestamp_col] - ts_lag

This time delta can be compared to our `session_boundary_threshold` to figure out if the distance between the current row and the previous row are far enough to label the current row as the start of a new session.

Our Truth Table looks like this:

subtract the lag from the current timestamp to get a timedelta and then
compare this value to our session delay in seconds to determine if the
current timestamp falls outside of the session.
Basically:
* new character, new session should be True (first timestamp in char partition)
  * Note: we will coalesce this to True since it is a new session
* same character, new session should be True
* same character, same session should be False
* new character, same session should not exist since our window is partitioned by char

```
| Case                         | True/False       | Explanation                                      |
| ============================ | ================ | ================================================ |
| New Character, New Session   | Coalesce to True | no previous timestamp, ts_delta is null          |
| Same Character, New Session  | True             | session_boundary_threshold > 30 min, new session |
| Same Character, Same Session | False            | session_boundary_threshold ≤ 30 min, new session |
| New Character, Same Session  | False            | should not happen; our window groups by entity   |
```

In [8]:
# Compare timedelta to our session delay in seconds to determine if the
# current timestamp falls outside of the session.
is_new_session = (ts_delta > ibis.interval(seconds=session_boundary_threshold))

We will coalesce this to `True` in our final call to capture our first case.

We can then take `is_new_session` and compute a rolling sum.

Since new characters and same character/same sessions will be `True` after coalescing, those will be masked as 1, and all other rows will be False (0).

We can then sum over our `entity_window` defined above to create this rolling sum.
This creates, for each entity ordered by timestamp, an integer that increments with each new session.

We will compute this in our final call and name it `session_id`, which will be an session identifier for a given entity.

Note that this number will not be unique, but will be unique when paired with an entity.

### Creating a window over our sessions

Our last window is over our `session_id`.
We do this so we can compute the max and min timestamp for each entity's session.
We can take those values, find the difference, and that will be the `session_duration`.

We create our window using `ibis.window` and `group_by` both `entity_col` and `session_id`:

In [11]:
# Window for finding session min/max
session_window = ibis.window(group_by=[entity_col, 'session_id'])

### Compute

Finally, we will:
1. Create a boolean column that tells us if a row marks a new session using `is_new_session`
2. Create session ids by calculating a rolling sum on `new_session` over our `entity_window`
3. Drop our `new_session` column since it is no longer needed
4. Get the session duration by subtracting the `min` over our sessions from the `max` over our sessions.
5. Finally, order by our entity and timestamp columns for convenience.

In [12]:
# Generate all of the data we need to analyze sessions:
sessionized = (
    data
    # Create a session id for each character by using a cumulative sum
    # over the `new_session` column
    .mutate(new_session=is_new_session.fillna(True))
    # Create a session id for each character by using a cumulative sum
    # over the `new_session` column
    .mutate(session_id=c.new_session.sum().over(entity_window))
    # Drop `new_session` because it is no longer needed
    .drop("new_session")
    .mutate(
        # Get session duration using max(timestamp) - min(timestamp) over our window
        session_duration=c[timestamp_col].max().over(session_window) - c[timestamp_col].min().over(session_window)
    )
    # Sort for convenience
    .order_by([entity_col, timestamp_col])
)

In [13]:
result = sessionized.limit(15).execute()

In [14]:
result.head(15)

Unnamed: 0,char,level,race,charclass,zone,guild,timestamp,session_id,session_duration
0,2,18,Orc,Shaman,The Barrens,6,2008-12-03 10:41:47,1,0 days 00:00:00
1,7,54,Orc,Hunter,Feralas,-1,2008-01-15 21:47:09,1,0 days 02:39:47
2,7,54,Orc,Hunter,Un'Goro Crater,-1,2008-01-15 21:56:54,1,0 days 02:39:47
3,7,54,Orc,Hunter,The Barrens,-1,2008-01-15 22:07:23,1,0 days 02:39:47
4,7,54,Orc,Hunter,Badlands,-1,2008-01-15 22:17:08,1,0 days 02:39:47
5,7,54,Orc,Hunter,Badlands,-1,2008-01-15 22:26:52,1,0 days 02:39:47
6,7,54,Orc,Hunter,Badlands,-1,2008-01-15 22:37:25,1,0 days 02:39:47
7,7,54,Orc,Hunter,Swamp of Sorrows,282,2008-01-15 22:47:10,1,0 days 02:39:47
8,7,54,Orc,Hunter,The Temple of Atal'Hakkar,282,2008-01-15 22:56:53,1,0 days 02:39:47
9,7,54,Orc,Hunter,The Temple of Atal'Hakkar,282,2008-01-15 23:07:25,1,0 days 02:39:47
