# Step 1: Initializing with Majority Vote
We start by creating the majority vote initialization for the hidden labels. For each item, we choose the label that has been assigned the most frequently by the annotators.

# Step 2: E-step and M-step of the Dawid and Skene Algorithm
The E-step involves calculating the probability that each item belongs to each class given the current estimates of the confusion matrices and the class probabilities. The M-step updates these estimates based on the probabilities from the E-step.

# Step 3: Convergence Criteria
A reasonable convergence criterion would be to stop when the change in the estimated label probabilities is below a certain threshold or after a fixed number of iterations. Let's implement this criterion.

In [None]:
import numpy as np
from scipy.stats import mode
import matplotlib.pyplot as plt

In [None]:
def majority_vote_initialization(annotation_matrix):
    """
    Perform majority vote initialization for the hidden labels.
    Args:
    - annotation_matrix (np.ndarray): Matrix with items in rows and annotations in columns.
    
    Returns:
    - np.ndarray: Array of initially hidden labels based on the majority vote.
    """
    majority_votes, _ = mode(annotation_matrix, axis=1)
    return majority_votes.flatten()

In [None]:
### i am accepting this for the time being. gives correct result for the example in the lecture
def calculate_confusion_matrices(annotation_matrix, label_probs_mat, num_classes):
    """
    calculate confusion matrices for each annotator.
    args:
    - annotation_matrix (np.ndarray): Matrix with items in rows and annotations in columns.
    - label_probs (np.ndarray): Probability matrix for each item and class.
    - num_classes (int): Number of possible classes.
    
    returns:
    - np.ndarray: Confusion matrices for each annotator.
    """
    num_annotators = annotation_matrix.shape[1]
    confusion_matrices = np.zeros((num_annotators, num_classes, num_classes))
    
    for j in range(num_annotators):
        for k in range(num_classes):
            for l in range(num_classes):
                confusion_matrices[j, k, l] = np.sum((annotation_matrix[:, j] == l) * label_probs_mat[:,k])
            confusion_matrices[j, k, :] /= np.sum(label_probs_mat[:,k])
    return confusion_matrices


In [None]:
# M and E step
def m_step(annotation_matrix, label_probs, num_classes):
    """
    M-step of the Dawid and Skene algorithm.
    Args:
    - annotation_matrix (np.ndarray): Matrix with items in rows and annotations in columns.
    - label_probs (np.ndarray): Updated label probabilities for each item and class.
    - num_classes (int): Number of possible classes.
    
    Returns:
    - Tuple[np.ndarray, np.ndarray]: Updated class probabilities and confusion matrices.
    """
    num_items = annotation_matrix.shape[0]
    class_probs = np.sum(label_probs, axis=0) / num_items
    confusion_matrices = calculate_confusion_matrices(annotation_matrix, label_probs, num_classes)
    return class_probs, confusion_matrices


def e_step(annotation_matrix, confusion_matrices, class_probs): # im trusting this for the time being. gives correct result
    """
    E-step of the Dawid and Skene algorithm.
    Args:
    - annotation_matrix (np.ndarray): Matrix with items in rows and annotations in columns.
    - confusion_matrices (np.ndarray): Confusion matrices for each annotator.
    - class_probs (np.ndarray): Probability of each class.
    
    Returns:
    - np.ndarray: Updated label probabilities for each item and class.
    """
    num_items, num_annotators = annotation_matrix.shape
    num_classes = class_probs.shape[0]
    label_probs = np.zeros((num_items, num_classes)) # new label distribution matrix (denoted as Z in lecture)
    
    for i in range(num_items):
        for k in range(num_classes):
            prob = class_probs[k]
            for j in range(num_annotators):
                prob *= confusion_matrices[j, k, annotation_matrix[i, j]]
            label_probs[i, k] = prob
        label_probs[i, :] /= np.sum(label_probs[i, :])
    return label_probs

