<a href="https://colab.research.google.com/github/ksokoll/Clickstream_Project_Exploration/blob/main/clickstream_project_static_anomaly_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Clickstream Anomaly Detection - Static Analysis

This script analyzes the complete historical dataset to identify conversion rate anomalies.

Each step of the analysis is wrapped in a function instead of keeping it plain in a code-cell, as in the last step (main()) there will be a loop to conduct the analysis for various different combinations of device, os and browser.

Each step is explained in detail, especially the parts where I use a different metric than usual.

In [None]:
import pandas as pd
from pathlib import Path
import os

In [None]:
INPUT_DIR = '/content/files'
SIGMA_THRESHOLD = 2  # +/-2 sigma
CONSECUTIVE_DAYS = 3  # Alert after 3 consecutive anomalies
BASELINE_WEEKS = 4   # Use last 4 weeks for baseline calculation

I used 2 sigma here, as I tested different combinations (1-3 sigma) manually. +/- 2 sigma proved to be the best value to detect only the correct positive values without skipping or throwing too much false alarms.

First, lets read the CSV into a dataframe:

# load input data

In [None]:
df = pd.read_csv(INPUT_DIR+"/df_contaminated.csv")
print(df.head())

       timestamp  visitorid event  itemid  transactionid   device browser  \
0  1433221332117     257597  view  355908            NaN  desktop  chrome   
1  1433224214164     992329  view  248676            NaN   mobile  safari   
2  1433221999827     111016  view  318965            NaN   mobile  chrome   
3  1433221955914     483717  view  253185            NaN  desktop  chrome   
4  1433221337106     951259  view  367447            NaN   mobile  chrome   

        os       timestamp_readable  
0  windows  2015-06-02 05:02:12.117  
1      ios  2015-06-02 05:50:14.164  
2  android  2015-06-02 05:13:19.827  
3    macos  2015-06-02 05:12:35.914  
4  android  2015-06-02 05:02:17.106  


# calculate_daily_conversion

Next, we use **calculate_daily_conversion** for two important things:

1. Aggregating the dataframe: As you can see above, each day of the whole period is spread across single event line items. Since we are interested in the aggregated numbers, we do it first.

2. calculate the conversion rates ourselves by applying: transactions/vievs*100. It is not part of the original dataset.

In [None]:
def calculate_daily_conversion(df, browser, os, device):
    """
    Calculate daily conversion rate for specific browser/OS/device combination
    Conversion Rate = Transactions / Views
    """
    print(f"Calculating daily conversion for {device}/{os}/{browser}...")

    filtered = df[
    (df['browser'] == browser) &
    (df['os'] == os) &
    (df['device'] == device)
    ].copy()

    # convert date column
    filtered['date'] = pd.to_datetime(filtered['timestamp_readable']).dt.date

    # Count views per day
    views_per_day = filtered[filtered['event'] == 'view'].groupby('date').size()

    # Count transactions per day
    trans_per_day = filtered[filtered['event'] == 'transaction'].groupby('date').size()

    # Combine into DataFrame
    daily = pd.DataFrame({
    'views': views_per_day,
    'transactions': trans_per_day
    }).fillna(0).reset_index()

    daily['conversion_rate'] = (
    daily['transactions'] / daily['views'].replace(0, 1)
    ) * 100

    print(f"Calculated conversion for {len(daily)} days")
    return daily

You can see the evolution from event line items to aggregated days which shrinked the whole data table from ~ 2 million rows to only 107:

In [None]:
browser = 'chrome'
os = 'windows'
device = 'desktop'
daily_conversion = calculate_daily_conversion(df, browser, os, device)
print(daily_conversion.head())

Calculating daily conversion for desktop/windows/chrome...
Calculated conversion for 107 days
         date  views  transactions  conversion_rate
0  2015-05-03   2946            17         0.577054
1  2015-05-04   4084            34         0.832517
2  2015-05-05   4825            47         0.974093
3  2015-05-06   5295            65         1.227573
4  2015-05-07   5118            56         1.094177


