## Data Import ##

In [1]:
import numpy as np
import pandas as pd
import scipy
import sklearn
from imblearn.over_sampling import SMOTE

**file processing**

In [5]:
import bz2

file_path = r"C:\Users\katsi\OneDrive\Business_Analytics\Thesis\Data\master-telemetry-distilled.bz2"
max_rows = 7_000_000  # number of rows to read

row_count = 0

with bz2.open(file_path, "rt", encoding="utf-8") as f:
    for line in f:
        if row_count >= max_rows:
            break

        # Strip trailing newline; keep empty trailing fields if present
        fields = line.rstrip("\n").split("\t")

        # Required fields
        try:
            timestamp, user, version, event = fields[:4]
        except ValueError:
            # malformed line; skip
            continue

        # Optional name field
        name = fields[4] if len(fields) > 4 else None

        # Use the values
        # ...

        row_count += 1

print(f"Processed {row_count} rows.")

Processed 7000000 rows.


**read & save subset of file**

In [2]:
import bz2
import pandas as pd

file_path = r"C:\Users\katsi\OneDrive\Business_Analytics\Thesis\Data\master-telemetry-distilled-sorted.bz2"
max_rows = 7_000_000  # number of rows to read

rows = []  # list to store rows temporarily
row_count = 0

with bz2.open(file_path, "rt", encoding="utf-8") as f:
    for line in f:
        if row_count >= max_rows:
            break

        # Strip trailing newline; keep empty trailing fields if present
        fields = line.rstrip("\n").split("\t")

        # Required fields
        try:
            timestamp, user, version, event_full = fields[:4]
        except ValueError:
            # malformed line; skip
            continue

        # Optional name field
        name = fields[4] if len(fields) > 4 else None

        # Append row as a tuple/list
        rows.append([timestamp, user, version, event_full])

        row_count += 1

# Create DataFrame
df = pd.DataFrame(rows, columns=["timestamp", "user", "version", "event_full"])

# Split 'event_full' into 'command_type' and 'event'
df[['event', 'command_type']] = df['event_full'].str.strip().str.split(n=1, pat=' ', expand=True)

# Drop the original 'event_full' column if no longer needed
df.drop(columns=['event_full'], inplace=True)

print(f"Processed {row_count} rows into DataFrame.")
print(df.head())

Processed 7000000 rows into DataFrame.
   timestamp  user   version  event command_type
0  315522314  5129  7.2.0.50  Start         None
1  315527925  5129  7.2.0.50  Start         None
2  315777777  4103   7.4.0.4  Start         None
3  315777898  4103   7.4.0.4    End         None
4  315777899  4103   7.4.0.4  Start         None


In [3]:
df.shape

(7000000, 5)

**read, clean and save the file def**

In [19]:
import pandas as pd

def preprocess_telemetry(file_path, versions_file_path):
    # Read bz2 CSV
    df = pd.read_csv(file_path, compression='bz2', sep='\t', dtype=str)
    """
    Clean and preprocess raw telemetry DataFrame.
    
    Steps:
    1. Remove rows with missing or negative user_id
    2. Drop duplicate rows
    3. Keep only relevant software versions and add release_date
    4. Create datetime, year, and month columns
    5. Remove rows where year == 2036
    """
    
    # 1) Remove missing or negative user_id
    df = df[df["user_id"].notna()].copy()
    df["user_id"] = pd.to_numeric(df["user_id"], errors="coerce")  # convert to numeric
    df = df[df["user_id"] >= 0].reset_index(drop=True)  # keep only non-negative IDs

    # 2) Drop duplicate rows
    df = df.drop_duplicates().reset_index(drop=True)

    # 3) Filter dataset for valid software versions
    valid_versions = pd.read_csv(
        versions_file_path,
        sep='\s+',       # splits on whitespace
        header=None,     
        names=['version', 'release_date']
    )

    df = df.merge(valid_versions, on='version', how='inner').reset_index(drop=True)

    # 4) Create datetime column
    df = df[df['timestamp'] != 'timestamp'].copy()  # remove header if present
    df['timestamp'] = df['timestamp'].astype(int)
    df['datetime'] = pd.to_datetime(df['timestamp'], unit='s')

    # Create year and month columns
    df['year'] = df['datetime'].dt.year
    df['month'] = df['datetime'].dt.month

    # 5) Remove rows where year == 2036
    df = df[df['year'] != 2036].reset_index(drop=True)

    return df

