#### Imports

In [1]:
from pyspark import SparkContext
from datetime import datetime

### Spark


Create a new SparkContext

In [2]:
#local spark
sc = SparkContext(appName="Conditions")
#vocareum
#sc = spark.sparkContext

In [3]:
threshold=1000
min_std_lift=0.2

#### Read data

Here we read the test data and map it by splitting by "," and ignoring the header row.

In [4]:
data = sc.textFile("conditions.csv")
data = data.map(lambda line: line.split(",")).filter(lambda v: v[0]!="START")
data.take(2)


[['2017-01-14',
  '2017-03-30',
  '09e4e8cb-29c2-4ef4-86c0-a6ff0ba25d2a',
  '88e540ab-a7d7-47de-93c1-720a06f3d601',
  '65363002',
  'Otitis media'],
 ['2012-09-15',
  '2012-09-16',
  'b0a03e8c-8d0f-4242-9548-40f4d294eba8',
  'e89414dc-d0c6-478f-86c0-d08bac6ad0a2',
  '241929008',
  'Acute allergic reaction']]

### Reorganize data

Next we map the unique conditions by their code and respective description

In [5]:
condition_map = data.map(lambda line: (int(line[4]),line[5])) \
.distinct()

condition_map.take(5)

[(40275004, 'Contact dermatitis'),
 (126906006, 'Neoplasm of prostate'),
 (399211009, 'History of myocardial infarction (situation)'),
 (97331000119101,
  'Macular edema and retinopathy due to type 2 diabetes mellitus (disorder)'),
 (241929008, 'Acute allergic reaction')]

And save the data  to new file

In [6]:
format_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
condition_map.saveAsTextFile("{0}/{1}".format("condition_map", format_time))


Here we get all the unique patient-condition pairs to a map.

We map by patient-(condition) code

In [7]:
#Map by PATIENT-(CONDITION)CODE, 
patient_rdd2 = data.map(lambda line: (line[2],line[4])) \
.groupByKey() \
.map(lambda tp: [int(j) for j in set(tp[1])]) 

patient_rdd2.cache()
n_buckets = patient_rdd2.count()
patient_rdd2.take(2)

[[65966004, 55822004, 162864005, 24079001],
 [446096008,
  44465007,
  284549007,
  72892002,
  198992004,
  53741008,
  444814009,
  43878008]]

### Most frequent

Here we map the most frequent conditions and their counts.

flatMap: We flatmap them by key, and set their value to 1;
reduceByKey: We reduce by key and make all keys unique and their values their respective counts;
sortBy: We sort it by decreasingly by their count;
filter: And finally we apply the threshold, keeping only the ones that have higher counts than it. 

In [8]:
frequent_singles = patient_rdd2 \
.flatMap(lambda l: [(i,1) for i in l]) \
.reduceByKey(lambda v1,v2:v1+v2) \
.sortBy(lambda a:a[1],False) \
.filter(lambda k: k[1]>threshold)

frequent_singles.take(10)

[(444814009, 751940),
 (195662009, 524692),
 (10509002, 461495),
 (162864005, 365567),
 (271737000, 355372),
 (15777000, 354315),
 (59621000, 305134),
 (40055000, 250239),
 (72892002, 205390),
 (19169002, 201894)]

Once we have the most frequent items we create a set with them

In [9]:
frequent_single_rdd = frequent_singles.map(lambda a: a[0])
frequent_single_set = sc.broadcast(set(frequent_single_rdd.collect()))
frequent_singles.saveAsTextFile("{0}/{1}".format("frequent_singles_count", format_time))
frequent_single_set.value

