In [1]:
import kumoai.experimental.rfm as rfm, os
from pathlib import Path
import pandas as pd

In [2]:
home_api_key_file = Path.home() / "kumoai_key.txt"
with open(home_api_key_file, "r") as file:
    api_key = file.read().strip()
os.environ["KUMO_API_KEY"] = api_key

rfm.init()

[2025-08-08 17:07:09 - kumoai:196 - INFO] Successfully initialized the Kumo SDK against deployment https://kumorfm.ai/api, with log level INFO.


## Creation of synthetic data: 4 tables

Create a synthetic dataset with the purpose of testing how KumoRFM deals with temporal dependency that is not in the transactional history, but instead in a user type variable that can change over time.
Essentially, there are users of 2 types (premium/free), and premium users can do actions that free users cannot.
The exercise is about testing if KumoRFM detects this dependency and include them into its predictions.

### Dataset description
We create a synthetic dataset representing users of 2 types (tier free or tier premium), uploading files, but the tier can change over time.
While on Premium tier, users can upload any size. And while on free tier, users can upload files below 10GB. 
For simplicity, there are only 2 sizes: 5GB and 50GB.
Users can change their tier no earlier than 24 hours after the last change. 
Within an hour of becoming premium, the likelihood of the user uploading a 50GB is higher.
There are 100 users, covering 5 cohorts:
- Always premium.
- Always free.
- Premium --> Free (once)
- Free --> Premium (once)
- Free --> Premium --> Free (twice, each change >= 24 hours apart)
The history of transactions last 10 days from March 1st to March 10th, 2025.
The prediction tasks will be done for different users, at different points in time, predicting their likelihood of uploading a 50GB file within the next hour.
The expectations are: is that for users that just became Free, this should be 0. For users that just became Premium it should be high. For users that had been Premium for a while it should be >> 0. 

### Tables

- users
    - user_id (PK)
    - name

- tiers (user tiers: free/premium, temporal, non-overalapping for each user)
    - user_id (FK -> users.user_id)
    - from_datetime
    - until_datetime (NULL means "still in effect")
    - tier in {'free', 'premium'}
    - Composite PK: user_id, from_datetime)

- items
    - item_id (PK)
    - size_gb in {5, 50}

- uploads
    - txn_id (PK)
    - user_id (FK -> users.user_id)
    - item_id (KF -> items.item_id)
    - datetime

### Cohorts (20 users each)

- Always premium (unknown start) → a single interval that spans the whole window.
- Always free → a single interval that spans the whole window.
- Premium → Free once (≥24h after start).
- Free → Premium once (≥24h).
- Free → Premium → Free (each change ≥24h apart; all within window).


### Behavior rules:

- While free: upload only 5 GB items.
    - Per-user free-upload rate: draw from, say, Poisson(λ_free_user) per day with λ sampled from a user-level distribution (to inject heterogeneity).
- While premium: upload both 5 GB and 50 GB.
    - Immediately after becoming premium: within the first hour, probability of at least one 50 GB upload is very high (e.g., 85–95%).
    - After that first hour, keep premium uploads going with a higher base rate than free; 50 GB vs 5 GB split could be, e.g., 40–60%, tapering (optional) as time from upgrade increases.
- Hard rule: probability of 50 GB exactly at the moment of switching to free is 0 (and remains 0 while free).
- We’ll jitter timestamps to look realistic (uniform within hours/minutes), and we’ll keep all uploads inside the user’s current tier interval.

#### Notes:
- items is only 2 rows. We do not distinguish between items: just make it 

In [5]:
"""
Generate a small relational dataset (CSV) for graph + prediction testing.

Tables:
- users(user_id PK, name)
- tiers(user_id FK, from_datetime, until_datetime, tier)  # composite PK (user_id, from_datetime)
- items(item_id PK, size_gb)                              # exactly two rows: (1,5), (2,50)
- uploads(txn_id PK, user_id FK, item_id FK, datetime)

Cohorts (20 users each):
1) Always premium
2) Always free
3) Premium -> Free (once)
4) Free -> Premium (once)
5) Free -> Premium -> Free (twice; each change >= 24h apart)

Behavior:
- While free: uploads only 5GB (item_id=1).
- While premium: uploads both 5GB and 50GB (item_id ∈ {1,2}).
- Within 1 hour after becoming premium: very high chance (90%) of at least one 50GB upload.
- Exactly when/while free: 50GB uploads are impossible.
- Upload rates are user-specific and higher on premium than free.
"""

