In [30]:
import os
os.environ['_JAVA_OPTIONS'] = '-Djava.security.manager=allow -Duser.name=julianromero'

from pyspark.sql import SparkSession
import pyspark.pandas as ps
spark = SparkSession.builder.appName("SparkSession").getOrCreate()

from utils.sql_manager import SQLManager
sql_manager = SQLManager(queries_dir='queries')

In [31]:
#Archivos
hosp_adm = spark.read.csv("data/nw_hosp/admissions.csv", header=True, inferSchema=True)
icu_stays = spark.read.csv("data/nw_icu/icustays.csv", header=True, inferSchema=True)
patients = spark.read.csv("data/nw_hosp/patients.csv", header=True, inferSchema=True)
chart_events = spark.read.csv("data/nw_icu/chartevents.csv", header=True, inferSchema=True)
lab_events = spark.read.csv("data/nw_hosp/labevents.csv", header=True, inferSchema=True)
procedure_events = spark.read.csv("data/nw_icu/procedureevents.csv", header=True, inferSchema=True)

                                                                                

In [32]:
#Vistas temporales
hosp_adm.createOrReplaceTempView("admissions")
icu_stays.createOrReplaceTempView("icu_stays")
patients.createOrReplaceTempView("patients")
chart_events.createOrReplaceTempView("chart_events")
lab_events.createOrReplaceTempView("lab_events")
procedure_events.createOrReplaceTempView("procedure_events")


In [43]:
hosp_adm_data = sql_manager.execute(spark, 'initial_data/admissions.sql')
hosp_adm_data.createOrReplaceTempView("hosp_adm_data")

icu_stays_data = sql_manager.execute(spark, 'initial_data/icu_stays.sql')
icu_stays_data.createOrReplaceTempView("icu_stays_data")

chart_events_data = sql_manager.execute(spark, 'initial_data/chart_events.sql')
chart_events_data.createOrReplaceTempView("chart_events_data")

patients_data = sql_manager.execute(spark, 'initial_data/patients.sql')
patients_data.createOrReplaceTempView("patients_data")

lab_events_data = sql_manager.execute(spark, 'initial_data/lab_events.sql')
lab_events_data.createOrReplaceTempView("lab_events_data")

procedure_events_data = sql_manager.execute(spark, 'initial_data/procedure_events.sql')
procedure_events_data.createOrReplaceTempView("procedure_events_data")


In [44]:
icu_stays.show(5)
stays_count = spark.sql("select subject_id, count(*) as num_stays from icu_stays group by subject_id order by num_stays desc")
stays_count.show(5)
stays_count_df = stays_count.toPandas()
stays_count_df
#23204 pacientes en uci en total

+----------+--------+--------+--------------+-------------+-------------------+-------------------+------------------+
|subject_id| hadm_id| stay_id|first_careunit|last_careunit|             intime|            outtime|               los|
+----------+--------+--------+--------------+-------------+-------------------+-------------------+------------------+
|  30000238|44714428|59801852|         CTICU|        CTICU|2195-10-14 15:12:00|2195-10-16 14:32:00|1.9722222222222223|
|  30000246|40653169|55154064|          CICU|         CICU|2104-05-05 18:23:00|2104-05-07 14:26:00|1.8354166666666667|
|  30000521|46912110|58492193|          SICU|         SICU|2188-04-17 16:46:00|2188-04-18 18:16:00|            1.0625|
|  30000590|44608425|50707570|         NSICU|        NSICU|2177-12-09 12:01:00|2177-12-10 16:43:00|1.1958333333333333|
|  30000828|40615900|57131111|          MICU|          CCU|2124-10-17 23:12:00|2124-10-18 11:57:00|           0.53125|
+----------+--------+--------+--------------+---

Unnamed: 0,subject_id,num_stays
0,39585304,20
1,35136418,14
2,37150297,12
3,35616012,12
4,35704091,11
...,...,...
23199,39745649,1
23200,39829731,1
23201,39911202,1
23202,36309828,1


In [45]:
admit_date_hospital = sql_manager.execute(spark, 'dates/admision_time.sql')
admit_date_hospital.createOrReplaceTempView("admit_date_hospital")
admit_date_hospital.show(5)
admit_time_ps = admit_date_hospital.toPandas()
admit_time_ps

