## Classification of ECG BigData using PyStark 

In [1]:
#these characters are used as part of the compression algorithm

gsm = list(r'@£$¥èéùìòÇØøÅå_^{}[~]|€ӔӕßÉ!#¤%&()*+,-./:;<?¡§¿äöñüàÀÁÂÃÄÈÊËÌÍÎÏÐÑÒÓÔÕÖÙÚÛÜÝÞþáâãçêëíîïðóôõúûýabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ')
gsm.append("\\")
range_list = [0, 50, 100, 150, 200, 250, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 800, 850, 900, 950, 1000]
for i in range_list:
    gsm.append("Num"+str(i))

gsm.sort()

In [2]:
#the encode function compresses an ECG segment(in numerical format) to an alphanumeric value
from queue import Queue
import math
def encode(lines):
    
    q = Queue()
    for j in map(int,lines.split(',')):
        q.put(j)
    
    print(q)
    
    noOfSmpl = 1
    intDiff = []
    endOfEcg = False
    rrOn = True
    rrCount = 0
    ecgSmpl1 = q.get()
    rrComp1 = 0
    encSB = []
    rrSB = []
    gsm = list(
        r'@£$¥èéùìòÇØøÅå_^{}[~]|€ӔӕßÉ!#¤%&()*+,-./:;<?¡§¿äöñüàÀÁÂÃÄÈÊËÌÍÎÏÐÑÒÓÔÕÖÙÚÛÜÝÞþáâãçêëíîïðóôõúûýabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ')
    gsm.append("\\")


    intHrv = 0
    while endOfEcg != True and not q.empty():

        intDiff = [0, 0, 0, 0, 0, 0, 0, 0]
        for i in range(0, 8):
            ecgSmpl2 = q.get()
            noOfSmpl = noOfSmpl + 1
            diff = ecgSmpl2 - ecgSmpl1

            try:
                if (math.ceil(diff) - diff) > 0.5:
                    intDiff[i] = int(math.floor(diff))  # there was an (int) here
                else:
                    intDiff[i] = int(math.ceil(diff))
            except:
                print("ERROR", intDiff[i], math.floor(diff))

            ecgSmpl1 = ecgSmpl2

        if q.empty():
            endOfEcg = True

        # intDiff is a list of 8 elements
        signVal = 0

        for i in range(0, 8):
            if intDiff[i] <= -1:
                if i == 0:
                    signVal = signVal + 1
                elif i == 1:
                    signVal = signVal + 2
                elif i == 2:
                    signVal = signVal + 4
                elif i == 3:
                    signVal = signVal + 8
                elif i == 4:
                    signVal = signVal + 16
                elif i == 5:
                    signVal = signVal + 32
                elif i == 6:
                    signVal = signVal + 64
                elif i == 7:
                    encSB.append('')

                intDiff[i] = abs(intDiff[i])

        encSB.append(gsm[signVal])


        # for q = 7
        for i in range(0, 7, 2):
            # compresses two single digits as one
            if (intDiff[i] < 10) and (intDiff[i + 1] < 10):
                join = intDiff[i] * 10 + intDiff[i + 1]
                encSB.append(gsm[join])
            # compresses all values less than 147
            elif (intDiff[i] < 47) and (intDiff[i + 1] < 47):
                encSB.append(gsm[intDiff[i] + 100])
                encSB.append(gsm[intDiff[i + 1] + 100])
            else:
                encSB.append(str(intDiff[i]))
                encSB.append('"')
                encSB.append(str(intDiff[i + 1]))
                encSB.append('"')
                # rr detection proceeding
                if rrOn == True:
                    if (rrComp1 < intDiff[i]) and (intDiff[i] < intDiff[i + 1]):
                        rrComp1 = intDiff[i + 1]
                    else:
                        if rrComp1 < intDiff[i]:
                            rrCount = rrCount - 1
                        rrVal = float(rrCount)
                        rrVal = rrVal / 360
                        rrSB.append(rrVal)
                        rrCount = 0
                        rrOn = False
                        intHrv = intHrv + 1

    str_encSB = ''.join(encSB)

    return str_encSB

In [3]:
#this function derives the features from a compressed ECG segment

def compress(segment):
    i = 0
    range_list = [0, 50, 100, 150, 200, 250, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 800, 850, 900, 950, 1000]
    alpha_count = {}
    while i < len(segment):

        if segment[i].isdigit():

            s = ''
            while (segment[i] != '"'):
                s = s + segment[i]
                i = i + 1

            for g in range(len(range_list)):
                if int(s) <= range_list[g]:
                    t = "Num" + str(g)
                    break

            if t not in alpha_count.keys():
                alpha_count[t] = 1
            else:
                alpha_count[t] = alpha_count[t] + 1


        else:
            s = segment[i]
            if s not in alpha_count.keys():
                alpha_count[s] = 1
            else:
                alpha_count[s] = alpha_count[s] + 1

        i = i + 1


    output = []
    for val in gsm:
        if val in alpha_count.keys():
            output.append(alpha_count[val])
        else:
            output.append(0)
    
    return output

