In [None]:
# get dataset
! curl -L -o actors.csv.zip "https://www.kaggle.com/api/v1/datasets/download/gsimonx37/letterboxd/actors.csv"
! unzip actors.csv.zip
! rm actors.csv.zip

# get whole repo if running in google colab
! git clone https://github.com/mattia01017/movie-actor-mb-analysis
! pip install -r movie-actor-mb-analysis/requirements.txt

# setup Spark
import os
import findspark
findspark.init()

In [None]:
import json
import numpy as np
from pympler.asizeof import asizeof
from typing import Iterable
from collections import Counter
from itertools import combinations, count
from functools import reduce
from pyspark.sql import SparkSession
from pyspark import RDD
import gc

spark = SparkSession.builder\
    .appName("movie-actor-mb-analysis")\
    .getOrCreate()

# Market-basket analysis of Letterboxd dataset

## Preprocessing

Starting from a csv table that associate film identifiers to actors, we generate the list of baskets and save it to disk. The list is sorted by movie ID to obtain a predictable order and is not strictly needed.

In [None]:
df = spark.read.csv("actors.csv", header=True, sep=",", mode="DROPMALFORMED")
df.rdd\
    .map(lambda x: (x["id"], x["name"]))\
    .groupByKey()\
    .sortByKey()\
    .map(lambda x: json.dumps(list(x[1])))\
    .saveAsTextFile("baskets")

We define an iterator that implement a lazy loading of baskets from files. In this way, we can load in memory a basket at a time instead of the whole dataset.

In [2]:
class Baskets:
    def __init__(self, parts_path) -> None:
        self.parts_path = parts_path
        
    def _file_name(self):
        return "{0}/part-{1:0>5}".format(self.parts_path, self.part)
        
    def __iter__(self):
        self.part = 0
        self.file = open(self._file_name())
        return self
    
    def __next__(self):
        try:
            line = next(self.file)
        except StopIteration: 
            self.file.close()
            self.part += 1
            try:
                self.file = open(self._file_name())
                return next(self)
            except FileNotFoundError:
                raise StopIteration
        return tuple(json.loads(line))
    

## Algorithms

For the analysis, the Savasere, Omiecinski and Navathe (SON) algorithm will be implemented using the Park, Cheng and Yu (PCY) algorithm for retrieving frequent itemsets in the chunks.

### PCY

First, a very simple class representing a bitmap useful for the PCY algorithm implementation is defined.

