In [321]:
import copy

class Item:
    def __init__(self, item: str, utility: int):
        self.item = item
        self.utility = utility
        self._twu = 0

    @property
    def twu(self) -> int:
        return self._twu

    @twu.setter
    def twu(self, value: int) -> None:
        self._twu = value

    def __repr__(self):
        # return f"({self.item},{self.twu})"
        return f"{self.item}"

    def __eq__(self, other):
        if isinstance(other, Item):
            return self.item == other.item and self.utility == other.utility
        return False

    def __hash__(self):
        return hash((self.item, self.utility))


class TransItem:
    def __init__(self, item: Item, quantity: int, probability: float):
        self.item = item
        self.quantity = quantity
        self.probability = probability

    def __repr__(self):
        return f"{self.item},{self.quantity},{self.probability})"

    def get_total_probability(self):
        return self.quantity * self.probability


def check_order_condition(a: Item, b: Item):
    if a.utility < 0 and b.utility > 0:
        return True
    elif a.utility * b.utility > 0:
        return a.twu >= b.twu
    return False


def check_order_item_and_set(ik: Item, X: set[Item]) -> bool:
    for i in X:
        if i != ik and check_order_condition(ik, i) == False:
            return False
    return True


class Transaction:
    def __init__(self, id: int, trans_items: set[TransItem]):
        self.id = id
        self.trans_items = trans_items

    def __repr__(self):
        return f"t{self.id}, {self.trans_items}"

    def contains_item_set(self, item_set: set[Item]) -> bool:
        transaction_items = {trans_item.item for trans_item in self.trans_items}
        return item_set.issubset(transaction_items)

    def get_quantity_of_item(self, item: Item) -> int:
        for trans_item in self.trans_items:
            if trans_item.item == item:
                return trans_item.quantity
        return 0

    def get_probability_of_item(self, item: Item) -> int:
        for trans_item in self.trans_items:
            if trans_item.item == item:
                return trans_item.probability
        return 0

    def get_items(self) -> set[Item]:
        return {trans_item.item for trans_item in self.trans_items}

    def get_probability_of_item_set(self, item_set: set[Item]) -> float:
        total_probability = 0.0
        if self.contains_item_set(item_set):
            total_probability = 1.0
            for trans_item in self.trans_items:
                if trans_item.item in item_set:
                    total_probability *= trans_item.probability
        return round(total_probability, 3)

    def get_positive_utility_of_item_set(self, items: set[Item]):
        pu = 0
        for item in items:
            utility = item.utility
            if utility > 0:
                quantity = self.get_quantity_of_item(item)
                pu += item.utility * quantity
        return pu

    def get_negative_utility_of_item_set(self, items: set[Item]):
        nu = 0
        for item in items:
            utility = item.utility
            if utility < 0:
                quantity = self.get_quantity_of_item(item)
                nu += item.utility * quantity
        return nu

    def get_remaining_utility_of_item_set(self, items: set[Item]):
        ru = 0
        trans_item_set = self.get_items()
        for item in trans_item_set:
            if item.utility > 0:
                if item not in items:
                    if check_order_item_and_set(item, items) == True:
                        ru += item.utility * self.get_quantity_of_item(item)
        return ru


from collections import namedtuple

Utilities = namedtuple("Utilities", ["tid", "pro", "pu", "nu", "ru"])


class AbstractList:
    def __init__(self, items: set[Item], utility_values: list[Utilities]):
        self.items = items
        self.utility_values = utility_values

    def get_ru(self):
        ru = 0
        for i in self.utility_values:
            ru += i.ru
        return ru

    def get_pu(self):
        pu = 0
        for i in self.utility_values:
            pu += i.pu
        return pu

    def get_nu(self):
        nu = 0
        for i in self.utility_values:
            nu += i.nu
        return nu

    def get_pro(self):
        pro = 0
        for i in self.utility_values:
            pro += i.pro
        return pro

    def __repr__(self):
        if not self.utility_values:
            return "Empty PNU-List"

        # Column titles
        titles = ["PRO", "PU", "NU", "RU"]

        # Get the number of columns from the first utility value
        if isinstance(self.utility_values[0], (list, tuple)):
            num_columns = len(self.utility_values[0])
        else:
            num_columns = 1

        # Create combined items string
        items_str = ",".join(str(item) for item in self.items)
        items_str = "(" + items_str + ")"

        # Calculate column widths based on utility values and titles
        value_widths = []
        for i in range(num_columns):
            max_width = max(
                (
                    len(str(round(row[i], 3)))
                    if isinstance(row, (list, tuple))
                    else len(str(row))
                )
                for row in self.utility_values
            )
            # Consider width of titles and combined items
            if i == 0:
                max_width = max(max_width, len(items_str))
            else:
                max_width = max(max_width, len(titles[i - 1]))
            value_widths.append(max_width)

        # Build the table string
        result = []

        # Add border
        total_width = sum(value_widths) + 3 * num_columns + 1
        result.append("-" * total_width)

        # Add single row with all items and titles
        row = "|"
        row += f" {items_str.rjust(value_widths[0])} |"
        for i in range(1, num_columns):
            row += f" {titles[i-1].center(value_widths[i])} |"
        result.append(row)

        # Add separator
        result.append("-" * total_width)

        # Add utility values
        for utility in self.utility_values:
            row = "|"
            if isinstance(utility, (list, tuple)):
                for i, value in enumerate(utility):
                    row += f" {str(round(value, 3)).rjust(value_widths[i])} |"
            else:
                row += f" {str(utility).rjust(value_widths[0])} |"
            result.append(row)

        # Add bottom border
        result.append("-" * total_width)
        return "\n".join(result)

