# BP

## 0. Spark 세션 생성

In [10]:
 from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BP_Cleaning") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/11 11:57:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 1. 데이터 로딩

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, DoubleType, StringType
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("BP_Cleaning") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# 1) chartevents.csv.gz 로딩 (CSV 아님! => csv()로)
chartevents_path = "/Users/skku_aws165/Documents/MIMIC/icu/chartevents.csv.gz"

# 필요한 컬럼 위주 스키마 (속도/메모리 절약)
schema = StructType([
    StructField("subject_id", IntegerType(), True),
    StructField("hadm_id", IntegerType(), True),
    StructField("stay_id", IntegerType(), True),
    StructField("charttime", TimestampType(), True),
    StructField("storetime", TimestampType(), True),   # 없어도 됨
    StructField("itemid", IntegerType(), True),
    StructField("value", StringType(), True),          # 텍스트 값
    StructField("valuenum", DoubleType(), True),       # 수치 값
    StructField("valueuom", StringType(), True),       # 단위
    # 그 외 컬럼은 필요하면 추가
])

chartevents_df = spark.read.csv(
    chartevents_path,
    header=True,
    schema=schema,
    mode="DROPMALFORMED"
).select("stay_id","charttime","itemid","valuenum","valueuom")

# 2) cohort 로딩
stay_id_df = spark.read.csv(
    "/Users/skku_aws165/Documents/MIMIC/MIMIC-IV-Project/notebooks/final/new_cohort.csv",
    header=True, inferSchema=True
).select("stay_id").dropDuplicates()


                                                                                

## 2. BP Cleaning – Hard Filtering + Fallback (5분 Flat-line)
**설명:**  
- SBP / DBP / MAP 각각 별도 처리  
- IBP 우선, 이상치 시 NIBP로 대체, 둘 다 없으면 결측  
- 각 컴포넌트별 정제 결과와 요약 통계 출력  


In [21]:
# ============================================================
# BP Cleaning – Hard Filtering + Fallback (5min flat-line)
# - SBP / DBP / MAP 각각 별도 처리
# - IBP 우선, 이상치면 NIBP fallback, 없으면 결측
# - 출력: 각 컴포넌트별 정제 결과 + 요약
# ============================================================

from pyspark.sql import functions as F, Window
from pyspark.sql import DataFrame

# -------------------------
# 0) 설정값 (필요시 수정)
# -------------------------
# MetaVision 기준(자주 쓰는 itemid)
ITEMS = {
    "SBP": {
        "IBP": [220050],      # Arterial BP Systolic
        "NIBP": [220179]      # Non Invasive Systolic BP
    },
    "DBP": {
        "IBP": [220051],      # Arterial BP Diastolic
        "NIBP": [220180]      # Non Invasive Diastolic BP
    },
    "MAP": {
        "IBP": [220052],      # Arterial BP Mean
        "NIBP": [220181]      # Non Invasive Mean BP
    }
}
# CareVue itemid가 필요하면 위 딕셔너리에 추가해서 같이 넣으면 됨.

# Extreme outlier 기준
EXTREME = {
    "SBP": (50, 250),
    "DBP": (30, 150),
    "MAP": (40, 200)  # MAP은 선택적(원하면 꺼도 됨)
}

# Flat-line (시간기반 5분 std<2)
FLAT_STD_THRESH = 2.0
FLAT_WINDOW_SEC = 300  # 5분

# -------------------------
# 1) 유틸 함수
# -------------------------
def _prep_component_frames(chartevents_df: DataFrame,
                           stay_df: DataFrame,
                           itemids_ibp: list,
                           itemids_nibp: list,
                           value_col: str) -> (DataFrame, DataFrame):
    """
    chartevents에서 해당 컴포넌트의 IBP/NIBP만 추출해 표준 컬럼으로 정리.
    반환: (ibp_df, nibp_df) with cols [stay_id, charttime, <value_col>]
    """
    base_cols = ["stay_id", "charttime", "itemid", "valuenum"]
    # cohort join (stay_id 기준)
    df = chartevents_df.select(*base_cols).join(stay_df.select("stay_id").dropDuplicates(), on="stay_id", how="inner")
    df = df.filter(F.col("valuenum").isNotNull())

    ibp = df.filter(F.col("itemid").isin(itemids_ibp)) \
            .select("stay_id", "charttime", F.col("valuenum").alias(value_col)) \
            .withColumn("ts", F.unix_timestamp("charttime"))

    nibp = df.filter(F.col("itemid").isin(itemids_nibp)) \
             .select("stay_id", "charttime", F.col("valuenum").alias(f"nibp_{value_col}"))

    return ibp, nibp


