# ICU STAY TIME PREDICTION

FCUP-2024-CDLE / Daniel Dias, Lucas Santiago, Miguel Lopes

In [1]:
# Imports
from google.cloud import bigquery, storage
import numpy as np
import os
import tensorflow as tf
import numpy as np
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
import pyarrow.csv as pv
import pyarrow.parquet as pq
from google.cloud import storage
import dask.dataframe as dd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import gcsfs
import ast
from apache_beam.io import ReadFromText, WriteToText
import tensorflow as tf
import modin.pandas as pd
import numpy as np
import ray
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Masking
import re
import datetime
from tensorflow.keras.optimizers import Adam

# TensorFlow will allocate memory on the GPU as needed
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'

## Calculating the optimal number of days for training

In [2]:
# Defining query and aquiring data
client = bigquery.Client()

project_id = "cdla-trabalho"
dataset_id = "CHARTEVENTS"
table_id = "ICUSTAYS"
min_days = [1,2,3,4,5]

# Comparing different days
for day in min_days:
    values_query = f"""
    SELECT t1.LOS
    FROM `{project_id}.{dataset_id}.{table_id}` as t1
    WHERE t1.LOS >= {day}
    """

    # Calculating the median/average LOS
    los = client.query(values_query).result()

    los_values = []
    for row in los:
        los_values.append(row['LOS'])

    median_los = np.median(los_values)
    average_los = np.average(los_values)
    count_stays = len(los_values)

    # Printing the results
    print(f"Day: {day}")
    print(f"Median LOS: {median_los}")
    print(f"Average LOS: {average_los}\n")

Day: 1
Median LOS: 2.79725
Average LOS: 6.00427281464624

Day: 2
Median LOS: 4.2185
Average LOS: 8.392038099217032

Day: 3
Median LOS: 6.0915
Average LOS: 10.92448104900966

Day: 4
Median LOS: 7.9718
Average LOS: 13.26212488279215

Day: 5
Median LOS: 9.8159
Average LOS: 15.430092340425531



We settled with 3 days as the minimum LOS for training our RNN. By doing this, we aim to achieve a dataset that is both sufficiently large and representative, ensuring better model performance. This choice helps in minimizing noise from very short stays while maintaining a substantial number of training samples to avoid overfitting and improve the generalizability of our predictions.

## Preprocessing the Data

In [None]:
'''
# Defining query and aquiring data
client = bigquery.Client()

project_id = "cdla-trabalho"
dataset_id = "CHARTEVENTS"
source_table_id = "CHARTEVENTS"
destination_table_id = "CHARTEVENTS_CLEAN"

# Defining the destination table of the clean data
destination_table_ref = client.dataset(dataset_id).table(destination_table_id)

# SQL querry
sql_query = """
SELECT 
    t1.SUBJECT_ID,
    t1.ICUSTAY_ID,
    t1.ITEMID,
    t1.CHARTTIME,
    t1.VALUE,
    t2.LOS,
FROM
    `cdla-trabalho.CHARTEVENTS.CHARTEVENTS` as t1
JOIN `cdla-trabalho.CHARTEVENTS.ICUSTAYS` as t2
ON t1.ICUSTAY_ID = t2.ICUSTAY_ID
WHERE
    t1.ERROR = 0 and t1.WARNING = 0 and t1.ICUSTAY_ID IS NOT NULL and t2.LOS >=3
ORDER BY t1.ICUSTAY_ID, t1.CHARTTIME
"""

# Complete job and save the new clea data
job_config = bigquery.QueryJobConfig(destination=destination_table_ref)
query_job = client.query(sql_query, job_config=job_config)
query_job.result()
print(f"Table {destination_table_id} created successfully.")
'''

#
# This code works. It's commented just to not let it run accidentally
#

We prepared a "clean" dataset for training our RNN model by selecting key attributes (`SUBJECT_ID`, `ICUSTAY_ID`, `ITEMID`, `CHARTTIME`, `VALUE`) from the `CHARTEVENTS` table and the length of stay (`LOS`) from the `ICUSTAYS` table. We filtered out records with errors and warnings, excluded null `ICUSTAY_ID`s, and focused on ICU stays of at least 3 days. We performed an inner join between the `CHARTEVENTS` and `ICUSTAYS` tables and ordered the results by `ICUSTAY_ID` and `CHARTTIME`. The cleaned data was saved into a new table, `CHARTEVENTS_CLEAN`, ensuring a high-quality dataset for model training.

## Cleaning and saving data into buckets