In [322]:
def calculate_positive_utility_of_transaction(trans: Transaction):
    pu = 0
    items: set[Item] = trans.get_items()
    for item in items:
        if item.utility > 0:
            pu += trans.get_quantity_of_item(item) * item.utility
    return pu

In [323]:
def calculate_transaction_weight_utility(items: set[Item], database: list[Transaction]):
    twu = 0
    for trans in database:
        if trans.contains_item_set(items):
            twu += calculate_positive_utility_of_transaction(trans)
    return twu

In [324]:
def calculate_positive_utility_of_item_set_in_trans(
    items: set[Item], trans: Transaction
):
    pu = 0
    for item in items:
        utility = item.utility
        if utility > 0:
            quantity = trans.get_quantity_of_item(item)
            pu += item.utility * quantity
    return pu

In [325]:
def calculate_negative_utility_of_item_set_in_trans(
    items: set[Item], trans: Transaction
):
    nu = 0
    for item in items:
        utility = item.utility
        if utility < 0:
            quantity = trans.get_quantity_of_item(item)
            nu += item.utility * quantity
    return nu

In [326]:
def calculate_remaining_utility_of_item_set_in_trans(
    items: set[Item], trans: Transaction
):
    ru = 0
    trans_items: set[Item] = trans.get_items()
    for item in trans_items:
        if item.utility > 0:
            if check_order_item_and_set(item, items) == True:
                if item not in items:
                    ru += item.utility * trans.get_quantity_of_item(item)
    return ru

In [327]:
def create_eucs_dict(
    arr: list[Item], database: list[Transaction]
) -> dict[frozenset[Item], int]:
    n = len(arr)
    eucs_dict = {}
    for i in range(n):
        for j in range(n):
            if i < j:
                item_pair = frozenset({arr[i], arr[j]})
                twu_value = calculate_transaction_weight_utility(item_pair, database)
                eucs_dict[item_pair] = twu_value
    return eucs_dict

In [328]:
def find_tuple_by_trans_id(P: AbstractList, target_trans_id: int) -> Utilities:
    utilities_list: list[Utilities] = P.utility_values
    for iTuple in utilities_list:
        if iTuple.tid == target_trans_id:
            return iTuple
    return None


def construct(P: AbstractList, Px: AbstractList, Py: AbstractList):
    x = Px.items
    y = Py.items
    xy = x | y
    utilities_list: list[Utilities] = list()
    Pxy = AbstractList(xy, utilities_list)
    utilities_list_of_px: list[Utilities] = Px.utility_values
    for xTuple in utilities_list_of_px:
        yTuple: Utilities = find_tuple_by_trans_id(Py, xTuple.tid)
        if yTuple is not None:
            if P.utility_values:
                pTuple: Utilities = find_tuple_by_trans_id(P, xTuple.tid)
                xyTuple: Utilities = Utilities(
                    xTuple.tid,
                    xTuple.pro * yTuple.pro / pTuple.pro,
                    xTuple.pu + yTuple.pu - pTuple.pu,
                    xTuple.nu + yTuple.nu - pTuple.nu,
                    yTuple.ru,
                )
                utilities_list.append(xyTuple)
            else:
                xyTuple: Utilities = Utilities(
                    xTuple.tid,
                    xTuple.pro * yTuple.pro,
                    xTuple.pu + yTuple.pu,
                    xTuple.nu + yTuple.nu,
                    yTuple.ru,
                )
                utilities_list.append(xyTuple)
    return Pxy

