In [1]:
import numpy as np
import math
import random

In [2]:
class EpiInferContact:
    def __init__(self, population_ids, contacts_history, incubation_period=7, infection_duration=7):
        """
        Initialize the EPIINFER-CONTACT system[cite: 31, 113].

        Args:
            population_ids (list): List of unique IDs for all individuals.
            contacts_history (dict): Dictionary mapping day (int) to a list of tuples [(p1, p2), ...].
            incubation_period (int): 'inc' - days between Exposed (E) and Infected (I)[cite: 225].
            infection_duration (int): 'duration' - days someone remains Infected (I)[cite: 274].
        """
        self.population = set(population_ids)
        self.contacts = contacts_history
        self.inc = incubation_period
        self.duration = infection_duration

        # Pre-process contacts for faster lookup: adj[day][person] = {set of contacts}
        self.adj = {}
        for day, meets in self.contacts.items():
            self.adj[day] = {pid: set() for pid in self.population}
            for u, v in meets:
                if u in self.adj[day]: self.adj[day][u].add(v)
                if v in self.adj[day]: self.adj[day][v].add(u)

    def _get_contacts(self, person, day):
        """Helper to get contacts of a person on a specific day."""
        if day in self.adj and person in self.adj[day]:
            return self.adj[day][person]
        return set()

    # ==========================================
    # Algorithm 3: findprob [cite: 285, 287]
    # ==========================================
    def find_prob(self, bel_asymp, s, p1, d):
        """
        Calculates probability that susceptible person 's' becomes asymptomatic
        on day 'd' given the set of currently asymptomatic people 'BelAsymp'.
        """
        count_encounters = 0
        contacts_of_s = self._get_contacts(s, d)

        # Line 3-6: Count encounters with asymptomatic individuals
        for x in contacts_of_s:
            if x in bel_asymp:
                count_encounters += 1

        # Line 8: Return 1 - (1 - p1)^countEncounters
        return 1.0 - ((1.0 - p1) ** count_encounters)

    # ==========================================
    # Algorithm 2: predict [cite: 281, 282]
    # ==========================================
    def predict(self, bel_asymp, bel_susc, p1, d):
        """
        Predicts the set of people who become newly asymptomatic (Exposed) on day d+1.
        """
        believed_new_asymp = set() # Line 2

        # Line 3: Iterate through susceptible population
        for s in bel_susc:
            # Line 4: Calculate probability
            p = self.find_prob(bel_asymp, s, p1, d)

            # Line 5: Draw U uniformly
            u = random.random()

            # Line 6-7: Determine if infection occurs
            if u < p:
                believed_new_asymp.add(s)

        return believed_new_asymp # Line 10

    # ==========================================
    # Algorithm 1: EpiInfer-Core [cite: 272, 280]
    # ==========================================
    def contact_infer_core(self, p1, p2, initial_infected_data, start_day, end_day, known_immune=None):
        """
        Projects future infection rates using contact tracing data.
        """
        # History tracking
        hist_new_asymp = {}
        hist_new_inf = {}

        # --- Initialization Phase  ---
        # Estimate initial asymptomatic pool based on Infected(d) and p2.
        # "Infected(d)/p2 people in the asymptomatic status at d-inc"
        initial_infected_count = len(initial_infected_data)
        if initial_infected_count > 0 and p2 > 0:
            est_total_asymp = int(initial_infected_count / p2)
            # Logic to assign these preferentially to high-contact people (simplified implementation)
            # For this code, we assume 'initial_infected_data' passed in might already be the estimated set,
            # or we seed the simulation with a basic set if specific history isn't provided.
            # Here we seed simply for the simulation loop:
            hist_new_asymp[start_day] = set(initial_infected_data)
        else:
            hist_new_asymp[start_day] = set()

        hist_new_inf[start_day] = set()
        current_immune = set(known_immune) if known_immune else set()

        # Helper: BelievedAsymptomatic(e) = Union of NewAsymp from (e-inc) to e [cite: 280]
        def get_believed_asymp(curr_day):
            union_set = set()
            for d in range(curr_day - self.inc + 1, curr_day + 1):
                if d in hist_new_asymp:
                    union_set.update(hist_new_asymp[d])
            return union_set

        # Helper: BelievedInf(e) = Union of NewInf from (e-duration) to e [cite: 274]
        def get_believed_inf(curr_day):
            union_set = set()
            for d in range((curr_day - self.duration) + 1, curr_day + 1):
                if d in hist_new_inf:
                    union_set.update(hist_new_inf[d])
            return union_set

        # --- Main Simulation Loop (Line 7) ---
        for d in range(start_day, end_day):
            # Maintain Invariants [cite: 280]
            bel_asymp_d = get_believed_asymp(d)
            bel_inf_d = get_believed_inf(d)

            # BelievedSusc(d) calculation (Line 5)
            non_susc = bel_asymp_d.union(bel_inf_d).union(current_immune)
            bel_susc_d = self.population - non_susc

            # Line 8: Predict New Asymptomatic for d+1
            new_asymp_next = self.predict(bel_asymp_d, bel_susc_d, p1, d)
            hist_new_asymp[d+1] = new_asymp_next

            # Line 9: Calculate New Infected for d+1
            # "Choose each member of BelievedNewAsymp(d+1-inc) with probability p2"
            source_day = (d + 1) - self.inc
            new_inf_next = set()

            if source_day in hist_new_asymp:
                candidates = hist_new_asymp[source_day]
                for person in candidates:
                    if random.random() < p2:
                        new_inf_next.add(person)
                    # The rest return to susceptible pool (implicitly by not entering Inf state) [cite: 279]

            hist_new_inf[d+1] = new_inf_next

        # Return daily counts of new infections (Line 10)
        results = {d: len(s) for d, s in hist_new_inf.items()}
        return results

    # ==========================================
    # Algorithm 4: ContinuousCalibrate [cite: 324, 326]
    # ==========================================
    def continuous_calibrate(self, training_data, training_days_window, initial_asymp, start_day):
        """
        Systematic Search for p1 and p2 minimizing RMSE.
        """
        best_rmse = float('inf')
        best_p1, best_p2 = 0.5, 0.5

        max_day = max(training_data.keys())
        rmse_start_day = max_day - training_days_window + 1

        # Line 2: Iterate p2 from 0.1 to 1.0
        for p2 in [round(x * 0.1, 1) for x in range(1, 11)]:

            # Line 3: Binary Search on p1
            low = 0.0
            high = 1.0

            # Fixed iterations for binary search approximation
            for _ in range(10):
                curr_p1 = (low + high) / 2.0

                # Line 4: Run EpiInfer-core
                pred_counts = self.contact_infer_core(curr_p1, p2, initial_asymp, start_day, max_day)

                # Line 5: Calculate RMSE for last t days
                sq_err_sum = 0
                count = 0
                pred_sum = 0
                real_sum = 0

                for d in range(rmse_start_day, max_day + 1):
                    if d in pred_counts and d in training_data:
                        pred = pred_counts[d]
                        real = training_data[d]
                        sq_err_sum += (pred - real) ** 2
                        pred_sum += pred
                        real_sum += real
                        count += 1

                if count == 0: break

                rmse = math.sqrt(sq_err_sum / count)

                # Line 6: Update best p1, p2
                if rmse < best_rmse:
                    best_rmse = rmse
                    best_p1 = curr_p1
                    best_p2 = p2

                # Line 7: Update interval for p1
                # Logic: If predicted > real, need lower p1. Else higher p1. [cite: 320]
                if pred_sum > real_sum:
                    high = curr_p1
                else:
                    low = curr_p1

        return best_p1, best_p2 # Line 10

