# Creación del dataset principal

## Ejecutar query para obtener el dataset

Dado que el dataset final tiene **más de 400 millones de filas**, **varias columnas de diversos tipos de datos** y un tamaño de alrededor de **20GB**,  se ha creado un data frame distribuido con **Spark** que contiene tal dataset, previamente almacenado en Azure Data Lake. Las consultas y creación de subconjuntos de datos para los algoritmos de Aprendizaje Automático se efectuarán con Spark.

## Configurar Spark


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, regexp_extract, count

# Iniciar sesión de Spark con configuración ajustada para 32GB RAM
spark = SparkSession.builder \
    .appName("Preparación dataset para entrenamiento del modelo predictivo") \
    .config("spark.master", "local[*]") \
    .config("spark.driver.memory", "24g") \
    .config("spark.executor.memory", "24g") \
    .config("spark.sql.shuffle.partitions", "64") \
    .config("spark.driver.maxResultSize", "8g") \
    .getOrCreate()

## Cargar los archivos CSV en DataFrames de Spark

In [None]:
# Definir la ubicación de los archivos
base_path = f"./data"

# Cargar los CSV en DataFrames
df_admissions = spark.read.option("header", "true").csv(f"{base_path}/mimic4/admissions.csv")
df_labevents = spark.read.option("header", "true").csv(f"{base_path}/mimic4/labevents.csv")
df_diagnoses_icd = spark.read.option("header", "true").csv(f"{base_path}/mimic4/diagnoses_icd.csv")
df_d_icd_diagnoses = spark.read.option("header", "true").csv(f"{base_path}/mimic4/d_icd_diagnoses.csv")
df_icd_blocks = spark.read.option("header", "true").csv(f"{base_path}/mimic4/icd_blocks.csv")
df_patients = spark.read.option("header", "true").csv(f"{base_path}/mimic4/patients.csv")

# Registrar los DataFrames como tablas temporales en SparkSQL
df_admissions.createOrReplaceTempView("admissions")
df_labevents.createOrReplaceTempView("labevents")
df_diagnoses_icd.createOrReplaceTempView("diagnoses_icd")
df_d_icd_diagnoses.createOrReplaceTempView("d_icd_diagnoses")
df_icd_blocks.createOrReplaceTempView("icd_blocks")
df_patients.createOrReplaceTempView("patients")

## Ejecutar la consulta SQL en Spark SQL

Obtener datos de todos los ingresos, pruebas, diagnosticos y datos de pacientes

In [None]:
query = """
SELECT 
  admissions.hadm_id AS id_ingreso,
  labevents.itemid AS id_prueba,
  diagnoses_icd.icd_code AS diagnostico_icd,
  d_icd_diagnoses.long_title AS diagnostico_descripcion,
  icd_blocks.icd_domain_code AS dominio_icd,
  icd_blocks.domain_description AS dominio_descripcion,
  patients.gender AS sexo,
  patients.anchor_age AS edad,
  admissions.marital_status AS estado_civil,
  admissions.insurance AS tipo_seguro,
  admissions.race AS grupo_poblacional,
  admissions.hospital_expire_flag AS muerte_durante_ingreso
FROM admissions
INNER JOIN labevents
  ON admissions.hadm_id = labevents.hadm_id
INNER JOIN diagnoses_icd
  ON admissions.hadm_id = diagnoses_icd.hadm_id
INNER JOIN d_icd_diagnoses
  ON diagnoses_icd.icd_code = d_icd_diagnoses.icd_code
  AND diagnoses_icd.icd_version = d_icd_diagnoses.icd_version
INNER JOIN icd_blocks
  ON icd_blocks.icd_code = diagnoses_icd.icd_code
INNER JOIN patients
  ON patients.subject_id = admissions.subject_id
WHERE diagnoses_icd.icd_version = 10
AND patients.anchor_age >= 18
AND labevents.hadm_id IS NOT NULL
AND labevents.flag = 'abnormal'
ORDER BY id_ingreso, id_prueba
"""

df_result = spark.sql(query)
df_result.show(10) 

## Guardar el resultado en formato Parquet

In [None]:
output_path = f"{base_path}/resultados/dataset_final.parquet"

In [None]:
df_result.write.mode("overwrite").parquet(output_path)
print(f"Los datos han sido almacenados en {output_path}")