In [329]:
def calculate_utility_of_item_set_in_database(
    items: set[Item], database: list[Transaction]
):
    u = 0
    for trans in database:
        if trans.contains_item_set(items):
            for item in items:
                u += item.utility * trans.get_quantity_of_item(item)
    return u

In [330]:
def calculate_probability_of_item_set(items: set[Item], database: list[Transaction]):
    prob = 0.0
    for trans in database:
        prob += trans.get_probability_of_item_set(items)
    return prob

In [331]:
def sort_items_by_twu_and_utility(items: list[Item]) -> list[Item]:
    def sort_key(item: Item) -> tuple:
        return (0 if item.utility > 0 else 1, item.twu)

    return sorted(items, key=sort_key)

DFS - SEARCH PROCEDURE ALGORITHM


In [332]:
def searching_procedure(
    PList: AbstractList,
    lists: list[AbstractList],
    minUtil,
    minPro,
    database: list[Transaction],
    eucs_dict: dict[frozenset[Item], int],
    output: list[set[Item]],
):
    for i in range(0, len(lists)):
        XList: AbstractList = lists[i]
        # print("X: " + str(XList.items))
        utility = XList.get_pu() + XList.get_nu()
        pro = XList.get_pro()
        # print(str(XList.items) + ", " + str(utility) + ", " + str(pro))
        if pro >= minPro * len(database) and utility >= minUtil:
            output.append(XList.items)
        if pro >= minPro * len(database) and XList.get_pu() + XList.get_ru() >= minUtil:
            new_lists: list[AbstractList] = list()
            for j in range(i + 1, len(lists)):
                YList: AbstractList = lists[j]
                # print("Y: " + str(YList.items))
                x = XList.items.difference(PList.items)
                y = YList.items.difference(PList.items)
                key = frozenset(x | y)
                twu_value = eucs_dict.get(key)
                if twu_value >= minUtil:
                    ZList = construct(PList, XList, YList)
                    new_lists.append(ZList)
            # print("candidate_count: " + str(len(new_lists)))
            searching_procedure(
                XList, new_lists, minUtil, minPro, database, eucs_dict, output
            )

In [333]:
def preparation_procedure(
    db: list[Transaction], item_list: list[Item], minUtility, minPro
):
    removed_list: list[Item] = list()
    for item in item_list:
        twu = calculate_transaction_weight_utility({item}, db)
        pro = calculate_probability_of_item_set({item}, db)
        if twu < minUtility or pro < minPro * len(db):
            removed_list.append(item)
        else:
            item.twu = twu

    # Remove unqualified item
    new_distinct_items = [item for item in item_list if item not in removed_list]
    new_distinct_items = sort_items_by_twu_and_utility(new_distinct_items)
    newDb = copy.deepcopy(db)

    # Remove unqualified item from transaction
    for trans in newDb:
        for item in removed_list:
            trans.get_items().discard(item)

    # Create list[AbstractList], prepare for algorithm 2
    lists: list[AbstractList] = list()

    for item in new_distinct_items:
        utility_values_list: list[tuple] = list()
        pnu_list = AbstractList({item}, utility_values_list)
        for trans in newDb:
            if trans.contains_item_set({item}):
                pro = trans.get_probability_of_item_set({item})
                pu = trans.get_positive_utility_of_item_set({item})
                nu = trans.get_negative_utility_of_item_set({item})
                ru = trans.get_remaining_utility_of_item_set({item})
                utility_values: Utilities = Utilities(trans.id, pro, pu, nu, ru)
                utility_values_list.append(utility_values)
        # print(pnu_list)
        lists.append(pnu_list)

    # Create EUCS
    eucs_dict: dict[frozenset[Item], int] = create_eucs_dict(new_distinct_items, newDb)

    root = AbstractList({}, list())
    # Call algorithm 2
    output = []
    searching_procedure(root, lists, minUtility, minPro, newDb, eucs_dict, output)
    return output

In [334]:
import random

item_list = [Item(f"i{i+1}", random.randint(-10, 10)) for i in range(10)]

database = list()
for transaction_id in range(1, 21):
    trans_items = set()
    selected_items = random.sample(item_list, random.randint(1, 5))
    for item in selected_items:
        quantity = random.randint(1, 10)
        probability = round(random.uniform(0.1, 1.0), 2)
        trans_item = TransItem(item, quantity, probability)
        trans_items.add(trans_item)

    transaction = Transaction(transaction_id, trans_items)
    database.append(transaction)

database

