# Enviroment Setup & Dataset Load

In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=fc3fde32b4a98544e3f2dce37292ba5e1448436d44e259c5c53fb72c44d036b6
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("YourApp") \
    .config("spark.default.parallelism", "50") \
    .config("spark.executor.instances", "12") \
    .getOrCreate()

In [None]:
sqlCtx = SQLContext(spark)



In [None]:
pip install opendatasets

Collecting opendatasets
  Downloading opendatasets-0.1.22-py3-none-any.whl (15 kB)
Installing collected packages: opendatasets
Successfully installed opendatasets-0.1.22


In [None]:
import opendatasets as od

In [None]:
od.download(r'https://www.kaggle.com/datasets/bjoernjostein/ischemia-dataset')

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: camillachiruzzi
Your Kaggle Key: ··········
Downloading ischemia-dataset.zip to ./ischemia-dataset


100%|██████████| 174M/174M [00:02<00:00, 83.9MB/s]





# Data manipulation

### Create the hea and mat lists

In [None]:
import os
import scipy.io
import numpy as np
import tqdm

directory = 'ischemia-dataset/ischemia_dataset'
listaHEA = []
listaMAT = []

for filename in tqdm.tqdm(os.listdir(directory)):
    f = os.path.join(directory, filename) #get the full path
    #next lines open and put the content of the file on listaHEA or listaMAT depending on the file type (if hea or mat)

    if os.path.isfile(f) and 'hea' in filename: #if the file contain 'hea' in the name append to listaHEA
        with open(f) as fp:
            buffer = fp.readlines()
            listaHEA.append(buffer)


    if os.path.isfile(f) and 'mat' in filename: #if the file contain 'mat' in the name append to listaMAT
        tempList = []
        tempList.append(filename)
        mat = scipy.io.loadmat(f)
        for elementi in mat.values():
            i = 1
            lista = []
            lista.append(filename)
            for e in elementi:
                lista.append(e)
            listaMAT.append(lista)

100%|██████████| 5118/5118 [00:01<00:00, 4030.98it/s]


In [None]:
#to verify if all the patients have been correctly uploaded to the two lists
print(len(listaHEA))
print(len(listaMAT))

2559
2559


### Lists parallelizing

In [None]:
matArray = spark.sparkContext.parallelize(listaMAT,400)

In [None]:
matArray.count()

2559

In [None]:
matArray.take(1)

[['HR09050.mat',
  array([  65,   65,   65, ..., -205, -205, -205], dtype=int16),
  array([ -20,  -20,  -20, ..., -205, -205, -205], dtype=int16),
  array([-85, -85, -85, ...,   0,   0,   0], dtype=int16),
  array([-22, -22, -22, ..., 205, 205, 205], dtype=int16),
  array([  75,   75,   75, ..., -102, -102, -102], dtype=int16),
  array([ -52,  -52,  -52, ..., -102, -102, -102], dtype=int16),
  array([ 95,  95,  95, ..., -25, -25, -25], dtype=int16),
  array([-20, -20, -20, ...,  95,  95,  95], dtype=int16),
  array([  30,   30,   30, ..., -155, -155, -155], dtype=int16),
  array([145, 145, 145, ..., -60, -60, -60], dtype=int16),
  array([ 150,  150,  150, ..., -160, -160, -160], dtype=int16),
  array([ 520,  520,  520, ..., -105, -105, -105], dtype=int16)]]

In [None]:
#list of string
fileHea = spark.sparkContext.parallelize(listaHEA)

In [None]:
fileHea.take(1)

[['HR01233 12 500 5000 04-Jun-2020 15:11:55\n',
  'HR01233.mat 16+24 200/mV 16 0 60 18620 0 I\n',
  'HR01233.mat 16+24 200/mV 16 0 -45 8660 0 II\n',
  'HR01233.mat 16+24 200/mV 16 0 -105 -9967 0 III\n',
  'HR01233.mat 16+24 200/mV 16 0 -7 -13359 0 aVR\n',
  'HR01233.mat 16+24 200/mV 16 0 83 14576 0 aVL\n',
  'HR01233.mat 16+24 200/mV 16 0 -75 -417 0 aVF\n',
  'HR01233.mat 16+24 200/mV 16 0 -135 3656 0 V1\n',
  'HR01233.mat 16+24 200/mV 16 0 265 -3603 0 V2\n',
  'HR01233.mat 16+24 200/mV 16 0 270 -4015 0 V3\n',
  'HR01233.mat 16+24 200/mV 16 0 -125 1166 0 V4\n',
  'HR01233.mat 16+24 200/mV 16 0 -130 3581 0 V5\n',
  'HR01233.mat 16+24 200/mV 16 0 35 8749 0 V6\n',
  '#Age: 84\n',
  '#Sex: Female\n',
  '#Dx: 164861001,164889003,428750005,429622005\n',
  '#Rx: Unknown\n',
  '#Hx: Unknown\n',
  '#Sx: Unknown\n']]