In [None]:
def has_converged(old_probs, new_probs, tolerance):
    """
    Check if the algorithm has converged.
    Args:
    - old_probs (np.ndarray): Previous class probabilities.
    - new_probs (np.ndarray): New class probabilities.
    - tolerance (float): Convergence tolerance.
    
    Returns:
    - bool: True if converged, otherwise False.
    """
    return np.max(np.abs(new_probs - old_probs)) < tolerance


In [None]:
# Code to estimate labels distributions
def indiv_annotation_label_distribution(annotation_matrix, num_class:int, num_items:int):
    '''
    annotation matrix (numpy array): Shape (num_items, num_annotators)
    num_class:
    num_items:

    Return: 1. label proportion matrix
            2. estimated label proportions
    '''
    # Initialize the label distribution matrix
    label_distn_matrix = np.zeros(shape=(num_items, num_class))
    num_annotators = annotation_matrix.shape[1]

    # Calculate the distribution of labels for each item
    for i in range(num_items):
        # Count the occurrence of each class label for the i-th item
        labels, counts = np.unique(annotation_matrix[i, :], return_counts=True)
        
        # Calculate the proportion of each label and update the label distribution matrix
        for label, count in zip(labels, counts):
            label_distn_matrix[i, label] = count / num_annotators

    avg_label_distn = np.mean(label_distn_matrix, axis=0) 

    return label_distn_matrix, avg_label_distn


In [83]:
def dawid_skene(annotation_matrix, num_classes, tol=1e-4, max_iter=100):
    """
    Run the Dawid and Skene algorithm.
    Args:
    - annotation_matrix (np.ndarray): Matrix with items in rows and annotations in columns.
    - num_classes (int): Number of possible classes.
    - tol (float): Convergence tolerance.
    - max_iter (int): Maximum number of iterations.
    
    Returns:
    - Tuple[np.ndarray, np.ndarray, np.ndarray]: Final class probabilities, confusion matrices, and label probabilities.
    """

    hidden_labels = majority_vote_initialization(annotation_matrix)
    print("hidden labels are:"); print(hidden_labels)
    num_items = annotation_matrix.shape[0]
    label_probs, class_probs = indiv_annotation_label_distribution(annotation_matrix, num_classes,
                                                                num_items = num_items)
    print("label probs (matrix) are:"); print(label_probs)
    print("class_probs (averaged estimate from label prob matrix):"); print(class_probs)

    
    confusion_matrices = calculate_confusion_matrices(annotation_matrix, label_probs, num_classes)
    print("confusion matrices are:"); print(confusion_matrices)
    
    for iteration in range(max_iter):
        new_label_probs = e_step(annotation_matrix, confusion_matrices, class_probs)
        if iteration==0:
            print("after first E step:, new labels probs are:")
            print(new_label_probs)
        new_class_probs, new_confusion_matrices = m_step(annotation_matrix, new_label_probs, num_classes)
        
        if has_converged(class_probs, new_class_probs, tol):
            print(f"---converged at the iteration count---: {iteration}")
            break
        
        class_probs = new_class_probs.copy()
        confusion_matrices = new_confusion_matrices.copy()
        
        # print(f"Iteration {iteration + 1}")
        # print("Class probabilities:", class_probs)
        # print("Confusion matrices:", confusion_matrices)
        # print("Label probabilities:", new_label_probs)

    final_labels = np.argmax(new_label_probs, axis=1)
    return class_probs, confusion_matrices, new_label_probs, final_labels



In [84]:
# example from the lecture
annotation_matrix = np.array([[0,1,0],
                            [0,1,1],
                            [1,1,0],
                            [0,0,1],
                            [0,0,1]])
num_classes = 2
class_probs, confusion_matrices, label_probs, final_labels = dawid_skene(annotation_matrix, num_classes)
print("---FINAL RESULTS---:")
print("1. Final class probabilities:", class_probs)
print("2. Final label probabilities:", label_probs)
print("3. Final labels estimated:"); print(final_labels)