[t1, {i8,7,0.53)},
 t2, {i6,8,0.35), i10,4,0.4)},
 t3, {i10,4,0.11), i3,9,0.19), i4,5,0.9), i8,1,0.2)},
 t4, {i1,3,0.84)},
 t5, {i2,7,0.68)},
 t6, {i1,5,0.97)},
 t7, {i2,5,0.23), i3,1,0.61), i9,5,0.26), i1,4,0.85)},
 t8, {i6,1,0.28), i9,4,0.17), i5,6,0.62), i2,3,0.92)},
 t9, {i1,1,0.4), i7,8,0.88), i10,3,0.83)},
 t10, {i9,8,0.48), i1,7,0.84), i3,5,0.23)},
 t11, {i3,10,0.55), i9,3,0.95), i8,9,0.61), i4,1,0.44), i10,2,0.2)},
 t12, {i3,7,0.98), i8,6,0.24)},
 t13, {i1,10,0.57), i9,2,0.4)},
 t14, {i2,8,0.52)},
 t15, {i3,4,0.16), i6,3,0.67), i8,6,0.56)},
 t16, {i6,3,0.23)},
 t17, {i10,10,0.53), i7,3,0.19), i9,9,0.35), i5,8,0.82)},
 t18, {i3,2,0.58), i5,3,0.94), i2,1,0.54)},
 t19, {i8,2,0.73), i4,10,0.92)},
 t20, {i2,7,0.76), i7,9,0.35), i6,1,0.93), i10,5,0.26)}]

BASED ON APRIORI - BFS


In [335]:
def is_phui(
    item_set: set[Item], database: list[Transaction], min_util: int, min_pro: float
):
    utility = 0
    probability = 0
    for trans in database:
        if trans.contains_item_set(item_set):
            utility += trans.get_positive_utility_of_item_set(
                item_set
            ) + trans.get_negative_utility_of_item_set(item_set)
            probability += trans.get_probability_of_item_set(item_set)
    
    # print(str(item_set) + ": " + str(probability))
    return utility >= min_util and round(probability, 3) >= min_pro * len(database)

In [336]:
def calculate_utilities_of_item_set(
    item_set: set[Item], database: list[Transaction]
) -> tuple[float, int, int]:
    p, u, ru = 0, 0, 0
    for trans in database:
        if trans.contains_item_set(item_set):
            p += trans.get_probability_of_item_set(item_set)
            u += trans.get_positive_utility_of_item_set(item_set)
            +trans.get_negative_utility_of_item_set(item_set)
            ru += trans.get_remaining_utility_of_item_set(item_set)
    return round(p, 3), u, ru

In [337]:
def is_potential_candidate(
    item_set: set[Item], database: list[Transaction], minUtil: int, minPro: float
) -> bool:
    p, u, ru = calculate_utilities_of_item_set(item_set, database)
    return u + ru >= minUtil and p >= minPro * len(database)

In [338]:
def find_size_one_phui(
    database: list[Transaction], item_list: set[Item], min_util: int, min_pro: float
) -> list[set[Item]]:
    phui_size_one_list: list[set[Item]] = list()
    for item in item_list:
        if is_phui({item}, database, min_util, min_pro):
            phui_size_one_list.append({item})
    return phui_size_one_list

In [339]:
def find_size_k_phui(
    database: list[Transaction],
    size_k_candidates: list[set[Item]],
    min_util: int,
    min_pro: float,
) -> list[set[Item]]:
    phui_size_k_list: list[set[Item]] = list()
    for item_set in size_k_candidates:
        if is_phui(item_set, database, min_util, min_pro):
            phui_size_k_list.append(item_set)
    return phui_size_k_list

In [340]:
def generate_size_k_candidates(
    prev_phui: list[set[Item]], item_list: set[Item], min_util: int, min_pro: float
) -> list[set[Item]]:
    distinct_items = set().union(*prev_phui)
    size_k_candidate: list[set[Item]] = list()
    for phui in prev_phui:
        for item in distinct_items.difference(phui):
            potential_candidate = set(phui)
            potential_candidate.add(item)
            if is_potential_candidate(potential_candidate, database, min_util, min_pro):
                if potential_candidate not in size_k_candidate:
                    size_k_candidate.append(potential_candidate)
        for item in set(item_list).difference(distinct_items):
            potential_candidate = set(phui)
            potential_candidate.add(item)
            if is_potential_candidate(potential_candidate, database, min_util, min_pro):
                if potential_candidate not in size_k_candidate:
                    size_k_candidate.append(potential_candidate)
    return size_k_candidate