### Create the dictionary structure in order to create the df easier


In [None]:
def get_diz (row):

    diz = {} #creo dizionario vuoto
    first = 1
    for i in range (0,len(row)):

        if i == 0: #if i'm in the first row
            row[i] = row[i].split()
            keys = ['NomeFile','NDerivations','SamplingRates','DurationRecordings'] #they will be the keys of the dictionary
            for key,value in zip(keys,row[i]):
                diz[key] = value

        elif 1 <= i <= 12: #if i'm the rows that describe the array
            row[i] = row[i].split()
            row[i] = row[i][1:] #don't care of the first element (it's the reference to the .mat file)
            keys = ['ECGRapprSignal','AmplitudeUnit','Registro','OffsetPar','StartingValue','SumValues']
            if first == 1:
                diz['Mat'] = row[0][0]+'.mat'
                first = first - 1

            for key,value in zip(keys,row[i]):
                key = key+str(i) #to have something like ECGRapprSignal1, AmplitudeUnit1 ... ECGRapprSignal2,AmplitudeUnit2 ecc
                diz[key] = value
        else:
            #to insert the value from age in the hea file
            row[i] = row[i].replace('#','')
            row[i] = row[i].replace('\n','')
            row[i] = row[i].split(':')
            diz[row[i][0]] = row[i][1]

    return diz

In [None]:
Hea = fileHea.map(lambda x: get_diz(x)) #applying the fx above for all the hea

In [None]:
Hea.take(1) #an example to show what we get

