In [27]:
import numpy as np

def independent_market_mechanism(n, m, P):
    """
    Implements the Independent Market Mechanism as proposed by Freeman et al. in 2019.

    Parameters:
    - n (int): Number of voters.
    - m (int): Number of alternatives.
    - P (np.array): Voting profile, a numpy array of shape (n, m), where each vote ranges from 0 to 1.

    Returns:
    - allocation (np.array): The final allocation using the Independent Market Mechanism, rounded to 3 decimal points.
    - detailed_info (list): A list containing detailed information for each alternative, rounded to 3 decimal points.
    """
    def compute_sum_medians(t):
        # Compute phantoms
        phantoms = [round(min(t * (n - k), 1), 3) for k in range(n + 1)]
        total_median = 0
        medians = []
        for i in range(m):
            votes_i = P[:, i].tolist()
            combined_list = votes_i + phantoms
            median_i = round(np.median(combined_list), 3)
            medians.append(median_i)
            total_median += median_i
        return total_median, medians

    # Function to find t in [0,1] using binary search, such that sum of medians equals 1
    def find_t():
        t_low = 0
        t_high = 1
        epsilon = 1e-8
        max_iterations = 10000

        sum_low, _ = compute_sum_medians(t_low)
        sum_high, _ = compute_sum_medians(t_high)
        
        # Extend t_high if necessary
        while sum_high < 1:
            t_high *= 2
            sum_high, _ = compute_sum_medians(t_high)
            if t_high > 1e6:
                raise ValueError("Unable to find t in a reasonable range.")

        for _ in range(max_iterations):
            t_mid = (t_low + t_high) / 2
            sum_mid, _ = compute_sum_medians(t_mid)
            if abs(sum_mid - 1) < epsilon:
                return round(t_mid, 3)
            elif sum_mid < 1:
                t_low = t_mid
            else:
                t_high = t_mid
        raise ValueError("Failed to converge to a solution for t.")

    # Find the appropriate t
    t = find_t()

    # Compute the final allocation and detailed info
    phantoms = [round(min(t * (n - k), 1), 3) for k in range(n + 1)]
    allocation = []
    detailed_info = []
    for i in range(m):
        votes_i = P[:, i].tolist()
        combined_list = votes_i + phantoms
        median_i = round(np.median(combined_list), 3)
        allocation.append(median_i)
        detailed_info.append({
            'alternative': i + 1,
            'votes': [round(v, 3) for v in votes_i],
            'phantoms': phantoms,
            'combined_list': sorted([round(x, 3) for x in combined_list]),
            'median': median_i
        })

    allocation = np.array(allocation)
    return allocation, detailed_info 


In [30]:
def welfare_maximizing_phantom_mechanism(n, m, P):
    """
    Implements the Welfare-Maximizing Phantom Mechanism as proposed by Freeman et al.

    Parameters:
    - n (int): Number of voters.
    - m (int): Number of alternatives.
    - P (np.array): Voting profile, a numpy array of shape (n, m), where each vote ranges from 0 to 1.

    Returns:
    - allocation (np.array): The final allocation using the Welfare-Maximizing Phantom Mechanism.
    - detailed_info (list): A list containing detailed information for each alternative.
    """
    def compute_phantoms(t):
        phantoms = []
        for k in range(n + 1):
            if t <= k / (n + 1):
                fk = 0
            elif t <= (k + 1) / (n + 1):
                fk = (n + 1) * t - k
            else:
                fk = 1
            phantoms.append(round(fk, 3))
        return phantoms

    def compute_sum_medians(t):
        phantoms = compute_phantoms(t)
        total_median = 0
        medians = []
        for i in range(m):
            votes_i = P[:, i].tolist()
            combined_list = votes_i + phantoms
            median_i = round(np.median(combined_list), 3)
            medians.append(median_i)
            total_median += median_i
        return total_median, medians, phantoms

    # Function to find t such that sum of medians equals 1
    def find_t():
        t_low = 0
        t_high = 1
        epsilon = 1e-8
        max_iterations = 100000

        for _ in range(max_iterations):
            t_mid = (t_low + t_high) / 2
            sum_mid, _, _ = compute_sum_medians(t_mid)
            if abs(sum_mid - 1) < epsilon:
                return t_mid
            elif sum_mid > 1:
                t_high = t_mid
            else:
                t_low = t_mid
        raise ValueError("Failed to converge to a solution for t.")

    # Find the appropriate t
    t = find_t()

    # Compute the final allocation and detailed info
    phantoms = compute_phantoms(t)
    allocation = []
    detailed_info = []
    for i in range(m):
        votes_i = P[:, i].tolist()
        combined_list = votes_i + phantoms
        median_i = round(np.median(combined_list), 3)
        allocation.append(median_i)
        detailed_info.append({
            'alternative': i + 1,
            'votes': [round(v, 3) for v in votes_i],
            'phantoms': phantoms,
            'combined_list': [round(x, 3) for x in sorted(combined_list)],
            'median': median_i
        })

    allocation = np.array(allocation)
    return allocation, detailed_info