In [3]:
class Bitmap:
    def __init__(self, bits_arr: list) -> None:
        self.bytes = np.packbits(bits_arr, bitorder="little")

    def get(self, index: int) -> bool:
        return bool(self.bytes[index // 8] & pow(2, index % 8))

    def set(self, index: int):
        self.bytes[index // 8] |= pow(2, index % 8)

    def __repr__(self) -> str:
        return " ".join(["{0:08b}".format(b) for b in self.bytes])

After that, the PCY algorithm can be implemented. The garbage collector is manually triggered for deleting from memory the counters immediately after the bitmap creation.

In [12]:
def hash_t(itemset: tuple) -> int:
    """hash tuple ignoring order"""
    return reduce(lambda p, c: p ^ hash(c), itemset, 0)


def _get_candidate_items(
    freq_itemset: frozenset, basket: tuple, last_freq_iset: list[tuple], s: int
):
    """This helper function take the frequent items in a basket, if those appear at least s-1 times
    in subsets of size s-1, then they must be in a candidate subset of size s"""
    basket_freq_items = freq_itemset.intersection(basket)
    items_in_freq_subset = (
        item for iset in last_freq_iset for item in iset if item in basket_freq_items
    )
    return [item for item, cnt in Counter(items_in_freq_subset).items() if cnt >= s - 1]


def apriori(
    baskets: Iterable[tuple[str]],
    threshold: int,
    freq_items: list[str],
    freq_couples: list[tuple],
    iset_len_limit: int | None = None,
) -> list[tuple]:
    """Apriori algorithm starting from frequent couples

    Args:
        baskets (Iterable[tuple[str]]): the baskets
        threshold (int): the threshold over which an itemset is frequent
        freq_items (list[str]): items found to be frequent, assumed to be non descending sorted
        freq_couples (set[tuple]): couples found to be frequent
        iset_len_limit (int | None, optional): the maximum cardinality of itemsets to consider. Defaults to None.

    Returns:
        list[tuple]: all the frequent itemsets, including the couples
    """
    freq_items_set = frozenset(freq_items)
    result = freq_couples
    last_freq_iset = freq_couples
    sizes = iset_len_limit and range(3, iset_len_limit + 1) or count(3)

    for s in sizes:
        counters = Counter()
        for basket in baskets:
            candidate_items = _get_candidate_items(
                freq_items_set, basket, last_freq_iset, s
            )
            for itemset in combinations(candidate_items, s):
                counters[tuple(sorted(itemset))] += 1
        new_frequent = [
            itemset for itemset, count in counters.items() if count > threshold
        ]
        if len(new_frequent) == 0:
            break
        result.extend(new_frequent)
        last_freq_iset = new_frequent

    return list(result)


def PCY(
    baskets: Iterable[tuple[str]],
    threshold: int,
    buckets: int,
    iset_len_limit: int | None = None,
) -> list[tuple]:
    """The PCY algorithm

    Args:
        baskets (Iterable[tuple[str]]): the baskets
        threshold (int): the threshold over which an itemset is frequent
        buckets (int): the number of buckets to use for counting pairs
        iset_len_limit (int | None, optional): the maximum cardinality of itemsets to consider. Defaults to None.

    Returns:
        list[tuple]: frequent itemsets, including false positives
    """
    item_counts = Counter()
    itemset_counts = np.zeros(buckets, dtype=np.uint32)

    for basket in baskets:
        for item in basket:
            item_counts[item] += 1
        for itemset in combinations(basket, 2):
            itemset_counts[hash_t(itemset) % buckets] += 1

    freq_items = [item for item, count in item_counts.items() if count > threshold]
    del item_counts
    gc.collect()

    bitmap = Bitmap([count > threshold for count in itemset_counts])
    del itemset_counts
    gc.collect()

    freq_couples = [
        tuple(sorted(itemset))
        for itemset in combinations(freq_items, 2)
        if bitmap.get(hash_t(itemset) % buckets)
    ]

    return apriori(baskets, threshold, freq_items, freq_couples, iset_len_limit)

In this implementation, the itemsets with cardinality larger than 2 are obtained using the apriori algorithm, as a low number of those itemset is expected to be frequent.

PCY alone can be used to retrieve frequent itemset (buckets) using a single node for computation. To avoid long execution times, in this section only frequent couples will be computed. Unsetting the `iset_len_limit` optional parameter in the cell below (or setting it to an higher value) will force the search to look for frequent itemsets with higher cardinality. The next cell should take around 7 minutes in a Google Colab CPU runtime.  

In [None]:
candidates = PCY(Baskets("baskets"), 100, int(1e8))
len(candidates)

The result contains also false positive couples, that is infrequent couples put in frequent buckets. To remove false positives, another pass is needed: we count occurrences of pairs in baskets and we discard the ones that don't reach a threshold.

In [14]:
def count_occurrences(
    baskets: Iterable[tuple],
    candidates: Iterable[tuple],
) -> list[tuple[tuple, int]]:
    """Count occurrences of candidates in baskets

    Args:
        baskets (Iterable[tuple]): the baskets
        candidates (Iterable[tuple]): the candidate itemsets
        iset_len_limit (int | None, optional): the maximum cardinality of itemsets to consider. Defaults to None.

    Returns:
        list[tuple[tuple, int]]: a list of itemsets with the occurrences
    """
    counts = Counter(
        itemset
        for basket in baskets
        for itemset in candidates
        if frozenset(basket).issuperset(itemset)
    )
    return list(counts.items())

In [None]:
frequent = [
    x
    for x in sorted(
        count_occurrences(Baskets("baskets"), candidates), key=lambda x: -x[1]
    )
    if x[1] > 100
]

with open("PCY_frequent_itemsets.json", "w") as f:
    json.dump([{"set": s[0], "count": s[1]} for s in frequent], f, indent=2)

print("Number of frequent itemsets:", len(frequent))
print("List of frequent itemsets saved in 'PCY_frequent_itemsets.json'")

### SON

Execution times can be improved by using SON, parallelizing the execution of PCY on a number of chunks and combining the results. The Apache Spark framework is used for the implementation of the SON algorithm. 

We start by loading the basket files:

In [16]:
df = spark.read.text("baskets")
baskets: RDD = df.rdd.map(lambda row: tuple(json.loads(row.value)))

In [None]:
baskets.count()

### Memory usage

The objective is to have the maximum memory usage without swapping and thus thrashing. The main elements to store in memory are:
- The hash table of item counters
- The array of bucket counters
- The bitmap of frequent buckets

The memory usage of the bitmap and the array of counters is easy to predict given the size, more tricky is doing it for the hash table of counters. For this purpose, we use a tool for observing memory behaviour of Python objects, namely Pympler. The `asizeof` method return an approximation of the memory usage of an object.

We measure the size of the `Counter` object after counting all items.

In [None]:
counter = Counter(baskets.flatMap(lambda x: x).collect())
print("{0:.3f} MB".format(asizeof(counter) / 1e6))

Thus, we can assume that a single node won't use more than 200 MB for storing the item counters. The remaining space can be used to store the bucket counters. Assuming we want to use up to 2 GB of memory for each computing node, we can use a number of buckets with 32-bit unsigned integer counters equal to:
$$
\frac{2 \cdot 10^9 \text{ B} - 2 \cdot 10^8 \text{ B}}{4 \text{ B}} = 4.5 \cdot 10^8 \text{ buckets}
$$

while the bitmap will occupy $1/32$ of the space, that is $56 \text{ MB}$.

The last parameters to tune are the thresholds for labelling an itemset as frequent. This will be chosen experimentally in a way to obtain an output of reasonable size, say around 50 itemsets in the whole basket list. 

### Map-reduce implementation

To avoid long times in resource constrained environments, only a randomly sampled list of baskets is used during the analysis. For the same reason, the used number of bucket will be smaller than the one computed above.

The result of the analysis on the original dataset using $10^7$ buckets and a threshold of 100 is reported in the `frequent_itemset.json` file in the repository. The next cell shows an execution on a much smaller set of baskets, considering only couples. To search from larger baskets and to retrieve itemsets with larger cardinality, simply increase respectively the `SAMPLE_FRACTION` and `ISET_LEN_LIMIT` constants, possibily setting to `None` the latter to remove the cardinality limit.

The cell below takes around 8 minutes to generate a result in a Google Colab CPU runtime.

In [None]:
BUCKETS = int(1e7)
THRESHOLD = 30
ISET_LEN_LIMIT = 2
SAMPLE_FRACTION = .2
CHUNKS = 5

if SAMPLE_FRACTION < 1:
    baskets = baskets.sample(False, SAMPLE_FRACTION, 2)

baskets.repartition(CHUNKS)

def SON(baskets: RDD, threshold: int, buckets: int) -> list[tuple]:
    """The SON algorithm

    Args:
        baskets (RDD): The Spark RDD containing baskets
        threshold (int): the threshold over which an itemset is frequent
        buckets (int): the number of buckets to use for counting pairs

    Returns:
        list[tuple]: the frequent itemsets
    """
    num_chunks = baskets.getNumPartitions()
    candidates = (
        baskets.mapPartitions(
            lambda chunk: PCY(
                list(chunk), threshold // num_chunks, buckets, ISET_LEN_LIMIT
            )
        )
        .distinct()
        .collect()
    )
    frequent_itemsets = (
        baskets.mapPartitions(
            lambda chunk: count_occurrences(list(chunk), candidates)
        )
        .reduceByKey(lambda a, b: a + b)
        .filter(lambda x: x[1] > THRESHOLD)
        .collect()
    )
    frequent_itemsets.sort(key=lambda x: -x[1])
    return frequent_itemsets


frequent_itemsets = SON(baskets, THRESHOLD, BUCKETS)

for x in frequent_itemsets[:5]:
    print(x)
with open("SON_frequent_itemsets.json", "w") as f:
    json.dump([{"set": s[0], "count": s[1]} for s in frequent_itemsets], f, indent=2)

print("Number of frequent itemsets:", len(frequent_itemsets))
print("List of frequent itemsets saved in 'SON_frequent_itemsets.json'")