**Read the entire file (Slow)**

In [23]:
file_path = r"C:\Users\katsi\OneDrive\Business_Analytics\Thesis\Data\master-telemetry-distilled.bz2"
versions_file_path = r"C:\Users\katsi\OneDrive\Business_Analytics\Thesis\Data\Fespa & Tekton Versions.txt"

cleaned_df = preprocess_telemetry(file_path, versions_file_path)
print(cleaned_df.head())

**1) WindowAccumulator** (sliding window counter)

In [38]:
from collections import deque

class WindowAccumulator:
    def __init__(self, window_size):
        self.window = deque()  # (timestamp, units)
        self.window_size = window_size
        self.total = 0

    def _evict(self, now):
        cutoff = now - self.window_size
        while self.window and self.window[0][0] < cutoff:
            ts, units = self.window.popleft()
            self.total -= units

    def add(self, timestamp, units):
        self._evict(timestamp)
        self.window.append((timestamp, units))
        self.total += units

    def get(self, timestamp):
        self._evict(timestamp)
        return self.total

- Purpose: keep a rolling sum of numeric units for only the events whose timestamps are within the last window_size seconds.

- deque storage: events are stored as (timestamp, units) in time order (oldest at left). deque.popleft() is O(1) so eviction is cheap.

- _evict(now): computes cutoff = now - window_size and removes all events strictly older than cutoff. This keeps memory bounded to events in the window.

- add(timestamp, units): evicts stale events first, appends the new event, and updates total incrementally (so get() is O(1)).

- get(timestamp): evicts stale events and returns total — the current sum inside the window.

- Assumptions: timestamps you pass in are numeric and (ideally) non-decreasing per metric per user (amortized O(1) behavior). If timestamps jump backwards, behavior still works but may cause some extra evictions later.

**2) Metric Definition**

In [39]:
from enum import Enum, auto

class Metric(Enum):
    COMMAND_PARAM_COUNT_WEEKLY = auto()
    DAYS_ACTIVE_LAST_30 = auto()
    AVG_SESSIONS_PER_WEEK = auto()
    TOTAL_USAGE_TIME_HOURS = auto()
    ERROR_RATE = auto()
    # Non-rolling metrics:
    INTER_SESSION_GAP_HOURS = auto()
    CRASH_COUNT = auto()
    LAST_VERSION_USED = auto()
    COMMAND_ENTROPY = auto()
    UNIQUE_COMMAND_COUNT = auto()
    DAYS_SINCE_NEXT_RELEASE = auto()
    DAYS_SINCE_LAST_USE = auto()

**3) Assign window sizes**

In [40]:
WINDOW_SIZES = {
    Metric.COMMAND_PARAM_COUNT_WEEKLY: 7 * 86400, #weekly
    Metric.DAYS_ACTIVE_LAST_30:        30 * 86400, #monthly
    Metric.AVG_SESSIONS_PER_WEEK:      7 * 86400, #weekly
    Metric.TOTAL_USAGE_TIME_HOURS:     30 * 86400, #monthly
    Metric.ERROR_RATE:                 30 * 86400, #monthly
}

- Purpose: enumerate the metrics you want to compute and centrally define which metrics need rolling windows and how large those windows should be.

- Why Enum: avoids string typos and makes code self-documenting (use Metric.COMMAND_PARAM_COUNT_WEEKLY not "command_weekly").

- WINDOW_SIZES mapping: only metrics present here will get a WindowAccumulator. Metrics not in this dict are handled differently (snapshots, sets, frequency counters).

**4) Build UserStats container**

In [41]:
from collections import defaultdict, deque
from math import log2
from datetime import datetime, timezone

