In [69]:
import pandas as pd
import numpy as np
import random
import math
import itertools
import ast
from itertools import combinations
from itertools import combinations_with_replacement
from itertools import product
from itertools import chain
import tensorflow as tf
import matplotlib.pyplot as plt
from scipy.optimize import differential_evolution
from tqdm.notebook import tqdm
import seaborn as sns
from sklearn.linear_model import LinearRegression
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import csv
import ast
import numpy as np
from scipy.stats import beta, uniform

plt.rcParams['figure.dpi'] = 300

def bayesTheorem(agents, posGroups, negAgents):

  posGroups = list(posGroups)
  posGroups = sorted(posGroups, key=len)

  for posGroup in posGroups:
    for posGroupCompare in posGroups:
      if posGroup.issubset(posGroupCompare) and posGroup != posGroupCompare:
        posGroups.remove(posGroupCompare)

  newAgents = []

  for agent in agents:
    if agent[0] in set(negAgents):
      newAgents.append((agent[0], 0, 1))
    else:
      newAgents.append(agent)

  agentDict = {}
  for agent in newAgents:
    agentDict[agent[0]] = (agent[1], agent[2])


  def all_combinations(frozen_sets):

    def pick_at_least_one(agents_set):
      # Generate combinations of different sizes from the set
      combinations_list = []
      for r in range(1, len(agents_set) + 1):
          combinations_list.extend(combinations(agents_set, r))
      return combinations_list

    possibilities = [pick_at_least_one(group) for group in frozen_sets]
    product_result = product(*possibilities)

    # Combine the elements of each tuple into a single set
    combos = {frozenset(chain(*comb)) for comb in product_result}

    return combos

  def getProb(groups):
    involvedAgents = frozenset(chain(*groups))

    possiblePos = all_combinations(groups)

    prob = 0
    for possibility in possiblePos:
      currentOutcome = 1
      for id in involvedAgents:
        if id in possibility:
          currentOutcome *= 1 - agentDict[id][1]
        else:
          currentOutcome *= agentDict[id][1]
      prob += currentOutcome

    return prob

  probPos = {}

  probPos[tuple(posGroups)] = getProb(posGroups)
  probPos[()] = 1

  finalAgents = []

  for agent in newAgents:

    # P(agent is Pos | posGroups) = P(posGroups | agent is Pos) * P(agent in Pos) / P(posGroups)
    # = P(pos Groups agent not in) * P(agent is Pos) / P(posGroups)

    if agent[2] == 1 or agent[2] == 0:
      finalAgents.append(agent)
      # agentDict[agent[0]] = (agent[1], agent[2])

    else:

      posNotIn = [posGroup for posGroup in posGroups if agent[0] not in posGroup]

      if tuple(posNotIn) not in probPos:

        probPos[tuple(posNotIn)] = getProb(posNotIn)

      pAgentPos = probPos[tuple(posNotIn)] * (1 - agent[2]) / probPos[tuple(posGroups)]
      health = 1 - pAgentPos

      finalAgents.append((agent[0], agent[1], health))
      # agentDict[agent[0]] = (agent[1], health)

  for agent in finalAgents:
    agentDict[agent[0]] = (agent[1], agent[2])

  return agentDict

In [88]:
def printBayes(agentDict):

  print("Agent, P(Healthy), Utility of Newly Testing Negative")

  for ID in agentDict:
    print(f"Agent {ID}: {agentDict[ID][1]}, {agentDict[ID][0]}")

In [71]:
# Compute the likelihood based on whether at least one agent in each group is infected
def likelihood_binary(agents, posGroups, health_states):
    """Compute the likelihood of the current health states."""
    for group in posGroups:
        # Check if at least one agent in the group is infected (state = 1)
        if all(health_states[agent] == 0 for agent in group):
            return -np.inf  # If all agents in the group are healthy, return very low likelihood
    return 0  # If at least one agent is infected in every group, return 0 (neutral log-likelihood)

In [86]:
# Gibbs MCMC algorithm for binary agent states
def Gibbs(agents, posGroups, negAgents, iterations=20000, burn_in=5000):

    # Initialize agents with binary states (0: healthy, 1: infected), ensuring negAgents are always healthy
    def initialize_agents_binary(agents, negAgents):
        return {agent[0]: 0 if agent[0] in negAgents else np.random.choice([0, 1], p=[agent[2], 1-agent[2]]) for agent in agents}

    # Gibbs-like sampling: Update one agent's state given the rest of the current states
    def update_agent_state(agent, posGroups, health_states, agent_prob, negAgents):
        """Update the state of the agent (0: healthy, 1: infected) based on the group constraints."""
        if agent in negAgents:
            return 0  # If the agent is in negAgents, they must always be healthy
        
        relevant_groups = [group for group in posGroups if agent in group]

        # Compute the probability that the agent should be infected based on their group constraints
        must_be_infected = any(all(health_states[other] == 0 for other in group if other != agent) for group in relevant_groups)
        
        if must_be_infected:
            return 1  # The agent must be infected to satisfy the group constraint
        else:
            # Otherwise, use their probability of being healthy (agent_prob)
            return np.random.choice([0, 1], p=[agent_prob, 1 - agent_prob])


    """Perform MCMC to update health states (binary: 0 or 1) and calculate probabilities."""
    # Initialize binary health states (0: healthy, 1: infected), ensuring negAgents are always healthy
    health_states = initialize_agents_binary(agents, negAgents)
    
    # Store counts of healthy samples for each agent
    healthy_counts = {agent[0]: 0 for agent in agents}
    
    # Run MCMC iterations
    for iteration in range(iterations):
        for agent in agents:
            # Update the state of each agent given the current states, except for agents in negAgents
            health_states[agent[0]] = update_agent_state(agent[0], posGroups, health_states, agent[2], negAgents)
        
        # After burn-in, record the number of times each agent is healthy
        if iteration >= burn_in:
            for agent in agents:
                if health_states[agent[0]] == 0:
                    healthy_counts[agent[0]] += 1
    
    # Calculate the final probabilities of being healthy
    total_samples = iterations - burn_in
    final_probs = {agent[0]: healthy_counts[agent[0]] / total_samples for agent in agents}

    # Prepare the agentDict to return, setting utility to 0 for negAgents
    agentDict = {}
    for agent in agents:
        agent_id = agent[0]
        if agent_id in negAgents:
            agentDict[agent_id] = (0, 1)  # Utility is 0, always healthy
        else:
            agentDict[agent_id] = (agent[1], final_probs[agent_id])  # Utility remains the same, update health probability

    return agentDict

In [91]:
# Example usage
agents = [(1, 1, 0.1), (2, 1, 0.1), (3, 1, 0.5)]  # Agents with initial probabilities of being healthy
negAgents = {1}  # Agent 3 must always be healthy
posGroups = [frozenset({1, 3}), frozenset({3, 2}), frozenset({3})]  # Positive test groups, without negative agents

# Run MCMC with burn-in
final_probs = Gibbs(agents, posGroups, negAgents, iterations=20000, burn_in=5000)

# Output final mean estimates of health probabilities
printBayes(final_probs)


Agent, P(Healthy), Utility of Newly Testing Negative
Agent 1: 1, 0
Agent 2: 0.09986666666666667, 1
Agent 3: 0.0, 1


In [92]:
printBayes(bayesTheorem(agents, posGroups, negAgents))

Agent, P(Healthy), Utility of Newly Testing Negative
Agent 1: 1, 0
Agent 2: 0.09999999999999998, 1
Agent 3: 0.0, 1
