### Algorithms for Massive Data: Market-basket Analysis of MedAL Dataset
#### The task is to implement a system finding frequent itemsets (aka market-basket analysis), analyzing the MeDAL dataset described in Project 1. The detector must consider as baskets the strings contained in the text column of the  full-data.csv file in the dataset, using words as items.


#### link to the dataset: https://www.kaggle.com/datasets/xhlulu/medal-emnlp

In [1]:
import pandas as pd
import numpy as np
import itertools
from nltk import word_tokenize
from google.colab import drive
import time
from tqdm import tqdm
import matplotlib.pyplot as plt
import json
import zipfile
import seaborn as sns

In [2]:
# install OpenJDK 8 and download and extract Apache Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz

In [3]:
#configuring the Java and Spark installations in your Python environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

In [4]:
#to locate and initialize Spark within a Python environment
!pip install -q findspark
import findspark
findspark.init()
#import the necessary modules from pyspark.sql to work with Spark
#create a SparkSession object
#obtain the SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [5]:
# Insert your own kaggle keys
os.environ['KAGGLE_USERNAME'] = " "
os.environ['KAGGLE_KEY'] = " "
!kaggle datasets download -d xhlulu/medal-emnlp --unzip

Downloading medal-emnlp.zip to /content
100% 6.82G/6.82G [05:16<00:00, 25.4MB/s]
100% 6.82G/6.82G [05:16<00:00, 23.1MB/s]


In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
# Specify the schema
schema = StructType([
    StructField("TEXT", StringType()),
    StructField("LOCATION", StringType()),
    StructField("LABEL", StringType())
])

# Retrieve the data
medal = spark.read.csv("full_data.csv", schema=schema, header=True, sep=",")
medal.head(2)