In [None]:
'''
# Process elements from the input data.
class PrepareData(beam.DoFn):
    def process(self, element):
        return [(element['ICUSTAY_ID'], (element['ITEMID'], element['VALUE'], element['CHARTTIME'], element['LOS']))]

# Consolidate measures by integer timestamp
class ConsolidateMeasures(beam.DoFn):
    def process(self, element):
        icustay_id, measures = element
        consolidated = {}
        
        # Ensure the first CHARTTIME is a string before converting to datetime
        start_time = measures[0][2]
        if isinstance(start_time, str):
            start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S%z")
        
        for itemid, value, charttime, los in measures:
            # Ensure CHARTTIME is a datetime object before calculating time difference
            if isinstance(charttime, str):
                charttime = datetime.datetime.strptime(charttime, "%Y-%m-%d %H:%M:%S%z")
            time_diff = int((charttime - start_time).total_seconds() / 60)  # Minutes since start_time
            if time_diff not in consolidated:
                consolidated[time_diff] = []
            consolidated[time_diff].append((itemid, value))
        yield (icustay_id, consolidated, los)

# Format the output
class FormatOutput(beam.DoFn):
    def process(self, element):
        icustay_id, consolidated, los = element
        sequences = []
        for time_diff, measures in consolidated.items():
            measures_str = ",".join([f"({itemid},{value})" for itemid, value in measures])
            sequences.append(f"[{time_diff},{measures_str}]")
        padded_sequence = "[" + ",".join(sequences) + "]"
        yield f'{icustay_id},"{padded_sequence}",{los}'

# Padding
class PadSequences(beam.DoFn):
    def process(self, element, max_length=143210):
        icustay_id, padded_sequence, los = element
        if len(padded_sequence) > max_length:
            padded_sequence = padded_sequence[:max_length]
        yield (icustay_id, padded_sequence, los)

# Pipeline options
options = PipelineOptions(
    project='cdla-trabalho',
    runner='DataflowRunner',
    region='us-central1',
    staging_location='gs://events_trabalho_cdle/staging',
    temp_location='gs://events_trabalho_cdle/temp',
    save_main_session=True
)

# Query for training data
training_query = """
SELECT *
FROM `CHARTEVENTS.training_data`
"""

# Query for test data
test_query = """
SELECT *
FROM `CHARTEVENTS.testing_data`
"""

# Function to create pipeline
def run_pipeline(query, output_prefix):
    with beam.Pipeline(options=options) as p:
        header = 'ICUSTAY_ID,Padded_Sequence,LOS'
        
        (p
         | 'Read from BigQuery' >> ReadFromBigQuery(query=query, use_standard_sql=True)
         | 'PrepareData' >> beam.ParDo(PrepareData())
         | 'FormatData' >> beam.Map(lambda element: (element[0], [element[1]]))
         | 'CombinePerKey' >> beam.CombinePerKey(lambda values: sum(values, []))
         | 'SortValues' >> beam.Map(lambda kv: (kv[0], sorted(kv[1], key=lambda x: x[2])))  # Sort values by CHARTTIME
         | 'ConsolidateMeasures' >> beam.ParDo(ConsolidateMeasures())
         | 'PadSequences' >> beam.ParDo(PadSequences())
         | 'FormatOutput' >> beam.ParDo(FormatOutput())
         | 'Write to Text' >> beam.io.WriteToText(f'gs://events_trabalho_cdle/{output_prefix}', file_name_suffix='.csv', header=header, shard_name_template=''))

# Run pipeline for training data
run_pipeline(training_query, 'training')

# Run pipeline for test data
run_pipeline(test_query, 'testing')
'''

#
# Since this Apache Pipeline cannot be ran from the notebook, we created a separate train_test_csv_creation.py that does the same thing.
#

This code retrieves and processes training and testing data from BigQuery. The Beam pipeline starts by reading the data and preparing it by extracting relevant fields. It consolidates measures by calculating time differences from the initial timestamp and organizes the data into a structured format (measures are groouped by "time" so we are not repeating redundant data). Finally, the processed data is formatted into CSV files and written to Google Cloud Storage.

## RNN Model

Using dask and modin.pandas we loaded the csvs from the bucket, analysed and preprocessed them into data compatible with the RNN model. Since we are using a RNN model, we know that all the entries need to have the same size. Let's look into that:

In [3]:
# Constants
BUCKET_NAME = 'events_trabalho_cdle'
TRAINING_FILE = 'training.csv'
TESTING_FILE = 'testing.csv'

# Check if file exists in the bucket
def check_file_exists(bucket_name, file):
    fs = gcsfs.GCSFileSystem()
    file_path = f'gs://{bucket_name}/{file}'
    if fs.exists(file_path):
        print(f"The file {file_path} exists.")
        return True
    else:
        print(f"The file {file_path} does not exist.")
        return False
    
check_file_exists(BUCKET_NAME, TRAINING_FILE)
check_file_exists(BUCKET_NAME, TESTING_FILE)