from __future__ import annotations
import csv
import math
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
import random
import numpy as np

# ------------------------
# Configuration
# ------------------------
SEED = 42
N_USERS = 100
START = datetime(2025, 3, 1, 0, 0, 0, tzinfo=timezone.utc)
END = datetime(2025, 3, 11, 0, 0, 0, tzinfo=timezone.utc)  # end-exclusive (covers Mar 1–10)
MIN_GAP_BETWEEN_TIER_CHANGES = timedelta(hours=24)

# Upload rate modeling (per-hour) via user-specific lognormal draws
FREE_RATE_DAY_MEAN = 1.0   # typical free uploads per day
FREE_RATE_DAY_STD = 0.7
PREM_RATE_DAY_MEAN = 3.0   # typical premium uploads per day
PREM_RATE_DAY_STD = 1.0

# Size selection while premium (outside the "first hour" spike)
BASE_P50_WHILE_PREMIUM = 0.4  # 40% chance of 50GB for non-spike premium uploads

# Spike rules when becoming premium
P_SPIKE_50GB_WITHIN_1H = 0.90
MAX_SPIKE_50GB = 2  # cap the number of spike 50GB uploads within the first hour

# Output files
USERS_CSV = "users.csv"
TIERS_CSV = "tiers.csv"
ITEMS_CSV = "items.csv"
UPLOADS_CSV = "uploads.csv"

# Fixed items table: exactly two rows
ITEMS = [
    {"item_id": 1, "size_gb": 5},
    {"item_id": 2, "size_gb": 50},
]
ITEM_SIZE_BY_ID = {1: 5, 2: 50}

random.seed(SEED)
np.random.seed(SEED)

@dataclass
class TierInterval:
    user_id: int
    from_dt: datetime
    until_dt: datetime  # end-exclusive
    tier: str           # 'free' | 'premium'

def iso(dt: datetime) -> str:
    return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

def draw_user_rate(mean_per_day: float, std_per_day: float) -> float:
    """Draw a user-specific rate (per hour) using a lognormal around mean/std per day."""
    mean = max(0.01, mean_per_day)
    std = max(0.01, std_per_day)
    cv2 = (std / mean) ** 2
    sigma2 = math.log(cv2 + 1.0)
    sigma = math.sqrt(sigma2)
    mu = math.log(mean) - 0.5 * sigma2
    per_day = np.random.lognormal(mean=mu, sigma=sigma)
    return float(per_day / 24.0)

def bounded_random_time(start: datetime, end: datetime) -> datetime:
    """Uniform random time in [start, end)."""
    if end <= start:
        return start
    span = (end - start).total_seconds()
    offs = random.random() * span
    return start + timedelta(seconds=offs)

def generate_cohort_tiers(user_id: int, cohort: int) -> list[TierInterval]:
    """Create tier intervals for a single user according to cohort."""
    t0 = START
    tE = END

    def random_change_time(low: datetime, high: datetime) -> datetime:
        lo = low + MIN_GAP_BETWEEN_TIER_CHANGES
        hi = high - MIN_GAP_BETWEEN_TIER_CHANGES
        if hi <= lo:
            return low + (high - low) / 2
        return bounded_random_time(lo, hi)

    intervals: list[TierInterval] = []

    if cohort == 1:  # Always premium
        intervals.append(TierInterval(user_id, t0, tE, "premium"))

    elif cohort == 2:  # Always free
        intervals.append(TierInterval(user_id, t0, tE, "free"))

    elif cohort == 3:  # Premium -> Free (once)
        c1 = random_change_time(t0, tE)
        intervals.append(TierInterval(user_id, t0, c1, "premium"))
        intervals.append(TierInterval(user_id, c1, tE, "free"))

    elif cohort == 4:  # Free -> Premium (once)
        c1 = random_change_time(t0, tE)
        intervals.append(TierInterval(user_id, t0, c1, "free"))
        intervals.append(TierInterval(user_id, c1, tE, "premium"))

    elif cohort == 5:  # Free -> Premium -> Free
        c1 = random_change_time(t0, tE - MIN_GAP_BETWEEN_TIER_CHANGES)
        c2_low = c1 + MIN_GAP_BETWEEN_TIER_CHANGES
        c2_high = tE
        if c2_low + MIN_GAP_BETWEEN_TIER_CHANGES >= c2_high:
            c2 = c1 + (tE - c1) / 2
        else:
            c2 = random_change_time(c2_low, c2_high)
        intervals.append(TierInterval(user_id, t0, c1, "free"))
        intervals.append(TierInterval(user_id, c1, c2, "premium"))
        intervals.append(TierInterval(user_id, c2, tE, "free"))

    intervals.sort(key=lambda x: x.from_dt)
    fixed: list[TierInterval] = []
    prev_end = None
    for it in intervals:
        if prev_end and it.from_dt < prev_end:
            it = TierInterval(it.user_id, prev_end, it.until_dt, it.tier)
        prev_end = it.until_dt
        fixed.append(it)
    return fixed