[Row(TEXT='alphabisabolol has a primary antipeptic action depending on dosage which is not caused by an alteration of the phvalue the proteolytic activity of pepsin is reduced by percent through addition of bisabolol in the ratio of the antipeptic action of bisabolol only occurs in case of direct contact in case of a previous contact with the ATP the inhibiting effect is lost', LOCATION='56', LABEL='substrate'),
 Row(TEXT='a report is given on the recent discovery of outstanding immunological properties in ba ncyanoethyleneurea having a low molecular mass m experiments in ds CS bearing wistar rats have shown that ba at a dosage of only about percent ld mg kg and negligible lethality percent results in a REC rate of percent without hyperglycemia and in one test of percent with hyperglycemia under otherwise unchanged conditions the REF substance ifosfamide if a further development of cyclophosphamide applied without hyperglycemia in its most efficient dosage of percent ld mg kg brought a

In [7]:
# Select the column "text"
medal = medal.select("TEXT")
# Take a random sample of 0.001 later increase the proportion as an experiment
subset = medal.sample(False, 0.0001, seed=12)

In [8]:
# Convert the dataframe to rdd and take the column "text"
# rdd is a data structure that stores data across multiple machines in a cluster
rdd = subset.rdd.map(lambda x: x[0])
subset.count()

1397

In [9]:
#inspect
subset.take(1)

[Row(TEXT='the presence of a growing tumor can lead to a significant curtailment of a graftversushost reaction as measured by the ability of allogeneic SP cells to induce a host vascular response this interference with the normal pattern of immunological reactions may be a reason for the survival of tumors in an immunologically alien environment')]

In [10]:
import string
import nltk
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
# Download required nltk data
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw-1.4')

# Define the function for preprocessing text
def preprocess(row):
    from nltk.corpus import stopwords
    # Convert the text to lowercase
    text = row.lower()

    # Remove punctuation
    text = text.translate(str.maketrans('', '', string.punctuation))

    # Remove stopwords
    stop_words = set(stopwords.words('english'))

    # Tokenize
    word_tokens = word_tokenize(text)
    filtered_words = [word for word in word_tokens if word not in stop_words]

    # Lemmatize the words
    lemmatizer = WordNetLemmatizer()
    lemmatized_words = [lemmatizer.lemmatize(word) for word in filtered_words]

    # Return the preprocessed text as a list of unique words
    return list(set(lemmatized_words))


# Preprocess
rdd = rdd.map(preprocess)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...


In [11]:
#create_vocabulary function creates a vocabulary of unique words and their corresponding indices
def create_vocabulary(rdd):

    # Assign a index to each word
    #ony unique words
    integer2word = rdd.flatMap(lambda line:line)\
                      .distinct().zipWithIndex()\
                      .map(lambda x: (x[1], x[0]))\
                      .collectAsMap()

    # Reverse the key, value pairs
    word2integer = {value:key for key, value in integer2word.items()}

    return integer2word, word2integer

# Create the vocabularies
# these will be used for decoding and encoding itemsets
# encoding is needed to run items in market basket analysis
# decoding is needed to map the results back to words
#itemsets represent items that frequently occur together
# since itemsets are represented as sets of unique identifier
integer2word, word2integer = create_vocabulary(rdd)

In [12]:
# Substitute each word by its corresponding index
# take an RDD and a vocabulary dictionary as inputs
# substitute each word in the RDD with its corresponding index from the vocabulary.
def create_indexing(rdd, vocab):
 return rdd.map(lambda sentence: [vocab[word] for word in sentence])

# the result is a new rdd where each sentence hsa been transformed into a sequence of indices
rdd = create_indexing(rdd, word2integer)

In [13]:
# Define a threshold
# is set at 10% of the total rdd
threshold = rdd.count()*0.05
threshold
# Define a list for the final output
results = []
# Compute frequent singletons using Apriori algorithm
# iterate over each sentence and create key-value pairs
# for each word, a tuple is created with the word and value is 1. This step counts each word
# then, aggregate the counts of each word
# then, filter the pairs based on the threshold set before
# only key-value pairs above the threshold are retained
freq_singletons = rdd.flatMap(lambda sentence: [(word, 1) for word in sentence])\
        .reduceByKey(lambda w1, w2: w1+w2)\
        .filter(lambda t: t[1]>threshold)

In [14]:
# top 5 frequent singletons by their index
# each element represents a word and its count
# all these singletons occur more frequently than the threshold
# this step identifies words that occur frequently enough to be considered for further analysis
freq_singletons.take(5)

[(16498, 87), (4972, 108), (10962, 71), (10398, 98), (5314, 189)]

In [15]:
# retrieves all the elements from the frequent_singletons RDD
# calculates the total number of elements
len(freq_singletons.collect())

180

In [16]:
# Add frequent singletons to result
results += freq_singletons.collect()
results

[(16498, 87),
 (4972, 108),
 (10962, 71),
 (10398, 98),
 (5314, 189),
 (18761, 83),
 (4298, 107),
 (3620, 77),
 (6671, 171),
 (15146, 90),
 (14469, 102),
 (14470, 106),
 (6674, 123),
 (5318, 93),
 (4640, 108),
 (17861, 147),
 (6336, 222),
 (17862, 89),
 (5320, 102),
 (4304, 110),
 (8033, 74),
 (8825, 102),
 (8826, 164),
 (16510, 114),
 (8827, 73),
 (5663, 87),
 (6342, 90),
 (18208, 274),
 (8829, 137),
 (2276, 135),
 (11881, 93),
 (9170, 155),
 (11092, 86),
 (696, 127),
 (6007, 74),
 (13805, 135),
 (11094, 85),
 (9852, 281),
 (18214, 109),
 (3976, 144),
 (18215, 113),
 (12566, 191),
 (9854, 101),
 (18555, 130),
 (16974, 122),
 (18217, 79),
 (1380, 75),
 (7370, 357),
 (12569, 225),
 (9857, 87),
 (9518, 78),
 (9858, 280),
 (7034, 95),
 (18901, 91),
 (7375, 135),
 (8848, 94),
 (7380, 108),
 (13257, 150),
 (10773, 129),
 (15974, 168),
 (16653, 153),
 (17331, 75),
 (15975, 139),
 (13603, 105),
 (13942, 118),
 (15976, 72),
 (10779, 83),
 (14283, 110),
 (15978, 222),
 (17675, 84),
 (17336, 71)

In [17]:
# Discard the counter
#rdd object now contains only the associated word index, without the counts
freq_singletons = freq_singletons.map(lambda t: t[0])

In [18]:
#apply cartersian trsnformation, by creating all possible combinations of frequent singletons
#apply filter to the pairs generated. Filter out pairs that contain duplicate elements
#convert each pair into a tuple. Itemsets with the same elements in different orders are consistent
#remove duplicates from rdd
#collect all items from rdd into a list
candidates = freq_singletons.cartesian(freq_singletons)\
                                .filter(lambda t: len(set(t))==len(t))\
                                .map(lambda t: tuple(sorted(t)))\
                                .distinct()\
                                .collect()

In [19]:
# Compute frequent pairs
freq_pairs = rdd.flatMap(lambda sentence: [(tuple(candidate), 1) for candidate in candidates if set(candidate).issubset(set(sentence))])\
                    .reduceByKey(lambda t1,t2: t1+t2)\
                    .filter(lambda t: t[1]>threshold)

#inspect
freq_pairs.take(5)

[((7370, 15131), 102),
 ((1559, 15473), 70),
 ((7370, 13777), 88),
 ((14293, 15473), 73),
 ((9321, 15312), 95)]

In [20]:
len(freq_pairs.collect())

63

In [21]:
results+=freq_pairs.collect()

In [22]:
len(results)

243

In [23]:
# Discard the count in freq_pairs
freq_pairs = freq_pairs.map(lambda t: t[0])

In [24]:
# Compute pairs for next iteration
candidates = freq_pairs.cartesian(freq_pairs)\
                          .map(lambda tuples: {item for tupla in tuples for item in tupla})\
                          .filter(lambda t: len(t) == 3)\
                          .map(lambda t: tuple(sorted(t)))\
                          .distinct()\
                          .collect()

candidates[1:10]

[(6336, 10578, 15473),
 (15312, 17676, 18019),
 (864, 9852, 15473),
 (6336, 9321, 15473),
 (7370, 10578, 15312),
 (2103, 9321, 15473),
 (9321, 13777, 15312),
 (12569, 14293, 15473),
 (6336, 15131, 15312)]

In [25]:
# Compute frequent triples
freq_triples = rdd.flatMap(lambda sentence: [(tuple(candidate), 1) for candidate in candidates if set(candidate).issubset(set(sentence))])\
                    .reduceByKey(lambda t1,t2: t1+t2)\
                    .filter(lambda t: t[1]>threshold)

In [26]:
freq_triples.take(1)

[]

In [27]:
len(freq_triples.collect())

0

In [28]:
results += freq_triples.collect()

In [29]:
freq_triples = freq_triples.map(lambda t: t[0])

In [30]:
results

[(16498, 87),
 (4972, 108),
 (10962, 71),
 (10398, 98),
 (5314, 189),
 (18761, 83),
 (4298, 107),
 (3620, 77),
 (6671, 171),
 (15146, 90),
 (14469, 102),
 (14470, 106),
 (6674, 123),
 (5318, 93),
 (4640, 108),
 (17861, 147),
 (6336, 222),
 (17862, 89),
 (5320, 102),
 (4304, 110),
 (8033, 74),
 (8825, 102),
 (8826, 164),
 (16510, 114),
 (8827, 73),
 (5663, 87),
 (6342, 90),
 (18208, 274),
 (8829, 137),
 (2276, 135),
 (11881, 93),
 (9170, 155),
 (11092, 86),
 (696, 127),
 (6007, 74),
 (13805, 135),
 (11094, 85),
 (9852, 281),
 (18214, 109),
 (3976, 144),
 (18215, 113),
 (12566, 191),
 (9854, 101),
 (18555, 130),
 (16974, 122),
 (18217, 79),
 (1380, 75),
 (7370, 357),
 (12569, 225),
 (9857, 87),
 (9518, 78),
 (9858, 280),
 (7034, 95),
 (18901, 91),
 (7375, 135),
 (8848, 94),
 (7380, 108),
 (13257, 150),
 (10773, 129),
 (15974, 168),
 (16653, 153),
 (17331, 75),
 (15975, 139),
 (13603, 105),
 (13942, 118),
 (15976, 72),
 (10779, 83),
 (14283, 110),
 (15978, 222),
 (17675, 84),
 (17336, 71)

In [31]:
# Re-convert the indexes
results = [(integer2word[t[0]],t[1]) if type(t[0])==int else (tuple([integer2word[element] for element in t[0]]),t[1]) for t in results]

In [32]:
results

[('structure', 87),
 ('condition', 108),
 ('cancer', 71),
 ('presence', 98),
 ('increased', 189),
 ('approach', 83),
 ('similar', 107),
 ('detected', 77),
 ('present', 171),
 ('specific', 90),
 ('identified', 102),
 ('risk', 106),
 ('t3', 123),
 ('region', 93),
 ('respectively', 108),
 ('use', 147),
 ('one', 222),
 ('treated', 89),
 ('therapy', 102),
 ('day', 110),
 ('whether', 74),
 ('demonstrated', 102),
 ('high', 164),
 ('within', 114),
 ('total', 73),
 ('following', 87),
 ('induced', 90),
 ('may', 274),
 ('could', 137),
 ('model', 135),
 ('h', 93),
 ('human', 155),
 ('interaction', 86),
 ('three', 127),
 ('cause', 74),
 ('expression', 135),
 ('technique', 85),
 ('also', 281),
 ('rat', 109),
 ('type', 144),
 ('several', 113),
 ('t0', 191),
 ('performed', 101),
 ('potential', 130),
 ('number', 122),
 ('animal', 79),
 ('reduction', 75),
 ('patient', 357),
 ('used', 225),
 ('mm', 87),
 ('evaluated', 78),
 ('using', 280),
 ('reported', 95),
 ('measured', 91),
 ('year', 135),
 ('among', 

###Experiment: Adjusting threshold since previous implementation does not produce a lot of medically meaningful results - adjust minimum support, minimum confidence, itemset size

In [33]:
"""
    Generate frequent itemsets using the Apriori algorithm.

    Inputs:
        rdd (DataFrame): The input rdd containing the data to be analyzed.
        perc_threshold (float): The threshold percentage of frequency below which an item is pruned.
        max_itemset_size (int): The maximum size of the itemset to generate. If None, no limit is imposed.

    Returns:
        list: A list of frequent itemsets.
    """

def apriori(rdd, perc_threshold=.01, max_itemset_size = "none"):
  # Compute the threshold on the basis of the percentage received as argument
  threshold = rdd.count()*perc_threshold

  # Preprocess the rdd
  rdd = rdd.map(preprocess)

  # Create the vocabularies: word2integer and integer2word
  integer2word, word2integer = create_vocabulary(rdd)

  # Index the rdd
  rdd = create_indexing(rdd, word2integer)

  # Define a counter, a flag, a list for storing the results and for storing the candidates
  iterate = True
  results = []
  candidates = []

  # Compute frequent singletons
  frequent_singletons = rdd.flatMap(lambda sentence: [(word, 1) for word in sentence])\
        .reduceByKey(lambda w1, w2: w1+w2)\
        .filter(lambda t: t[1]>threshold)

  # Add the frequent singletons to the result
  results += frequent_singletons.collect()

  # Remove the count
  frequent_singletons = frequent_singletons.map(lambda t:t[0])

  # Comput candidates for next iteration by computing the cartesian product between
  # the list of frequent singletons and itself. Discard the couples with the same
  # element repeated and couples with same elements in different order
  candidates = frequent_singletons.cartesian(frequent_singletons)\
                                .filter(lambda t: len(set(t))==len(t))\
                                .map(lambda t: tuple(sorted(t)))\
                                .distinct()\
                                .collect()

  # Define the candidate size, starting from triples
  candidate_size = 3

  while iterate:

    # If the maximum itemset size is 1, exit the loop
    if max_itemset_size ==1:
      break

    # Filtering Phase
    frequent_k_itemset = rdd.flatMap(lambda sentence: [(tuple(candidate), 1) for candidate in candidates if set(candidate).issubset(set(sentence))])\
                    .reduceByKey(lambda t1,t2: t1+t2)\
                    .filter(lambda t: t[1]>threshold)

    # Add the filtered items to the result list
    results += frequent_k_itemset.collect()

    # Remove the count
    frequent_k_itemset = frequent_k_itemset.map(lambda t:t[0])

    # Compute candidates for the next iteration
    candidates = frequent_k_itemset.cartesian(frequent_k_itemset)\
                          .map(lambda tuples: {item for tupla in tuples for item in tupla})\
                          .filter(lambda t: len(t) == candidate_size)\
                          .map(lambda t: tuple(sorted(t)))\
                          .distinct()\
                          .collect()

    # Iterate until the candidate set is empty or the max itemset size has been reached
    if ((max_itemset_size=="none") and (len(candidates) == 0)) or ((max_itemset_size != "none") and (candidate_size-1 == max_itemset_size)):
      iterate = False

    candidate_size += 1

  return [(integer2word[t[0]],t[1]) if type(t[0])==int else (tuple([integer2word[element] for element in t[0]]),t[1]) for t in results]

In [34]:
# The 0.03 value sets the minimum support threshold to 3% of the dataset size
#results = apriori(subset, 0.03)
results

[('structure', 87),
 ('condition', 108),
 ('cancer', 71),
 ('presence', 98),
 ('increased', 189),
 ('approach', 83),
 ('similar', 107),
 ('detected', 77),
 ('present', 171),
 ('specific', 90),
 ('identified', 102),
 ('risk', 106),
 ('t3', 123),
 ('region', 93),
 ('respectively', 108),
 ('use', 147),
 ('one', 222),
 ('treated', 89),
 ('therapy', 102),
 ('day', 110),
 ('whether', 74),
 ('demonstrated', 102),
 ('high', 164),
 ('within', 114),
 ('total', 73),
 ('following', 87),
 ('induced', 90),
 ('may', 274),
 ('could', 137),
 ('model', 135),
 ('h', 93),
 ('human', 155),
 ('interaction', 86),
 ('three', 127),
 ('cause', 74),
 ('expression', 135),
 ('technique', 85),
 ('also', 281),
 ('rat', 109),
 ('type', 144),
 ('several', 113),
 ('t0', 191),
 ('performed', 101),
 ('potential', 130),
 ('number', 122),
 ('animal', 79),
 ('reduction', 75),
 ('patient', 357),
 ('used', 225),
 ('mm', 87),
 ('evaluated', 78),
 ('using', 280),
 ('reported', 95),
 ('measured', 91),
 ('year', 135),
 ('among', 