In [3]:
# ==========================================
# Testing Block
# ==========================================
def test_algorithms():
    print("--- Setting up Synthetic Data ---")
    pop_ids = list(range(1, 51)) # 50 people

    # Create random contacts
    contacts = {}
    for d in range(1, 25):
        daily_meets = []
        for _ in range(40):
            u = random.choice(pop_ids)
            v = random.choice(pop_ids)
            if u != v: daily_meets.append((u, v))
        contacts[d] = daily_meets

    # Instantiate System with inc=5, duration=7
    epi_system = EpiInferContact(pop_ids, contacts, incubation_period=5, infection_duration=7)

    # Initial Asymptomatic People (Day 1)
    init_asymp = {1, 2, 3}

    print("\n--- Testing Algorithm 1 & 2: EpiInfer-Core & Predict ---")
    results = epi_system.contact_infer_core(p1=0.2, p2=0.7, initial_infected_data=init_asymp, start_day=1, end_day=20)

    print("Projected New Infections:")
    for d in sorted(results.keys()):
        print(f"Day {d}: {results[d]}")

    print("\n--- Testing Algorithm 4: Continuous Calibrate ---")
    # Synthetic ground truth
    ground_truth = {
        10: 1, 11: 2, 12: 3, 13: 4, 14: 3
    }

    best_p1, best_p2 = epi_system.continuous_calibrate(
        training_data=ground_truth,
        training_days_window=5,
        initial_asymp=init_asymp,
        start_day=1
    )

    print(f"Calibration Result -> p1: {best_p1:.4f}, p2: {best_p2:.1f}")

