# Exercise 1

For this exercise the A-priori algorithm was developed to get the most frequent itemsets (size 2 and 3), and from them extract association rules. This algorithm was implemented using Spark, more specifically the PySpark library with the Dataframe API.

## Imports

PySpark is the only non-standard library required.

In [1]:
import os.path
from pyspark import Broadcast
from pyspark.sql import SparkSession, DataFrame, Column
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, ArrayType
from itertools import combinations, chain
from typing import Iterable, Any, List

## Spark initialization

Spark is initialized, with as many worker threads as logical cores on the machine.
We did not use a fixed value since the machines used for development had a different number of CPU cores.

In [2]:
spark = SparkSession.builder \
    .appName('Apriori') \
    .config('spark.master', 'local[*]') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/18 15:22:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Prepare the data

The dataset is medical data, where each row identifies a patient and their disease tested at a certain time.
In this context, the diseases are the *items* and the patients are the *baskets*.

The data's format is CSV, and is loaded including the header.
The `START`, `STOP` and `ENCOUNTER` columns are removed as they are not useful for this problem.

In [3]:
df = spark.read \
    .option('header', True) \
    .csv('./data/conditions.csv.gz') \
    .drop('START', 'STOP', 'ENCOUNTER')

The dataframe `df` will have three columns: `PATIENT`, `CODE` and `DESCRIPTION`.

From this dataframe, we extract the mappings from `CODE` to its `DESCRIPTION`.
Throughout the algorithm, we will use the `CODE` to identify each disease, and then map it to its `DESCRIPTION` when we output the final results.

We first check how many unique diseases there are, so that we can determine whether we can keep this mapping in memory or not.

In [4]:
# print('Number of distinct CODE-DESCRIPTION pairs:', df.select('CODE', 'DESCRIPTION').distinct().count())
# print('Number of distinct CODEs:', df.select('CODE').distinct().count())

There is discrepancy between the counts due to one `CODE` having 2 different descriptions.
For simplification we simply choose one description over the other.

Since there are only 159 diseases, we can perfectly keep the mapping in memory.
And so, we collect the distinct `CODE`-`DESCRIPTION` pairs into an hash table.

In [5]:
code_description_map = {r.CODE: r.DESCRIPTION
    for r in df \
    .select('CODE', 'DESCRIPTION') \
    .distinct() \
    .collect()
}

                                                                                