In [None]:
daily_conversion.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 107 entries, 0 to 106
Data columns (total 4 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   date             107 non-null    object 
 1   views            107 non-null    int64  
 2   transactions     107 non-null    int64  
 3   conversion_rate  107 non-null    float64
dtypes: float64(1), int64(2), object(1)
memory usage: 3.5+ KB


# calculate_baseline_and_sigma

We continue now by adding two new calculated columns to our dataframe: **baseline** and **sigma**:

**baseline**: The baseline takes the conversion rates of the last n consecutive same weekdays to calculate something we can measure potential anomalies against. After a bit of back and forth with mean() and median(), which were either too sensitive or threw too many false positives, I settled with **Median Absolute Deviation (MAD)**. This metric is better at handling outliers (extreme values). As an example:

If the last 4 Sundays have the conversion rates [0.8%, 0.75%, 0.8%, 0.1%], the "0.1%" datapoint imposes a problem, as the mean() will be 0.612% despite three of four values being quite high (>0.75%)!

**With MAD, the calculation is as follows:**

**Step 1:** Calculate the median (our baseline):
```
median([0.1, 0.75, 0.8, 0.8]) = (0.75 + 0.8) / 2 = 0.775%
```

**Step 2:** For each value, calculate the absolute deviation from the median:
```
|0.1 - 0.775| = 0.675
|0.75 - 0.775| = 0.025
|0.8 - 0.775| = 0.025
|0.8 - 0.775| = 0.025
```

This gives us the list of absolute deviations:
```
[0.025, 0.025, 0.025, 0.675]
```

**Step 3:** Take the median of these deviations (this is MAD):
```
MAD = median([0.025, 0.025, 0.025, 0.675]) = (0.025 + 0.025) / 2 = 0.025
```

**Step 4:** Scale MAD to be comparable with standard deviation:
```
sigma = MAD c 1.4826 = 0.025 c 1.4826 = 0.037
```

The scaling factor **1.4826** comes from the mathematical relationship between MAD and standard deviation in a normal distribution. This allows us to continue using "±2 sigma" threshold logic while benefiting from MAD's robustness.

**The big advantage:** MAD is robust against extreme deviations because it uses absolute values and medians, whereas standard deviation squares deviations (making outliers disproportionately large). This prevents our threshold from becoming unrealistically wide when contaminated data points exist in the lookback window, which is crucial for accurate anomaly detection.

Other methods for handling these outliers would have been trimming or winsorizing our observations, which would have been more complex an in the need of importing more dependancies, for example from sklearn, and MAD really fits our purpose here.

In [None]:
def calculate_baseline_and_sigma(daily_data, weeks=BASELINE_WEEKS):
    """
    Calculate baseline conversion rate using same-weekday average from last N weeks.
    Also calculates standard deviation for anomaly threshold.

    """
    print(f"Calculating baseline (last {weeks} weeks, same weekday)...")

    # Add weekday column (0=Monday, 6=Sunday)
    daily_data = daily_data.copy()
    daily_data['date'] = pd.to_datetime(daily_data['date'])
    daily_data['weekday'] = daily_data['date'].dt.weekday

     # Sort by date
    daily_data = daily_data.sort_values('date').reset_index(drop=True)

    # Calculate baseline for each day
    baselines = []
    sigmas = []

    for idx, row in daily_data.iterrows():
        current_date = row['date']
        current_weekday = row['weekday']

        # Get dates from last N weeks with same weekday
        lookback_start = current_date - pd.Timedelta(days=weeks*7)

        # Find same weekdays in lookback period
        same_weekdays = daily_data[
            (daily_data['weekday'] == current_weekday) &
            (daily_data['date'] < current_date) &
            (daily_data['date'] >= lookback_start)
        ]

        # Calculate baseline (mean) and sigma (std)
        if len(same_weekdays) >= 2:  # Need at least 2 data points
            baseline = same_weekdays['conversion_rate'].median()
            mad = (same_weekdays['conversion_rate'] - baseline).abs().median()
            sigma = mad * 1.4826
        else:
            # Not enough data in the beginning - use overall stats
            baseline = daily_data['conversion_rate'].median()
            mad = (daily_data['conversion_rate'] - baseline).abs().median()
            sigma = mad * 1.4826

        baselines.append(baseline)
        sigmas.append(sigma)

    # Add to dataframe
    daily_data['baseline'] = baselines
    daily_data['sigma'] = sigmas

    print(f"Calculated baselines for {len(daily_data)} days")

    return daily_data

In [None]:
baseline_data = calculate_baseline_and_sigma(daily_conversion, BASELINE_WEEKS)
print(baseline_data.head())

Calculating baseline (last 4 weeks, same weekday)...
Calculated baselines for 107 days
        date  views  transactions  conversion_rate  weekday  baseline  \
0 2015-05-03   2946            17         0.577054        6  0.938158   
1 2015-05-04   4084            34         0.832517        0  0.938158   
2 2015-05-05   4825            47         0.974093        1  0.938158   
3 2015-05-06   5295            65         1.227573        2  0.938158   
4 2015-05-07   5118            56         1.094177        3  0.938158   

      sigma  
0  0.365477  
1  0.365477  
2  0.365477  
3  0.365477  
4  0.365477  


# detect_anomalies

In this step a quite simple comparison of the "conversion_rate" of the current day in comparison to the baseline is performed, to measure if the conversion rate is distinctevly low. Note: With "threshold" we are looking only onto the lower threshold, as the stakeholder are particularly interested in drops in conversion rate as opposed to a higher conversion rate than usual, which might also be interesting for other purposes but is out of scope here.

Here is a detailled explanation based on a single data point:

**date**: 2015-05-03 (which is a sunday)

**conversion rate:** 0,577% (measured conversion rate TODAY)

**baseline:** 0,938% (the "regular" conversion rate based on the last X sundays)

**sigma**: 0,365 (typical variability)

**threshold**: baseline - 2x sigma = 0,938 - 2x(0,365) = 0,208

**is_anomaly**: False



In [None]:
def detect_anomalies(baseline_data, sigma_threshold=SIGMA_THRESHOLD):
    """
    Detect anomalies where conversion < (baseline - sigma_threshold * sigma)
    """
    print(f"Detecting anomalies (threshold: {sigma_threshold} sigma)...")

    df = baseline_data.copy()

    # Calculate threshold
    df['threshold'] = df['baseline'] - (sigma_threshold * df['sigma'])

    # Clip to avoid negative thresholds
    df['threshold'] = df['threshold'].clip(lower=0)

    # Flag anomalies
    df['is_anomaly'] = df['conversion_rate'] <= df['threshold']

    # Calculate deviation in sigma units
    df['deviation_sigma'] = (df['conversion_rate'] - df['baseline']) / df['sigma']

    # Count anomalies
    anomaly_count = df['is_anomaly'].sum()
    print(f"Found {anomaly_count} anomalous days")

    return df

In [None]:
print(detect_anomalies(baseline_data, sigma_threshold=SIGMA_THRESHOLD).head())

Detecting anomalies (threshold: 2 sigma)...
Found 7 anomalous days
        date  views  transactions  conversion_rate  weekday  baseline  \
0 2015-05-03   2946            17         0.577054        6  0.938158   
1 2015-05-04   4084            34         0.832517        0  0.938158   
2 2015-05-05   4825            47         0.974093        1  0.938158   
3 2015-05-06   5295            65         1.227573        2  0.938158   
4 2015-05-07   5118            56         1.094177        3  0.938158   

      sigma  threshold  is_anomaly  deviation_sigma  
0  0.365477   0.207204       False        -0.988036  
1  0.365477   0.207204       False        -0.289050  
2  0.365477   0.207204       False         0.098324  
3  0.365477   0.207204       False         0.791883  
4  0.365477   0.207204       False         0.426892  


# find_consecutive_alerts

Now that we have our calculated column "is_anomaly" we can observe where streaks of anomalies happen, or cluster. The logic is straightforward: If there are X consecutive anomalies, the period is flagged as critical.

In [None]:
def find_consecutive_alerts(anomalies_df, consecutive_days=3):
    """Find periods where anomalies occur N consecutive days"""
    print(f"Finding {consecutive_days}-day consecutive anomalies...")

    df = anomalies_df.sort_values('date').reset_index(drop=True)

    critical_periods = []
    i = 0

    while i < len(df):
        if df.iloc[i]['is_anomaly']:
            # Start of potential streak
            start_idx = i
            count = 1

            # Count consecutive anomalies
            while i + 1 < len(df) and df.iloc[i + 1]['is_anomaly']:
                count += 1
                i += 1

            # Check if streak is long enough
            if count >= consecutive_days:
                start_date = df.iloc[start_idx]['date']
                end_date = df.iloc[i]['date']
                critical_periods.append((start_date, end_date, count))

        i += 1

    print(f"Found {len(critical_periods)} critical periods (≥{consecutive_days} days)")

    return critical_periods

# print_report

Now we get a report of our results.

In [None]:
def print_report(anomalies_df, critical_periods, browser, os, device):
    """Print formatted terminal report"""
    print("\n" + "="*80)
    print(f"ANOMALY DETECTION REPORT -  {browser} {os} {device}")
    print("="*80)

    # Summary Statistics
    total_days = len(anomalies_df)
    anomaly_days = anomalies_df['is_anomaly'].sum()

    print(f"\nSummary:")
    print(f"   Total days:       {total_days}")
    print(f"   Anomalous days:   {anomaly_days} ({anomaly_days/total_days*100:.1f}%)")
    print(f"   Critical periods: {len(critical_periods)}")

    # Critical Periods
    if critical_periods:
        print(f"\nCritical Periods (≥3 consecutive days):")
        for i, (start, end, length) in enumerate(critical_periods, 1):
            print(f"   {i}. {start} → {end} ({length} days)")

# export_csv

Export so the values can be saved.

In [None]:
def export_csv(anomalies_df, output_path, os_, browser, device):
    """Export results to CSV for Power BI integration"""
    print(f"\nExporting results to {output_path}...")

    # Select relevant columns for Power BI
    export_df = anomalies_df[[
        'date',
        'views',
        'transactions',
        'conversion_rate',
        'baseline',
        'sigma',
        'threshold',
        'deviation_sigma',
        'is_anomaly'
    ]].copy()

    # Convert date to string for CSV
    export_df['date'] = export_df['date'].astype(str)

    # Save to CSV
    export_df.to_csv(output_path, index=False)

    print(f"Exported {len(export_df)} rows to {output_path}")


# main()

The main now combines all of the above step to move through the whole analysis.

Noteworthy is that we do a iteration for each interesting combination of os x browser x device to really make sure that nothing is slipping through the net.

We did not to each iteration (which would be nine), as some of them do not make sense. The static "COMBINATIONS_TO_CHECK" variable allows later changes, as the customer might notice that a combination can be dropped or new devices or browsers should be observed. This can later be managed by environment variables.

In [None]:
def main():
  COMBINATIONS_TO_CHECK = [
  ('safari', 'ios', 'mobile'),
  ('safari', 'macos', 'desktop'),
  ('chrome', 'android', 'mobile'),
  ('chrome', 'windows', 'desktop'),
  ('firefox', 'windows', 'desktop'),
]

  for browser, os, device in COMBINATIONS_TO_CHECK:
      output_path = f"data/output_{device}_{os}_{browser}.csv"

  # Step 2: Calculate daily conversion
      daily_conversion = calculate_daily_conversion(df, browser, os, device)

      # Step 3: Calculate baseline & sigma
      baseline_data = calculate_baseline_and_sigma(daily_conversion, BASELINE_WEEKS)

      # Step 4: Detect anomalies
      anomalies = detect_anomalies(baseline_data, SIGMA_THRESHOLD)

      # Step 5: Find consecutive streaks
      critical_periods = find_consecutive_alerts(anomalies, CONSECUTIVE_DAYS)

      # Step 6: Print report
      print_report(anomalies, critical_periods, browser, os, device)

      # Step 7: Export CSV
      #export_csv(anomalies, output_path, browser, os, device)

  print("\nAnalysis complete!")

if __name__ == "__main__":
    main()

Calculating daily conversion for mobile/ios/safari...
Calculated conversion for 107 days
Calculating baseline (last 4 weeks, same weekday)...
Calculated baselines for 107 days
Detecting anomalies (threshold: 2 sigma)...
Found 24 anomalous days
Finding 3-day consecutive anomalies...
Found 2 critical periods (≥3 days)

ANOMALY DETECTION REPORT -  safari ios mobile

Summary:
   Total days:       107
   Anomalous days:   24 (22.4%)
   Critical periods: 2

Critical Periods (≥3 consecutive days):
   1. 2015-09-01 00:00:00 → 2015-09-12 00:00:00 (12 days)
   2. 2015-09-16 00:00:00 → 2015-09-18 00:00:00 (3 days)
Calculating daily conversion for desktop/macos/safari...
Calculated conversion for 107 days
Calculating baseline (last 4 weeks, same weekday)...
Calculated baselines for 107 days
Detecting anomalies (threshold: 2 sigma)...
Found 16 anomalous days
Finding 3-day consecutive anomalies...
Found 0 critical periods (≥3 days)

ANOMALY DETECTION REPORT -  safari macos desktop

Summary:
   Total