# Load CSV files into Dask DataFrames
ddf_train = dd.read_csv(f'gs://{BUCKET_NAME}/{TRAINING_FILE}', storage_options={'anon': True})
ddf_test = dd.read_csv(f'gs://{BUCKET_NAME}/{TESTING_FILE}', storage_options={'anon': True})
print("CSVs loaded!")

# Function to find the sequence lengths
def get_sequence_lengths(df, sequence_column):
    def calculate_length(padded_sequence):
        try:
            # Convert the string representation of the list to an actual list
            sequence_list = ast.literal_eval(padded_sequence)
            # Calculate the length of the sequence, which is the number of sublists
            return len(sequence_list)
        except (SyntaxError, ValueError):
            return 0

    lengths = df[sequence_column].map(calculate_length, meta=('x', 'i8')).compute()
    return lengths

# Get sequence lengths for training and testing data
seq_lengths_train = get_sequence_lengths(ddf_train, 'Padded_Sequence')
seq_lengths_test = get_sequence_lengths(ddf_test, 'Padded_Sequence')

# Calculate statistics
max_seq_length = max(seq_lengths_train.max(), seq_lengths_test.max())
min_seq_length = min(seq_lengths_train.min(), seq_lengths_test.min())
mean_seq_length = (seq_lengths_train.sum() + seq_lengths_test.sum()) / (len(seq_lengths_train) + len(seq_lengths_test))
median_seq_length = (seq_lengths_train.median() + seq_lengths_test.median()) / 2

print(f'Maximum sequence length: {max_seq_length}')
print(f'Minimum sequence length: {min_seq_length}')
print(f'Mean sequence length: {mean_seq_length}')
print(f'Median sequence length: {median_seq_length}')

The file gs://events_trabalho_cdle/training.csv exists.
The file gs://events_trabalho_cdle/testing.csv exists.
CSVs loaded!
Maximum sequence length: 5760
Minimum sequence length: 1
Mean sequence length: 190.9987425697302
Median sequence length: 182.5


By knowing those values, we think that a good padding value would be 200. That way we can mantain most of the important data while having a relative small dataset.

Let's pad and save new datasets to the bucket.

In [None]:
# Function to pad sequences to a fixed length
def pad_sequence(padded_sequence, pad_length=200):
    try:
        # Convert the string representation of the list to an actual list
        sequence_list = ast.literal_eval(padded_sequence)
        # Calculate the padding length
        pad_needed = pad_length - len(sequence_list)
        if pad_needed > 0:
            # Add padding
            sequence_list.extend([[-1]] * pad_needed)
        else:
            # Truncate the sequence if it's longer than pad_length
            sequence_list = sequence_list[:pad_length]
        return str(sequence_list)
    except (SyntaxError, ValueError):
        return str([[-1]] * pad_length)

# Apply padding to the Padded_Sequence column
ddf_train['Padded_Sequence'] = ddf_train['Padded_Sequence'].map(pad_sequence, meta=('Padded_Sequence', 'str'))
ddf_test['Padded_Sequence'] = ddf_test['Padded_Sequence'].map(pad_sequence, meta=('Padded_Sequence', 'str'))

# Compute the results and convert back to Pandas DataFrame
df_train_padded = ddf_train.compute()
df_test_padded = ddf_test.compute()

# Save the padded sequences back to CSV files
df_train_padded.to_csv('gs://events_trabalho_cdle/training_padded.csv', index=False)
df_test_padded.to_csv('gs://events_trabalho_cdle/testing_padded.csv', index=False)

print("Padding applied and CSVs saved!")

Using modin.pandas we loaded the csvs from the bucket and preprocessed them into data compatible with the RNN model by flatenning each sublist of the "padded_sequence". First let's take a look at our data:

In [2]:
# Load data
train_path = 'gs://events_trabalho_cdle/training_padded.csv'
test_path = 'gs://events_trabalho_cdle/testing_padded.csv'

# Read CSV files
train_df = pd.read_csv(train_path)
test_df = pd.read_csv(test_path)

test_df.head()

2024-06-08 22:28:23,452	INFO worker.py:1749 -- Started a local Ray instance.


Unnamed: 0,ICUSTAY_ID,Padded_Sequence,LOS
0,230784,"[[0, (220181, 140.0), (220052, 285.0), (220210...",7.9974
1,228800,"[[0, (220602, 105.0), (220546, 6.3), (225684, ...",13.9835
2,247715,"[[0, (220180, 109.0), (220181, 116.0), (220179...",8.7017
3,265829,"[[0, (226534, 144.0), (227464, 4.8), (226536, ...",4.7195
4,237507,"[[0, (224639, 87.0), (223761, 100.8), (223770,...",3.1416


