# Large Scale Data Science (CC3047) - 2024/2025

## Machine Learning Pipeline - Is solving this problem an actual challenge?

Work by Alexandre Sousa (2022206427) and Francisco Carqueija (202205113)

---

The project focuses on building a Machine Learning pipeline that aims to perform large-scale data analysis on patient events in the ICU. The main focuses of this work are:

-> Statistical analysis and visualization of data by patient (SUBJECT_ID)

-> Prediction of length of stay in the ICU with machine learning models.

-> Use appropriate tools to efficiently process large volumes of data (PySpark, MapReduce, BigQuery, etc.).

-> Evaluate and discuss performance issues (execution time, profile).

For this project we need to take into consideration the choices for our preprocessing, the choices for our data preparation, our choices on how to train and validate our models and the quality of the analysis, interpretations and discussion of results

---

## Table of contents

<!-- Hello World -->
1. [Libraries Needed](#Libraries-Needed)
2. [Introduction](#Introduction)
    - [ADMISSIONS](#ADMISSIONS)
    - [CHARTEVENTS](#CHARTEVENTS)
    - [D_ITEMS](#D_ITEMS)
    - [DIAGNOSES_ICD](#DIAGNOSES_ICD)
    - [D_ICD_DIAGNOSES](#D_ICD_DIAGNOSES)
    - [ICUSTAYS](#ICUSTAYS)
    - [PATIENTS](#PATIENTS)
    - [PROCEDURES_ICD](#PROCEDURES_ICD)
    - [D_ICD_PROCEDURES](#D_ICD_PROCEDURES)
3. [Data Preparation](#Data-Preparation)
4. [Training and Validation](#Training-and-Validation)
5. [Data Analysis](#Data-Analysis)
6. [Conclusions](#Conclusion)
7. [References](#References)



---

## Libraries Needed 

In [2]:
!python -V

Python 3.11.9


In [3]:
# !pip install --upgrade bigframes
# !pip install dask dask-bigquery google-cloud-bigquery

In [4]:
# !pip install modin
# !pip install google-cloud-bigquery
# !pip install spark-bigquery-connector
# !pip install pyspark

In [5]:
# !pip install --upgrade pandas
# !pip install --upgrade scikit-learn


In [6]:
import os
from google.cloud import bigquery

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/frankiko.mc/Desktop/UP/Keys/ml-pipeline-459822-e0958fb6a5e7.json"

client = bigquery.Client()

In [7]:
import bigframes as bf
import bigframes.pandas as bpd

In [8]:
import pandas as pd
import dask.dataframe as dd

In [9]:
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, mean as count, when, udf, regexp_replace, coalesce, lit
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F

In [11]:
import matplotlib.pyplot as plt
import math
from datetime import datetime
import seaborn as sns

---

## Introduction
[[ go back to the top ]](#Table-of-contents)




### NEED TO CHANGE !!!!!!!!!

The dataset we are analysing is CHARTEVENTS.csv.gz from the MIMIC-III clinical database. 
It contains detailed time-stamped measurements and observations recorded in the ICU, including vital signs (e.g., heart rate, blood pressure, temperature), lab test results, and other patient-specific metrics. Each row corresponds to a single charted event and includes key fields such as SUBJECT_ID (patient identifier), HADM_ID (hospital admission ID), ICUSTAY_ID (ICU stay ID), ITEMID (type of measurement), CHARTTIME (timestamp), and VALUE (the observed or recorded value). The dataset is large and granular, enabling detailed temporal analysis of patient states during ICU stays, which is essential for tasks like predicting length of stay, assessing patient deterioration, and analyzing treatment outcomes.

### -Why we use bigframes.pandasand not other tools

We use bigframes.pandas because it lets us work with massive BigQuery datasets using familiar pandas-like syntax, without downloading the data. All operations run in BigQuery, making it scalable and memory-efficient — ideal for analyzing MIMIC’s large medical tables.

##### Why Not Other Tools
- pandas: Loads all data into memory — crashes with large datasets.
- cuDF: Needs a local GPU, not suited for cloud SQL queries.
- Dask/Polars: Good for local or distributed CSVs, but don’t integrate easily with BigQuery.

---

## ADMISSIONS
[[ go back to the top ]](#Table-of-contents)


In [None]:
df_admissions = bpd.read_gbq("SELECT * FROM `ml-pipeline-459822.data.admissions`", use_cache=False)

In [13]:
df_admissions.head()

Unnamed: 0,ROW_ID,SUBJECT_ID,HADM_ID,ADMITTIME,DISCHTIME,DEATHTIME,ADMISSION_TYPE,ADMISSION_LOCATION,DISCHARGE_LOCATION,INSURANCE,LANGUAGE,RELIGION,MARITAL_STATUS,ETHNICITY,EDREGTIME,EDOUTTIME,DIAGNOSIS,HOSPITAL_EXPIRE_FLAG,HAS_CHARTEVENTS_DATA
0,14629,11898,108389,2128-01-13 20:55:00+00:00,2128-01-15 18:20:00+00:00,,EMERGENCY,EMERGENCY ROOM ADMIT,DISCH-TRAN TO PSYCH HOSP,Private,ENGL,NOT SPECIFIED,SINGLE,WHITE,2128-01-13 18:08:00+00:00,2128-01-13 22:36:00+00:00,OVERDOSE,0,1
1,14339,11705,165730,2165-11-18 13:40:00+00:00,2165-11-27 17:30:00+00:00,,NEWBORN,PHYS REFERRAL/NORMAL DELI,HOME HEALTH CARE,Private,,NOT SPECIFIED,,OTHER,,,NEWBORN,0,1
2,57944,96731,198001,2124-09-11 17:37:00+00:00,2124-09-13 14:38:00+00:00,,EMERGENCY,CLINIC REFERRAL/PREMATURE,HOME,Medicaid,ENGL,OTHER,SINGLE,BLACK/AFRICAN AMERICAN,2124-09-11 09:32:00+00:00,2124-09-11 19:17:00+00:00,ASTHMA;CHRONIC OBST PULM DISEASE,0,1
3,55611,89600,108570,2106-03-17 11:44:00+00:00,2106-03-20 12:50:00+00:00,2106-03-20 12:50:00+00:00,EMERGENCY,EMERGENCY ROOM ADMIT,DEAD/EXPIRED,Private,ENGL,NOT SPECIFIED,WIDOWED,WHITE,2106-03-17 10:01:00+00:00,2106-03-17 14:19:00+00:00,ACUTE SUBDURAL HEMATOMA,1,1
4,31247,25553,164133,2146-05-30 20:47:00+00:00,2146-06-22 23:59:00+00:00,,EMERGENCY,EMERGENCY ROOM ADMIT,HOME HEALTH CARE,Medicare,ENGL,CATHOLIC,WIDOWED,BLACK/AFRICAN AMERICAN,2146-05-30 19:02:00+00:00,2146-05-30 21:35:00+00:00,COPD EXACERBATION,0,1


In [25]:
df_admissions.shape

(58976, 19)

---

## CHARTEVENTS
[[ go back to the top ]](#Table-of-contents)


In [None]:
df_chartevents=bpd.read_gbq("ml-pipeline-459822.data.chartevents", use_cache=False)

In [None]:
df_chartevents.head()

Unnamed: 0,ROW_ID,SUBJECT_ID,HADM_ID,ICUSTAY_ID,ITEMID,CHARTTIME,STORETIME,CGID,VALUE,VALUENUM,VALUEUOM,WARNING,ERROR,RESULTSTATUS,STOPPED
0,227480130,1663,162078,295174,8551,2141-04-06 07:50:00,2141-04-06 07:52:00,15004,160,160.0,mmHg,,,,NotStopd
1,35461753,234,134944,233735,776,2106-04-10 12:38:00,2106-04-10 12:42:00,15331,-3,-3.0,,,,Final,NotStopd
2,36434629,423,194013,248770,1484,2169-07-07 02:30:00,2169-07-07 04:25:00,18524,Yes,,,,,,NotStopd
3,213123811,19361,128434,276021,392,2177-02-03 04:00:00,2177-02-03 04:19:00,17704,Serous,,,,,,NotStopd
4,145067178,21838,143088,200378,3446,2136-08-14 08:00:00,2136-08-14 08:12:00,19371,33-36 weeks gest,,,,,,NotStopd


In [27]:
df_chartevents.shape

NameError: name 'df_chartevents' is not defined

De todas as colunas desta tabela, as colunas que não são precisas e podem ser descartadas, são:

STORETIME -> since it records the time at which an observation was manually input or manually validadted, it is not relevant dor the context of the lenght of the stay of a patient
CGID -> not relevant to know who validated the given measurement
ERROR -> Como os dados com erro nas medidas tiradas não servem para prever ou correlacionar com outros dados, pois estão erradas, esta coluna não nos faz falta. Todas as linhas com erro = 0 já aqui estão.
RESTULSTATUS, STOPPED -> Também não são precisos, pois não são relevantes para o problema em questão.

In [20]:
df_chartevents = df_chartevents.drop(columns=["STORETIME", "CGID"])

NameError: name 'df_chartevents' is not defined

---

## D_ITEMS
[[ go back to the top ]](#Table-of-contents)


In [None]:
df_d_items = bpd.read_gbq("SELECT * FROM `ml-pipeline-459822.data.d_items`", use_cache=False)

In [17]:
df_d_items.head()

Unnamed: 0,ROW_ID,ITEMID,LABEL,ABBREVIATION,DBSOURCE,LINKSTO,CATEGORY,UNITNAME,PARAM_TYPE,CONCEPTID
0,3325,1996,HIGH MIN. VOL.,,carevue,chartevents,,,,
1,4423,6898,TRACHEA DSG,,carevue,chartevents,,,,
2,12841,223921,Neuro Symptoms,Neuro Symptoms,metavision,chartevents,Neurological,,Text,
3,1655,4813,nutrition labs,,carevue,chartevents,,,,
4,556,599,RUL Lung Sounds,,carevue,chartevents,,,,


Podemos eliminar a coluna ABBREVIATION

Desta tabela não queremos eliminar alguma coluna ainda, para efeitos de análise exploratória por paciente.

In [None]:
df_d_items.shape

---

## DIAGNOSES_ICD
[[ go back to the top ]](#Table-of-contents)


In [29]:
df_diagnoses_icd = bpd.read_gbq("SELECT * FROM `ml-pipeline-459822.data.diagnoses_icd`", use_cache=False)

In [None]:
df_diagnoses_icd.head()

Unnamed: 0,ROW_ID,SUBJECT_ID,HADM_ID,SEQ_NUM,ICD9_CODE
0,328395,28879,163707,5,4019
1,607827,90609,146682,1,380
2,577044,83314,108439,2,5070
3,564547,80752,180665,21,42789
4,591003,86662,140257,19,2724


In [None]:
df_diagnoses_icd.shape

---

## D_ICD_DIAGNOSES
[[ go back to the top ]](#Table-of-contents)


In [32]:
df_d_icd_diagnoses = bpd.read_gbq("SELECT * FROM `ml-pipeline-459822.data.d_icd_diagnoses`", use_cache=False)

In [35]:
df_d_icd_diagnoses.head()

Unnamed: 0,ROW_ID,ICD9_CODE,SHORT_TITLE,LONG_TITLE
0,14184,E9839,Undet circ-suffocate NOS,Strangulation or suffocation by unspecified me...
1,11678,E9258,Electric current acc NEC,Accident caused by other electric current
2,8999,7810,Abn involun movement NEC,Abnormal involuntary movements
3,7735,6959,Erythematous cond NOS,Unspecified erythematous condition
4,399,0913,Secondary syph skin,Secondary syphilis of skin or mucous membranes


In [None]:
df_d_icd_diagnoses.shape

---

## ICUSTAYS
[[ go back to the top ]](#Table-of-contents)


In [None]:
df_icustays = bpd.read_gbq("SELECT * FROM `ml-pipeline-459822.data.icustays`", use_cache=False)

In [None]:
df_icustays.head()


In [None]:
df_icustays.shape

---

## PATIENTS
[[ go back to the top ]](#Table-of-contents)

In [None]:
df_patients = bpd.read_gbq("SELECT * FROM `ml-pipeline-459822.data.patients`", use_cache=False)

In [None]:
df_patients.head()

In [None]:
df_patients.shape

---

## PROCEDURES_ICD
[[ go back to the top ]](#Table-of-contents)


In [None]:
df_procedures_icd = bpd.read_gbq("SELECT * FROM `ml-pipeline-459822.data.procedures_icd`", use_cache=False)

In [None]:
df_procedures_icd.head()

In [None]:
df_procedures_icd.shape

---

## D_ICD_PROCEDURES
[[ go back to the top ]](#Table-of-contents)

In [None]:
df_d_icd_procedures = bpd.read_gbq("SELECT * FROM `ml-pipeline-459822.data.d_icd_procedures`", use_cache=False)

In [None]:
df_d_icd_procedures.head()

In [None]:
df_icd_procedures.shape

---

## Obtain the top 3 diseases with more occurencies
In this section, we investigate which disease occurs most frequently in the dataset.

In [37]:
# Step 1: Get top 3 ICD9 codes by frequency
top_3_codes = df_diagnoses_icd['ICD9_CODE'].value_counts().head(3).index

print("Top 3 most common disease codes:")
# Step 2: Loop through the top codes and retrieve their SHORT_TITLE
for code in top_3_codes:
    # Filter for the corresponding row in d_icd_diagnoses
    filtered_row = df_d_icd_diagnoses[df_d_icd_diagnoses['ICD9_CODE'] == code]
    
    if not filtered_row.empty:
        short_title = filtered_row['SHORT_TITLE'].values[0]
        count = df_diagnoses_icd['ICD9_CODE'].value_counts()[code]
        print(f"{short_title} ({code}) — {count} occurrences")
    else:
        print(f"ICD9 code {code} not found in the descriptions table.")

Top 3 most common disease codes:
Hypertension NOS (4019) — 20703 occurrences
CHF NOS (4280) — 13111 occurrences
Atrial fibrillation (42731) — 12891 occurrences


---