def simulate_uploads_for_interval(user_id: int,
                                  interval: TierInterval,
                                  free_rate_per_hour: float,
                                  prem_rate_per_hour: float,
                                  next_txn_id: int) -> tuple[list[dict], int]:
    """
    Simulate uploads for a single interval.
    Returns uploads_rows, next_txn_id
    """
    uploads_rows: list[dict] = []
    tier = interval.tier
    rate_per_hour = free_rate_per_hour if tier == "free" else prem_rate_per_hour

    # Hourly grid across [from, until)
    cursor = interval.from_dt
    while cursor < interval.until_dt:
        hour_end = min(cursor + timedelta(hours=1), interval.until_dt)
        lam = rate_per_hour * (hour_end - cursor).total_seconds() / 3600.0
        n = np.random.poisson(lam=lam) if lam > 0 else 0
        for _ in range(n):
            ts = bounded_random_time(cursor, hour_end)
            if tier == "free":
                item_id = 1  # 5 GB only
            else:
                # Premium outside spike window: choose with base probability
                item_id = 2 if random.random() < BASE_P50_WHILE_PREMIUM else 1
            uploads_rows.append({
                "txn_id": next_txn_id,
                "user_id": user_id,
                "item_id": item_id,
                "datetime": iso(ts)
            })
            next_txn_id += 1
        cursor = hour_end

    # Spike: within first hour after becoming premium
    if tier == "premium":
        became_premium_now = interval.from_dt > START
        if became_premium_now and random.random() < P_SPIKE_50GB_WITHIN_1H:
            spike_window_end = min(interval.from_dt + timedelta(hours=1), interval.until_dt)
            if spike_window_end > interval.from_dt:
                n_spike = 1 + (1 if (MAX_SPIKE_50GB > 1 and random.random() < 0.15) else 0)
                for _ in range(n_spike):
                    ts = bounded_random_time(interval.from_dt, spike_window_end)
                    uploads_rows.append({
                        "txn_id": next_txn_id,
                        "user_id": user_id,
                        "item_id": 2,  # force 50 GB in spike
                        "datetime": iso(ts)
                    })
                    next_txn_id += 1

    return uploads_rows, next_txn_id

def main():
    # Users
    users = [{"user_id": uid, "name": f"User {uid:03d}"} for uid in range(1, N_USERS + 1)]

    # Cohorts by user_id blocks of 20
    def cohort_of(uid: int) -> int:
        return (uid - 1) // 20 + 1  # 1..5

    # User-specific rates
    free_rate_per_hour = {u["user_id"]: draw_user_rate(FREE_RATE_DAY_MEAN, FREE_RATE_DAY_STD) for u in users}
    prem_rate_per_hour = {u["user_id"]: draw_user_rate(PREM_RATE_DAY_MEAN, PREM_RATE_DAY_STD) for u in users}

    # Tier intervals
    tiers: list[TierInterval] = []
    for u in users:
        tiers.extend(generate_cohort_tiers(u["user_id"], cohort_of(u["user_id"])))
    tiers.sort(key=lambda t: (t.user_id, t.from_dt))

    # Simulate uploads
    uploads_rows: list[dict] = []
    next_txn_id = 1
    for t in tiers:
        ups, next_txn_id = simulate_uploads_for_interval(
            user_id=t.user_id,
            interval=t,
            free_rate_per_hour=free_rate_per_hour[t.user_id],
            prem_rate_per_hour=prem_rate_per_hour[t.user_id],
            next_txn_id=next_txn_id
        )
        uploads_rows.extend(ups)

    # Sort uploads by time (optional neatness)
    uploads_rows.sort(key=lambda r: (r["user_id"], r["datetime"]))
    # Re-assign txn_ids sequentially
    for i, r in enumerate(uploads_rows, start=1):
        r["txn_id"] = i

    # Write CSVs
    with open(USERS_CSV, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=["user_id", "name"])
        w.writeheader()
        w.writerows(users)

    with open(TIERS_CSV, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=["user_id", "from_datetime", "until_datetime", "tier"])
        w.writeheader()
        for t in tiers:
            w.writerow({
                "user_id": t.user_id,
                "from_datetime": iso(t.from_dt),
                "until_datetime": iso(t.until_dt),
                "tier": t.tier
            })

    with open(ITEMS_CSV, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=["item_id", "size_gb"])
        w.writeheader()
        w.writerows(ITEMS)

    with open(UPLOADS_CSV, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=["txn_id", "user_id", "item_id", "datetime"])
        w.writeheader()
        w.writerows(uploads_rows)

    # Integrity check: no 50GB while free
    by_user = {}
    for t in tiers:
        by_user.setdefault(t.user_id, []).append(t)

    def tier_at(user_id: int, dt_str: str) -> str:
        dt = datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
        for it in by_user[user_id]:
            if it.from_dt <= dt < it.until_dt:
                return it.tier
        return "unknown"

    violations = []
    for up in uploads_rows:
        t = tier_at(up["user_id"], up["datetime"])
        size = ITEM_SIZE_BY_ID[up["item_id"]]
        if t == "free" and size == 50:
            violations.append(up)

    print(f"Generated: {USERS_CSV}, {TIERS_CSV}, {ITEMS_CSV}, {UPLOADS_CSV}")
    if violations:
        print(f"[WARNING] Found {len(violations)} violations (50GB while free).")
    else:
        print("Integrity check passed: No 50GB uploads while free.")