In [6]:
# Function to parse sequence string to list of sublists
def parse_sequence(sequence):
    try:
        sequence_list = ast.literal_eval(sequence)
        parsed_sequence = []
        for sublist in sequence_list:
            if sublist == [-1]:
                parsed_sequence.append([-1])
            else:
                parsed_sublist = [sublist[0]] + [item for tup in sublist[1:] for item in tup]
                parsed_sequence.append(parsed_sublist)
        return parsed_sequence
    except (SyntaxError, ValueError):
        print(1)
        return [[-1]]  # Handle parsing errors with a default value

# Apply the parse_sequence function
train_df['Padded_Sequence'] = train_df['Padded_Sequence'].apply(parse_sequence)
test_df['Padded_Sequence'] = test_df['Padded_Sequence'].apply(parse_sequence)

# Ensure all sequences have the same length
max_length = max(train_df['Padded_Sequence'].apply(len).max(), test_df['Padded_Sequence'].apply(len).max())

def pad_sequences(sequences, max_length):
    padded_sequences = []
    for seq in sequences:
        # Pad sequence with [-1] to ensure all sequences have the same length
        padded_seq = seq + [[-1]] * (max_length - len(seq))
        padded_sequences.append(padded_seq)
    return np.array(padded_sequences, dtype=object)

X_train = pad_sequences(train_df['Padded_Sequence'].tolist(), max_length)
y_train = train_df['LOS'].values

X_test = pad_sequences(test_df['Padded_Sequence'].tolist(), max_length)
y_test = test_df['LOS'].values

# Convert sequences to 3D numpy arrays
def convert_to_3d_array(data):
    max_len = max(len(x) for seq in data for x in seq)
    result = np.full((len(data), max_length, max_len), -1.0)  # Initialize with padding value
    for i, seq in enumerate(data):
        for j, sublist in enumerate(seq):
            result[i, j, :len(sublist)] = sublist
    return result

X_train = convert_to_3d_array(X_train)
X_test = convert_to_3d_array(X_test)

# Check for NaNs and Infs
def check_nans_infs(data):
    if np.isnan(data).any() or np.isinf(data).any():
        print("Data contains NaNs or Infs.")
    else:
        print("Data is clean.")

check_nans_infs(X_train)
check_nans_infs(X_test)
check_nans_infs(y_train)
check_nans_infs(y_test)

1
1


[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_single_chain pid=32680)[0m 1
[36m(_remote_exec_s



Data is clean.
Data is clean.
Data is clean.
Data is clean.


In [11]:
# Define the RNN model
model = Sequential([
    Masking(mask_value=-1, input_shape=(max_length, X_train.shape[2])),
    LSTM(128, return_sequences=True),
    Dropout(0.2),
    LSTM(64, return_sequences=False),
    Dropout(0.2),
    Dense(32, activation='relu'),
    Dense(16, activation='relu'),
    Dense(1)
])

model.compile(optimizer=Adam(learning_rate=0.00001), loss='mse', metrics=['mae'])

# Train the model
history = model.fit(X_train, y_train, epochs=100, batch_size=128, validation_data=(X_test, y_test), verbose=2)

model.save("rnn_model.keras")

# Evaluate the model
loss, mae = model.evaluate(X_test, y_test)
print(f'Test Loss: {loss}')
print(f'Test MAE: {mae}')

Epoch 1/100
110/110 - 13s - loss: 115.7081 - mae: 7.9607 - val_loss: 109.6519 - val_mae: 7.8036 - 13s/epoch - 118ms/step
Epoch 2/100
110/110 - 2s - loss: 113.9878 - mae: 7.8521 - val_loss: 107.9675 - val_mae: 7.6950 - 2s/epoch - 15ms/step
Epoch 3/100
110/110 - 1s - loss: 112.3006 - mae: 7.7438 - val_loss: 106.3244 - val_mae: 7.5874 - 1s/epoch - 12ms/step
Epoch 4/100
110/110 - 1s - loss: 110.6478 - mae: 7.6361 - val_loss: 104.7099 - val_mae: 7.4803 - 1s/epoch - 12ms/step
Epoch 5/100
110/110 - 1s - loss: 109.0335 - mae: 7.5293 - val_loss: 103.1330 - val_mae: 7.3741 - 1s/epoch - 13ms/step
Epoch 6/100
110/110 - 1s - loss: 107.4492 - mae: 7.4242 - val_loss: 101.5913 - val_mae: 7.2688 - 1s/epoch - 13ms/step
Epoch 7/100
110/110 - 1s - loss: 105.8895 - mae: 7.3183 - val_loss: 100.0634 - val_mae: 7.1630 - 1s/epoch - 13ms/step
Epoch 8/100
110/110 - 2s - loss: 104.3652 - mae: 7.2132 - val_loss: 98.5796 - val_mae: 7.0586 - 2s/epoch - 15ms/step
Epoch 9/100
110/110 - 2s - loss: 102.8773 - mae: 7.109

KeyboardInterrupt: 