*Data management course*

In [10]:
from typing import List
import math
import heapq

In [7]:
class Item:
    def __init__(self, name: str, category: int, score: float):
        self.name = name
        self.category= category
        self.score = score

    def __str__(self):
        return f"name={self.name}, category={self.category}, val={self.score}"

    def __le__(self, other):
        return self.score <= other.val



In [8]:
def algo1(I: List[Item], K:int, d:int, floors:List[int], ceilings: List[int]) -> List[Item]:
    """
    Diverse top-k selection from a sorted list.
    Args:
        I:          List of items sorted by score
        K:          Number of items to select
        d:          Number of categories
        floors:     Constraints floor_i ≤ k_i for each i ∈ [1. . .d].
        ceilings:   Constraints k_i ≤ ceil_i for each i ∈ [1. . .d].

    Returns:
        L top K  chosen items from list I.
    """
    L = []
    C = [0] * d
    slack = K - sum(floors)
    iterator = iter(I)
    while len(L) < K:
        x = next(iterator)
        i = x.category
        if C[i] < floors[i]:
            L.append(x)
            C[i] += 1
        elif (C[i] < ceilings[i]) and (slack > 0):
            L.append(x)
            C[i] +=1
            slack -= 1
    return L


In [None]:
def algo2(I: List[Item], K:int, d:int, floors:List[int], ceilings: List[int], items_per_category: List[int]) -> List[Item]:
    """
    Diverse top-k selection from a sorted list.
    Args:
        I:                  Stream of items.
        K:                  Number of items to select
        d:                  Number of categories
        floors:             Constraints floor_i ≤ k_i for each i ∈ [1. . .d].
        ceilings:           Constraints k_i ≤ ceil_i for each i ∈ [1. . .d].
        items_per_category: n_i for i ∈[1 . . .d].

    Returns:
        L top K  chosen items from list I.
    """
    num_feasible_items = lambda : sum(items_per_category[i] - M[i] for i in range(len(items_per_category)))

    N = len(I)
    L = []
    C = [0] * d
    M = [0] * d
    R = [math.floor(n / math.e) for n in items_per_category]
    T_i = [[floor] for floor in floors]
    slack = K - sum(floors)
    r = math.floor(N / math.e)
    T = [slack]
    iterator = iter(I)
    while len(L) < K:
        x = next(iterator)
        i = x.category
        sum_m = sum(M)
        if sum_m < r:
            heapq.heappush(T, x)
        if M[i] < R[i]:
            heapq.heappush(T_i[i], x)
        elif ((C[i] < floors[i]) and (x.score > T_i[i][0])) or items_per_category[i] - M[i] == floors[i] - C[i]:
            L.append(x)
            C[i] += 1
        elif (sum_m >= r) and (x.score > T[0] and (C[i] < ceilings[i]) and (slack > 0)):
            heapq.heappop(T)
            L.append(x)
            C[i] += 1
            slack -= 1
        elif (C[i] < ceilings[i]) and num_feasible_items() == K - len(L):
            L.append(x)
            C[i] += 1
            slack -= 1
        M[i] += 1
    return L
