# Exercise 1.1

In [1]:
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set, col, count
from pyspark import SparkContext, SparkConf

In [2]:
conf = SparkConf()
sc = SparkContext(conf=conf)

In [3]:
spark = SparkSession \
    .builder \
    .appName("CSV with conditions") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

### Creating a DataFrame of the conditions

In [4]:
df = spark.read.csv("dataset/conditions.csv")
df.printSchema()
df.show()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)

+----------+----------+--------------------+--------------------+---------+--------------------+
|       _c0|       _c1|                 _c2|                 _c3|      _c4|                 _c5|
+----------+----------+--------------------+--------------------+---------+--------------------+
|     START|      STOP|             PATIENT|           ENCOUNTER|     CODE|         DESCRIPTION|
|2017-01-14|2017-03-30|09e4e8cb-29c2-4ef...|88e540ab-a7d7-47d...| 65363002|        Otitis media|
|2012-09-15|2012-09-16|b0a03e8c-8d0f-424...|e89414dc-d0c6-478...|241929008|Acute allergic re...|
|2018-06-17|2018-06-24|09e4e8cb-29c2-4ef...|c14325b0-f7ec-431...|444814009|Viral sinusitis (...|
|2019-04-19|2019-09-26|09e4e8cb-29c2-4ef...|71af18ee-3157-408...| 65363002|        Otitis media|
|2019-04

In [5]:
Data_list = ["START","STOP","PATIENT","ENCOUNTER","CODE","DESCRIPTION"]
 
df = df.toDF(*Data_list)
df=df.filter(df.START !="START")

df_Names=df
df_Names=df_Names.drop(*['START','STOP','PATIENT','ENCOUNTER'])
df_Names=df_Names.dropDuplicates()

df=df.drop(*['START','STOP','ENCOUNTER','DESCRIPTION'])
df.show()

+--------------------+---------+
|             PATIENT|     CODE|
+--------------------+---------+
|09e4e8cb-29c2-4ef...| 65363002|
|b0a03e8c-8d0f-424...|241929008|
|09e4e8cb-29c2-4ef...|444814009|
|09e4e8cb-29c2-4ef...| 65363002|
|09e4e8cb-29c2-4ef...|444814009|
|09e4e8cb-29c2-4ef...| 33737001|
|b0a03e8c-8d0f-424...|444814009|
|b0a03e8c-8d0f-424...| 10509002|
|b0a03e8c-8d0f-424...|233678006|
|b0a03e8c-8d0f-424...|195662009|
|b0a03e8c-8d0f-424...|232353008|
|b0a03e8c-8d0f-424...|195662009|
|5420ae87-24c8-4ed...|446096008|
|5420ae87-24c8-4ed...|284551006|
|5420ae87-24c8-4ed...|283371005|
|5420ae87-24c8-4ed...| 72892002|
|5420ae87-24c8-4ed...|444814009|
|5420ae87-24c8-4ed...|195662009|
|bf1f30f2-27de-4b5...|162864005|
|bf1f30f2-27de-4b5...|283385000|
+--------------------+---------+
only showing top 20 rows



In [6]:
df = df.dropna()  #confirmar depois com tudo
df.show()

+--------------------+---------+
|             PATIENT|     CODE|
+--------------------+---------+
|09e4e8cb-29c2-4ef...| 65363002|
|b0a03e8c-8d0f-424...|241929008|
|09e4e8cb-29c2-4ef...|444814009|
|09e4e8cb-29c2-4ef...| 65363002|
|09e4e8cb-29c2-4ef...|444814009|
|09e4e8cb-29c2-4ef...| 33737001|
|b0a03e8c-8d0f-424...|444814009|
|b0a03e8c-8d0f-424...| 10509002|
|b0a03e8c-8d0f-424...|233678006|
|b0a03e8c-8d0f-424...|195662009|
|b0a03e8c-8d0f-424...|232353008|
|b0a03e8c-8d0f-424...|195662009|
|5420ae87-24c8-4ed...|446096008|
|5420ae87-24c8-4ed...|284551006|
|5420ae87-24c8-4ed...|283371005|
|5420ae87-24c8-4ed...| 72892002|
|5420ae87-24c8-4ed...|444814009|
|5420ae87-24c8-4ed...|195662009|
|bf1f30f2-27de-4b5...|162864005|
|bf1f30f2-27de-4b5...|283385000|
+--------------------+---------+
only showing top 20 rows



### Creating a DataFrame and RDD of the Baskets

In [7]:
baskets = df.groupBy('PATIENT').agg(collect_set('CODE').alias('items'))
baskets.createOrReplaceTempView('baskets')
baskets=baskets.drop('PATIENT')
baskets.show(20,False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|items                                                                                                                                                                   |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[703151001, 70704007, 65363002, 128613002, 192127007, 444814009, 232353008, 195662009]                                                                                  |
|[59621000, 368581000119106, 80394007, 15777000, 55822004, 44054006, 271737000, 26929004, 88805009, 302870006, 237602007, 1551000119108, 444814009, 195662009, 422034002]|
|[271737000, 201834006, 40055000, 10509002, 19169002, 444814009, 15777000, 55822004]                                                             

In [8]:
BasquetRDD = baskets.rdd.flatMap(list) 
BasquetRDD.take(10) 

[['703151001',
  '70704007',
  '65363002',
  '128613002',
  '192127007',
  '444814009',
  '232353008',
  '195662009'],
 ['59621000',
  '368581000119106',
  '80394007',
  '15777000',
  '55822004',
  '44054006',
  '271737000',
  '26929004',
  '88805009',
  '302870006',
  '237602007',
  '1551000119108',
  '444814009',
  '195662009',
  '422034002'],
 ['271737000',
  '201834006',
  '40055000',
  '10509002',
  '19169002',
  '444814009',
  '15777000',
  '55822004'],
 ['72892002', '10509002', '65363002', '444814009', '301011002'],
 ['271737000', '367498001', '10509002', '15777000', '55822004', '239873007'],
 ['271737000', '72892002', '198992004', '43878008', '10509002', '444814009'],
 ['162864005', '271737000', '72892002', '40055000', '19169002'],
 ['162864005',
  '271737000',
  '64859006',
  '62106007',
  '40055000',
  '195662009',
  '15777000',
  '55822004'],
 ['162864005',
  '307731004',
  '254837009',
  '59621000',
  '88805009',
  '10509002',
  '444814009'],
 ['271737000',
  '26929004',
  

### Counting number of ocurrences of diseases in baskets

In [9]:
CodesRDD=baskets.rdd.flatMap(lambda x: sum(x,[]))
CodesRDD.take(20)

['703151001',
 '70704007',
 '65363002',
 '128613002',
 '192127007',
 '444814009',
 '232353008',
 '195662009',
 '59621000',
 '368581000119106',
 '80394007',
 '15777000',
 '55822004',
 '44054006',
 '271737000',
 '26929004',
 '88805009',
 '302870006',
 '237602007',
 '1551000119108']

In [10]:
Unique = CodesRDD.distinct()
Unique.take(10)

['128613002',
 '232353008',
 '195662009',
 '55822004',
 '302870006',
 '237602007',
 '201834006',
 '40055000',
 '19169002',
 '307731004']

In [11]:
CountRDD = CodesRDD.map(lambda item: (item, 1))
CountRDD = CountRDD.reduceByKey(lambda a,b: a+b)
CountRDD.take(20)

[('128613002', 42693),
 ('232353008', 31036),
 ('195662009', 524692),
 ('55822004', 133442),
 ('302870006', 75992),
 ('237602007', 74395),
 ('201834006', 25426),
 ('40055000', 250239),
 ('19169002', 201894),
 ('307731004', 17220),
 ('53741008', 68517),
 ('65966004', 43961),
 ('84757009', 22352),
 ('79586000', 25783),
 ('398254007', 22959),
 ('236077008', 4422),
 ('262574004', 7629),
 ('97331000119101', 4291),
 ('707577004', 177),
 ('425048006', 1)]

In [12]:
# Setting a support threshold of 1000
# Aqui não tá 1000, tá 2, mas na versão com tudo faz-se com 1000
CountFilterRDD=CountRDD.filter(lambda x: x[1] >= 1000) #Aqui 1000
CountFilterRDD.take(10)

[('128613002', 42693),
 ('232353008', 31036),
 ('195662009', 524692),
 ('55822004', 133442),
 ('302870006', 75992),
 ('237602007', 74395),
 ('201834006', 25426),
 ('40055000', 250239),
 ('19169002', 201894),
 ('307731004', 17220)]

In [13]:
AprioriRDD=CountFilterRDD.map(lambda x: ([x[0]], x[1]))
AprioriRDD.take(10)

[(['128613002'], 42693),
 (['232353008'], 31036),
 (['195662009'], 524692),
 (['55822004'], 133442),
 (['302870006'], 75992),
 (['237602007'], 74395),
 (['201834006'], 25426),
 (['40055000'], 250239),
 (['19169002'], 201894),
 (['307731004'], 17220)]

In [14]:
CodeFilterRDD=CountFilterRDD.map(lambda x: x[0])
CodeFilterRDD.take(10)

['128613002',
 '232353008',
 '195662009',
 '55822004',
 '302870006',
 '237602007',
 '201834006',
 '40055000',
 '19169002',
 '307731004']

### Apriori Algorithm

In [15]:
# Remove Replicas 
#Basquets with items that are in a diferent order, but with the same items

def RemoveReplicas(item):

    if(isinstance(item[0], tuple)):
        x1 = item[0]
        x2 = item[1]
    else:
        x1 = [item[0]]
        x2 = item[1]

    if(any(x == x2 for x in x1) == False):
        a = list(x1)
        a.append(x2)
        a.sort()
        result = tuple(a)
        return result 
    else:
        return x1

In [16]:
def SumOp(x,y):
    return x+y

In [17]:
for k in[2,3]:
    print(k)
    print('')
    
    Combine = CodeFilterRDD.cartesian(Unique)
    Combine = Combine.map(lambda x: RemoveReplicas(x))
  
    Combine = Combine.filter(lambda x: len(x) == k)
    Combine = Combine.distinct()
     
    print('Combine')
    print(Combine.take(10))
    print('')
    
    Combine2 = Combine.cartesian(BasquetRDD)
    Combine2 = Combine2.filter(lambda y: all(x in y[1] for x in y[0]))
    
    Combine2 = Combine2.map(lambda x: x[0])
    Combine2 = Combine2.map(lambda x: (x , 1))
    Combine2 = Combine2.reduceByKey(SumOp)
    Combine2 = Combine2.filter(lambda x: x[1] >= 1000) #Aqui 1000 em vez de 2
    
    print('Combine2')
    print(Combine2.take(10))
    print('')

    AprioriRDD = AprioriRDD.union(Combine2)
    
    Combine2 = Combine2.map(lambda x: x[0])
    CodeFilterRDD = Combine2
    
    print('CodeFilterRDD')
    print(CodeFilterRDD.take(10))
    print('')


2

Combine
[('195662009', '398254007'), ('232353008', '236077008'), ('302870006', '398254007'), ('128613002', '230265002'), ('195662009', '36971009'), ('237602007', '65363002'), ('53741008', '65363002'), ('19169002', '38822007'), ('65966004', '713197008'), ('283371005', '398254007')]

Combine2
[(('444814009', '79586000'), 17253), (('19169002', '239873007'), 17667), (('195662009', '6072007'), 2061), (('302870006', '703151001'), 2466), (('22298006', '65966004'), 1132), (('26929004', '88805009'), 4396), (('40055000', '424132000'), 3976), (('239872002', '65966004'), 1089), (('196416002', '64859006'), 3323), (('5602001', '88805009'), 1191)]

CodeFilterRDD
[('444814009', '79586000'), ('19169002', '239873007'), ('195662009', '6072007'), ('302870006', '703151001'), ('22298006', '65966004'), ('26929004', '88805009'), ('40055000', '424132000'), ('239872002', '65966004'), ('196416002', '64859006'), ('5602001', '88805009')]

3

Combine
[('197927001', '26929004', '88805009'), ('124171000119105', '2

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:

AprioriFilter=AprioriRDD.filter(lambda x: len(x[0])!=1)
AprioriFilter2=AprioriFilter.filter(lambda x: len(x[0])==2)
AprioriFilter3=AprioriFilter.filter(lambda x: len(x[0])==3)
print('k=2')
print(AprioriFilter2.collect())
print(' ')
print('k=3')
print(AprioriFilter3.collect())

# Exercise 1.2

### Creating Dictionarys to search the quantaty of deceases quantaty

In [None]:
countbaskets=baskets.count()

AprioriDict1=CountFilterRDD.collectAsMap()
AprioriFilter22=AprioriFilter2.map(lambda item: ((item[0][1],item[0][0]),item[1]))
AprioriFilter22.take(10)

AprioriDict2=AprioriFilter2.collectAsMap()
AprioriDict22=AprioriFilter22.collectAsMap()
AprioriDict2.update(AprioriDict22)

print(AprioriDict1)
print(AprioriDict2)

In [None]:
print(countbaskets)

###  Confidence Interval

In [None]:
# Check Items sets includes at least one comman item 

def CheckItemSets(item_1 , item_2):

    if(len(item_1) > len(item_2)):
        return all(any(k == l for k in item_1 ) for l in item_2)
    else:
        return all(any(k == l for k in item_2 ) for l in item_1)


In [None]:
def FilterForConfidence(item , total):
        
    if(len(item[0][0]) > len(item[1][0])  ):
        if(CheckItemSets(item[0][0] , item[1][0]) == False):
            pass
        else:
            return (item)       
    else:
        pass

In [None]:
def CalculateConfidence(item):

    # Parent item list
    parent = set(item[0][0])
        
    # Child item list
    if(isinstance(item[1][0] , str)):
        child  = set([item[1][0]])
    else:
        child  = set(item[1][0])
    # Parent and Child support values
    parentSupport = item[0][1]
    childSupport = item[1][1]
    # Finds the item set confidence is going to be found

    support = (parentSupport / childSupport)*100

    return list([ list(child) ,  list(parent.difference(child)) , support ])

### Interest

In [None]:

def Interest(item):
    if len(item[1])==1:
        j=AprioriDict1[item[1][0]]
    else:
        j=AprioriDict2[tuple(item[1])]
    interest=item[2]-((j/countbaskets)*100) #Aqui o *100 é para se for em percentagem, não sei se devia ser assim....
    item.append(interest)
    return item

### Lift

In [None]:
def Lift(item):
    if len(item[1])==1:
        j=AprioriDict1[item[1][0]]
    else:
        j=AprioriDict2[tuple(item[1])]
    lift=item[2]/((j/countbaskets)*100) #Aqui a cena do devimal ou percentagem again
    item.append(lift)
    return item

### Standardised Lift

In [None]:
# deve tar mal os valores que dão tão muito estranhos

def  StandardisedLift(item):
    if len(item[1])==1:
        j=AprioriDict1[item[1][0]]
    else:
        j=AprioriDict2[tuple(item[1])]
    #print(j)
        
    if len(item[0])==1:
        i=AprioriDict1[item[0][0]]
    else:
        i=AprioriDict2[tuple(item[0])]
    #print(i)
        
    pi= i/countbaskets
    pj= j/countbaskets
    #print(pi, pj)
    
    l=[pi+pj-1,1/countbaskets]
    #print(l)
    
    term=max(l)/(pi*pj)
    #print(term)
    
    standardisedlift=(item[4]-term)/((1/(pi*pj))-term) #Aqui a cena do decimal ou percentagem again  #nãos sei se é max ou não
    #print(standardisedlift)
    item.append(standardisedlift)
    return item

### Applying Statistics and organizing data

In [None]:
calcuItems = AprioriRDD.cartesian(AprioriRDD)
total = calcuItems.count()

StatisticsRDD = calcuItems.filter(lambda item: FilterForConfidence(item , total))
StatisticsRDD = StatisticsRDD.map(lambda item: CalculateConfidence(item))
StatisticsRDD = StatisticsRDD.map(lambda item: Interest(item))
StatisticsRDD = StatisticsRDD.map(lambda item: Lift(item))
StatisticsRDD = StatisticsRDD.map(lambda item: StandardisedLift(item))
StatisticsRDD.take(10)

In [None]:
isto agora tá a dar conjunto vazio porque o standard lift tá mal
StatisticsRDD=StatisticsRDD.filter(lambda item: item[5] >= 0.2) 
StatisticsRDD.take(10)

In [None]:
df_Names.show()

In [None]:
df_Names = df_Names.toPandas()
  
# Convert the dataframe into 
# dictionary
Dict_Names=dict(df_Names.values)
  
# Print the dictionary
print(Dict_Names)

In [None]:
def CodeToName(item):
    
    if len(item[0])==1:
        item[0][0]=Dict_Names[item[0][0]]
    else:
        item[0][0]=Dict_Names[item[0][0]]
        item[0][1]=Dict_Names[item[0][1]]
        
    if len(item[1])==1:
        item[1][0]=Dict_Names[item[1][0]]
    else:
        item[1][0]=Dict_Names[item[1][0]]
        item[1][1]=Dict_Names[item[1][1]]
        
    return item
    

In [None]:
StatisticsRDD = StatisticsRDD.map(lambda item: CodeToName(item))
StatisticsRDD.take(10)

In [None]:
def Organized(item):
    if len(item[0])==1:
        item[0]=item[0][0]
    else:
        item[0]=tuple(item[0])
        
    if len(item[1])==1:
        item[1]=item[1][0]
    else:
        item[1]=tuple(item[1])
        
    item[2] = round(item[2], 2)
    item[3] = round(item[3], 2)
    item[4] = round(item[4], 2)
    item[5] = round(item[5], 2)
    
    return item

In [None]:
StatisticsRDD = StatisticsRDD.map(lambda item: Organized(item))
StatisticsRDD.take(10)

In [None]:
StatisticsRDD=StatisticsRDD.sortBy(lambda x: x[5])
StatisticsRDD.take(10)

### Saving to a text file

In [None]:
Final_df = pd.DataFrame(StatisticsRDD.collect(), columns = ["I","j","Confidence","Interest","Lift","Standerdized Lift"])
Final_df.head(20)

In [None]:
Final_df.to_csv('Conditions Rules.txt', index=None, sep='\t')

Repositório fixe para o apriori

https://github.com/sergencansiz/apriori-pyspark/blob/master/AprioriPySpark.py