class UserStats:
    def __init__(self):
        # Rolling accumulators
        self.roll = {
            metric: WindowAccumulator(WINDOW_SIZES[metric])
            for metric in WINDOW_SIZES
        }

        # Non-rolling
        self.last_version_used = None
        self.last_use_timestamp = None
        self.crash_count = defaultdict(int)

        # Sessions
        self.last_start = None
        self.last_end = None
        self.total_gap = 0
        self.gap_count = 0

        # Command entropy (30-day window)
        self.cmd_window = deque()
        self.cmd_freq = defaultdict(int)
        self.CMD_WINDOW = 30 * 86400

        # Unique commands
        self.unique_cmds = defaultdict(set)

        # Active days (30-day window)
        self.active_days = deque()
        self.active_set = set()
        self.ACTIVE_WINDOW = 30 * 86400

#############
## Methods ##
#############

    #evict old commands for entropy: removes commands older than 30 days from the command window.
    def _evict_commands(self, now):
        cutoff = now - self.CMD_WINDOW
        while self.cmd_window and self.cmd_window[0][0] < cutoff:
            ts, cmd = self.cmd_window.popleft()
            self.cmd_freq[cmd] -= 1
            if self.cmd_freq[cmd] == 0:
                del self.cmd_freq[cmd]

######################
## DYNAMIC FEATUTES ##
######################
    
    #add command event
    def add_command(self, ts, cmd, version):
        self._evict_commands(ts)
        self.cmd_window.append((ts, cmd))
        self.cmd_freq[cmd] += 1
        self.unique_cmds[version].add(cmd)
        self.roll[Metric.COMMAND_PARAM_COUNT_WEEKLY].add(ts, 1)

    #add active day
    def add_active_day(self, ts):
        day = datetime.fromtimestamp(ts, tz=timezone.utc).date()
        self.active_days.append((day, ts))
        self.active_set.add(day)

        cutoff = ts - self.ACTIVE_WINDOW
        while self.active_days and self.active_days[0][1] < cutoff:
            old_day, old_ts = self.active_days.popleft()
            if old_day not in [d for d, _ in self.active_days]:
                self.active_set.discard(old_day)

        self.roll[Metric.DAYS_ACTIVE_LAST_30].add(ts, 1)

    #sessions
    def start_session(self, ts):
        if self.last_end is not None:
            gap = (ts - self.last_end)/3600
            self.total_gap += gap
            self.gap_count += 1
        self.last_start = ts
        self.last_use_timestamp = ts
        self.roll[Metric.AVG_SESSIONS_PER_WEEK].add(ts, 1)

    def end_session(self, ts):
        if self.last_start is not None:
            dur = (ts - self.last_start)/3600
            self.roll[Metric.TOTAL_USAGE_TIME_HOURS].add(ts, dur)
            self.last_end = ts
            self.last_use_timestamp = ts
            self.last_start = None

    #crashes
    def add_crash(self, ts, version):
        self.crash_count[version] += 1
        self.roll[Metric.ERROR_RATE].add(ts, 1)

#####################
## STATIC FEATUTES ##
#####################
    
    def get(self, metric, now, version=None):
        if metric in WINDOW_SIZES:
            return self.roll[metric].get(now)

        if metric == Metric.INTER_SESSION_GAP_HOURS:
            return self.total_gap / self.gap_count if self.gap_count else 0

        if metric == Metric.CRASH_COUNT:
            return self.crash_count[version] if version else sum(self.crash_count.values())

        if metric == Metric.LAST_VERSION_USED:
            return self.last_version_used

        if metric == Metric.UNIQUE_COMMAND_COUNT:
            return len(self.unique_cmds.get(version, set()))

        if metric == Metric.DAYS_SINCE_LAST_USE:
            return (now - self.last_use_timestamp)/86400 if self.last_use_timestamp else None

        if metric == Metric.COMMAND_ENTROPY:
            total = sum(self.cmd_freq.values())
            if total == 0:
                return 0
            return -sum((c/total)*log2(c/total) for c in self.cmd_freq.values())

**5) per user dictionary**

In [42]:
user_stats = {}

def get_user(user_id):
    stats = user_stats.get(user_id)
    if stats is None:
        stats = UserStats()
        user_stats[user_id] = stats
    return stats