In [4]:
#The main program starts from here

from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkContext


In [5]:
#defining a spark session
spark = SparkSession(sparkContext=sc)

In [6]:
#all the data is loaded in an RDD at once
rdd = sc.textFile(r'C:\Users\Shweta\PycharmProjects\untitled\new_output.csv')

In [8]:
#this is what one line of the data looks like
rdd.take(1)

['3,4,5,9,8,8,7,3,3,2,3,2,2,1,2,1,0,1,1,2,2,1,2,2,2,1,2,1,1,3,3,0,6,13,37,55,77,98,110,112,103,85,56,18,15,45,93,111,116,114,102,85,65,36,21,12,8,7,6,5,5,3,4,3,3,1,1,2,5,5,5,5,6,7,7,8,9,8,8,8,7,6,6,5,4,5,7,6,8,7,8,10,12,15,13,12,13']

In [10]:
#now the encode function is mapped to each line of the RDD
data1 = rdd.map(encode)
data1.take(5)             #we have shown 5 lines of compressed ECG signal

['kø;£:îøØøø{ØØø@ÕøØ],@nEyCBs¤^yJSj30"48"ylHisxAJvaðøØ|Ø£]å@£{ØøØ£þØøÅÅdø€(ø',
 'Ð@££Ø{@Ø]|Ò£@£]¥|Ø|$@ìyMsp|_}PKkTNpH*,rQIi^;|Å£,Ø]£ØÅ£ø@@ò£øø@øÅ$ø£',
 'c&ÉÒ<E$<|%,@øøÃÍüÅ|ӕ@lFuyCGulì24"53"Mn49"54"JjbzV\\Hänq¥§¥ø@a||Ø]$øØ@£òØ£øӔÀ*|ÃØ',
 'A$£€ø@£&]@âøÅ@]òØÅ@€@mrrxCECu_gu46"48"3"57"76"33"ÏhH48"69"KxüDÅum€ÊáÉøØÅßå£|Åìø%$|ù£]Å]',
 'c@ø£|{(@|Ø§Øøøå¿*|Å#@yswxvwtn^lC61"29"70"70"SnH28"51"60"38"íØ-ØØ£@þÅøÅø¡ê;$£;Ø£øø@ØØ]*']

In [11]:
# now features are derived from each line of the RDD
data2 = data1.map(compress)
temp = data2.take(5)           #we have shown features derived from 5 ECG segments

for i in temp:
    print(i)