In [4]:
if __name__ == "__main__":
    test_algorithms()

--- Setting up Synthetic Data ---

--- Testing Algorithm 1 & 2: EpiInfer-Core & Predict ---
Projected New Infections:
Day 1: 0
Day 2: 0
Day 3: 0
Day 4: 0
Day 5: 0
Day 6: 3
Day 7: 0
Day 8: 0
Day 9: 0
Day 10: 1
Day 11: 0
Day 12: 1
Day 13: 0
Day 14: 0
Day 15: 2
Day 16: 2
Day 17: 1
Day 18: 2
Day 19: 2
Day 20: 0

--- Testing Algorithm 4: Continuous Calibrate ---
Calibration Result -> p1: 0.2588, p2: 1.0


# `EpiInferContact` Class Documentation

## Overview

The `EpiInferContact` class implements the **EPIINFER-CONTACT** system, a non-Markovian, agent-based epidemic forecasting model. Unlike standard differential equation models (e.g., SIR), this system utilizes specific **contact tracing information** (who met whom) to predict future infection rates.

It models individuals transitioning through four states:

* **S (Susceptible):** Healthy and non-immune.
* **E (Exposed/Asymptomatic):** Contagious but not yet symptomatic.
* **I (Infected):** Symptomatic and quarantined.
* **R (Recovered):** Healthy and immune.

## Algorithm Mapping

This class implements the four primary algorithms defined in the EPIINFER-CONTACT methodology:

| Method Name | Algorithm # | Description | Source |
| --- | --- | --- | --- |
| `contact_infer_core` | **Algorithm 1** | Projects future infections by tracking state transitions () over time.
| `predict` | **Algorithm 2** | Determines which susceptible individuals become exposed () on a given day.
| `find_prob` | **Algorithm 3** | Calculates the specific probability of infection for a susceptible person based on their contacts.
| `continuous_calibrate` | **Algorithm 4** | Optimizes parameters  and  by minimizing RMSE against historical training data.

---

## Class Initialization

### `__init__(self, population_ids, contacts_history, incubation_period=7, infection_duration=7)`

Initializes the agent-based model and pre-processes the contact graph.

* **`population_ids`** *(list[int/str])*: A list of unique identifiers for every individual in the simulation.
* **`contacts_history`** *(dict[int, list[tuple]])*: A dictionary mapping a day number (integer) to a list of contact pairs.
  * *Format:* `{1: [(id1, id2), (id3, id4)], 2: [...]}`
  * *Note:* Represents the distribution of meetings among people each day.