if __name__ == "__main__":
    main()


Generated: users.csv, tiers.csv, items.csv, uploads.csv
Integrity check passed: No 50GB uploads while free.


In [7]:
def get_user_tier_summary(user_id: int, tiers_csv: str = "tiers.csv"):
    """
    Return a list of (start_iso, end_iso, tier) covering the user’s tier evolution.
    - start/end are ISO-8601 UTC strings: 'YYYY-MM-DDTHH:MM:SSZ'
    - tier is 'free' or 'premium'
    """
    # Load and filter
    tiers = pd.read_csv(tiers_csv)
    tiers = tiers[tiers["user_id"] == user_id].copy()
    if tiers.empty:
        return []  # unknown user_id

    # Parse/normalize
    tiers["from_datetime"] = pd.to_datetime(tiers["from_datetime"], utc=True)
    tiers["until_datetime"] = pd.to_datetime(tiers["until_datetime"], utc=True)
    tiers = tiers.sort_values(["from_datetime", "until_datetime", "tier"])

    # Optional: coalesce adjacent intervals with the same tier
    merged = []
    for _, row in tiers.iterrows():
        start, end, tier = row["from_datetime"], row["until_datetime"], row["tier"]
        if not merged:
            merged.append([start, end, tier])
        else:
            last_start, last_end, last_tier = merged[-1]
            # If same tier and touching (no gap), merge
            if tier == last_tier and start == last_end:
                merged[-1][1] = end
            else:
                merged.append([start, end, tier])

    # Format to tuples with ISO-8601 Zulu
    out = [(s.strftime("%Y-%m-%dT%H:%M:%SZ"),
            e.strftime("%Y-%m-%dT%H:%M:%SZ"),
            t) for s, e, t in merged]

    return out

# --- examples ---
if __name__ == "__main__":
    # Always-Free or Always-Premium users will yield one tuple
    print("User 5:", get_user_tier_summary(5))
    # Mixed cohorts will yield 2 or 3 tuples
    print("User 25:", get_user_tier_summary(25))
    print("User 45:", get_user_tier_summary(45))
    print("User 85:", get_user_tier_summary(85))


User 5: [('2025-03-01T00:00:00Z', '2025-03-11T00:00:00Z', 'premium')]
User 25: [('2025-03-01T00:00:00Z', '2025-03-11T00:00:00Z', 'free')]
User 45: [('2025-03-01T00:00:00Z', '2025-03-07T21:24:08Z', 'premium'), ('2025-03-07T21:24:08Z', '2025-03-11T00:00:00Z', 'free')]
User 85: [('2025-03-01T00:00:00Z', '2025-03-06T10:47:41Z', 'free'), ('2025-03-06T10:47:41Z', '2025-03-09T00:22:06Z', 'premium'), ('2025-03-09T00:22:06Z', '2025-03-11T00:00:00Z', 'free')]