For the algorithm, the diseases' `DESCRIPTION`s won't be needed anymore, as we have the mapping.
The distinct `PATIENT`-`CODE` pairs are taken (it doesn't make sense to have duplicate items within a basket).

In [6]:
df = df.drop('DESCRIPTION').distinct()

## A-priori algorithm

We set the support threshold parameter to 1000, as recommended.

In [7]:
support_threshold = 1000

The intermediate results of each pass are saved in disk in Parquet format (Spark's default format).

### First pass

In the first pass, the frequent items are taken.
For that, a "group by" operation is performed, grouping by `CODE` and counting the number of `PATIENTS` that each `CODE` is present in.
Finally, the diseases are filtered according to the `support_threshold`, by comparing with the support stored in `COUNT`.

In [8]:
if not os.path.exists('frequent_diseases_k1'):
    frequent_diseases_k1 = df \
        .groupBy('CODE') \
        .count() \
        .withColumnRenamed('count', 'COUNT') \
        .filter(col('COUNT') >= support_threshold)
    
    frequent_diseases_k1.write.mode('overwrite').parquet(path='frequent_diseases_k1', compression='gzip')

frequent_diseases_k1 = spark.read.parquet('frequent_diseases_k1')

The frequent items table is kept in memory for future passes (in a Python `set`, for quicker membership queries).

In [9]:
frequent_diseases_k1_set = {r.CODE for r in frequent_diseases_k1.select('CODE').collect()}

Since this set of frequent items will be used multiple times by all nodes in the future, then we can broadcast this read-only data beforehand (https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables).

In [10]:
if not isinstance(frequent_diseases_k1_set, Broadcast):
    frequent_diseases_k1_set = spark.sparkContext.broadcast(frequent_diseases_k1_set)

In [11]:
print('Number of frequent items:', len(frequent_diseases_k1_set.value))   # 131

Number of frequent items: 131


### Second pass

The second pass requires generating frequent pairs of items.
For that, an UDF was developed that simply took an array of items and returned the list of item pairs, an operation performed within Python.

In [12]:
@udf(returnType=ArrayType(ArrayType(StringType(), False), False))
def combine_pairs(elems: Iterable[Any]):
    return list(combinations(elems, 2))

First, the `CODE`s are filtered using the `frequent_diseases_k1_set`, so that we only have frequent diseases (*monotonicity of itemsets*: itemsets are only frequent if all their subsets are).
Then, for each `PATIENT` we collect its `CODE`s into an array, and then use that array in the UDF previously defined.

It's important to note that the array of `CODE`s should be sorted beforehand, so that pair comparison can be done properly. Since Spark doesn't have a "set" datatype, the elements should be kept in order so that two pairs (which are arrays) with the same items will be considered equal.
The `combinations` function is guaranteed to keep this order when generating pairs.

The result is a column `CODE_PAIRS`, containing an array of pairs, being each pair an array with two elements.
This column is exploded, producing a row for each pair within the arrays of pairs.

Afterwards, the same grouping procedure in the first pass is performed, grouping by the itemsets and obtaining the number of baskets each itemset belongs to, filtering with the `support_threshold`.

In [13]:
if not os.path.exists('frequent_diseases_k2'):
    frequent_diseases_k2 = df \
        .filter(col('CODE').isin(frequent_diseases_k1_set.value)) \
        .groupBy('PATIENT') \
        .agg(collect_list('CODE')) \
        .withColumn('collect_list(CODE)', array_sort('collect_list(CODE)')) \
        .withColumn('CODE_PAIRS', combine_pairs('collect_list(CODE)')) \
        .select('PATIENT', 'CODE_PAIRS') \
        .withColumn('CODE_PAIR', explode('CODE_PAIRS')) \
        .drop('CODE_PAIRS') \
        .groupBy('CODE_PAIR') \
        .count() \
        .withColumnRenamed('count', 'COUNT') \
        .filter(col('COUNT') >= support_threshold)
    
    frequent_diseases_k2.write.mode('overwrite').parquet(path='frequent_diseases_k2', compression='gzip')

frequent_diseases_k2 = spark.read.parquet('frequent_diseases_k2')

The table of frequent pairs is kept in memory for the third pass.

In [14]:
frequent_diseases_k2_set = {tuple(r.CODE_PAIR) for r in frequent_diseases_k2.select('CODE_PAIR').collect()}

In [15]:
print('Number of frequent pairs:', len(frequent_diseases_k2_set))   # 2940

Number of frequent pairs: 2940


### Third Pass

As was done for the second pass, an UDF was developed that returns an array of triples given an array of items.
This function includes the verification that all $k-1$ immediate subsets of each returned triple are frequent (that is, all pairs within the triple are frequent).

In [16]:
@udf(returnType=ArrayType(ArrayType(StringType(), False), False))
def combine_triples(elems: Iterable[Any]):
    return [
        combination for combination in list(combinations(elems, 3))
        if ((combination[0], combination[1]) in frequent_diseases_k2_set
            and (combination[0], combination[2]) in frequent_diseases_k2_set
            and (combination[1], combination[2]) in frequent_diseases_k2_set)
    ]

The same approach for the second pass was used, merely differing in the UDF used.

In [17]:
if not os.path.exists('frequent_diseases_k3'):
    frequent_diseases_k3 = df \
        .filter(col('CODE').isin(frequent_diseases_k1_set.value)) \
        .groupBy('PATIENT') \
        .agg(collect_list('CODE')) \
        .withColumn('collect_list(CODE)', array_sort('collect_list(CODE)')) \
        .withColumn('CODE_TRIPLES', combine_triples('collect_list(CODE)')) \
        .select('PATIENT', 'CODE_TRIPLES') \
        .withColumn('CODE_TRIPLE', explode('CODE_TRIPLES')) \
        .drop('CODE_TRIPLES') \
        .groupBy('CODE_TRIPLE') \
        .count() \
        .withColumnRenamed('count', 'COUNT') \
        .filter(col('COUNT') >= support_threshold)

    frequent_diseases_k3.write.mode('overwrite').parquet(path='frequent_diseases_k3', compression='gzip')

frequent_diseases_k3 = spark.read.parquet('frequent_diseases_k3')

The table of frequent triples is generated, merely because the same was done for the previous $k$, but it won't be used.

In [18]:
frequent_diseases_k3_set = {tuple(r.CODE_TRIPLE) for r in frequent_diseases_k3.select('CODE_TRIPLE').collect()}

In [19]:
print('Number of frequent triples:', len(frequent_diseases_k3_set))   # 13395

Number of frequent triples: 13395


### Most frequent

For outputting the most frequent itemsets, an UDF was developed merely so the disease `CODE`s are converted to their descriptions.

In [20]:
@udf(returnType=ArrayType(StringType(), False))
def map_codes_to_description(codes: List[str]):
    return [code_description_map[item] for item in codes]

The listing of the 10 frequent pairs/triples is saved in a tab-separated CSV file, which includes the header.
Obtaining the 10 most frequent itemsets involves sorting the respective dataframe in descending order and taking the top 10 results.

In [21]:
with open('most_frequent_k2.csv', 'w') as f:
    print('pair\tcount', file=f)
    print(*(
            f'{r.CODE_PAIR}\t{r.COUNT}' for r in
            frequent_diseases_k2
                .withColumn('CODE_PAIR', map_codes_to_description('CODE_PAIR'))
                .sort('COUNT', ascending=False).take(10)
        ), sep='\n', file=f)

                                                                                

In [22]:
with open('most_frequent_k3.csv', 'w') as f:
    print('triple\tcount', file=f)
    print(*(
            f'{r.CODE_TRIPLE}\t{r.COUNT}' for r in
            frequent_diseases_k3
                .withColumn('CODE_TRIPLE', map_codes_to_description('CODE_TRIPLE'))
                .sort('COUNT', ascending=False).take(10)
        ), sep='\n', file=f)

## Association Rules

Throughout this section, for a rule $A \rightarrow B$ we denote $A$ as the **LHS** (left-hand side) and $B$ as the **RHS** (right-hand side).

In [23]:
standardised_lift_threshold = 0.2


The total number of patients (baskets) is required for calculating the rule metrics.

In [24]:
n_patients = df.select('PATIENT').distinct().count()

                                                                                

Much like in the creation of pairs and triples, an UDF was created so that, taking an itemset as input, produces all association rules with a single element on the RHS.

To do so, a list is generated whose elements are tuples of 2 elements, with the first element being the LHS as an array and the RHS as a single-element array.
A tuple is generated for every element present in an itemset (a pair or a triple).

The LHS has to be sorted so that two LHS arrays with the same elements are considered equal.
The RHS is an array merely for generalization, since the RHS could have multiple elements.

In [25]:
@udf(returnType=ArrayType(ArrayType(ArrayType(StringType(), False), False), False))
def generate_association_rules(itemset: List[str]):
    itemset = set(itemset)
    return [(sorted(itemset - {item}), [item]) for item in itemset]

To generate rules from the frequent pairs, the previous UDF is applied to the dataframe of item pairs (`frequent_diseases_k2`).
The LHS and RHS of the generated subsets is put in two separate columns, `RULE_LHS` and `RULE_RHS` respectively.
The support of pairs is kept in `COUNT_PAIR` to be used when calculating the metrics.

Afterwards, the dataframe is joined with the dataframe containing the frequent items and their support (`frequent_diseases_k1`) on the disease `CODE`s, so that we can obtain the support of the rule's LHS and RHS.
And so, two inner joins are performed, since there it's guaranteed that the rule LHS/RHS is present in the dataframe of frequent items.

In [26]:
rules_k2 = frequent_diseases_k2 \
    .withColumn('RULES', generate_association_rules('CODE_PAIR')) \
    .withColumn('RULES', explode('RULES')) \
    .select(col('RULES')[0].alias('RULE_LHS'), col('RULES')[1].alias('RULE_RHS'), col('COUNT').alias('COUNT_RULE')) \
    .join(frequent_diseases_k1, frequent_diseases_k1['CODE'] == col('RULE_LHS')[0], 'inner') \
    .withColumnRenamed('COUNT', 'COUNT_LHS') \
    .drop('CODE') \
    .join(frequent_diseases_k1, frequent_diseases_k1['CODE'] == col('RULE_RHS')[0], 'inner') \
    .withColumnRenamed('COUNT', 'COUNT_RHS') \
    .drop('CODE')

With the necessary counts on separate columns, the metrics can be easily calculated by referencing the column containing the desired values.
The `CONFIDENCE` column is created first because the result of this calculation is used to calculate the other metrics.
The results are finally filtered according to the `STANDARDISED_LIFT`.

The calculation of metrics is put in a separate function since it will be reused for the rules extracted from triples.

In [27]:
def add_metrics_columns(rule_counts: DataFrame) -> DataFrame:
    return rule_counts \
        .withColumn('CONFIDENCE', col('COUNT_RULE') / col('COUNT_LHS')) \
        .withColumn('INTEREST', col('CONFIDENCE') - col('COUNT_RHS') / n_patients) \
        .withColumn('LIFT', n_patients * col('CONFIDENCE') / col('COUNT_RHS')) \
        .withColumn('STANDARDISED_LIFT', 
                    (col('LIFT') - array_max(array(
                        (col('COUNT_LHS') + col('COUNT_RHS')) / n_patients - 1,
                        lit(1 / n_patients)
                    )) / (col('COUNT_LHS') * col('COUNT_RHS') / (n_patients ** 2)))
                    /
                    ((n_patients / array_max(array(col('COUNT_LHS'), col('COUNT_RHS')))) - array_max(array(
                        (col('COUNT_LHS') + col('COUNT_RHS')) / n_patients - 1,
                        lit(1 / n_patients)
                    )) / (col('COUNT_LHS') * col('COUNT_RHS') / (n_patients ** 2)))
        )

In [28]:
rules_k2_metrics = rules_k2 \
    .transform(add_metrics_columns) \
    .filter(col('STANDARDISED_LIFT') >= standardised_lift_threshold)

For the rules extracted from the frequent triples, a similar approach to the frequent pairs was used, on the dataframe of frequent triples (`frequent_diseases_k3`).

However, the join to obtain the LHS support had to be performed differently.
Since the rules' LHS can be a subset of different sizes (in this case, it can have one or two elements), the joins have to be performed both with the dataframe of frequent items (`frequent_diseases_k1`) and the dataframe of frequent pairs (`frequent_diseases_k2`).

Because of this, the join can't be *inner*, and instead should be a *left join*, so that non-matching `CODE`s aren't removed, but kept with a `null` value on `COUNT`.
To obtain the support of the rules' LHS for instance, we join with `frequent_diseases_k1`, and so the rule LHSs that have one element will have a count while the rule LHSs that have two elements will have a `null` value (`COUNT_LHS`).
Afterwards, joining with `frequent_diseases_k2` provides the counts for rule LHSs that have two elements, while LHSs with one element have a `null` value (`COUNT_LHS_OTHER`).

Both columns `COUNT_LHS` and `COUNT_LHS_OTHER` are "complementary" to each other, that is, for a given row a column has a non-`null` value while the other has `null`.
In order to combine both into a single column `COUNT_LHS`, `coalesce` is used which uses the first non-`null` value of both columns for all rows.

In [29]:
rules_k3 = frequent_diseases_k3 \
    .withColumn('RULES', generate_association_rules('CODE_TRIPLE')) \
    .withColumn('RULES', explode('RULES')) \
    .select(col('RULES')[0].alias('RULE_LHS'), col('RULES')[1].alias('RULE_RHS'), col('COUNT').alias('COUNT_RULE')) \
    \
    .join(frequent_diseases_k1, array(frequent_diseases_k1['CODE']) == col('RULE_LHS'), 'left') \
    .withColumnRenamed('COUNT', 'COUNT_LHS') \
    .drop('CODE') \
    .join(frequent_diseases_k2, frequent_diseases_k2['CODE_PAIR'] == col('RULE_LHS'), 'left') \
    .withColumnRenamed('COUNT', 'COUNT_LHS_OTHER') \
    .drop('CODE_PAIR') \
    .withColumn('COUNT_LHS', coalesce('COUNT_LHS', 'COUNT_LHS_OTHER')) \
    .drop('COUNT_LHS_OTHER') \
    \
    .join(frequent_diseases_k1, array(frequent_diseases_k1['CODE']) == col('RULE_RHS'), 'inner') \
    .withColumnRenamed('COUNT', 'COUNT_RHS') \
    .drop('CODE')

With the counts obtained, calculating the metrics is exactly the same to the rule metrics obtained from the pairs.

In [30]:
rules_k3_metrics = rules_k3 \
    .transform(add_metrics_columns) \
    .filter(col('STANDARDISED_LIFT') >= standardised_lift_threshold)

Finally, both dataframes `rules_k2_metrics` and `rules_k3_metrics` are merged into a dataframe containing all their rows.
After this, we can sort by the `STANDARDISED_LIFT` for printing the results.

In [31]:
assert rules_k2_metrics.columns == rules_k3_metrics.columns, 'The dataframes of rule metrics should have the same columns in the same order!'

rules_metrics = rules_k2_metrics \
    .union(rules_k3_metrics) \
    .sort('STANDARDISED_LIFT', ascending=False)

### Results

To generate the final results, the disease codes are mapped to their respective descriptions and the columns are transformed into a single `String` column.

Then, the dataframe is written to disk, into a partitioned text file.
If the file is to be read, then the following command can be used (if on Linux):

```bash
cat association_rules/part* | less
```

In [32]:
@udf(returnType=StringType())
def format_rule(rule_1: List[str], rule_2: List[str], *values: List[Any]):
    return f'{{{", ".join(rule_1)}}} -> {{{", ".join(rule_2)}}}: {", ".join(map(str, values))}'

In [33]:
rules_metrics \
    .withColumn('RULE_LHS', map_codes_to_description('RULE_LHS')) \
    .withColumn('RULE_RHS', map_codes_to_description('RULE_RHS')) \
    .select(format_rule('RULE_LHS', 'RULE_RHS', 'STANDARDISED_LIFT', 'LIFT', 'CONFIDENCE', 'INTEREST').alias('LINE')) \
    .write.mode('overwrite').text('association_rules')

                                                                                