In [341]:
def high_utility_mining_apriori(
    database: list[Transaction], item_list: set[Item], min_util: int, min_pro: float
):
    result: list[list[set[Item]]] = list()
    # create PHUI size 1
    size_1_phui: list[set[Item]] = find_size_one_phui(
        database, item_list, min_util, min_pro
    )
    result.append(size_1_phui)
    while result[-1]:
        candidate_size_k: list[set[Item]] = generate_size_k_candidates(
            result[-1], item_list, min_util, min_pro
        )
        # print("candidate_count: " + str(len(candidate_size_k)))
        size_k_phui: list[set[Item]] = find_size_k_phui(
            database, candidate_size_k, min_util, min_pro
        )
        result.append(size_k_phui)

    return [item_set for sublist in result for item_set in sublist]

In [342]:
class PHUNode:
    def __init__(
        self,
        item_set: set[Item] = None,
        utility: int = 0,
        ru: int = 0,
        prob: float = 0.0,
        children: list["PHUNode"] = None,
        parent: "PHUNode" = None,
    ):
        self.item_set = item_set if item_set is not None else set()
        self.utility = utility
        self.ru = ru
        self.prob = prob
        self.children = children if children is not None else []
        self.parent = parent

    def add_child(self, child: "PHUNode") -> None:
        self.children.append(child)

    def is_leaf(self) -> bool:
        return len(self.children) == 0

    def get_total_utility(self) -> int:
        total = self.utility
        for child in self.children:
            total += child.get_total_utility()
        return total

    def __repr__(self) -> str:
        return f"{self.item_set}"

    def print_children(self, level=0):
        print("  " * level + f"{self.item_set}")
        for child in self.children:
            child.print_children(level + 1)

In [343]:
def sort_children_by_twu_and_utility(items: list[PHUNode]) -> list[PHUNode]:
    def sort_key(node: PHUNode) -> tuple:
        item = next(iter(node.item_set))
        return (0 if item.twu > 0 else 1, item.twu)
    return sorted(items, key=sort_key)

In [344]:
def get_right_sibling(current_node: PHUNode):
    if current_node.parent is None:
        return []

    siblings = current_node.parent.children

    if not siblings or current_node not in siblings:
        return []

    current_index = siblings.index(current_node)
    if current_index < len(siblings) - 1:
        return siblings[
            current_index + 1 :
        ]  
    else:
        return []  

In [345]:
def build_subtree(
    node_x: PHUNode,
    database: list[Transaction],
    min_util: int,
    min_prob: int,
    collection: list[set[Item]],
):
    if not is_potential_candidate(node_x.item_set, database, min_util, min_prob):
        return
    right_siblings = get_right_sibling(node_x)

    generates: list[PHUNode] = list()
    for node_y in right_siblings:
        xy = (node_x.item_set - node_y.item_set).union(
            node_y.item_set - node_x.item_set
        )
        xy_twu = calculate_transaction_weight_utility(xy, database)
        if xy_twu >= min_util:
            z_item_set = node_x.item_set.union(node_y.item_set)
            z_p, z_u, z_ru = calculate_utilities_of_item_set(z_item_set, database)
            node_z = PHUNode(z_item_set, z_u, z_ru, z_p, [], node_x)
            node_x.add_child(node_z)
            generates.append(node_z)
            if is_phui(z_item_set, database, min_util, min_prob):
                collection.append(z_item_set)
    for gen_node in generates:
        build_subtree(gen_node, database, min_util, min_prob, collection)

In [346]:
def build_tree(
    database: list[Transaction], item_list: set[Item], min_util: int, min_prob: int
):
    root: PHUNode = PHUNode()
    collection: list[set[Item]] = list()
    for item in item_list:
        p, u, ru = calculate_utilities_of_item_set({item}, database)
        twu = calculate_transaction_weight_utility({item}, database)
        if p >= min_prob * len(database) and twu >= min_util:
            new_node = PHUNode({item}, u, ru, p, [], root)
            root.add_child(new_node)
            item.twu = twu
            if is_phui({item}, database, min_util, min_prob):
                collection.append({item})
        root.children = sort_children_by_twu_and_utility(root.children)
    for child in root.children:
        build_subtree(child, database, min_util, min_prob, collection)
    return collection

In [347]:
x = build_tree(database, item_list, 25, 0.05)
print(x)
len(x)

[{i2}, {i5}, {i5, i2}]


3

In [348]:
print(item_list)
y = high_utility_mining_apriori(database, item_list, 40, 0.05)
print(y)
len(y)

[i1, i2, i3, i4, i5, i6, i7, i8, i9, i10]
[{i5}, {i5, i2}]


2

In [349]:
z = preparation_procedure(database, item_list, 40, 0.05)
print(z)
len(z)

[{i5, i2}, {i5}]


2