* **`incubation_period`** *(int)*: The number of days an individual remains in the **Exposed (Asymptomatic)** state before becoming Infected (`inc`). Defaults to 7.
* **`infection_duration`** *(int)*: The number of days an individual remains in the **Infected** state (`duration`) before recovering. Defaults to 7.

---

## Core Methods

### 1. `contact_infer_core` (Algorithm 1)

```python
def contact_infer_core(self, p1, p2, initial_infected_data, start_day, end_day, known_immune=None)

```

Simulates the epidemic evolution from `start_day` to `end_day` to predict daily new infections.

* **`p1`** *(float)*: Probability a Susceptible person transitions to Exposed after meeting an Asymptomatic person.

* **`p2`** *(float)*: Probability an Exposed person transitions to Infected after the incubation period.

* **`initial_infected_data`** *(set)*: A set of IDs representing people who are initially asymptomatic/infected at the start of the simulation.
  * *Logic:* Used to seed the `BelievedAsymp` sets for the first few days.
* **Returns**: `dict[int, int]` mapping Day  Count of New Infections.

**Key Logic:**

* Maintains "Believed" sets for Asymptomatic and Infected states based on sliding windows of `inc` and `duration`.


* Calculates new infections by applying `p2` to the set of people who became asymptomatic `inc` days ago.


### 2. `predict` (Algorithm 2)

```python
def predict(self, bel_asymp, bel_susc, p1, d)

```

Identifies which susceptible individuals transition to the **Exposed** state on day .

* **`bel_asymp`** *(set)*: IDs of currently asymptomatic (infectious) individuals.
* **`bel_susc`** *(set)*: IDs of currently susceptible individuals.
* **Returns**: `set` of IDs representing newly exposed individuals.

### 3. `find_prob` (Algorithm 3)

```python
def find_prob(self, bel_asymp, s, p1, d)

```

Calculates the probability that a specific susceptible individual `s` becomes infected on day `d`.

* **Logic**: Iterates through the contacts of `s` on day `d`. If `s` met  asymptomatic people, the probability is .


### 4. `continuous_calibrate` (Algorithm 4)

```python
def continuous_calibrate(self, training_data, training_days_window, initial_asymp, start_day)

```

Performs a systematic search to find the optimal  and  values that match historical data.

* **`training_data`** *(dict[int, int])*: Ground truth data mapping Day  Actual New Infections.
* **`training_days_window`** *(int)*: The number of recent days to calculate RMSE against (e.g., last 5 days).
* **Returns**: `(best_p1, best_p2)` tuple.
* **Search Strategy**:
  * Iterates `p2` from 0.1 to 1.0.
  * Uses **Binary Search** to find the optimal `p1` for each `p2`.
  * Minimizes Root Mean Squared Error (RMSE) between projected and actual infections.

---

## Example Usage Snippet

```python
# 1. Setup Data
population = [101, 102, 103, 104, 105]
contacts = {
    1: [(101, 102), (102, 103)],  # Day 1: 101 met 102, 102 met 103
    2: [(101, 104), (103, 105)]   # Day 2: 101 met 104, 103 met 105
}

# 2. Initialize System (inc=2 days, duration=3 days)
model = EpiInferContact(population, contacts, incubation_period=2, infection_duration=3)

# 3. Define Initial State (Patient Zero)
initial_cases = {102} # Person 102 is asymptomatic at start

# 4. Run Projection (Manual Parameters)
# p1 = 0.5 (50% chance to infect on contact)
# p2 = 0.8 (80% chance for asymptomatic to become symptomatic)
forecast = model.contact_infer_core(p1=0.5, p2=0.8,
                                    initial_infected_data=initial_cases,
                                    start_day=1, end_day=5)
print("Forecast:", forecast)

# 5. Calibrate against Real Data
real_data = {3: 1, 4: 2} # Real infections observed on days 3 and 4
best_p1, best_p2 = model.continuous_calibrate(real_data, training_days_window=2,
                                              initial_asymp=initial_cases,
                                              start_day=1)
print(f"Optimized Parameters -> p1: {best_p1}, p2: {best_p2}")

```