{1734006,
 5602001,
 6072007,
 7200002,
 10509002,
 15724005,
 15777000,
 16114001,
 19169002,
 22298006,
 24079001,
 26929004,
 30832001,
 33737001,
 35999006,
 36971009,
 38822007,
 39848009,
 40055000,
 40275004,
 43878008,
 44054006,
 44465007,
 47505003,
 47693006,
 49436004,
 53741008,
 55680006,
 55822004,
 58150001,
 59621000,
 62106007,
 62564004,
 64859006,
 65275009,
 65363002,
 65966004,
 68496003,
 69896004,
 70704007,
 72892002,
 74400008,
 75498004,
 79586000,
 80394007,
 82423001,
 83664006,
 84757009,
 87433001,
 88805009,
 90560007,
 92691004,
 93761005,
 94260004,
 95417003,
 109838007,
 110030002,
 126906006,
 127013003,
 128613002,
 156073000,
 162573006,
 162864005,
 185086009,
 192127007,
 195662009,
 195967001,
 196416002,
 197927001,
 198992004,
 201834006,
 230265002,
 230690007,
 232353008,
 233604007,
 233678006,
 235919008,
 236077008,
 237602007,
 239720000,
 239872002,
 239873007,
 241929008,
 254632001,
 254637007,
 254837009,
 262574004,
 263102004,
 27

### Build Pairs

In [10]:
def pair_forming(l):
    pairs=[]
    l.sort()
    for i in range(0,len(l)):
        if l[i] in frequent_single_set.value:
            for j in range(i+1,len(l)):
                if  l[i]!=l[j] and l[j] in frequent_single_set.value:
                    pairs.append((l[i],l[j]))
    return pairs

### Build Triples

In [11]:
def triple_forming(l):
    triples=[]
    pairs = pair_forming(l)
    for i,j in pairs:
        for k in l:
            if (i,j) in frequent_pair_set.value and (i,k) in frequent_pair_set.value and (j,k) in frequent_pair_set.value and k!=i and k!=j:#or (j,i) in frequent_pairs#prob unecessary
                  triples.append(tuple(sorted([i,j,k])))
    return triples

In [12]:
conds = sc.broadcast(condition_map.collectAsMap())

### Count the pairs

10 Most frequent itemsets of size 2

In [13]:
pair_count = patient_rdd2.flatMap(pair_forming).map(lambda p:(p,1)).reduceByKey(lambda v1,v2: v1+v2).filter(lambda tp:tp[1]>=threshold)\
.sortBy(lambda a: a[1],False)
frequent_pair_set = sc.broadcast(set(pair_count.map(lambda a: a[0]).collect()))
pair_count.saveAsTextFile("{0}/{1}".format("frequent_pairs_count", format_time))

ten_most_frequent_pairs = pair_count.take(10)
for pair in ten_most_frequent_pairs:
    print([conds.value[i] for i in list(pair[0])]," with count ", pair[1])

['Acute viral pharyngitis (disorder)', 'Viral sinusitis (disorder)']  with count  343651
['Acute bronchitis (disorder)', 'Viral sinusitis (disorder)']  with count  302516
['Prediabetes', 'Anemia (disorder)']  with count  289176
['Body mass index 30+ - obesity (finding)', 'Viral sinusitis (disorder)']  with count  243812
['Anemia (disorder)', 'Viral sinusitis (disorder)']  with count  236847
['Prediabetes', 'Viral sinusitis (disorder)']  with count  236320
['Acute bronchitis (disorder)', 'Acute viral pharyngitis (disorder)']  with count  211065
['Hypertension', 'Viral sinusitis (disorder)']  with count  203450
['Body mass index 30+ - obesity (finding)', 'Acute viral pharyngitis (disorder)']  with count  167438
['Chronic sinusitis (disorder)', 'Viral sinusitis (disorder)']  with count  165530


### Count the triples

10 Most frequent itemsets of size 3

In [14]:
triple_count = patient_rdd2.flatMap(triple_forming).map(lambda p:(p,1)).reduceByKey(lambda v1,v2: v1+v2).filter(lambda tp:tp[1]>=threshold)\
.sortBy(lambda a: a[1], False)
ten_most_frequent_triples = triple_count.take(10)
for triple in ten_most_frequent_triples:
    print([conds.value[i] for i in list(triple[0])]," with count ", triple[1])

['Prediabetes', 'Anemia (disorder)', 'Viral sinusitis (disorder)']  with count  192819
['Acute bronchitis (disorder)', 'Acute viral pharyngitis (disorder)', 'Viral sinusitis (disorder)']  with count  139174
['Prediabetes', 'Acute viral pharyngitis (disorder)', 'Anemia (disorder)']  with count  132583
['Acute bronchitis (disorder)', 'Prediabetes', 'Anemia (disorder)']  with count  115510
['Body mass index 30+ - obesity (finding)', 'Acute viral pharyngitis (disorder)', 'Viral sinusitis (disorder)']  with count  111860
['Acute viral pharyngitis (disorder)', 'Anemia (disorder)', 'Viral sinusitis (disorder)']  with count  108560
['Prediabetes', 'Acute viral pharyngitis (disorder)', 'Viral sinusitis (disorder)']  with count  108083
['Prediabetes', 'Hypertension', 'Anemia (disorder)']  with count  99818
['Acute bronchitis (disorder)', 'Body mass index 30+ - obesity (finding)', 'Viral sinusitis (disorder)']  with count  97384
['Acute bronchitis (disorder)', 'Anemia (disorder)', 'Viral sinusiti

## Association Rules: Confidence


In [15]:
def confidence_pairs(single_1, single_2,count):
    den = single_counts_dict.value[single_1] #had  0 in case miss
    if den==0:
        return 0
    else:
        return count/den

In [16]:
def confidence_triples(pair,single, count):
    den = pair_counts_dict.value.get(pair,0)
    if den==0:
        return 0
    else:
        return count/den

## Association Rules: Interest

    interestI -> j = p(j|I) − p(j)
    interestI -> j = confidence(I -> j)−baskets containing j / baskets

high positive interest: presence of I indicates the presence of j.

high negative interest: presence of I discourages the presence of j

In [17]:
def interest_pairs(single_1, single_2,count):
    return confidence_pairs(single_1, single_2,count) - (single_counts_dict.value[single_2]/n_buckets)

In [18]:
def interest_triples(pair,single,count):
    return confidence_triples(pair,single,count) - (single_counts_dict.value[single]/n_buckets)

## Association rules: Lift

    Lift(I) -> j = confidence(I -> j)/P(j) =P(I, j)/P(I)*P(j)

Lift (also known as the observed/expected ratio) is a measure of the degree of dependence between I and j. 

A lift of 1 indicates that I and j are independent

In [19]:
def lift_pairs(single_1, single_2,count):
    return confidence_pairs(single_1, single_2,count)/(single_counts_dict.value[single_2]/n_buckets)

In [20]:
def lift_triples(pair,single,count):
    return confidence_triples(pair,single,count)/(single_counts_dict.value[single]/n_buckets)

## Association rules: Standardised lift

Standardised lift ranges from 0 to 1.

This facilitates setting a fixed threshold for selecting the rules

In [21]:
def std_lift_pairs(single_1, single_2,count):
    p_single_1 = single_counts_dict.value[single_1]/n_buckets
    p_single_2 = single_counts_dict.value[single_2]/n_buckets
    
    if p_single_1!=0 and p_single_2!=0:
        num = lift_pairs(single_1,single_2, count) - (max(p_single_1+p_single_2-1,1/n_buckets)/(p_single_1*p_single_2))
        den = (1/(p_single_1*p_single_2)) - (max(p_single_1+p_single_2-1,1/n_buckets))/(p_single_1*p_single_2)
        if den!=0:
            return num/den
        else:
            return 0
    else:
        return 0

In [22]:
def std_lift_triples(pair,single,count):
    p_pair = pair_counts_dict.value.get(pair,0)/n_buckets
    p_single = single_counts_dict.value[single]/n_buckets
    if p_pair!=0 and p_single!=0:
        num = lift_triples(pair,single, count) - (max(p_pair+p_single-1,1/n_buckets))/(p_pair*p_single)
        den = (1/(p_pair*p_single)) - (max(p_pair+p_single-1,1/n_buckets))/(p_pair*p_single)
        if den!=0:
            return num/den
        else:
            return 0
    else:
        return 0

Breaking frequent itemsets into rules with respective counts

In [23]:
def break_doubles(pair):
    single_1 = pair[0][0]
    single_2 = pair[0][1]
    count = pair[1]
    return [((single_1,single_2),count),((single_2,single_1),count)]

In [24]:
def break_triples(triple_count):
    
    triple=triple_count[0]
    count = triple_count[1]
    first_pair = (triple[0],triple[1])
    second_pair = (triple[0],triple[2])
    third_pair = (triple[1],triple[2])
    pairs = [first_pair, second_pair, third_pair]
    singles = [2,1,0]
    candidate_rules=[]
    for i in range(3):
        candidate_rules.append(((pairs[i],triple[singles[i]],count)))
    
    return candidate_rules

Getting metrics for the rules in the format:
left side of the rule,right side,standard lift,lift,confidence,interest

In [25]:
single_counts_dict = sc.broadcast(frequent_singles.collectAsMap())
pair_counts_dict = sc.broadcast(pair_count.collectAsMap())


In [26]:
def get_metrics_double(pair):
    single_1 = pair[0][0]
    single_2 = pair[0][1]
    count = pair[1]
    
    confidence = confidence_pairs(single_1, single_2,count)
    interest = interest_pairs(single_1, single_2,count)
    lift = lift_pairs(single_1, single_2,count)
    std_lift = std_lift_pairs(single_1, single_2,count)
    return ([single_1],single_2,std_lift,lift,confidence,interest)

In [27]:
def get_metrics_triple(triple):
    pair = triple[0]
    single = triple[1]
    count = triple[2]
    confidence = confidence_triples(pair, single,count)
    interest = interest_triples(pair, single,count)
    lift = lift_triples(pair, single,count)
    std_lift = std_lift_triples(pair, single,count)
    return (list(pair),single,std_lift,lift,confidence,interest)

Generating candidate 3 element rules and respective counts (occurrence of all elements)

In [28]:
possible_triple_rules = triple_count.flatMap(break_triples)
possible_triple_rules.take(10)

[((15777000, 271737000), 444814009, 192819),
 ((15777000, 444814009), 271737000, 192819),
 ((271737000, 444814009), 15777000, 192819),
 ((10509002, 195662009), 444814009, 139174),
 ((10509002, 444814009), 195662009, 139174),
 ((195662009, 444814009), 10509002, 139174),
 ((15777000, 195662009), 271737000, 132583),
 ((15777000, 271737000), 195662009, 132583),
 ((195662009, 271737000), 15777000, 132583),
 ((10509002, 15777000), 271737000, 115510)]

Triples that passed the minimum standard lift condition

In [29]:
resulting_triples = possible_triple_rules.map(get_metrics_triple).filter(lambda a: a[2]>=min_std_lift).sortBy(lambda a: a[2], False)
resulting_triples.take(10)

[]

Generating candidate 2 element rules and respective counts (occurrence of all elements)

In [30]:
candidate_double_rules = pair_count.flatMap(break_doubles)
candidate_double_rules.take(10)

[((195662009, 444814009), 343651),
 ((444814009, 195662009), 343651),
 ((10509002, 444814009), 302516),
 ((444814009, 10509002), 302516),
 ((15777000, 271737000), 289176),
 ((271737000, 15777000), 289176),
 ((162864005, 444814009), 243812),
 ((444814009, 162864005), 243812),
 ((271737000, 444814009), 236847),
 ((444814009, 271737000), 236847)]

Pairs that passed the minimum standard lift condition

In [31]:
resulting_pairs = candidate_double_rules.map(get_metrics_double).filter(lambda a: a[2] >=min_std_lift).sortBy(lambda a: a[2], False)
resulting_pairs.take(10)

[([15777000],
  271737000,
  0.24981059575302555,
  2.6585189804270324,
  0.816155116210152,
  0.5091589569880521),
 ([271737000],
  15777000,
  0.24981059575302555,
  2.6585189804270324,
  0.81372758686672,
  0.5076445410590077),
 ([10509002],
  444814009,
  0.22388517601098637,
  1.0091330397505829,
  0.6555130608132266,
  0.0059326437700554235),
 ([444814009],
  10509002,
  0.22388517601098631,
  1.0091330397505827,
  0.4023140144160438,
  0.0036410956148917317),
 ([195662009],
  444814009,
  0.21626558461816975,
  1.0082778943553083,
  0.6549575751107316,
  0.00537715806756045),
 ([444814009],
  195662009,
  0.21626558461816975,
  1.0082778943553083,
  0.4570191770619996,
  0.003752097003463628),
 ([162864005],
  444814009,
  0.21062184200273504,
  1.0267274389531114,
  0.6669420379848291,
  0.01736162094165794),
 ([444814009],
  162864005,
  0.21062184200273498,
  1.0267274389531111,
  0.3242439556347581,
  0.008440614520811573),
 ([271737000],
  444814009,
  0.20460496364388717,


Uniting both types of rules X -> Y and X,Y -> Z to a single rdd

In [32]:
final_rules = resulting_triples.union(resulting_pairs).sortBy(lambda a: a[2])
final_rules.take(10)

[([15777000],
  444814009,
  0.20414970235241373,
  1.0267814536509203,
  0.6669771248747584,
  0.0173967078315872),
 ([444814009],
  15777000,
  0.20414970235241378,
  1.0267814536509206,
  0.3142803947123441,
  0.008197348904631807),
 ([271737000],
  444814009,
  0.20460496364388717,
  1.0260103882146394,
  0.6664762558670914,
  0.016895838823920206),
 ([444814009],
  271737000,
  0.20460496364388717,
  1.0260103882146394,
  0.31498124850387,
  0.007985089281770053),
 ([444814009],
  162864005,
  0.21062184200273498,
  1.0267274389531111,
  0.3242439556347581,
  0.008440614520811573),
 ([162864005],
  444814009,
  0.21062184200273504,
  1.0267274389531114,
  0.6669420379848291,
  0.01736162094165794),
 ([195662009],
  444814009,
  0.21626558461816975,
  1.0082778943553083,
  0.6549575751107316,
  0.00537715806756045),
 ([444814009],
  195662009,
  0.21626558461816975,
  1.0082778943553083,
  0.4570191770619996,
  0.003752097003463628),
 ([444814009],
  10509002,
  0.22388517601098631

Function to translate condition ids to names

In [33]:
def translate_conditions(line):
    temp_list = list(line)
    left_side = line[0]
    right_side = line[1]
    translated_left = [conds.value[i] for i in left_side]
    translated_right = conds.value[right_side]
    temp_list[0] = translated_left
    temp_list[1] = translated_right
    return tuple(temp_list)
    

Translating condition ids to their name in the final rules rdd

In [34]:
final_rules = final_rules.map(translate_conditions).sortBy(lambda a: a[2],False)
final_rules.take(10)

[(['Prediabetes'],
  'Anemia (disorder)',
  0.24981059575302555,
  2.6585189804270324,
  0.816155116210152,
  0.5091589569880521),
 (['Anemia (disorder)'],
  'Prediabetes',
  0.24981059575302555,
  2.6585189804270324,
  0.81372758686672,
  0.5076445410590077),
 (['Acute bronchitis (disorder)'],
  'Viral sinusitis (disorder)',
  0.22388517601098637,
  1.0091330397505829,
  0.6555130608132266,
  0.0059326437700554235),
 (['Viral sinusitis (disorder)'],
  'Acute bronchitis (disorder)',
  0.22388517601098631,
  1.0091330397505827,
  0.4023140144160438,
  0.0036410956148917317),
 (['Acute viral pharyngitis (disorder)'],
  'Viral sinusitis (disorder)',
  0.21626558461816975,
  1.0082778943553083,
  0.6549575751107316,
  0.00537715806756045),
 (['Viral sinusitis (disorder)'],
  'Acute viral pharyngitis (disorder)',
  0.21626558461816975,
  1.0082778943553083,
  0.4570191770619996,
  0.003752097003463628),
 (['Body mass index 30+ - obesity (finding)'],
  'Viral sinusitis (disorder)',
  0.21062

Saving results to a folder and stopping spark context

In [35]:
#FILES ARE STRUCTURED AS left_rule_side,right_rule_side,std_lift,lift,confidence,interest
final_rules.saveAsTextFile("{0}/{1}".format("extracted_rules", format_time))

Printing sorted rules with minimum std lift of 0.2 to single file as requested

In [36]:
final_rules.coalesce(1).saveAsTextFile("{0}/{1}".format("results", format_time))

In [37]:
sc.stop()