+----------+------------+-------------------+
|subject_id|admission_id|          admittime|
+----------+------------+-------------------+
|  30000238|    44714428|2195-10-14 07:12:00|
|  30000246|    40653169|2104-05-05 16:18:00|
|  30000246|    43974039|2103-07-08 17:12:00|
|  30000246|    45869139|2103-07-11 10:37:00|
|  30000521|    40796332|2187-08-15 11:33:00|
+----------+------------+-------------------+
only showing top 5 rows


Unnamed: 0,subject_id,admission_id,admittime
0,30000238,44714428,2195-10-14 07:12:00
1,30000246,40653169,2104-05-05 16:18:00
2,30000246,43974039,2103-07-08 17:12:00
3,30000246,45869139,2103-07-11 10:37:00
4,30000521,40796332,2187-08-15 11:33:00
...,...,...,...
61838,39998775,48786224,2103-02-10 21:07:00
61839,39998775,49181211,2104-02-12 20:43:00
61840,39998775,49555175,2103-10-14 11:50:00
61841,39999363,44506852,2195-02-22 14:39:00


In [46]:
patients_data = sql_manager.execute(spark, 'initial_data/patients_filtered.sql')
patients_data.createOrReplaceTempView("patients_data")
patients_data.show(5)
patients_data_ps = patients_data.toPandas()
patients_data_ps

+----------+------------+-------------------+------+----------+------------------+
|subject_id|admission_id|          admittime|gender|anchor_age|dod_within_30_days|
+----------+------------+-------------------+------+----------+------------------+
|  30000238|    44714428|2195-10-14 07:12:00|     M|        74|              NULL|
|  30000246|    40653169|2104-05-05 16:18:00|     M|        59|              NULL|
|  30000246|    43974039|2103-07-08 17:12:00|     M|        59|              NULL|
|  30000246|    45869139|2103-07-11 10:37:00|     M|        59|              NULL|
|  30000521|    40796332|2187-08-15 11:33:00|     F|        46|              NULL|
+----------+------------+-------------------+------+----------+------------------+
only showing top 5 rows


Unnamed: 0,subject_id,admission_id,admittime,gender,anchor_age,dod_within_30_days
0,30000238,44714428,2195-10-14 07:12:00,M,74,
1,30000246,40653169,2104-05-05 16:18:00,M,59,
2,30000246,43974039,2103-07-08 17:12:00,M,59,
3,30000246,45869139,2103-07-11 10:37:00,M,59,
4,30000521,40796332,2187-08-15 11:33:00,F,46,
...,...,...,...,...,...,...
61838,39998775,48786224,2103-02-10 21:07:00,M,75,
61839,39998775,49181211,2104-02-12 20:43:00,M,75,
61840,39998775,49555175,2103-10-14 11:50:00,M,75,
61841,39999363,44506852,2195-02-22 14:39:00,F,73,


In [47]:
chart_events_post_admission = sql_manager.execute(spark, 'joins/admission_chartevents.sql')
chart_events_post_admission.createOrReplaceTempView("chart_events_adm")
chart_events_post_admission.show(5)

[Stage 369:>                                                        (0 + 1) / 1]