hidden labels are:
[0 1 1 0 0]
label probs (matrix) are:
[[0.66666667 0.33333333]
 [0.33333333 0.66666667]
 [0.33333333 0.66666667]
 [0.66666667 0.33333333]
 [0.66666667 0.33333333]]
class_probs (averaged estimate from label prob matrix):
[0.53333333 0.46666667]
confusion matrices are:
[[[0.875      0.125     ]
  [0.71428571 0.28571429]]

 [[0.5        0.5       ]
  [0.28571429 0.71428571]]

 [[0.375      0.625     ]
  [0.42857143 0.57142857]]]
after first E step:, new labels probs are:
[[0.46164199 0.53835801]
 [0.5173454  0.4826546 ]
 [0.23444976 0.76555024]
 [0.72823779 0.27176221]
 [0.72823779 0.27176221]]
---converged at the iteration count---: 13
---FINAL RESULTS---:
1. Final class probabilities: [0.59988622 0.40011378]
2. Final label probabilities: [[4.52934089e-07 9.99999547e-01]
 [9.99714697e-01 2.85303313e-04]
 [7.38580305e-51 1.00000000e+00]
 [1.00000000e+00 1.78630045e-28]
 [1.00000000e+00 1.78630045e-28]]
3. Final labels estimated:
[1 0 1 0 0]


In [77]:
import numpy as np

# Your final label probability matrix
label_prob_matrix = np.array([
    [0.1, 0.9],
    [0.4,0.6],
    [0.89,0.11]
])

# Find the class index with the highest probability for each item
final_labels = np.argmax(label_prob_matrix, axis=1)

print(final_labels)

[1 1 0]


In [None]:
def plot_annotation_matrix(annotation_matrix):
    fig, ax = plt.subplots(figsize=(8, 6))
    
    # Display the annotation matrix using imshow
    cax = ax.imshow(annotation_matrix, cmap='Blues')

    # Add annotations to each cell
    for i in range(annotation_matrix.shape[0]):
        for j in range(annotation_matrix.shape[1]):
            text = ax.text(j, i, annotation_matrix[i, j],
                           ha='center', va='center', color='black')

    # Add a colorbar
    fig.colorbar(cax)

    # Set labels and title
    ax.set_title('Annotation Matrix')
    ax.set_xlabel('X Axis')
    ax.set_ylabel('Y Axis')

    # Display the plot
    plt.show()
plot_annotation_matrix(annotation_matrix)

In [None]:
def plot_confusion_matrices(confusion_matrices, n_classes):
    n_annotators = len(confusion_matrices)
    fig, axes = plt.subplots(1, n_annotators, figsize=(n_annotators * 10, 10))
    
    # Ensure axes is iterable if there's only one annotator
    if n_annotators == 1:
        axes = [axes]

    for j, ax in enumerate(axes):
        cax = ax.matshow(confusion_matrices[j], cmap='Blues')
        fig.colorbar(cax, ax=ax, fraction=0.046, pad=0.04)
        
        ax.set_title(f'Annotator {j+1}', fontsize=24, pad=20)
        ax.set_xlabel('True Label', fontsize=20, labelpad=15)
        ax.set_ylabel('Assigned Label', fontsize=20, labelpad=15)
        
        # Set tick parameters
        ax.tick_params(axis='both', which='major', labelsize=16)
        
        # Set ticks and labels
        ticks = np.arange(n_classes)
        ax.set_xticks(ticks)
        ax.set_yticks(ticks)
        ax.set_xticklabels(ticks)
        ax.set_yticklabels(ticks)
        
        # Annotate each cell with the numeric value
        for i in range(n_classes):
            for k in range(n_classes):
                value = confusion_matrices[j][i, k]
                text_color = 'white' if value > 0.5 else 'black'
                ax.text(k, i, f'{value:.2f}', ha='center', va='center', 
                        color=text_color, fontsize=16, fontweight='bold')

    plt.tight_layout(pad=3.0)
    plt.show()

In [None]:
plot_confusion_matrices(confusion_matrices, num_classes)