def _apply_ibp_validity(ibp_df: DataFrame, comp: str) -> DataFrame:
    """
    IBP extreme + flat-line(5분 std<2) 적용 → is_valid_ibp 플래그 생성
    """
    lo, hi = EXTREME[comp]
    # extreme
    ibp = ibp_df.withColumn(
        "is_valid_ext",
        (F.col(comp.lower()).between(lo, hi))
    )

    # 5분 rolling std (시간기반 윈도우)
    w5 = Window.partitionBy("stay_id").orderBy("ts").rangeBetween(-FLAT_WINDOW_SEC, 0)
    ibp = ibp.withColumn(f"std5_{comp.lower()}", F.stddev(comp.lower()).over(w5))

    ibp = ibp.withColumn(
        "flat_artifact",
        (F.col(f"std5_{comp.lower()}") < FLAT_STD_THRESH)
    )

    return ibp.withColumn(
        "is_valid_ibp",
        F.col("is_valid_ext") & (~F.col("flat_artifact"))
    )


def process_component(chartevents_df: DataFrame,
                      stay_df: DataFrame,
                      comp: str,
                      out_prefix: str = "outputs") -> (DataFrame, DataFrame):
    """
    한 컴포넌트(SBP/DBP/MAP)에 대해:
      - IBP/NIBP 분리
      - IBP extreme + flat-line 적용
      - IBP 유효 시 IBP, 아니면 NIBP fallback
      - 결과 및 요약 저장
    반환: (result_df, summary_df)
    """
    comp = comp.upper()
    assert comp in ["SBP", "DBP", "MAP"]
    value_col = comp.lower()  # "sbp", "dbp", "map"

    ibp_raw, nibp = _prep_component_frames(
        chartevents_df,
        stay_df,
        ITEMS[comp]["IBP"],
        ITEMS[comp]["NIBP"],
        value_col
    )

    ibp = _apply_ibp_validity(ibp_raw, comp)

    # merge & fallback
    merged = ibp.join(nibp, ["stay_id", "charttime"], "outer")

    # 최종값
    merged = merged.withColumn(
        f"final_{value_col}",
        F.when(F.col("is_valid_ibp"), F.col(value_col)).otherwise(F.col(f"nibp_{value_col}"))
    ).withColumn(
        f"{value_col}_source",
        F.when(F.col("is_valid_ibp") & F.col(value_col).isNotNull(), F.lit("IBP"))
         .when(F.col(f"nibp_{value_col}").isNotNull(), F.lit("NIBP"))
         .otherwise(F.lit("NA"))
    )

    # 저장 (컴포넌트별)
    out_path = f"{out_prefix}/bp_{value_col}_clean.parquet"
    merged.write.mode("overwrite").parquet(out_path)

# 요약(채택 비율, 유효/아웃라이어/플랫 카운트 등)
# ---- 기존 코드 (문제 발생)
# summary = merged.select(
#     "stay_id",
#     F.count(F.when(F.col("is_valid_ibp"), True)).alias("cnt_valid_ibp"),
#     F.count(F.when(~F.col("is_valid_ibp"), True)).alias("cnt_invalid_ibp"),
#     F.count(F.when(F.col(f"{value_col}_source") == "IBP", True)).alias("cnt_final_ibp"),
#     F.count(F.when(F.col(f"{value_col}_source") == "NIBP", True)).alias("cnt_final_nibp"),
#     F.count(F.when(F.col(f"{value_col}_source") == "NA", True)).alias("cnt_final_na")
# ).groupBy().sum()

# ---- 수정 코드 (전역 요약)
    summary = merged.agg(
        F.count(F.when(F.col("is_valid_ibp"), True)).alias("cnt_valid_ibp"),
        F.count(F.when(~F.col("is_valid_ibp"), True)).alias("cnt_invalid_ibp"),
        F.count(F.when(F.col(f"{value_col}_source") == "IBP", True)).alias("cnt_final_ibp"),
        F.count(F.when(F.col(f"{value_col}_source") == "NIBP", True)).alias("cnt_final_nibp"),
        F.count(F.when(F.col(f"{value_col}_source") == "NA", True)).alias("cnt_final_na")
    )

    summary_out = f"{out_prefix}/bp_{value_col}_summary.parquet"
    summary.write.mode("overwrite").parquet(summary_out)

    return merged, summary