[0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 3, 1, 1, 1, 0, 1, 0, 0, 1, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 1, 0, 0, 0, 0, 2, 0, 0, 1, 0, 1, 3, 0, 2, 1, 0, 0, 0, 4, 1, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 11, 0, 0, 0, 0, 0, 1, 0, 0, 1]
[0, 0, 2, 0, 0, 0, 0, 1, 0, 2, 0, 0, 0, 0, 1, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 3, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 2, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 1, 5, 1, 0, 0, 9, 0, 1, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0

In [12]:
#these are the indivisual features we shall be looking for
keys = gsm
print(keys)

['!', '#', '$', '%', '&', '(', ')', '*', '+', ',', '-', '.', '/', ':', ';', '<', '?', '@', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'Num0', 'Num100', 'Num1000', 'Num150', 'Num200', 'Num250', 'Num300', 'Num350', 'Num400', 'Num450', 'Num50', 'Num500', 'Num550', 'Num600', 'Num650', 'Num700', 'Num750', 'Num800', 'Num850', 'Num900', 'Num950', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '[', '\\', ']', '^', '_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '{', '|', '}', '~', '¡', '£', '¤', '¥', '§', '¿', 'À', 'Á', 'Â', 'Ã', 'Ä', 'Å', 'Ç', 'È', 'É', 'Ê', 'Ë', 'Ì', 'Í', 'Î', 'Ï', 'Ð', 'Ñ', 'Ò', 'Ó', 'Ô', 'Õ', 'Ö', 'Ø', 'Ù', 'Ú', 'Û', 'Ü', 'Ý', 'Þ', 'ß', 'à', 'á', 'â', 'ã', 'ä', 'å', 'ç', 'è', 'é', 'ê', 'ë', 'ì', 'í', 'î', 'ï', 'ð', 'ñ', 'ò', 'ó', 'ô', 'õ', 'ö', 'ø', 'ù', 'ú', 'û', 'ü', 'ý', 'þ', 'Ӕ', 'ӕ', '€']


In [13]:
# now we create a DataFrame from our RDD
a = sqlContext.createDataFrame(data2, ['x'+str(i) for i in range(0,len(keys))])

In [14]:
#this is the schema of the DataFrame
#a.schema

In [15]:
#We take the class of the input features in a list
fz = open(r'C:\Users\Shweta\PycharmProjects\untitled\new_output_class.txt', 'r')
cls = fz.read()
fz.close()


In [16]:
# c = 0
# for i in cls.split(','):
#     if i != '':
#         c += 1
# print(c)

In [17]:
#Now we create another DataFrame consisting of only class column
b = sqlContext.createDataFrame([(int(i),) for i in cls.split(',') if i != ''], ['class'])

In [18]:
#we consider the schema of the class column
b.schema

StructType(List(StructField(class,LongType,true)))

In [19]:
#now we join the two dataframes(for features and class) togeather in the final dataFrame
from pyspark.sql.functions import monotonically_increasing_id
a = a.withColumn("row_idx", monotonically_increasing_id())
b = b.withColumn("row_idx", monotonically_increasing_id())
final_df = b.join(a, b.row_idx == a.row_idx).drop("row_idx")

In [20]:
#this is what the final dataFrame looks like
final_df.show(2)

+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|class| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|x10|x11|x12|x13|x14|x15|x16|x17|x18|x19|x20|x21|x22|x23|x24|x25|x26|x27|x28|x29|x30|x31|x32|x33|x34|x35|x36|x37|x38|x39|x40|x41|x42|x43|x44|x45|x46|x47|x48|x49|x50|x51|x52|x53|x54|x55|x56|x57|x58|x59|x60|x

In [21]:
# from pyspark.ml.feature import StringIndexer
# # build indexer
# string_indexer = StringIndexer(inputCol='class', outputCol='in_class')

# # learn the model
# string_indexer_model = string_indexer.fit(final_df)

# # transform the data
# df_new = string_indexer_model.transform(final_df)

# # resulting df
# df_new.show(3)

In [22]:
#We consider the columns of the final DataFrame
#final_df.columns

In [23]:
#this part of the code runs the Machine Learning Algorithm

from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.clustering import GaussianMixture

#assemble all the festures in an assembler
Cols= ['x'+ str(c) for c in range(0, 167)]
Assembler = VectorAssembler ( inputCols = Cols, outputCol='features')

#apply feature selection using Chisquared feature selection
selector = ChiSqSelector(numTopFeatures=13, featuresCol='features',outputCol='selectedFeatures', labelCol='class')

gmm = GaussianMixture(featuresCol='selectedFeatures', predictionCol = 'prediction', k= 4, probabilityCol='probability', maxIter= 100, seed=1) 
pipeline = Pipeline(stages=[Assembler, selector,gmm])

(train, test) = final_df.randomSplit([0.7, 0.3])




In [24]:
# model = pipeline.fit(train)
# result = model.transform(final_df).select('features', 'selectedFeatures', 'probability', 'prediction', 'class')
# result.show(3200)
# d = result.select('prediction', 'class').toPandas()
# d.loc[d['class'] == 1, 'prediction']

In [33]:
#the test data is fit into the model
model123 = pipeline.fit(test)

In [34]:
#we select just a couple of features from the entire dataset
result123 = model123.transform(final_df).select('features', 'selectedFeatures', 'probability','prediction', 'class')

In [35]:
#in here we see how features contains 168 features out of which only 13 features are selected
result123.show(5)

+--------------------+--------------------+--------------------+----------+-----+
|            features|    selectedFeatures|         probability|prediction|class|
+--------------------+--------------------+--------------------+----------+-----+
|(167,[0,2,14,15,1...|(13,[0,1,5,10,11,...|[3.72449968684894...|         1|    1|
|(167,[2,3,4,6,10,...|(13,[0,4,6,10,11,...|[6.96743265451260...|         3|    1|
|(167,[1,2,5,6,13,...|(13,[0,2,7,9,10,1...|[1.54453813142484...|         1|    1|
|(167,[0,2,3,7,17,...|(13,[0,1,3,7,8,10...|[0.99892350420316...|         0|    3|
|(167,[2,7,8,14,17...|(13,[0,1,5,7,10,1...|[1.56155953605912...|         1|    3|
+--------------------+--------------------+--------------------+----------+-----+
only showing top 5 rows



In [36]:
d123 = result123.select('prediction', 'class').toPandas()

In [37]:
d123[0:5]

Unnamed: 0,prediction,class
0,1,1
1,3,1
2,1,1
3,0,3
4,1,3


In [38]:
#build a class matrix to check the accuracy
pospos = 0
negneg = 0
posneg = 0
negpos = 0
for iterator in range(len(d123)):
    if d123['prediction'][iterator] != d123['class'][iterator]:
        if d123['class'][iterator] == 0:
            pospos = pospos + 1
        if d123['class'][iterator] == 1:
            negneg = negneg + 1
    else:
        if d123['class'][iterator] == 0:
            posneg = posneg + 1
        if d123['class'][iterator] == 1:
            negpos = negpos + 1

print("\n\n\n\n\n")
print("Finished making predictions for test data.\n")
print("x-axis: predicted class")
print("y-axis: actual class")
print("\n")
print("\tcls pos\tneg")
print("\tpos", pospos,posneg)
print("\tneg", negpos,negneg)

accuracy = (pospos+negneg)/(pospos+negneg+posneg+negpos)*100
print("\nAccuracy of the model : ", accuracy,"%")








Finished making predictions for test data.

x-axis: predicted class
y-axis: actual class


	cls pos	neg
	pos 4478 746
	neg 44090 75925

Accuracy of the model :  64.19965026868627 %