- Purpose: a dictionary mapping user_id → UserStats.
- Lazy initialization: create a UserStats only when you first see the user — simple and memory efficient for workloads with many users but many inactive ones.
- Scalability: if you have extremely many users, consider periodic pruning of _USER_STATS for inactive users (e.g., if no events seen for 90 days), or persist older user states to disk.

**6) adapt upgrade computation**

**read the first 5M rows**

In [49]:
import bz2
import pandas as pd
from datetime import datetime

# ------------------ File paths ------------------
file_path = r"C:\Users\katsi\OneDrive\Business_Analytics\Thesis\Data\master-telemetry-distilled-sorted.bz2"
output_file = "upgrade_events.csv"

valid_versions = {
    "5.0.0.34","5.0.0.46","5.1.0.2","5.4.0.14","5.4.0.100",
    "5.6.0.6","5.6.0.14","7.0.0.16","7.1.0.2","7.1.0.28",
    "7.1.0.64","7.2.0.50","7.4.0.4","7.5.0.20","7.6.0.2",
    "7.6.0.4","7.6.0.24","7.7.0.100","8.0.0.8","8.1.0.2",
    "8.1.0.4","8.1.0.18","8.1.0.22","8.2.0.72","8.3.0.16",
    "9.1.0.16","9.1.0.46"
}

# ------------------ Version parser ------------------
def version_to_tuple(v: str):
    return tuple(int(x) for x in v.split(".") if x.isdigit())

# ------------------ Initialize CSV ------------------
FEATURES = [
    Metric.COMMAND_PARAM_COUNT_WEEKLY,
    Metric.DAYS_ACTIVE_LAST_30,
    Metric.AVG_SESSIONS_PER_WEEK,
    Metric.TOTAL_USAGE_TIME_HOURS,
    Metric.INTER_SESSION_GAP_HOURS,
    Metric.CRASH_COUNT,
    Metric.LAST_VERSION_USED,
    Metric.ERROR_RATE,
    Metric.COMMAND_ENTROPY,
    Metric.UNIQUE_COMMAND_COUNT,
    Metric.DAYS_SINCE_LAST_USE
]

header_row = {'timestamp':0,'user':'','version':'','upgrade':1, **{f.name:0 for f in FEATURES}}
pd.DataFrame([header_row]).to_csv(output_file, index=False)

# ------------------ Buffer ------------------
buffer, BUFFER_SIZE = [], 10_000

def flush_buffer():
    if buffer:
        pd.DataFrame(buffer).to_csv(output_file, mode='a', header=False, index=False)
        buffer.clear()

def process_upgrade(ts, user_id, version, user):
    row = {
        'timestamp': ts,
        'user': user_id,
        'version': version,
        'upgrade': 1,
        **{f.name: user.get(f, ts, version=version) if f in (Metric.CRASH_COUNT, Metric.UNIQUE_COMMAND_COUNT) else user.get(f, ts) for f in FEATURES}
    }
    buffer.append(row)
    if len(buffer) >= BUFFER_SIZE:
        flush_buffer()
        
# ------------------ Main Loop ------------------
user_stats, last_version_per_user = {}, {}
MAX_ROWS = 5_000_000
row_count = 0

def get_user(user_id):
    if user_id not in user_stats:
        user_stats[user_id] = UserStats()
    return user_stats[user_id]

with bz2.open(file_path, "rt", encoding="utf-8") as f:
    for line in f:
        row_count += 1
        if MAX_ROWS and row_count > MAX_ROWS:
            break
        if row_count % 1_000_000 == 0:
            print(f"Processed {row_count} rows, buffer size {len(buffer)}")

        fields = line.rstrip("\n").split("\t")
        if len(fields) < 4:
            continue

        ts_raw, user_id, version, event = fields[:4]

        # ------------------ CLEANING ------------------
        # (1) Validate timestamp
        try:
            ts = int(float(ts_raw))
        except ValueError:
            continue

        # (2) Validate user_id
        try:
            user_id_int = int(user_id)
            if user_id_int < 0:
                continue
        except ValueError:
            continue

        # (3) Validate version
        if version not in valid_versions:
            continue

        # (4) Reject timestamps >= 2025
        if ts >= 1735689600:
            continue

        # ------------------ Get user object ------------------
        user = get_user(user_id)

        # ------------------ Event handling ------------------
        if event == "Start":
            prev_version = last_version_per_user.get(user_id)
            is_upgrade = prev_version and version_to_tuple(version) > version_to_tuple(prev_version)
            last_version_per_user[user_id] = version
            
            user.start_session(ts)
            if is_upgrade:
                process_upgrade(ts, user_id, version, user)

        elif event in ("End", "Stop", "Exit"):
            user.end_session(ts)

        elif event == "Crash":
            user.add_crash(ts, version)

        elif event.startswith(("Command", "Param")):
            cmd_detail = event.split(" ", 1)[1] if " " in event else "UNKNOWN"
            user.add_command(ts, cmd_detail, version)

        # ------------------ Update general stats ------------------
        user.add_active_day(ts)
        user.last_version_used = version
        user.last_use_timestamp = ts

