# Clinical Outcome Prediction from Admission Notes
## Model training

This repository contains source code for the task creation and experiments from our paper [Clinical Outcome Prediction from Admission Notes using Self-Supervised Knowledge Integration](https://arxiv.org/abs/2102.04110), to appear at EACL 2021.

## Install Required Dependencies

In [3]:
!pip install tqdm -q
# !pip install -U sagemaker -q

In [4]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/ec2-user/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

## Prepare data

In [5]:
import boto3
import logging
import io
import os
import tarfile
from pathlib import Path
from tqdm import tqdm
from botocore.client import Config

In [6]:
DATA_DIR = "data"
BUCKET = 'mimic-data-for-demo'
MIMIC = 'mimic-iii-physionet'
MIMIC_DIR = os.path.join(DATA_DIR,MIMIC)
TAR_FILE = 'mimic3minimal.tar.gz'
DIR_TO_SAVE_DATA = 'export'

In [7]:
Path(DATA_DIR).mkdir(parents=True,exist_ok=True)
s3 = boto3.client('s3')

### Download data => *optional*

In [8]:
if not Path(f'{DATA_DIR}/{TAR_FILE}').exists():
    s3.download_file(BUCKET, TAR_FILE, f"{DATA_DIR}/{TAR_FILE}")
else:
    print('file already exists')

file already exists


### Extract tar.gz and sync with s3 => *optional*

In [12]:
tar = tarfile.open(name=f'{DATA_DIR}/{TAR_FILE}')
tar.extractall(f'{DATA_DIR}')
tar.close()

In [28]:
# !aws s3 sync {DATA_DIR}/{MIMIC_DIR}/ s3://{BUCKET}/unzipped/
path = 'data'
print(os.path.join(path, "User/Desktop", "file.txt"))

data/User/Desktop/file.txt


In [29]:
# Create path to task data
os.makedirs(path+'/train', exist_ok=True)

### Create a Task Dictionary

In [9]:
tasks = {
    "mortality-prediction": {
        'path_to_script': 'tasks/mp/mp.py', 
        'task_name': 'MP_IN',
        'config_file': 'configs/example_config_mp.yaml'
    },
    "length-of-stay": {
        'path_to_script': 'tasks/los/los.py', 
        'task_name': 'LOS_WEEKS',
        'config_file': 'configs/example_config_los.yaml'
    },
    "diagnoses-3-digits": {
        'path_to_script': 'tasks/dia/dia.py', 
        'task_name': 'DIA_GROUPS_3_DIGITS',
        'config_file': 'configs/example_config_dia_3.yaml'
    },
    "diagnoses-plus-icd-hierarchy": {
        'path_to_script': 'tasks/dia/dia_plus.py', 
        'task_name': 'DIA_PLUS',
        'config_file': 'configs/example_config_dia_plus.yaml'
    },
    "procedures-3-digits": {
        'path_to_script': 'tasks/pro/pro.py', 
        'task_name': 'PRO_GROUPS_3_DIGITS',
        'config_file': 'configs/example_config_pro_3.yaml'
    },
    "procedures-plus-icd-hierarchy": {
        'path_to_script': 'tasks/pro/pro_plus.py', 
        'task_name': 'PRO_PLUS',
        'config_file': 'configs/example_config_pro_plus.yaml'
    },
}

### Create training and validation datasets using SageMaker Processing

#### Start a new SM session and initiate SKLearnProcessor
While initiating SKlearnProcessor, we will specify the following:
* framework version
* instance type
* instance count

In [15]:
import boto3
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

region = boto3.session.Session().region_name

role = get_execution_role()
sklearn_processor = SKLearnProcessor(base_job_name='clinical-outcome-processing',
                                     framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.m5.2xlarge',
                                     instance_count=1)

### Prepare Data for Mortality Prediction

In [36]:
selected_task = "mortality-prediction"
processing_script = tasks[selected_task]['path_to_script']

#### Specify bucket to be used for staging data

In [11]:
bucket = 'mimic-data-for-demo'
prefix = 'unzipped'

The processing job will require the following parameters:
- `ProcessingInput`: A ProcessingInput object specifying the S3 bucket an prefix to source the data from
- `ProcessingOutput`: A ProcessingOutput object specifying the S3 bucket an prefix to source the data from

Arguments:
- `admission_only`: Filter parts of Discharge Summary that are not known at Admission. `True` => Create simulated Admission Notes, `False` => Keep complete Discharge Summaries

In [30]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(code=processing_script,
                      inputs=[ProcessingInput(
                        source='s3://'+bucket+'/'+ prefix,
                        destination='/opt/ml/processing/input')],
                      outputs=[ProcessingOutput(output_name='processed',
                                                source='/opt/ml/processing/output',
                                               destination='s3://'+bucket+'/processed')],
                      arguments=['--admission-only', 'True']
                     )


Job Name:  clinical-outcome-processing-2021-05-07-01-23-37-567
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://mimic-data-for-demo/unzipped', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-2-468574038824/clinical-outcome-processing-2021-05-07-01-23-37-567/input/code/mp.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'processed', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://mimic-data-for-demo/processed', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
[34mA value is trying to be set on a copy of a slice from a DataFrame.[0m
[34mTry using .loc[row_indexer,col_indexer

In [34]:
preprocessing_job_description = sklearn_processor.jobs[-1].describe()

output_config = preprocessing_job_description['ProcessingOutputConfig']
for output in output_config['Outputs']:
    if output['OutputName'] == 'processed':
        preprocessed_training_data = output['S3Output']['S3Uri']

### Prepare data for Length of Stay

In [12]:
selected_task = "length-of-stay"
processing_script = tasks[selected_task]['path_to_script']

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(code=processing_script,
                      inputs=[ProcessingInput(
                        source='s3://'+bucket+'/'+ prefix,
                        destination='/opt/ml/processing/input')],
                      outputs=[ProcessingOutput(output_name='processed',
                                                source='/opt/ml/processing/output',
                                               destination='s3://'+bucket+'/processed')],
                      arguments=['--admission-only', 'True']
                     )


Job Name:  clinical-outcome-processing-2021-05-11-17-57-11-573
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://mimic-data-for-demo/unzipped', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-2-468574038824/clinical-outcome-processing-2021-05-11-17-57-11-573/input/code/los.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'processed', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://mimic-data-for-demo/processed', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
................

### Prepare Data for Procedure (3 Digits)

In [20]:
selected_task = "procedures-3-digits"
processing_script = tasks[selected_task]['path_to_script']

In [26]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(code=processing_script,
                      inputs=[ProcessingInput(
                        source='s3://'+bucket+'/'+ prefix,
                        destination='/opt/ml/processing/input')],
                      outputs=[ProcessingOutput(output_name='processed',
                                                source='/opt/ml/processing/output',
                                               destination='s3://'+bucket+'/processed')],
                      arguments=['--admission-only', 'True']
                     )


Job Name:  clinical-outcome-processing-2021-05-17-11-02-18-763
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://mimic-data-for-demo/unzipped', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-2-468574038824/clinical-outcome-processing-2021-05-17-11-02-18-763/input/code/pro.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'processed', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://mimic-data-for-demo/processed', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
[34mA value is trying to be set on a copy of a slice from a DataFrame.[0m
[34mTry using .loc[row_indexer,col_indexe

## Sagemaker Training Job

### Configuration

In [27]:
import sagemaker as sage
from time import gmtime, strftime

sess = sage.Session()

In [28]:
bucket = 'mimic-data-for-demo'
prefix = 'processed'

In [30]:
train_config = sage.inputs.TrainingInput(f's3://{bucket}/{prefix}', content_type='text/csv')
val_config = sage.inputs.TrainingInput(f's3://{bucket}/{prefix}/val/', content_type='text/csv')
test_config = sage.inputs.TrainingInput(f's3://{bucket}/{prefix}/test/', content_type='text/csv')

In [31]:
hyperparameters = {
    'task_config': tasks[selected_task]['config_file'],
    'model_name_or_path': "distilbert-base-uncased",
    'cache_dir': "cache_dir",
    'epochs': 200,
    'batch_size': 5,
}

In [48]:
import os, subprocess

# instance_type = 'local'
instance_type = 'ml.p3.8xlarge'
instance_count = 1

if instance_type is 'local' and subprocess.call('nvidia-smi') == 0:
    ## Set type to GPU if one is present
    instance_type = 'local_gpu'
    
print("Instance type = " + instance_type)

Instance type = ml.p3.8xlarge


In [49]:
account = sess.boto_session.client('sts').get_caller_identity()['Account']
region = sess.boto_session.region_name

from sagemaker.pytorch import PyTorch

estimator = PyTorch(
    entry_point='doc_classification.py',
    source_dir='./experiments',
    role=role,
    instance_type=instance_type,
    instance_count=instance_count,
    base_job_name=f'clinical-prediction-{selected_task}',
    framework_version='1.8.0',
    py_version='py36',
    hyperparameters=hyperparameters)

In [50]:
estimator.fit({'train': train_config, 'val': val_config, 'test': test_config}, wait=False)

In [202]:
import sagemaker, uuid

sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

role = sagemaker.get_execution_role()
sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

sagemaker role arn: arn:aws:iam::468574038824:role/service-role/AmazonSageMaker-ExecutionRole-20210421T150707
sagemaker bucket: sagemaker-eu-west-2-468574038824
sagemaker session region: eu-west-2


### Checkpointing

In [203]:
bucket = sagemaker_session_bucket
checkpoint_suffix = str(uuid.uuid4())[:8]
checkpoint_s3_path = 's3://{}/checkpoint-{}'.format(bucket, checkpoint_suffix)
print(f'Checkpointing path: {checkpoint_s3_path}')

Checkpointing path: s3://sagemaker-eu-west-2-468574038824/checkpoint-0c70892a


### Upload data to s3
We use the ```sagemaker.Session.upload_data``` function to upload our datasets to an S3 location. The return value inputs identifies the location -- we will use this later when we start the training job.

In [204]:
task_name = tasks[selected_task]['task_name']
prefix = f'sagemaker/clinical-prediction-{selected_task}'

In [205]:
for x in ['train','val','test']:
    path = [x for x in Path(DIR_TO_SAVE_DATA).glob(f'{task_name}*{x}.csv')][-1]
    print(sess.upload_data(str(path), bucket, prefix + f'/{x}'))

s3://sagemaker-eu-west-2-468574038824/sagemaker/clinical-prediction-diagnoses-plus-icd-hierarchy/train/DIA_PLUS_adm_train.csv
s3://sagemaker-eu-west-2-468574038824/sagemaker/clinical-prediction-diagnoses-plus-icd-hierarchy/val/DIA_PLUS_adm_val.csv
s3://sagemaker-eu-west-2-468574038824/sagemaker/clinical-prediction-diagnoses-plus-icd-hierarchy/test/DIA_PLUS_adm_test.csv


In [206]:
for x in Path(DIR_TO_SAVE_DATA).glob(f'*.txt'): 
    sess.upload_data(str(x), bucket, prefix + f'/train')

In [207]:
train_config = sagemaker.inputs.TrainingInput(f's3://{bucket}/{prefix}/train/', content_type='text/csv')
val_config = sagemaker.inputs.TrainingInput(f's3://{bucket}/{prefix}/val/', content_type='text/csv')
test_config = sagemaker.inputs.TrainingInput(f's3://{bucket}/{prefix}/test/', content_type='text/csv')

### Managed Spot Training

In [208]:
use_spot_instances = False
max_run = 86400
max_wait = 3600 if use_spot_instances else 1

In [209]:
import os, subprocess

# instance_type = 'local'
instance_type = 'ml.p3.2xlarge'
instance_count = 1

if instance_type is 'local' and subprocess.call('nvidia-smi') == 0:
    ## Set type to GPU if one is present
    instance_type = 'local_gpu'
    
print("Instance type = " + instance_type)

Instance type = ml.p3.2xlarge


In [210]:
hyperparameters = {
    'task_config': tasks[selected_task]['config_file'],
    'model_name_or_path': "distilbert-base-uncased",
    'cache_dir': "cache_dir",
    'epochs': 200,
    'batch_size': 5,
}

In [211]:
from sagemaker.pytorch import PyTorch

estimator = PyTorch(
    entry_point='doc_classification.py',
    source_dir='./experiments',
    role=role,
    instance_type=instance_type,
    instance_count=instance_count,
    base_job_name=f'clinical-prediction-{selected_task}',
#     checkpoint_s3_uri=checkpoint_s3_uri,
#     use_spot_instances=use_spot_instances,
#     max_wait=max_run, # This should be equal to or greater than max_run in seconds'
#     max_run=max_wait, # expected max run in seconds
    framework_version='1.8.0',
    py_version='py36',
    hyperparameters=hyperparameters)

In [212]:
estimator.fit({'train': train_config, 'val': val_config, 'test': test_config}, wait=False)

ResourceLimitExceeded: An error occurred (ResourceLimitExceeded) when calling the CreateTrainingJob operation: The account-level service limit 'Number of instances across all processing jobs' is 4 Instances, with current utilization of 4 Instances and a request delta of 1 Instances. Please contact AWS support to request an increase for this limit.