text.replace("\\", "/")

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
spark

In [81]:
parent_path = 'D:/skola√Ör5/MasterThesis/mimic-iv-1.0/'

In [82]:
codes_icd_path = parent_path + '/hosp/diagnoses_icd.csv'
diagnose_codes_icd = spark.read.load(codes_icd_path, format="csv", sep=",", inferSchema="true", header="true")

In [83]:
diagnoses_codes_path = parent_path + '/hosp/d_icd_diagnoses.csv'
diagnoses_codes = spark.read.load(diagnoses_codes_path,  format="csv", sep=",", inferSchema="true", header="true").dropDuplicates() 

In [84]:
#diagnose_codes_icd.select('subject_id').distinct().count()

In [85]:
patients_path = parent_path + '/core/patients.csv'
patients = spark.read.load(patients_path,  format="csv", sep=",", inferSchema="true", header="true")  

In [86]:
admission_path = parent_path + '/core/admissions.csv'
admissions = spark.read.load(admission_path,  format="csv", sep=",", inferSchema="true", header="true")  

In [87]:
#admissions.select('subject_id').distinct().count()

In [88]:
admissions_ = admissions.withColumnRenamed('subject_id', 'subject_id2')
patients_admissions = patients.join(admissions_, patients.subject_id == admissions_.subject_id2, 'inner').drop('subject_id2')

In [89]:
from pyspark.sql.functions import udf,col

In [90]:
diagnoses_codes_ = diagnoses_codes.withColumnRenamed('icd_code', 'code').withColumnRenamed('icd_version', 'version')

patients_diagnoses = diagnose_codes_icd.join(diagnoses_codes_, (diagnose_codes_icd.icd_code == diagnoses_codes_.code) & (diagnose_codes_icd.icd_version == diagnoses_codes_.version), "inner").drop('code', 'version')

In [91]:
patients_diagnoses = patients_diagnoses.withColumnRenamed('subject_id', 'sub2').withColumnRenamed('hadm_id', 'h_id')

In [92]:
final_data = patients_admissions.join(patients_diagnoses, ((patients_admissions.subject_id == patients_diagnoses.sub2) & (patients_admissions.hadm_id == patients_diagnoses.h_id)) , 'inner').drop('sub2', 'h_id')

In [93]:
final_data.show()

+----------+------+----------+-----------+-----------------+-------------------+--------+-------------------+-------------------+-------------------+-----------------+--------------------+------------------+---------+--------+--------------+---------+-------------------+-------------------+--------------------+-------+--------+-----------+--------------------+
|subject_id|gender|anchor_age|anchor_year|anchor_year_group|                dod| hadm_id|          admittime|          dischtime|          deathtime|   admission_type|  admission_location|discharge_location|insurance|language|marital_status|ethnicity|          edregtime|          edouttime|hospital_expire_flag|seq_num|icd_code|icd_version|          long_title|
+----------+------+----------+-----------+-----------------+-------------------+--------+-------------------+-------------------+-------------------+-----------------+--------------------+------------------+---------+--------+--------------+---------+-------------------+---

In [94]:
needed_columns = set(['subject_id', 'anchor_age', 'hadm_id', 'admittime', 'dischtime', 'icd_code', 'icd_version', 'long_title'])

In [95]:
columns_ = [c for c in final_data.columns if c not in needed_columns]

In [96]:
columns_

['gender',
 'anchor_year',
 'anchor_year_group',
 'dod',
 'deathtime',
 'admission_type',
 'admission_location',
 'discharge_location',
 'insurance',
 'language',
 'marital_status',
 'ethnicity',
 'edregtime',
 'edouttime',
 'hospital_expire_flag',
 'seq_num']

In [97]:
new = final_data.drop(*columns_).na.drop()

In [98]:
new.show()

+----------+----------+--------+-------------------+-------------------+--------+-----------+--------------------+
|subject_id|anchor_age| hadm_id|          admittime|          dischtime|icd_code|icd_version|          long_title|
+----------+----------+--------+-------------------+-------------------+--------+-----------+--------------------+
|  10003637|        57|23487925|2146-01-22 23:08:00|2146-01-26 14:02:00|   I2582|         10|Chronic total occ...|
|  10003637|        57|23487925|2146-01-22 23:08:00|2146-01-26 14:02:00|    I252|         10|Old myocardial in...|
|  10003637|        57|23487925|2146-01-22 23:08:00|2146-01-26 14:02:00|    K611|         10|      Rectal abscess|
|  10003637|        57|23487925|2146-01-22 23:08:00|2146-01-26 14:02:00|   I2510|         10|Atherosclerotic h...|
|  10003637|        57|23487925|2146-01-22 23:08:00|2146-01-26 14:02:00|    I509|         10|Heart failure, un...|
|  10003637|        57|23487925|2146-01-22 23:08:00|2146-01-26 14:02:00|    Z951

In [100]:
new1 = new.select('subject_id', 'anchor_age','hadm_id' ,'icd_code', 'icd_version').filter('icd_version == 10')

In [101]:
new1.show()