# ------------------ Final flush ------------------
flush_buffer()

Processed 1000000 rows, buffer size 12
Processed 2000000 rows, buffer size 12
Processed 3000000 rows, buffer size 12
Processed 4000000 rows, buffer size 13
Processed 5000000 rows, buffer size 13


In [50]:
df = pd.read_csv("upgrade_events.csv")

In [52]:
df.head()

Unnamed: 0,timestamp,user,version,upgrade,COMMAND_PARAM_COUNT_WEEKLY,DAYS_ACTIVE_LAST_30,AVG_SESSIONS_PER_WEEK,TOTAL_USAGE_TIME_HOURS,INTER_SESSION_GAP_HOURS,CRASH_COUNT,LAST_VERSION_USED,ERROR_RATE,COMMAND_ENTROPY,UNIQUE_COMMAND_COUNT,DAYS_SINCE_LAST_USE
0,0,,,1,0,0,0,0.0,0.0,0,0,0,0.0,0,0.0
1,1009844676,5299.0,7.1.0.64,1,0,2,2,0.004444444,1.737222,0,7.0.0.16,0,0.0,0,0.0
2,1102094567,5299.0,7.2.0.50,1,0,0,1,0.0,6406.299653,0,7.1.0.64,0,1.584963,0,0.0
3,1136066749,5511.0,7.6.0.4,1,1,5,3,0.006944444,0.0125,0,7.4.0.4,0,-0.0,0,0.0
4,1199134901,4450.0,7.6.0.4,1,0,0,1,3.747003e-16,1530.776037,0,7.1.0.64,0,3.856365,0,0.0


In [53]:
df.shape

(14, 15)

### Read 5M of Raw File ###

In [7]:
import bz2
import pandas as pd

file_path = r"C:\Users\katsi\OneDrive\Business_Analytics\Thesis\Data\master-telemetry-distilled-sorted.bz2"
max_rows = 5_000_000  # number of rows to read

rows = []  # list to store rows temporarily
row_count = 0

with bz2.open(file_path, "rt", encoding="utf-8") as f:
    for line in f:
        if row_count >= max_rows:
            break

        # Strip trailing newline; keep empty trailing fields if present
        fields = line.rstrip("\n").split("\t")

        # Required fields
        try:
            timestamp, user, version, event_full = fields[:4]
        except ValueError:
            # malformed line; skip
            continue

        # Optional name field
        name = fields[4] if len(fields) > 4 else None

        # Append row as a tuple/list
        rows.append([timestamp, user, version, event_full])

        row_count += 1

# Create DataFrame
df = pd.DataFrame(rows, columns=["timestamp", "user", "version", "event_full"])

# Split 'event_full' into 'command_type' and 'event'
df[['event', 'command_type']] = df['event_full'].str.strip().str.split(n=1, pat=' ', expand=True)

# Drop the original 'event_full' column if no longer needed
df.drop(columns=['event_full'], inplace=True)

print(f"Processed {row_count} rows into DataFrame.")
print(df.head())

Processed 5000000 rows into DataFrame.
   timestamp  user   version  event command_type
0  315522314  5129  7.2.0.50  Start         None
1  315527925  5129  7.2.0.50  Start         None
2  315777777  4103   7.4.0.4  Start         None
3  315777898  4103   7.4.0.4    End         None
4  315777899  4103   7.4.0.4  Start         None