[{'NomeFile': 'HR01233',
  'NDerivations': '12',
  'SamplingRates': '500',
  'DurationRecordings': '5000',
  'Mat': 'HR01233.mat',
  'ECGRapprSignal1': '16+24',
  'AmplitudeUnit1': '200/mV',
  'Registro1': '16',
  'OffsetPar1': '0',
  'StartingValue1': '60',
  'SumValues1': '18620',
  'ECGRapprSignal2': '16+24',
  'AmplitudeUnit2': '200/mV',
  'Registro2': '16',
  'OffsetPar2': '0',
  'StartingValue2': '-45',
  'SumValues2': '8660',
  'ECGRapprSignal3': '16+24',
  'AmplitudeUnit3': '200/mV',
  'Registro3': '16',
  'OffsetPar3': '0',
  'StartingValue3': '-105',
  'SumValues3': '-9967',
  'ECGRapprSignal4': '16+24',
  'AmplitudeUnit4': '200/mV',
  'Registro4': '16',
  'OffsetPar4': '0',
  'StartingValue4': '-7',
  'SumValues4': '-13359',
  'ECGRapprSignal5': '16+24',
  'AmplitudeUnit5': '200/mV',
  'Registro5': '16',
  'OffsetPar5': '0',
  'StartingValue5': '83',
  'SumValues5': '14576',
  'ECGRapprSignal6': '16+24',
  'AmplitudeUnit6': '200/mV',
  'Registro6': '16',
  'OffsetPar6': '0',

# Dataframe creation

In [None]:
from pyspark.sql import Row

In [None]:
HeaRow = Hea.map(lambda x: Row(**x)) #unpacks the dictionary's key-value pairs **

In [None]:
hea_df = spark.createDataFrame(HeaRow)

### Distinct values

In [None]:
from pyspark.sql.functions import countDistinct

distinct_counts = {}
distinct_values = {}

for col_name in hea_df.columns:
    # Count distincts
    count_distinct = hea_df.agg(countDistinct(col_name).alias("count")).collect()[0]["count"]
    distinct_counts[col_name] = count_distinct

    # Collect distinct
    values = hea_df.select(col_name).distinct().rdd.map(lambda x: x[0]).collect()
    distinct_values[col_name] = values if len(values) <= 5 else values[:5]

# Print count distinct and the values for each column
for col, count in distinct_counts.items():
    values = distinct_values[col]
    print(f"Column '{col}': {count} distinct values")
    print(f"Distinct values: {values}")
    print("\n")

Column 'NomeFile': 2559 distinct values
Distinct values: ['HR01799', 'HR06245', 'HR11508', 'Q2120', 'HR19824']


Column 'NDerivations': 1 distinct values
Distinct values: ['12']


Column 'SamplingRates': 1 distinct values
Distinct values: ['500']


Column 'DurationRecordings': 31 distinct values
Distinct values: ['5000', '6500', '6000', '16000', '9000']


Column 'Mat': 2559 distinct values
Distinct values: ['HR19887.mat', 'HR16867.mat', 'HR01799.mat', 'HR11508.mat', 'HR19480.mat']


Column 'ECGRapprSignal1': 1 distinct values
Distinct values: ['16+24']


Column 'AmplitudeUnit1': 2 distinct values
Distinct values: ['1000/mV', '200/mV']


Column 'Registro1': 1 distinct values
Distinct values: ['16']


Column 'OffsetPar1': 1 distinct values
Distinct values: ['0']


Column 'StartingValue1': 366 distinct values
Distinct values: ['-4', '-30', '-290', '15', '11']


Column 'SumValues1': 2320 distinct values
Distinct values: ['-4', '7', '21364', '-2757', '-10680']


Column 'ECGRapprSignal2': 1 

Looking at the distrinct values we can consider some operations to clean the df, in particular:

- TAKE NomeFile as patient ID
- DROP NDerivations
- DROP SamplingRates
- TAKE TO ANALYZE DurationRecordings
- DROP ECGRapprSignal
- TAKE TO ANALYZE AmplitudeUnit
- DROP Registro
- DROP OffsetPar
- TAKE StartingValue
- TAKE SumValues
- TAKE Age
- TAKE Sex
- TAKE TO ANALYZE Dx
- DROP Rx since it is 'Unknown'
- DROP Hx since it is 'Unknown'
- DROP Sx since it is 'Unknown'

In [None]:
from pyspark.sql.functions import col

field_prefixes = ['OffsetPar', 'Registro', 'AmplitudeUnit', 'ECGRapprSignal']

distinct_values_dfs = []

for prefix in field_prefixes:
    columns_to_select = [col(f"{prefix}{i}") for i in range(1, 13)]

    distinct_values_df = hea_df.select(*columns_to_select).distinct()

    distinct_values_dfs.append(distinct_values_df)

check_df = distinct_values_dfs[0]
for i in range(1, len(distinct_values_dfs)):
    check_df = check_df.crossJoin(distinct_values_dfs[i])

check_df.show(truncate=False)

+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------------+----------------+----------------+
|OffsetPar1|OffsetPar2|OffsetPar3|OffsetPar4|OffsetPar5|OffsetPar6|OffsetPar7|OffsetPar8|OffsetPar9|OffsetPar10|OffsetPar11|OffsetPar12|Registro1|Registro2|Registro3|Registro4|Registro5|Registro6|Registro7|Registro8|Registro9|Registro10|Registro11|Registro12|AmplitudeUnit1|AmplitudeUnit2|AmplitudeUnit3|AmplitudeUnit4|AmplitudeUnit5|AmplitudeUnit6|AmplitudeUnit

### Quick data cleaning
From an analysis of the output can be clearly seen that all the colums analyzed are, for group of columns, constant in the records. The decision is to drop them.
A different situation is given by the AmplitudeUnit that is of course contant in each record, but has 2 distinct value.


---


Considering the duration of the recording and the amplitude unit, the decision is to drop the ones that don't respect the standard for the correct ECG representation and recording. In particular, records with different values the from 5000ms as DurationRecordings and 200/mV as AmplitudeUnit

In [None]:
hea_df = hea_df.select(
    'NomeFile',
    'DurationRecordings',
    'AmplitudeUnit1',
    'StartingValue1',
    'StartingValue2',
    'StartingValue3',
    'StartingValue4',
    'StartingValue5',
    'StartingValue6',
    'StartingValue7',
    'StartingValue8',
    'StartingValue9',
    'StartingValue10',
    'StartingValue11',
    'StartingValue12',
    'SumValues1',
    'SumValues2',
    'SumValues3',
    'SumValues4',
    'SumValues5',
    'SumValues6',
    'SumValues7',
    'SumValues8',
    'SumValues9',
    'SumValues10',
    'SumValues11',
    'SumValues12',
    'Age',
    'Sex',
    'Dx',
    'Mat')

hea_df = hea_df.filter((col('DurationRecordings') == 5000) & (col('AmplitudeUnit1') == '200/mV'))
hea_df = hea_df.drop('DurationRecordings', 'AmplitudeUnit1')

In [None]:
hea_df.show()


+--------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+---+-------+--------------------+-----------+
|NomeFile|StartingValue1|StartingValue2|StartingValue3|StartingValue4|StartingValue5|StartingValue6|StartingValue7|StartingValue8|StartingValue9|StartingValue10|StartingValue11|StartingValue12|SumValues1|SumValues2|SumValues3|SumValues4|SumValues5|SumValues6|SumValues7|SumValues8|SumValues9|SumValues10|SumValues11|SumValues12|Age|    Sex|                  Dx|        Mat|
+--------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+----------+----------+----------+----------+----------+----

In [None]:
hea_df

DataFrame[NomeFile: string, StartingValue1: string, StartingValue2: string, StartingValue3: string, StartingValue4: string, StartingValue5: string, StartingValue6: string, StartingValue7: string, StartingValue8: string, StartingValue9: string, StartingValue10: string, StartingValue11: string, StartingValue12: string, SumValues1: string, SumValues2: string, SumValues3: string, SumValues4: string, SumValues5: string, SumValues6: string, SumValues7: string, SumValues8: string, SumValues9: string, SumValues10: string, SumValues11: string, SumValues12: string, Age: string, Sex: string, Dx: string, Mat: string]

In [None]:
hea_df.count()

2175

### Add file mat

In [None]:
def create_dict (array):
    diz = {}
    diz["Mat"] = array[0]
    for i in range(1,len(array)):
        diz["Der"+str(i)] = array[i].tolist()
    return diz


In [None]:
matArray = matArray.map(lambda row: create_dict(row))

In [None]:
matArray = matArray.map(lambda x: Row(**x))

In [None]:
mat_df = spark.createDataFrame(matArray)

In [None]:
mat_df.limit(2).show()

+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        Mat|                Der1|                Der2|                Der3|                Der4|                Der5|                Der6|                Der7|                Der8|                Der9|               Der10|               Der11|               Der12|
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|HR09050.mat|[65, 65, 65, 65, ...|[-20, -20, -20, -...|[-85, -85, -85, -...|[-22, -22, -22, -...|[75, 75, 75, 75, ...|[-52, -52, -52, -...|[95, 95, 95, 95, ...|[-20, -20, -20, -...|[30, 30, 30, 30, ...|

In [None]:
mat_df.count()

2559

### Join hea and mat dataframes to get the final dataframe

In [None]:
hea_df.createOrReplaceTempView("hea")
mat_df.createOrReplaceTempView("mat")

df_merged = sqlCtx.sql("""SELECT * FROM hea h, mat m WHERE h.Mat = m.Mat""")

In [None]:
df_merged.limit(2).show()

+--------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+---+-------+--------------------+-----------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|NomeFile|StartingValue1|StartingValue2|StartingValue3|StartingValue4|StartingValue5|StartingValue6|StartingValue7|StartingValue8|StartingValue9|StartingValue10|StartingValue11|StartingValue12|SumValues1|SumValues2|SumValues3|SumValues4|SumValues5|SumValues6|SumValues7|SumValues8|SumValues9|SumValues10|SumValues11|SumValues12|Age|    Sex|                  Dx| 

In [None]:
df_merged.count()

2175

In [None]:
df_merged = df_merged.drop('Mat')

In [None]:
df_merged.columns

['NomeFile',
 'StartingValue1',
 'StartingValue2',
 'StartingValue3',
 'StartingValue4',
 'StartingValue5',
 'StartingValue6',
 'StartingValue7',
 'StartingValue8',
 'StartingValue9',
 'StartingValue10',
 'StartingValue11',
 'StartingValue12',
 'SumValues1',
 'SumValues2',
 'SumValues3',
 'SumValues4',
 'SumValues5',
 'SumValues6',
 'SumValues7',
 'SumValues8',
 'SumValues9',
 'SumValues10',
 'SumValues11',
 'SumValues12',
 'Age',
 'Sex',
 'Dx',
 'Der1',
 'Der2',
 'Der3',
 'Der4',
 'Der5',
 'Der6',
 'Der7',
 'Der8',
 'Der9',
 'Der10',
 'Der11',
 'Der12']