+----------+----------+--------+--------+-----------+
|subject_id|anchor_age| hadm_id|icd_code|icd_version|
+----------+----------+--------+--------+-----------+
|  10003637|        57|23487925|   I2582|         10|
|  10003637|        57|23487925|    I252|         10|
|  10003637|        57|23487925|    K611|         10|
|  10003637|        57|23487925|   I2510|         10|
|  10003637|        57|23487925|    I509|         10|
|  10003637|        57|23487925|    Z951|         10|
|  10003637|        57|23487925|    Z950|         10|
|  10003637|        57|23487925|   Z8673|         10|
|  10003637|        57|23487925|  F17210|         10|
|  10003637|        57|23487925|    E785|         10|
|  10003637|        57|23487925|    I255|         10|
|  10004720|        61|22081550|    I959|         10|
|  10004720|        61|22081550|    L570|         10|
|  10004720|        61|22081550|   Z8674|         10|
|  10004720|        61|22081550|    Y848|         10|
|  10004720|        61|22081

# BEHRT 

In [102]:
import pyspark.sql.functions as F
from pyspark.sql import Window

In [103]:
new2 = new1.groupby(['subject_id', 'hadm_id']).agg(F.collect_list('icd_code').alias('icd_code'), F.collect_list('anchor_age').alias('age'))

In [104]:
new2.show()