**1) Handle Missing Values**

In [9]:
#check for missing values
df.isnull().sum()

timestamp            0
user                 0
version              0
event                0
command_type    107100
dtype: int64

**2) Remove rows where user_id is missing or is negative**

In [13]:
df["user"] = pd.to_numeric(df["user"], errors="coerce")
df1 = df[df["user"].notna() & (df["user"] >= 0)]
df1 = df1.reset_index(drop=True)

**3) Handle duplicate rows**

In [14]:
#count duplicate rows
df1.duplicated().sum()

156513

In [15]:
#drop duplicate rows
df2 = df1.drop_duplicates().copy()

#reset index
df2 = df2.reset_index(drop=True)

**4) Filtering Dataset by Relevant Software Versions**

In [16]:
print(df2.head())

   timestamp  user   version  event command_type
0  315522314  5129  7.2.0.50  Start         None
1  315527925  5129  7.2.0.50  Start         None
2  315777777  4103   7.4.0.4  Start         None
3  315777898  4103   7.4.0.4    End         None
4  315777899  4103   7.4.0.4  Start         None


In [17]:
df3 = df2.copy()

# Read valid versions and their release dates from the txt file
valid_versions = pd.read_csv(
    'C:/Users/katsi/OneDrive/Business_Analytics/Thesis/Data/Fespa & Tekton Versions.txt', 
    sep='\s+',       # splits on whitespace
    header=None,     # no header in txt
    names=['version', 'release_date']
)

# keep only valid versions and add release_date
df3 = df3.merge(valid_versions, on='version', how='inner')

# Reset index correctly
df3.reset_index(drop=True, inplace=True)

**5) Remove row where year is equal to '2036'**

In [20]:
# Convert timestamps to numeric (invalid ones become NaN)
df3['timestamp'] = pd.to_numeric(df3['timestamp'], errors='coerce')

# Remove rows where timestamp is missing or from 2025+
df4 = df3[df3['timestamp'] < 1735689600].reset_index(drop=True)

**6) Compute Upgrade Variable**

In [21]:
df4.dtypes

timestamp        int64
user             int64
version         object
event           object
command_type    object
release_date    object
dtype: object

In [24]:
# Make a working copy
df5 = df4.copy()

# Ensure version is string
df5['version'] = df5['version'].astype(str)

# Convert version string to tuple
def version_to_tuple(v):
    return tuple(map(int, v.split('.')))
df5['version_tuple'] = df5['version'].apply(version_to_tuple)

# Sort by user and timestamp
df5 = df5.sort_values(['user_id', 'timestamp']).reset_index(drop=True)

# Initialize upgrade column
df5['upgrade'] = 0


def mark_first_upgrades(group):
    """
    For each user:
    - Identify only the FIRST Start event of each version
    - Mark upgrade = 1 ONLY if that version is newer than any previous version
    """

    starts = group[group['event'] == 'Start'].copy()
    if starts.empty:
        return pd.Series(0, index=group.index)

    # Keep only the first Start per version
    first_starts = (
        starts.sort_values('timestamp')
              .groupby('version_tuple')
              .head(1)
              .copy()
    )

    # Determine upgrade: version > previous max_version
    first_starts = first_starts.sort_values('timestamp')
    prev_version = (-1, -1, -1)  # always smaller than any real version

    upgrade_flags = []

    for idx, row in first_starts.iterrows():
        if row['version_tuple'] > prev_version:
            upgrade_flags.append((idx, 1))
            prev_version = row['version_tuple']
        else:
            upgrade_flags.append((idx, 0))

    # Create mapping back to original DataFrame
    upgrade_series = pd.Series(0, index=group.index)
    for idx, flag in upgrade_flags:
        upgrade_series.loc[idx] = flag

    return upgrade_series


# Apply per user
df5['upgrade'] = df5.groupby('user').apply(mark_first_upgrades).reset_index(level=0, drop=True)

# Remove temporary column
df5.drop(columns='version_tuple', inplace=True)

**verification steps**

In [26]:
df5['upgrade'].sum()

729