In [1]:
import itertools
import gc # for removing rdds from memory
import pandas as pd
import numpy as np
from pyspark import SparkContext

In [2]:
sc = SparkContext(master='local', appName="Assignment1_E1")

22/04/24 18:37:33 WARN Utils: Your hostname, Luiss-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.126 instead (on interface en0)
22/04/24 18:37:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/24 18:37:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#hdfs dfs -mkdir -p data
#hdfs dfs -put data/small_conditions.csv data/

In [40]:
SUPPORT_THRESHOLD = 1000
data = sc.textFile("data/conditions.csv.gz")
header = data.first() #extract header

                                                                                

In [41]:
# START,STOP,PATIENT,ENCOUNTER,CODE,DESCRIPTION
# PATIENT is the patient identifier
# CODE is a condition identifier 
# DESCRIPTION is the name of the condition

In [42]:
# Reorganizing the data to a list of baskets (lists)
# and removing the header row
lines = data.filter(lambda row: row != header) \
                .map(lambda line: tuple(line.split(",")))

lines.take(3)

                                                                                

[('2017-01-14',
  '2017-03-30',
  '09e4e8cb-29c2-4ef4-86c0-a6ff0ba25d2a',
  '88e540ab-a7d7-47de-93c1-720a06f3d601',
  '65363002',
  'Otitis media'),
 ('2012-09-15',
  '2012-09-16',
  'b0a03e8c-8d0f-4242-9548-40f4d294eba8',
  'e89414dc-d0c6-478f-86c0-d08bac6ad0a2',
  '241929008',
  'Acute allergic reaction'),
 ('2018-06-17',
  '2018-06-24',
  '09e4e8cb-29c2-4ef4-86c0-a6ff0ba25d2a',
  'c14325b0-f7ec-4314-bba8-dddc37f0067d',
  '444814009',
  'Viral sinusitis (disorder)')]

In [35]:
# Freeing memory
del data
gc.collect()

115

In [36]:
conditions = lines.map(lambda x: (int(x[4]), x[5])) \
                .distinct() \
                .collectAsMap()

#conditions

                                                                                

In [37]:
conditions