+----------+--------+--------------------+--------------------+
|subject_id| hadm_id|            icd_code|                 age|
+----------+--------+--------------------+--------------------+
|  10003637|23487925|[I2582, I252, K61...|[57, 57, 57, 57, ...|
|  10004720|22081550|[I959, L570, Z867...|[61, 61, 61, 61, ...|
|  10006457|23963539|[Z87891, I69351, ...|[57, 57, 57, 57, ...|
|  10022373|22567635|[G893, E039, R197...|[60, 60, 60, 60, ...|
|  10027100|28267047|[B1920, F19129, F...|        [46, 46, 46]|
|  10051043|26948064|[R42, I63542, I63...|[60, 60, 60, 60, ...|
|  10059041|22386234|[Z7902, I10, Z966...|[65, 65, 65, 65, ...|
|  10066102|21912366|[T18128A, K209, N...|[82, 82, 82, 82, ...|
|  10071930|29524322|        [Z23, Z3800]|              [0, 0]|
|  10122428|25017197|[I10, Z7902, M415...|[84, 84, 84, 84, ...|
|  10124189|23921797|[F17290, K8510, I...|[18, 18, 18, 18, 18]|
|  10124544|29966384|[K5790, F0280, G3...|[91, 91, 91, 91, ...|
|  10138124|21798709|[I2782, I201, Z79..

In [105]:
new4 = new2.groupby(['subject_id']).agg(F.collect_list('icd_code').alias('icd_code'), F.collect_list('age').alias('age'))

In [109]:
new4.show()

+----------+--------------------+--------------------+
|subject_id|            icd_code|                 age|
+----------+--------------------+--------------------+
|  10028314|[[Z3800, P2912, Z...|[[0, 0, 0, 0, 0, ...|
|  10052351|   [[R0789, F10129]]|          [[56, 56]]|
|  10092012|[[Z051, Z23, Z3800]]|         [[0, 0, 0]]|
|  10092020|[[Z87891, Z8546, ...|[[69, 69, 69, 69,...|
|  10126895|[[Z30430, O80, Z3...|  [[24, 24, 24, 24]]|
|  10145339|[[Z3800, Z23], [P...|       [[0, 0], [0]]|
|  10150136|[[D72829, R0682, ...|[[61, 61, 61, 61,...|
|  10188507|[[P590], [Z3800, ...| [[0], [0, 0, 0, 0]]|
|  10229726|[[G7000, R7989, M...|[[63, 63, 63, 63,...|
|  10236661|[[K5730, F22, G40...|[[79, 79, 79, 79,...|
|  10246179|[[R45851, F17210,...|[[20, 20, 20, 20,...|
|  10249110|[[I8510, Z8673, E...|[[51, 51, 51, 51,...|
|  10266640|[[Z3800, Z051, Q6...|      [[0, 0, 0, 0]]|
|  10275889|[[Z23, Z3801, P030]]|         [[0, 0, 0]]|
|  10296904|[[Z390], [O701, E...|[[28], [28, 28, 2...|
|  1030163

### Test 

In [None]:
new2 = new1.groupby(['subject_id', 'hadm_id']).agg(F.collect_list('icd_code').alias('icd_code'), F.collect_list('anchor_age').alias('age'))

In [120]:
new3 = new2.withColumn("icd_code",F.concat(F.col('icd_code'), F.array(F.lit('SEP')))).withColumn('age', F.concat(F.col('age'), F.array(F.lit('SEP'))))

In [124]:
new4 = new3.withColumn('icd_code', F.concat_ws(",", F.col('icd_code')))
new4 = new4.withColumn('age', F.concat_ws(",", F.col('age')))

In [133]:
new5 = new4.groupby(['subject_id']).agg(F.collect_list('icd_code').alias('icd_code'), F.collect_list('age').alias('age'))\
.withColumn('icd_code', F.concat_ws(',', F.col('icd_code'))).withColumn('age', F.concat_ws(',', F.col('age')))

In [134]:
new5.show()

+----------+--------------------+--------------------+
|subject_id|            icd_code|                 age|
+----------+--------------------+--------------------+
|  10028314|Z3800,P2912,Z23,Q...|0,0,0,0,0,0,0,0,0...|
|  10052351|    R0789,F10129,SEP|           56,56,SEP|
|  10092012|  Z051,Z23,Z3800,SEP|           0,0,0,SEP|
|  10092020|Z87891,Z8546,Z790...|69,69,69,69,69,69...|
|  10126895|Z30430,O80,Z3A39,...|     24,24,24,24,SEP|
|  10145339|Z3800,Z23,SEP,P55...|       0,0,SEP,0,SEP|
|  10150136|D72829,R0682,K298...|61,61,61,61,61,61...|
|  10188507|P590,SEP,Z3800,P5...|   0,SEP,0,0,0,0,SEP|
|  10229726|G7000,R7989,M2137...|63,63,63,63,63,63...|
|  10236661|K5730,F22,G40209,...|79,79,79,79,79,79...|
|  10246179|R45851,F17210,F41...|20,20,20,20,20,SE...|
|  10249110|I8510,Z8673,E46,F...|51,51,51,51,51,51...|
|  10266640|Z3800,Z051,Q6502,...|         0,0,0,0,SEP|
|  10275889|  Z23,Z3801,P030,SEP|           0,0,0,SEP|
|  10296904|Z390,SEP,O701,E03...|28,SEP,28,28,28,2...|
|  1030163

In [131]:
new4.filter('subject_id == 10367068').show()

+----------+--------+--------------------+---------------+
|subject_id| hadm_id|            icd_code|            age|
+----------+--------+--------------------+---------------+
|  10367068|29462361|B600,I10,D598,R16...|48,48,48,48,SEP|
+----------+--------+--------------------+---------------+



In [132]:
new5.filter('subject_id == 10145339').collect()

[Row(subject_id=10145339, icd_code=['Z3800,Z23,SEP', 'P551,SEP'], age=['0,0,SEP', '0,SEP'])]

In [121]:
new3.show()

+----------+--------+--------------------+--------------------+
|subject_id| hadm_id|            icd_code|                 age|
+----------+--------+--------------------+--------------------+
|  10003637|23487925|[I2582, I252, K61...|[57, 57, 57, 57, ...|
|  10004720|22081550|[I959, L570, Z867...|[61, 61, 61, 61, ...|
|  10006457|23963539|[Z87891, I69351, ...|[57, 57, 57, 57, ...|
|  10022373|22567635|[G893, E039, R197...|[60, 60, 60, 60, ...|
|  10027100|28267047|[B1920, F19129, F...|   [46, 46, 46, SEP]|
|  10051043|26948064|[R42, I63542, I63...|[60, 60, 60, 60, ...|
|  10059041|22386234|[Z7902, I10, Z966...|[65, 65, 65, 65, ...|
|  10066102|21912366|[T18128A, K209, N...|[82, 82, 82, 82, ...|
|  10071930|29524322|   [Z23, Z3800, SEP]|         [0, 0, SEP]|
|  10122428|25017197|[I10, Z7902, M415...|[84, 84, 84, 84, ...|
|  10124189|23921797|[F17290, K8510, I...|[18, 18, 18, 18, ...|
|  10124544|29966384|[K5790, F0280, G3...|[91, 91, 91, 91, ...|
|  10138124|21798709|[I2782, I201, Z79..

In [107]:
new4.select('subject_id').count()

107704

In [136]:
dd = spark.read.load('dataset.csv',  format="csv", sep=",", inferSchema="true", header="true")  

In [137]:
dd.show()

+----------+--------------------+--------------------+
|subject_id|            icd_code|                 age|
+----------+--------------------+--------------------+
|  10002559| 462,78060,34831,SEP|        21,21,21,SEP|
|  10012942|34600,80701,4011,...|45,45,45,45,45,45...|
|  10028314|Z3800,P2912,Z23,Q...|0,0,0,0,0,0,0,0,0...|
|  10052351|    R0789,F10129,SEP|           56,56,SEP|
|  10066704|     30500,78097,SEP|           19,19,SEP|
|  10076765|76528,V3000,77081...|0,0,0,0,0,0,0,0,0...|
|  10085296|      V3000,V053,SEP|             0,0,SEP|
|  10092012|  Z051,Z23,Z3800,SEP|           0,0,0,SEP|
|  10092020|Z87891,Z8546,Z790...|69,69,69,69,69,69...|
|  10116201|2102,V1582,5272,0...|  57,57,57,57,57,SEP|
|  10117508|V103,E8490,V0751,...|30,30,30,30,30,30...|
|  10126895|Z30430,O80,Z3A39,...|     24,24,24,24,SEP|
|  10128450|     78901,78702,SEP|           24,24,SEP|
|  10129784|79099,30520,2713,...|  24,24,24,24,24,SEP|
|  10132888|2760,34982,49121,...|89,89,89,89,89,89...|
|  1013881