# Create Demographic Features

#### Configure Spark and import all necessary libraries

In [1]:
%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
         }
}

In [2]:
sc.install_pypi_package("pandas")
sc.install_pypi_package('boto3')

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1618618538590_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas
  Downloading https://files.pythonhosted.org/packages/51/51/48f3fc47c4e2144da2806dfb6629c4dd1fa3d5a143f9652b141e979a8ca9/pandas-1.2.4-cp37-cp37m-manylinux1_x86_64.whl (9.9MB)
Collecting python-dateutil>=2.7.3 (from pandas)
  Downloading https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl (227kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.2.4 python-dateutil-2.8.1

Collecting boto3
  Downloading https://files.pythonhosted.org/packages/62/b3/8c889dd3d5ae47a9c4468cc20ef980adc4a16f06f0937ab33f78b58b5eda/boto3-1.17.53-py2.py3-none-any.whl (131kB)
Collecting botocore<1.21.0,>=1.20.53 (from boto3)
  Downloading https://files.pythonhosted.org/packages/92/4e/232e261b739534e216f28d935a06c44840221c3476ebcdb411cd0fc2bf16/botocore-1.20.53-py2.py3-none-any.whl (7.4MB)
Collecting s3transfer<0.4.0,>=0.3.0 (from boto3)
  Downloading https://file

In [5]:
import pandas as pd
import boto3
from pyspark.sql import functions as F
from pyspark.sql.window import Window

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Pull the Data

In [25]:
df_admissions=spark.read.parquet('s3://mimic-iii-physionet/parquet/ADMISSIONS/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
df_admissions.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['row_id', 'subject_id', 'hadm_id', 'admittime', 'dischtime', 'deathtime', 'admission_type', 'admission_location', 'discharge_location', 'insurance', 'language', 'religion', 'marital_status', 'ethnicity', 'edregtime', 'edouttime', 'diagnosis', 'hospital_expire_flag', 'has_chartevents_data']

In [27]:
df_admissions.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

58976

In [12]:
df_patients=spark.read.parquet('s3://mimic-iii-physionet/parquet/PATIENTS/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
df_patients.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['row_id', 'subject_id', 'gender', 'dob', 'dod', 'dod_hosp', 'dod_ssn', 'expire_flag']

#### Filter the Data for Relevant HADM IDs

In [21]:
bucket='allstays' # Or whatever you called your bucket
data_key = 'all_stays.csv' # Where the file is within your bucket
data_location = 's3://{}/{}'.format(bucket, data_key)
df = spark.read.option("header",True).csv(data_location)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
df_hadm = df.select("SUBJECT_ID","HADM_ID").withColumnRenamed("SUBJECT_ID","subject_id").withColumnRenamed("HADM_ID","hadm_id")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
df_admissions=df_admissions.select("*").join(df_hadm,["hadm_id","subject_id"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
df_admissions.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

42276

#### Add Grouped Columns

In [30]:
df_admissions=df_admissions.withColumn("ethnicity_group",F.when(F.col("ethnicity").like("%WHITE%"),"White")
                      .when(F.col("ethnicity").like("%ASIAN%"),"Asian")
                      .when(F.col("ethnicity").like("%BLACK%"),"Black")
                      .when(F.col("ethnicity").like("%HISPANIC%"),"Hispanic")
                      .when(F.col("ethnicity").like("%UNKNOWN%"),"Unknown")
                      .when(F.col("ethnicity").like("%UNABLE%"),"Unknown")
                      .when(F.col("ethnicity").like("%DECLINED%"),"Unknown")
                      .otherwise("Other"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
df_admissions=df_admissions.withColumn("religion_group",F.when(F.col("religion").like("%NOT%"),"Unknown")
                      .when(F.col("religion").like("%UNOBTAINABLE%"),"Unknown")
                      .when(F.col("religion").like("%CATHOLIC%"),"Catholic")
                      .when(F.col("religion").like("%PROTESTANT%"),"Protestant Quaker")
                      .when(F.col("religion").like("%JEWISH%"),"Jewish")
                      .when(F.col("religion").like("%EPISCOPALIAN%"),"Episcopalian")
                      .otherwise("Other"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
df_admissions=df_admissions.withColumn("marital_status_group",F.when(F.col("marital_status").like("%MARRIED%"),"Married")
                      .when(F.col("marital_status").like("%SINGLE%"),"Single")
                      .when(F.col("marital_status").like("%WIDOW%"),"Widowed")
                      .when(F.col("marital_status").like("%DIVORCE%"),"Divorced")
                      .when(F.col("marital_status").like("%SEPARATE%"),"Separated")
                      .otherwise("Other"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
#Join with patients table to get gender
df_admin_patients = df_admissions.join(df_patients,['subject_id'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Create Final CSV

In [34]:
df_csv=df_admin_patients.select('subject_id','hadm_id','insurance','ethnicity_group','marital_status_group','gender','admittime').toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
df_csv['episode_count']=df_csv.groupby(["subject_id"])['admittime'].rank()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
df_csv['filename']=df_csv.apply(lambda x: str(x["subject_id"])+"_episode"+str(int(x['episode_count']))+"_timeseries.csv",axis=1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
df_csv

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

       subject_id  hadm_id  ... episode_count                       filename
0              22   165315  ...           1.0     22_episode1_timeseries.csv
1              23   152223  ...           1.0     23_episode1_timeseries.csv
2              23   124321  ...           2.0     23_episode2_timeseries.csv
3              24   161859  ...           1.0     24_episode1_timeseries.csv
4              25   129635  ...           1.0     25_episode1_timeseries.csv
...           ...      ...  ...           ...                            ...
42271       98800   191113  ...           1.0  98800_episode1_timeseries.csv
42272       98802   101071  ...           1.0  98802_episode1_timeseries.csv
42273       98805   122631  ...           1.0  98805_episode1_timeseries.csv
42274       98813   170407  ...           1.0  98813_episode1_timeseries.csv
42275       98813   190264  ...           2.0  98813_episode2_timeseries.csv

[42276 rows x 9 columns]

In [38]:
from io import StringIO # python3; python2: BytesIO 

bucket = 'allstays' # already created on S3
csv_buffer = StringIO()
df_csv.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'demographic_data.csv').put(Body=csv_buffer.getvalue())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'ResponseMetadata': {'RequestId': 'TR545EQ3Y4XYD837', 'HostId': 'u6KGk4KN2l+y5UaQ9VUER1i1cb+82di//p2VeUXf8wRgP3sUFw9lvC0sIhsbRqj86jGdl2FrR9k=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'u6KGk4KN2l+y5UaQ9VUER1i1cb+82di//p2VeUXf8wRgP3sUFw9lvC0sIhsbRqj86jGdl2FrR9k=', 'x-amz-request-id': 'TR545EQ3Y4XYD837', 'date': 'Sat, 17 Apr 2021 00:41:33 GMT', 'etag': '"a89d18c7a5e457d0eccc698940683674"', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'ETag': '"a89d18c7a5e457d0eccc698940683674"'}