{65363002: 'Otitis media',
 241929008: 'Acute allergic reaction',
 444814009: 'Viral sinusitis (disorder)',
 33737001: 'Fracture of rib',
 10509002: 'Acute bronchitis (disorder)',
 233678006: 'Childhood asthma',
 195662009: 'Acute viral pharyngitis (disorder)',
 232353008: 'Perennial allergic rhinitis with seasonal variation',
 446096008: 'Perennial allergic rhinitis',
 284551006: 'Laceration of foot',
 283371005: 'Laceration of forearm',
 72892002: 'Normal pregnancy',
 162864005: 'Body mass index 30+ - obesity (finding)',
 283385000: 'Laceration of thigh',
 239873007: 'Osteoarthritis of knee',
 19169002: 'Miscarriage in first trimester',
 156073000: 'Fetus with unknown complication',
 410429000: 'Cardiac Arrest',
 429007001: 'History of cardiac arrest (situation)',
 55822004: 'Hyperlipidemia',
 307731004: 'Injury of tendon of the rotator cuff of shoulder',
 68496003: 'Polyp of colon',
 44465007: 'Sprain of ankle',
 713197008: 'Recurrent rectal polyp',
 36971009: 'Sinusitis (disorder)'

In [43]:
item_baskets = lines.map(lambda x: (x[2], {int(x[4])})) \
                    .reduceByKey(lambda a, b: a | b) \
                    .map(lambda x: x[1])
                    

item_baskets.take(3)

                                                                                

[{47693006, 74400008, 162864005, 428251008, 444814009},
 {15777000, 53741008, 195662009, 239872002, 271737000, 403190006, 444814009},
 {65363002, 195662009}]

In [44]:
total_baskets = item_baskets.count()

                                                                                

1157578

In [12]:
# Freeing memory
del lines
gc.collect()

299

## Apriori Phase 1

In [45]:
# Flat listing all the baskets 
freqItemCounts = item_baskets.flatMap(lambda x: x) \
                    .map(lambda item: (item, 1)) \
                    .reduceByKey(lambda a, b: a + b) \
                    .filter(lambda item: item[1] >= SUPPORT_THRESHOLD) \
                    .sortBy(lambda x: x[1], ascending=False)
                    

# Mapping -> create pairs (item, 1)
#itemPairs = items.map(lambda item: (item, 1))

# Reducing
#itemCounts = itemPairs.reduceByKey(lambda a, b: a + b)

# Keeping only the ones above the support threshold
#freqItemCounts = itemCounts.filter(lambda item: item[1] >= SUPPORT_THRESHOLD)


# Taking the 10 most frequent itemsets for k = 1
#freqItemCounts.takeOrdered(10, key=lambda x: -x[1])

## Intermediate step

In [46]:
# Creating the frequent items table
freq_item_count = freqItemCounts.collect()

freqItemTable = freqItemCounts.map(lambda x: x[0]).collect()

#freqItemTable.take(10)
freq_item_count[0:10]

                                                                                

[(444814009, 751940),
 (195662009, 524692),
 (10509002, 461495),
 (162864005, 365567),
 (271737000, 355372),
 (15777000, 354315),
 (59621000, 305134),
 (40055000, 250239),
 (72892002, 205390),
 (19169002, 201894)]

In [47]:
# In order for a pair to be frequent both its items have to be frequent. 
# As such, we can remove the unfrequent items from the baskets.

# Remove the unfrequent items from the baskets
item_baskets = item_baskets.filter(lambda basket: {item for item in basket if item in freqItemTable}) 

#item_baskets.take(3)

## Phase 2, k = 2

In [48]:
# Generate all the possible pairs / triples / etc from the
# combinations of the items of each basket. 

def freq_n_uple(basket, k):
    candidate_n_uple = itertools.combinations(basket, k)
    for n_uple in candidate_n_uple:
        yield(n_uple, 1)

In [49]:
# counting pairs of frequent items
pairs = item_baskets.flatMap(lambda x: freq_n_uple(x, 2)) \
                    .reduceByKey(lambda v1, v2: v1 + v2) \
                    .filter(lambda x: x[1] >= SUPPORT_THRESHOLD) \
                    .sortBy(lambda x: x[1], ascending=False)
                    
pairs.take(10)

                                                                                

[((15777000, 271737000), 278832),
 ((10509002, 444814009), 250309),
 ((15777000, 444814009), 230525),
 ((271737000, 444814009), 226523),
 ((162864005, 444814009), 209660),
 ((59621000, 444814009), 200430),
 ((444814009, 195662009), 178208),
 ((195662009, 444814009), 165443),
 ((15777000, 10509002), 135606),
 ((162864005, 10509002), 129516)]

In [50]:
frequent_pairs_count = pairs.collect()

In [51]:
# Only worth checking for triples in items that are in frequent pairs.
# So we create a table similar to the frequent items table in order to remove
# unfrequent items from the baskets.
freq_pair_table = pairs.flatMap(lambda x: x[0]) \
                        .distinct() \
                        .collect()

In [52]:
# Removing unfrequent items from the baskets and droping baskets 
# with fewer than 3 items because we need ate least 3 items to make a triple.

item_baskets = item_baskets.filter(lambda basket: {item for item in basket if item in freq_pair_table}) \
                            .filter(lambda x: len(x) > 2 )
                            

#item_baskets.take(3)

## Phase 2, k = 3

In [53]:
triples = item_baskets.flatMap(lambda x: freq_n_uple(x, 3)) \
                    .reduceByKey(lambda v1, v2: v1 + v2) \
                    .filter(lambda x: x[1] >= SUPPORT_THRESHOLD) \
                    .sortBy(lambda x: x[1], ascending=False)

triples.take(10)

                                                                                

[((15777000, 271737000, 444814009), 175553),
 ((162864005, 10509002, 444814009), 88613),
 ((15777000, 10509002, 444814009), 88520),
 ((15777000, 59621000, 271737000), 84223),
 ((15777000, 271737000, 195662009), 81251),
 ((59621000, 271737000, 444814009), 80834),
 ((15777000, 271737000, 40055000), 77874),
 ((162864005, 15777000, 271737000), 73514),
 ((59621000, 10509002, 444814009), 72447),
 ((162864005, 59621000, 444814009), 68954)]

In [54]:
frequent_triples = triples.map(lambda x: x[0]).collect()
#frequent_triples

                                                                                

## Mining Association Rules

In [55]:
# Helper function to convert the codes to the conditions names
def code_to_text(codes):
    if isinstance(codes, tuple):
        return conditions[codes[0]], conditions[codes[1]]
    else:
        return conditions[codes]

In [56]:
# rules in the form {X,Y} -> {Z} plus the support of {X,Y,Z}
pair_rules = triples.flatMap(lambda triple: [(pair, tuple(set(triple[0]) - set(pair))[0], int(triple[1])) for pair in itertools.combinations(triple[0], 2)]).collect()

# rules in the form {X} -> {Y} and {Y} -> {X} plus the support of {X,Y}
single_rules = [[(pair[0][0], pair[0][1], int(pair[1])), (pair[0][1], pair[0][0], int(pair[1]))]
                for pair in frequent_pairs_count]
single_rules = list(itertools.chain.from_iterable(single_rules))

# Auxiliar lists in order to create the dataframes
all_rules = single_rules + pair_rules

all_supports = freq_item_count + frequent_pairs_count

                                                                                

In [57]:
# Auxiliar dataframe to easily map i and j to their respective supports
supports = pd.DataFrame(data=all_supports, columns=['i','support'])
#supports

In [58]:
# Dataframe used to calculate the metrics for the rules
rules = pd.DataFrame(data=all_rules, columns=['i', 'j', 'support_i_j'])
#rules

In [59]:
rules = pd.merge(rules, supports, on=['i'], how='inner')
rules.rename(columns={'support': 'support_i'}, inplace=True)
supports.rename(columns={'i': 'j'}, inplace=True)

rules = pd.merge(rules, supports, on=['j'], how='inner')
rules.rename(columns={'support': 'support_j'}, inplace=True)
rules.sort_values(by=['support_i_j'], inplace=True, ascending=False)
#rules

In [60]:
rules['i'] = rules['i'].apply(lambda x: code_to_text(x))
rules['j'] = rules['j'].apply(lambda x: code_to_text(x))

In [61]:
# Auxiliar columns containing the probabilities of i and j
rules['prob_i'] = rules.support_i / total_baskets
rules['prob_j'] = rules.support_j / total_baskets
# Confidence
rules['confidence'] = rules.support_i_j / rules.support_i
# Interest
rules['interest'] = rules.confidence - rules.prob_j
# Lift
rules['lift'] = rules.confidence / rules.prob_j
# Auxiliar calculation to simplify the std_lift expression
rules['aux_calc'] = np.maximum(rules.prob_i + rules.prob_j - 1, 1 / total_baskets) / (rules.prob_i * rules.prob_j)
# Standard Lift
rules['std_lift'] = (rules.lift - rules.aux_calc) / ((1 / np.maximum(rules.prob_i, rules.prob_j)) - rules.aux_calc)

#rules

In [62]:
rules.drop(columns=['support_i', 'support_j', 'support_i_j', 'prob_i', 'prob_j', 'aux_calc'], inplace=True)
rules.sort_values(by=['std_lift'], ascending=False, inplace=True)
rules.reset_index(inplace=True, drop=True)
rules = rules.loc[rules.std_lift > 0.2]
rules


Unnamed: 0,i,j,confidence,interest,lift,std_lift
0,Hypertriglyceridemia (disorder),Nonproliferative diabetic retinopathy due to t...,0.154029,0.143918,15.232893,1.000000
1,Diabetes,Diabetic retinopathy associated with type II d...,0.264611,0.246939,14.973974,1.000000
2,Hypertriglyceridemia (disorder),Proliferative diabetic retinopathy due to type...,0.039938,0.037317,15.232893,1.000000
3,Diabetes,Nonproliferative diabetic retinopathy due to t...,0.151411,0.141300,14.973974,1.000000
4,(Diabetic retinopathy associated with type II ...,Nonproliferative diabetic retinopathy due to t...,0.572204,0.562092,56.588678,1.000000
...,...,...,...,...,...,...
23699,Seasonal allergic rhinitis,Childhood asthma,0.200157,0.177908,8.996198,0.200105
23700,Childhood asthma,Seasonal allergic rhinitis,0.118967,0.105743,8.996198,0.200105
23701,"(Hypertriglyceridemia (disorder), Metabolic sy...",Miscarriage in first trimester,0.200103,0.025692,1.147309,0.200088
23702,"(Epilepsy, Acute viral pharyngitis (disorder))",Chronic sinusitis (disorder),0.200222,-0.015953,0.926205,0.200074


In [63]:
rules[rules['i']== 'Prediabetes']

Unnamed: 0,i,j,confidence,interest,lift,std_lift
2532,Prediabetes,Hyperglycemia (disorder),0.081222,0.050791,2.669058,0.816948
2805,Prediabetes,Anemia (disorder),0.786961,0.479965,2.563422,0.786960
3659,Prediabetes,Proliferative diabetic retinopathy due to type...,0.006082,0.003460,2.319793,0.709954
3905,Prediabetes,Nonproliferative diabetic retinopathy due to t...,0.022790,0.012679,2.253885,0.689850
4554,Prediabetes,Neuropathy due to type 2 diabetes mellitus (di...,0.050091,0.026957,2.165289,0.662746
...,...,...,...,...,...,...
21651,Prediabetes,Antepartum eclampsia,0.015040,-0.004602,0.765692,0.234332
22696,Prediabetes,Recurrent urinary tract infection,0.006172,-0.002571,0.705970,0.216008
22733,Prediabetes,Bullet wound,0.004637,-0.001953,0.703608,0.215260
23169,Prediabetes,Chronic pain,0.032341,-0.015149,0.681005,0.208430