In [11]:
def compute_disutilities(P, allocation):
    """
    Computes the disutilities for each voter given their preferences and an allocation.
    Disutility is the sum of absolute differences between the voter's peak preference and the allocation.

    Parameters:
    - P (np.array): Voting profile, a numpy array of shape (n, m), representing voters' peak preferences.
    - allocation (np.array): Allocation vector of shape (m,).

    Returns:
    - disutilities (np.array): Disutilities for each voter, shape (n,).
    """
    disutilities = np.sum(np.abs(P - allocation), axis=1)
    return disutilities

def combined_allocation_and_loss(n, m, P):
    """
    Computes the combined allocation by splitting the budget equally between the
    Independent Market Mechanism and Welfare-Maximizing Phantom Mechanism.

    Calculates the welfare loss (total disutility) and fairness loss (maximum disutility)
    compared to both individual mechanisms.

    Parameters:
    - n (int): Number of voters.
    - m (int): Number of alternatives.
    - P (np.array): Voting profile.

    Returns:
    - allocation_C (np.array): The combined allocation.
    - welfare_losses (dict): Welfare loss compared to IMM and WMPM.
    - fairness_losses (dict): Fairness loss compared to IMM and WMPM.
    """
    # Compute allocations from both mechanisms
    # Assume the IMM and WMPM functions are defined elsewhere
    allocation_IMM, _ = independent_market_mechanism(n, m, P)
    allocation_WMPM, _ = welfare_maximizing_phantom_mechanism(n, m, P)

    # Combine the allocations equally
    allocation_C = np.round((allocation_IMM + allocation_WMPM) / 2, 3)

    # Compute disutilities for each voter under each allocation
    disutilities_IMM = compute_disutilities(P, allocation_IMM)
    disutilities_WMPM = compute_disutilities(P, allocation_WMPM)
    disutilities_C = compute_disutilities(P, allocation_C)

    # Calculate total disutilities (lower is better)
    total_disutility_IMM = np.sum(disutilities_IMM)
    total_disutility_WMPM = np.sum(disutilities_WMPM)
    total_disutility_C = np.sum(disutilities_C)

    # Calculate welfare loss (difference in total disutilities)
    welfare_loss_IMM = total_disutility_C - total_disutility_IMM
    welfare_loss_WMPM = total_disutility_C - total_disutility_WMPM

    # Calculate fairness loss (difference in maximum disutilities)
    max_disutility_IMM = np.max(disutilities_IMM)
    max_disutility_WMPM = np.max(disutilities_WMPM)
    max_disutility_C = np.max(disutilities_C)

    fairness_loss_IMM = max_disutility_C - max_disutility_IMM
    fairness_loss_WMPM = max_disutility_C - max_disutility_WMPM

    # Prepare the results
    welfare_losses = {
        'IMM': round(welfare_loss_IMM, 3),
        'WMPM': round(welfare_loss_WMPM, 3)
    }

    fairness_losses = {
        'IMM': round(fairness_loss_IMM, 3),
        'WMPM': round(fairness_loss_WMPM, 3)
    }

    return allocation_C, welfare_losses, fairness_losses

In [31]:
# Example usage with provided votes
n = 3
m = 3
P = np.array(
[[0.61, 0.33, 0.06],
 [0.23, 0.24, 0.53],
 [0.32, 0.44, 0.24]])

allocation_C, welfare_losses, fairness_losses = combined_allocation_and_loss(n, m, P)

print("Combined Allocation:", allocation_C)
print("\nWelfare Losses (Total Disutility):")
for key, value in welfare_losses.items():
    print(f"Compared to {key}: {value}")

print("\nFairness Losses (Maximum Disutility):")
for key, value in fairness_losses.items():
    print(f"Compared to {key}: {value}")


ValueError: Failed to converge to a solution for t.

In [16]:
from scipy.optimize import linprog