# -------------------------
# 2) 실행부  (경로만 네 환경에 맞게 수정)
# -------------------------
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, DoubleType, StringType

# 1) chartevents.csv.gz 로딩 (CSV! => csv() 사용)
chartevents_path = "/Users/skku_aws165/Documents/MIMIC/icu/chartevents.csv.gz"

schema = StructType([
    StructField("subject_id", IntegerType(), True),
    StructField("hadm_id", IntegerType(), True),
    StructField("stay_id", IntegerType(), True),
    StructField("charttime", TimestampType(), True),
    StructField("storetime", TimestampType(), True),
    StructField("itemid", IntegerType(), True),
    StructField("value", StringType(), True),
    StructField("valuenum", DoubleType(), True),
    StructField("valueuom", StringType(), True),
])

chartevents_df = spark.read.csv(
    chartevents_path,
    header=True,
    schema=schema,
    mode="DROPMALFORMED"
).select("stay_id","charttime","itemid","valuenum","valueuom")

# 2) cohort 로딩
stay_id_df = spark.read.csv(
    "/Users/skku_aws165/Documents/MIMIC/MIMIC-IV-Project/notebooks/final/new_cohort.csv",
    header=True, inferSchema=True
).select("stay_id").dropDuplicates()

# 3) 실행
sbp_res, sbp_sum = process_component(chartevents_df, stay_id_df, "SBP", out_prefix="outputs")
dbp_res, dbp_sum = process_component(chartevents_df, stay_id_df, "DBP", out_prefix="outputs")
map_res, map_sum = process_component(chartevents_df, stay_id_df, "MAP", out_prefix="outputs")

# 4) 최종 병합
final_bp = sbp_res.select("stay_id","charttime",
                          F.col("final_sbp"), F.col("sbp_source"),
                          F.col("sbp").alias("ibp_sbp"),
                          F.col("nibp_sbp")) \
    .join(
        dbp_res.select("stay_id","charttime",
                       F.col("final_dbp"), F.col("dbp_source"),
                       F.col("dbp").alias("ibp_dbp"),
                       F.col("nibp_dbp")),
        ["stay_id","charttime"], "outer"
    ).join(
        map_res.select("stay_id","charttime",
                       F.col("final_map"), F.col("map_source"),
                       F.col("map").alias("ibp_map"),
                       F.col("nibp_map")),
        ["stay_id","charttime"], "outer"
    )

# (참고) MAP 최종값이 없으면 NIBP에서 유도하는 로직 추가하고 싶다면:
final_bp = final_bp.withColumn(
    "final_map",
    F.when(F.col("final_map").isNotNull(), F.col("final_map"))
     .otherwise(
        F.when(F.col("nibp_map").isNotNull(), F.col("nibp_map"))
         .otherwise((F.col("final_dbp")*2 + F.col("final_sbp"))/3.0)
     )
)

final_bp.write.mode("overwrite").parquet("outputs/bp_final_merged.parquet")


chartevents_df.write.mode("overwrite").parquet("outputs/chartevents_cached.parquet")
# 다음 실행부터는
# chartevents_df = spark.read.parquet("outputs/chartevents_cached.parquet")


25/08/11 12:42:56 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: stay_id, caregiver_id, storetime, value
 Schema: stay_id, charttime, itemid, valuenum
Expected: charttime but found: caregiver_id
CSV file: file:///Users/skku_aws165/Documents/MIMIC/icu/chartevents.csv.gz
25/08/11 12:42:56 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: stay_id, caregiver_id, storetime, value
 Schema: stay_id, charttime, itemid, valuenum
Expected: charttime but found: caregiver_id
CSV file: file:///Users/skku_aws165/Documents/MIMIC/icu/chartevents.csv.gz
25/08/11 13:03:37 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: stay_id, caregiver_id, storetime, value
 Schema: stay_id, charttime, itemid, valuenum
Expected: charttime but found: caregiver_id
CSV file: file:///Users/skku_aws165/Documents/MIMIC/icu/chartevents.csv.gz
25/08/11 13:03:37 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: stay_id, care