+----------+------------+--------+-------------------+----------------+--------------------+--------------------+
|subject_id|admission_id| stay_id|          admittime|num_chart_events|       chart_itemids|        chart_values|
+----------+------------+--------+-------------------+----------------+--------------------+--------------------+
|  30000521|    46912110|58492193|2188-04-17 16:46:00|              53|[320045, 320179, ...|[92.0, 160.0, 79....|
|  30000828|    40615900|57131111|2124-10-17 23:12:00|              17|[320045, 320179, ...|[97.0, 131.0, 87....|
|  30003460|    43300755|52287466|2139-04-28 18:30:00|              47|[320045, 320050, ...|[66.0, 126.0, 58....|
|  30004640|    41454271|57661199|2128-06-04 14:14:00|             162|[320045, 320179, ...|[63.0, 127.0, 78....|
|  30006179|    40825743|57485621|2133-05-08 06:47:00|              85|[320045, 320179, ...|[79.0, 145.0, 66....|
+----------+------------+--------+-------------------+----------------+-----------------

                                                                                

In [38]:
lab_events_adm = sql_manager.execute(spark, 'joins/admission_labevents.sql')
lab_events_adm.createOrReplaceTempView("lab_events_adm")
lab_events_adm.show(5)

[Stage 265:>                                                        (0 + 1) / 1]

+----------+------------+--------+-------------------+--------------+--------------------+--------------------+
|subject_id|admission_id| stay_id|          admittime|num_lab_events|         lab_itemids|          lab_values|
+----------+------------+--------+-------------------+--------------+--------------------+--------------------+
|  30000521|    46912110|58492193|2188-04-17 16:46:00|            20|[100040, 100003, ...|[9.0, 8.0, 94.6, ...|
|  30000828|    40615900|57131111|2124-10-17 23:12:00|             0|                  []|                  []|
|  30003460|    43300755|52287466|2139-04-28 18:30:00|            22|[100046, 100003, ...|[30.7, 8.8, 1.6, ...|
|  30004640|    41454271|57661199|2128-06-04 14:14:00|            16|[100001, 100013, ...|[158.0, 223.0, 8....|
|  30006179|    40825743|57485621|2133-05-08 06:47:00|            31|[100006, 100005, ...|[23.3, 35.0, 23.1...|
+----------+------------+--------+-------------------+--------------+--------------------+--------------

                                                                                

In [48]:
procedure_events_adm = sql_manager.execute(spark, 'joins/admission_procedure_events.sql')
procedure_events_adm.createOrReplaceTempView("procedure_events_adm")
procedure_events_adm.show(5)

[Stage 371:>                                                        (0 + 1) / 1]

+----------+------------+--------+-------------------+---------------+--------------------+--------------------+
|subject_id|admission_id| stay_id|          admittime|num_prod_events| prod_events_itemids|         prod_values|
+----------+------------+--------+-------------------+---------------+--------------------+--------------------+
|  30000238|    44714428|59801852|2195-10-14 15:12:00|             64|[772725, 707814, ...|[576.0, 500.0, 17...|
|  30000246|    40653169|55154064|2104-05-05 18:23:00|              0|                  []|                  []|
|  30001076|    41565504|56667136|2104-10-20 22:15:00|              0|                  []|                  []|
|  30001076|    41565504|59550340|2104-10-24 18:09:00|              0|                  []|                  []|
|  30001292|    47098230|57234823|2116-02-03 04:23:00|             33|[766246, 766246, ...|[408.0, 330.0, 16...|
+----------+------------+--------+-------------------+---------------+--------------------+-----

                                                                                

In [50]:
dataset = sql_manager.execute(spark, 'final/dataset.sql')
dataset.show(5)
dataset_df = dataset.toPandas()
dataset_df.to_csv("data/final_dataset.csv", index=False)


                                                                                

+----------+------------+-------------------+--------+-------------------+---------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------+--------------------+--------------------+------+----------+------------------+
|subject_id|admission_id|          admittime| stay_id|     length_of_stay|num_prod_events| prod_events_itemids|         prod_values|num_chart_events|       chart_itemids|        chart_values|num_lab_events|         lab_itemids|          lab_values|gender|anchor_age|dod_within_30_days|
+----------+------------+-------------------+--------+-------------------+---------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------+--------------------+--------------------+------+----------+------------------+
|  30005809|    44231909|2143-03-25 13:06:00|52987879| 1.0659722222222223|             38|[763150, 763189, ...|[440.0, 431.0, 19...|          

                                                                                

In [None]:
dataset_df

                                                                                

82116

In [54]:
from pyspark.sql.functions import col, coalesce, when, count as spark_count, row_number
from pyspark.sql.window import Window
import pandas as pd

# Contar valores no-nulos por fila
dataset_with_nulls = dataset.withColumn(
    "non_null_count",
    sum(when(col(c).isNotNull(), 1).otherwise(0) for c in dataset.columns)
)

# Definir window function: ordenar por stay_id, con mayor número de non-nulls primero
window_spec = Window.partitionBy("stay_id").orderBy(col("non_null_count").desc())

# Agregar row number
dataset_dedup = dataset_with_nulls.withColumn("rn", row_number().over(window_spec))

# Quedarse solo con la primera (la más completa)
dataset_dedup_final = dataset_dedup.filter(col("rn") == 1).drop("rn", "non_null_count")

print(f"Dataset original: {dataset.count()} filas")
print(f"Dataset después de deduplicación: {dataset_dedup_final.count()} filas")
print(f"Filas removidas: {dataset.count() - dataset_dedup_final.count()}")

# Actualizar la vista temporal
dataset_dedup_final.createOrReplaceTempView("dataset")

dataset_df = dataset_dedup_final.toPandas()
dataset_df.to_csv("data/final_dataset_dedup.csv", index=False)

                                                                                

Dataset original: 30592 filas


                                                                                

Dataset después de deduplicación: 28612 filas


                                                                                



26/01/02 18:47:19 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
                                                                                

Filas removidas: 1980


                                                                                

In [None]:
dataset_df