def check_pareto_optimality_single_peaked(allocation, P):
    """
    Checks whether the allocation is Pareto optimal under single-peaked preferences with L1 disutility.
    If not, provides a new allocation that Pareto dominates the current allocation.

    Parameters:
    - allocation (np.array): Current allocation, a numpy array of shape (m,)
    - P (np.array): Ideal allocations for each voter, shape (n, m)

    Returns:
    - is_pareto_optimal (bool): True if the allocation is Pareto optimal, False otherwise
    - new_allocation (np.array or None): A new allocation that Pareto dominates the current allocation,
      or None if the allocation is Pareto optimal
    """
    n, m = P.shape
    epsilon = 1e-6  # Small positive value to ensure strict improvement

    # Calculate current disutilities for each voter
    D_current = np.sum(np.abs(allocation - P), axis=1)

    # Decision variables:
    # - A'_j for allocations (m variables)
    # - d_{i,j} for disutilities (n*m variables)

    num_vars = m + n * m  # Total number of variables

    # Objective function: We can set it to zeros as we're only interested in feasibility
    c = np.zeros(num_vars)

    # Bounds for allocation variables A'_j: between 0 and 1
    bounds_A = [(0, 1) for _ in range(m)]

    # Bounds for disutility variables d_{i,j}: d_{i,j} >= 0
    bounds_d = [(0, None) for _ in range(n * m)]

    bounds = bounds_A + bounds_d

    # Constraint for sum of A'_j = 1
    A_eq = []
    b_eq = []
    
    A_eq_allocation = np.zeros(num_vars)
    A_eq_allocation[:m] = 1
    A_eq.append(A_eq_allocation)
    b_eq.append(1)

    # Constraints for disutility calculations and non-increasing disutility
    A_ub = []
    b_ub = []

    # For each voter i and alternative j
    for i in range(n):
        D_i_current = D_current[i]

        # Disutility sum for voter i
        A_d_sum = np.zeros(num_vars)
        for j in range(m):
            # Indices for A'_j and d_{i,j}
            idx_Aj = j
            idx_dij = m + i * m + j

            # Constraints for |A'_j - P_{i,j}| <= d_{i,j}
            # d_{i,j} >= A'_j - P_{i,j}
            A1 = np.zeros(num_vars)
            A1[idx_Aj] = -1
            A1[idx_dij] = 1
            A_ub.append(A1)
            b_ub.append(-P[i, j])

            # d_{i,j} >= -(A'_j - P_{i,j}) => d_{i,j} >= -A'_j + P_{i,j}
            A2 = np.zeros(num_vars)
            A2[idx_Aj] = 1
            A2[idx_dij] = 1
            A_ub.append(A2)
            b_ub.append(P[i, j])

            # Sum up disutilities for voter i
            A_d_sum[idx_dij] = 1

        # Constraint for non-increasing disutility
        A_ub.append(A_d_sum)
        b_ub.append(D_i_current)

    # Add strict improvement constraint for at least one voter
    pareto_improvement_found = False
    for k in range(n):
        # Copy the existing constraints
        A_ub_k = A_ub.copy()
        b_ub_k = b_ub.copy()

        # Modify the disutility constraint for voter k to ensure strict improvement
        D_k_current = D_current[k]
        # Indices for d_{k,j}
        idx_dk = [m + k * m + j for j in range(m)]

        A_dk_sum = np.zeros(num_vars)
        for idx in idx_dk:
            A_dk_sum[idx] = 1

        # Adjust the constraint for strict improvement
        A_ub_k.append(A_dk_sum)
        b_ub_k.append(D_k_current - epsilon)

        # Solve the LP
        res = linprog(c, A_ub=A_ub_k, b_ub=b_ub_k, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs')

        if res.success:
            # Extract the allocation variables
            new_allocation = res.x[:m]
            # Adjust for numerical errors
            new_allocation = np.clip(new_allocation, 0, 1)
            new_allocation = new_allocation / np.sum(new_allocation)
            return False, new_allocation

    # If no Pareto improvement is found, the allocation is Pareto optimal
    return True, None

In [17]:
if __name__ == "__main__":
    n = 3
    m = 3
    P = np.array([
        [0, 0.5, 0.5],
        [1/3, 2/3, 0],
        [0.9, 0, 0.1]
    ])
    allocation = np.array([1/3, 4/9, 2/9])  # Current allocation

    is_optimal, new_alloc = check_pareto_optimality_single_peaked(allocation, P)

    if is_optimal:
        print("The allocation is Pareto optimal under single-peaked preferences with L1 disutility.")
    else:
        print("The allocation is NOT Pareto optimal under single-peaked preferences with L1 disutility.")
        print("A new allocation that Pareto dominates the current allocation is:")
        print(np.round(new_alloc, 3))

The allocation is Pareto optimal under single-peaked preferences with L1 disutility.


In [12]:
is_optimal = is_pareto_optimal(allocation, P)
print("Is the allocation Pareto optimal?", is_optimal)

Is the allocation Pareto optimal? True
