In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
from collections import defaultdict
import re
import gc
import time
import json

pd.set_option('display.max_columns', 120)
pd.set_option('display.width', 120)

In [None]:
csv_path = Path("admissions.csv")

admissions = pd.read_csv(csv_path, low_memory=False,
                            parse_dates=['admittime','dischtime','deathtime','edregtime','edouttime'])
print("Loaded admissions.csv from disk. Rows:", len(admissions))

admissions['admittime'] = pd.to_datetime(admissions['admittime'], errors='coerce')
admissions['dischtime'] = pd.to_datetime(admissions['dischtime'], errors='coerce')

admissions['dischtime'] = admissions['dischtime'].fillna(admissions['admittime'])

def expand_admission_row(row):
    adm_date = row['admittime'].normalize().date()
    dis_date = row['dischtime'].normalize().date()
    n_days = (dis_date - adm_date).days + 1
    if n_days <= 0:
        n_days = 1
    rows = []
    for d in range(n_days):
        new = {
            'subject_id': row['subject_id'],
            'hadm_id': row['hadm_id'],
            'day_index': int(d),
            'admittime': row['admittime'],
            'dischtime': row['dischtime'],
            'deathtime': row['deathtime'],
            'admission_type': row.get('admission_type', np.nan),
            'admit_provider_id': row.get('admit_provider_id', np.nan),
            'admission_location': row.get('admission_location', np.nan),
            'discharge_location': row.get('discharge_location', np.nan),
            'insurance': row.get('insurance', np.nan),
            'language': row.get('language', np.nan),
            'marital_status': row.get('marital_status', np.nan),
            'race': row.get('race', np.nan),
            'edregtime': row.get('edregtime', pd.NaT),
            'edouttime': row.get('edouttime', pd.NaT),
            'hospital_expire_flag': row.get('hospital_expire_flag', np.nan)
        }
        rows.append(new)
    return rows

expanded = []
for _, r in admissions.iterrows():
    expanded.extend(expand_admission_row(r))

merged_initial = pd.DataFrame(expanded)
merged_initial[['subject_id','hadm_id','day_index']] = merged_initial[['subject_id','hadm_id','day_index']].astype('Int64')

print("Expanded admissions -> rows:", merged_initial.shape[0])


In [None]:
merged_initial.to_csv("admissions_expanded.csv", index=False)

In [None]:
merged_initial = pd.read_csv("admissions_expanded.csv")

In [None]:
icu_path = Path("icustays.csv")
if not icu_path.exists():
    raise FileNotFoundError(f"icustays.csv not found at {icu_path.resolve()}  -- put the file next to admissions.csv")

icustays = pd.read_csv(icu_path, low_memory=False, parse_dates=['intime','outtime'])

for col in ['subject_id','hadm_id','intime','outtime']:
    if col not in icustays.columns:
        raise KeyError(f"Expected column '{col}' in icustays.csv but it is missing.")

optional_cols = ['stay_id','first_careunit','last_careunit','los']
for c in optional_cols:
    if c not in icustays.columns:
        icustays[c] = pd.NA

merged_with_icu = merged_initial.copy().reset_index(drop=False).rename(columns={'index':'row_id'})
for col in ['stay_id_icu','icustay_intime','icustay_outtime','first_careunit_icu','last_careunit_icu','los_icu']:
    if col not in merged_with_icu.columns:
        merged_with_icu[col] = pd.NA

if 'admittime' not in merged_with_icu.columns:
    raise KeyError("merged_initial must contain 'admittime' column")

merged_with_icu['admittime'] = pd.to_datetime(merged_with_icu['admittime'], errors='coerce')
merged_with_icu['day_index_int'] = merged_with_icu['day_index'].fillna(0).astype(int)
merged_with_icu['row_date'] = merged_with_icu['admittime'].dt.normalize() + pd.to_timedelta(merged_with_icu['day_index_int'], unit='D')

icustays['intime'] = pd.to_datetime(icustays['intime'], errors='coerce')
icustays['outtime'] = pd.to_datetime(icustays['outtime'], errors='coerce').fillna(icustays['intime'])
icustays['intime_norm'] = icustays['intime'].dt.normalize()
icustays['outtime_norm'] = icustays['outtime'].dt.normalize()

icu_keep = ['subject_id','hadm_id','stay_id','intime','outtime','intime_norm','outtime_norm','first_careunit','last_careunit','los']
candidate = merged_with_icu.merge(icustays[icu_keep], on=['subject_id','hadm_id'], how='left', suffixes=('','_icu'))

mask_in_icu = (candidate['row_date'] >= candidate['intime_norm']) & (candidate['row_date'] <= candidate['outtime_norm'])
candidate['in_icu'] = mask_in_icu.fillna(False)

matched = candidate[candidate['in_icu']].copy()
if not matched.empty:
    matched = matched.sort_values(by=['row_id','intime'])
    first_matches = matched.groupby('row_id', as_index=False).first()
    map_cols = {
        'stay_id':'stay_id_icu',
        'intime':'icustay_intime',
        'outtime':'icustay_outtime',
        'first_careunit':'first_careunit_icu',
        'last_careunit':'last_careunit_icu',
        'los':'los_icu'
    }
    for src, dst in map_cols.items():
        mapping = first_matches.set_index('row_id')[src]
        merged_with_icu.loc[merged_with_icu['row_id'].isin(mapping.index), dst] = merged_with_icu.loc[merged_with_icu['row_id'].isin(mapping.index), 'row_id'].map(mapping)
    assigned_count = len(first_matches)
else:
    assigned_count = 0

merged_with_icu = merged_with_icu.drop(columns=['day_index_int','row_date'])

print(f"ICU assignment complete. Rows where ICU info filled: {int(assigned_count)}")
print(merged_with_icu[merged_with_icu['stay_id_icu'].notna()].head(20))


In [None]:
rename_map = {
    'stay_id_icu': 'stay_id',
    'first_careunit_icu': 'first_careunit',
    'last_careunit_icu': 'last_careunit',
    'icustay_intime': 'icustays_intime',
    'icustay_outtime': 'icustays_outtime',
    'los_icu': 'los'
}
merged_with_icu = merged_with_icu.rename(columns=rename_map)

icu_cols_ordered = ['stay_id', 'first_careunit', 'last_careunit', 'icustays_intime', 'icustays_outtime', 'los']

other_cols = [c for c in merged_with_icu.columns if c not in icu_cols_ordered]

merged_with_icu = merged_with_icu[other_cols + icu_cols_ordered]

print("Renaming & reordering complete.")
print(merged_with_icu[icu_cols_ordered].head(20))


In [None]:
merged_initial

In [None]:
merged_with_icu.head(5)

In [None]:
merged_initial.to_csv('merged_initial.csv')

In [None]:
merged_with_icu.to_csv('merged_with_icu.csv')

In [None]:
merged_with_icu.head(100).to_csv('merged_with_icu_sample.csv')

In [None]:
vanco_path = Path("all_vanco.csv")
if not vanco_path.exists():
    raise FileNotFoundError(f"all_vanco.csv not found at {vanco_path.resolve()}")

all_vanco = pd.read_csv(vanco_path, low_memory=False, parse_dates=['charttime'])

all_vanco['subject_id'] = pd.to_numeric(all_vanco['subject_id'], errors='coerce').astype('Int64')
all_vanco['hadm_id'] = pd.to_numeric(all_vanco['hadm_id'], errors='coerce').astype('Int64')

def resolve_numeric(row):
    v = row.get('value')
    vn = row.get('valuenum')
    if pd.isna(v) or str(v).strip() in ['', '___', 'NaN', 'nan']:
        try:
            return float(vn) if not pd.isna(vn) else np.nan
        except:
            return np.nan
    s = str(v).strip().replace(',', '')
    try:
        return float(s)
    except:
        try:
            return float(vn) if not pd.isna(vn) else np.nan
        except:
            return np.nan

all_vanco['resolved_val'] = all_vanco.apply(resolve_numeric, axis=1)

merged_with_vanco = merged_with_icu.copy()
if 'admittime' not in merged_with_vanco.columns:
    raise KeyError(f"merged_with_vanco must contain 'admittime' column before merging labs.")
merged_with_vanco['admittime'] = pd.to_datetime(merged_with_vanco['admittime'], errors='coerce')

admit_map = merged_with_vanco.groupby(['subject_id','hadm_id'], dropna=False)['admittime'].first().reset_index().rename(columns={'admittime':'admit_time'})
admit_map['admit_date'] = pd.to_datetime(admit_map['admit_time']).dt.normalize()

all_vanco = all_vanco.merge(admit_map[['subject_id','hadm_id','admit_date']], on=['subject_id','hadm_id'], how='left')

missing_admit = all_vanco['admit_date'].isna().sum()
if missing_admit:
    print(f"Warning: {missing_admit} all_vanco rows have no matching admission (admit_date missing) and will be skipped.")
all_vanco = all_vanco[all_vanco['admit_date'].notna()].copy()

all_vanco['chart_date'] = pd.to_datetime(all_vanco['charttime'], errors='coerce').dt.normalize()
all_vanco['day_index_lab'] = (all_vanco['chart_date'] - all_vanco['admit_date']).dt.days.fillna(0).astype(int)
all_vanco.loc[all_vanco['day_index_lab'] < 0, 'day_index_lab'] = 0

group_cols = ['subject_id','hadm_id','day_index_lab']
usable = all_vanco[~all_vanco['resolved_val'].isna()].copy()
if usable.empty:
    print("No usable numeric vanco values found to aggregate.")
    daily_vanco = pd.DataFrame(columns=['subject_id','hadm_id','day_index_lab',
                                       'charttime','value','valuenum','valueuom','flag','resolved_val'])
else:
    idx = usable.groupby(group_cols)['resolved_val'].idxmax()
    daily_vanco = usable.loc[idx].copy()

daily_vanco = daily_vanco.rename(columns={
    'charttime':'all_vanco_charttime',
    'value':'all_vanco_value',
    'valuenum':'all_vanco_valuenum',
    'valueuom':'all_vanco_valueuom',
    'flag':'all_vanco_flag',
    'day_index_lab':'day_index'
})

merge_cols = ['subject_id','hadm_id','day_index',
              'all_vanco_charttime','all_vanco_value','all_vanco_valuenum','all_vanco_valueuom','all_vanco_flag']
daily_vanco = daily_vanco[merge_cols]

daily_vanco['subject_id'] = daily_vanco['subject_id'].astype('Int64')
daily_vanco['hadm_id'] = daily_vanco['hadm_id'].astype('Int64')
daily_vanco['day_index'] = daily_vanco['day_index'].astype('Int64')

merged_with_vanco = merged_with_vanco.merge(daily_vanco, on=['subject_id','hadm_id','day_index'], how='left')

print(f"all_vanco merged -> rows with vanco info: {int(merged_with_vanco['all_vanco_charttime'].notna().sum())}")

preview = merged_with_vanco[merged_with_vanco['all_vanco_charttime'].notna()].head(20)
print(preview[['subject_id','hadm_id','day_index',
               'all_vanco_charttime','all_vanco_value','all_vanco_valuenum','all_vanco_valueuom','all_vanco_flag']])


In [None]:
merged_with_vanco.head(100).to_csv('merged_with_vanco_sample.csv')

In [None]:
merged_with_vanco.to_csv('merged_with_vanco.csv')

In [None]:
chartevents_path = Path("chartevents.csv")
chartevents = pd.read_csv(chartevents_path, nrows=1000)


In [None]:
pd.read_csv(chartevents_path,nrows=100)

In [None]:
pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)
pd.set_option("display.width", None)
pd.set_option("display.max_colwidth", None)

In [None]:
print(chartevents.head(100))

In [None]:
chartevents_path = Path("chartevents.csv")
if not chartevents_path.exists():
    raise FileNotFoundError(f"chartevents.csv not found at {chartevents_path.resolve()}")

usecols = ['subject_id','hadm_id','itemid','charttime','value','valuenum']
chunksize = 200_000

total_counts = defaultdict(int)
present_counts = defaultdict(int)

reader = pd.read_csv(chartevents_path, usecols=usecols, chunksize=chunksize, low_memory=True)

chunk_i = 0
for chunk in reader:
    chunk_i += 1
    chunk['itemid'] = pd.to_numeric(chunk['itemid'], errors='coerce').astype('Int64')
    chunk = chunk[chunk['itemid'].notna()]
    if chunk.empty:
        continue

    present_mask = ~chunk['valuenum'].isna()

    need_check = chunk['valuenum'].isna()
    if need_check.any():
        vals = chunk.loc[need_check, 'value'].astype(str).str.strip()
        good = ~vals.isin(["", "___", "NaN", "nan", "None", "none"])
        present_mask.loc[need_check] = good.values

    grp_total = chunk.groupby('itemid').size()
    grp_present = present_mask.groupby(chunk['itemid']).sum()

    for item, cnt in grp_total.items():
        total_counts[int(item)] += int(cnt)
    for item, cnt in grp_present.items():
        if pd.isna(item):
            continue
        present_counts[int(item)] += int(cnt)

    if chunk_i % 10 == 0:
        print(f"Processed {chunk_i*chunksize:,} rows...")

itemids = sorted(set(list(total_counts.keys()) + list(present_counts.keys())))
rows = []
for iid in itemids:
    tot = total_counts.get(iid, 0)
    pres = present_counts.get(iid, 0)
    miss = tot - pres
    frac = pres / tot if tot > 0 else 0.0
    rows.append((iid, tot, pres, miss, frac))

missingness_df = pd.DataFrame(rows, columns=['itemid','total_count','present_count','missing_count','present_fraction'])
missingness_df = missingness_df.sort_values(by='present_fraction', ascending=False).reset_index(drop=True)
missingness_df.to_csv("chartevents_itemid_missingness.csv", index=False)
print("Saved chartevents_itemid_missingness.csv")


In [None]:
df = pd.read_csv("chartevents_itemid_missingness.csv")

drop_ids = df.loc[df['present_fraction'] < 0.5, 'itemid'].tolist()

print(f"تعداد itemid هایی که باید drop بشن: {len(drop_ids)}")
print(drop_ids[:50])

In [None]:
reader = pd.read_csv("chartevents.csv", chunksize=2_000_000)
out_path = "chartevents_missing50_dropped.csv"

first = True
total_rows = 0
total_dropped = 0
total_written = 0

for i, chunk in enumerate(reader, start=1):
    before = len(chunk)
    filtered = chunk.loc[~chunk['itemid'].isin(drop_ids)]
    after = len(filtered)
    
    filtered.to_csv(out_path, mode="w" if first else "a", index=False, header=first)
    first = False
    
    total_rows += before
    total_dropped += before - after
    total_written += after
    print(f"Chunk {i}: rows={before:,}, dropped={before - after:,}, kept={after:,}")

print("---- DONE ----")
print(f"Total rows processed: {total_rows:,}")
print(f"Total rows dropped:   {total_dropped:,}")
print(f"Total rows written:   {total_written:,}")
print("✅ فایل نهایی ذخیره شد:", out_path)

In [None]:
df = pd.read_csv("d_items.csv")

In [None]:
print(df.head(0))

In [None]:
in_path = "d_items.csv"
out_path = "d_items_chartevents_missing50_dropped.csv"

df = pd.read_csv(in_path)

filtered = df.loc[
    (df["linksto"] == "chartevents") & 
    (~df["itemid"].isin(drop_ids))
]

filtered.to_csv(out_path, index=False)

print("✅ d_items filtered and saved:", out_path)
print("before:", len(df), "after:", len(filtered), "drop:", len(df) - len(filtered))

In [None]:
ditems_path = Path("d_items_chartevents_missing50_dropped.csv")
merged_initial_file = Path("merged_initial.csv")
out_path = Path("merged_initial_with_items_cols.csv")

ditems = pd.read_csv(ditems_path, usecols=['itemid'])
itemids = pd.to_numeric(ditems['itemid'], errors='coerce').dropna().astype(int).unique().tolist()
cols_to_add = [str(i) for i in itemids]
print("Will add columns (count):", len(cols_to_add))

try:
    merged_initial
    print("Using merged_initial from memory (existing DataFrame). rows:", len(merged_initial))
except NameError:
    if not merged_initial_file.exists():
        raise FileNotFoundError(f"merged_initial not in memory and file {merged_initial_file} not found.")
    print("Loading merged_initial from disk:", merged_initial_file)
    merged_initial = pd.read_csv(merged_initial_file, low_memory=False, parse_dates=['admittime','dischtime','deathtime','edregtime','edouttime'])
    print("Loaded merged_initial rows:", len(merged_initial))

n_rows = len(merged_initial)
added = 0
for c in cols_to_add:
    if c not in merged_initial.columns:
        merged_initial[c] = pd.Series([pd.NA] * n_rows, dtype="object")
        added += 1
print(f"Added {added} new columns. Total columns now: {len(merged_initial.columns)}")

chunksize = 10000
first = True
written = 0
for start in range(0, n_rows, chunksize):
    end = min(start + chunksize, n_rows)
    chunk = merged_initial.iloc[start:end]
    chunk.to_csv(out_path, mode="w" if first else "a", index=False, header=first)
    first = False
    written += len(chunk)
    print(f"Wrote rows {start:,}..{end-1:,} -> {len(chunk):,} rows")
    del chunk
    gc.collect()

print("✅ Done. Output saved to:", out_path)
print("Rows written:", written, "Columns in output:", len(merged_initial.columns))

In [None]:
print(merged_initial.head(3))

In [None]:
chartevents_path = Path("chartevents_missing50_dropped.csv")
ditems_path = Path("d_items_chartevents_missing50_dropped.csv")
merged_initial_file = None
chunksize = 500_000
save_after = False

if not chartevents_path.exists():
    raise FileNotFoundError(chartevents_path)
if not ditems_path.exists():
    raise FileNotFoundError(ditems_path)

ditems = pd.read_csv(ditems_path, usecols=['itemid'])
keep_itemids = pd.to_numeric(ditems['itemid'], errors='coerce').dropna().astype(int).unique().tolist()
keep_itemids_set = set(keep_itemids)
print("Keep itemids count:", len(keep_itemids))

try:
    merged_initial
except NameError:
    if merged_initial_file is None:
        raise NameError("merged_initial not in memory. Set merged_initial_file path or load it.")
    print("Loading merged_initial from disk...")
    merged_initial = pd.read_csv(merged_initial_file, low_memory=False, parse_dates=['admittime'])
    print("loaded merged_initial rows:", len(merged_initial))

for iid in keep_itemids:
    col = str(iid)
    if col not in merged_initial.columns:
        merged_initial[col] = pd.Series([pd.NA] * len(merged_initial), dtype="object")

admit_map = merged_initial.groupby(['subject_id','hadm_id'], dropna=False)['admittime'].first().reset_index().rename(columns={'admittime':'admit_time'})
admit_map['admit_date'] = pd.to_datetime(admit_map['admit_time'], errors='coerce').dt.normalize()
admit_map['key'] = list(zip(admit_map['subject_id'].astype('Int64'), admit_map['hadm_id'].astype('Int64')))
admit_dict = dict(zip(admit_map['key'], admit_map['admit_date']))

merged_initial_index_map = {}
for idx, row in merged_initial[['subject_id','hadm_id','day_index']].iterrows():
    key = (int(row['subject_id']), int(row['hadm_id']), int(row['day_index']))
    merged_initial_index_map[key] = idx

print("Admit map keys:", len(admit_dict), "merged rows map size:", len(merged_initial_index_map))

reader = pd.read_csv(chartevents_path, usecols=['subject_id','hadm_id','itemid','charttime','value','valuenum'],
                     parse_dates=['charttime'], chunksize=chunksize, low_memory=True)

total_assigned = 0
chunk_no = 0

for chunk in reader:
    chunk_no += 1
    print(f"\n--- Processing chunk {chunk_no} (rows: {len(chunk)}) ---")
    chunk['itemid'] = pd.to_numeric(chunk['itemid'], errors='coerce').astype('Int64')
    chunk = chunk[chunk['itemid'].notna()]
    chunk = chunk[chunk['itemid'].isin(keep_itemids)]
    if chunk.empty:
        print("no relevant itemids in this chunk")
        continue

    chunk['subject_id'] = chunk['subject_id'].astype(int)
    chunk['hadm_id'] = chunk['hadm_id'].astype(int)

    def lookup_admit_date(s):
        return admit_dict.get((int(s.subject_id), int(s.hadm_id)), pd.NaT)
    keys = list(zip(chunk['subject_id'].astype(int), chunk['hadm_id'].astype(int)))
    chunk['admit_date'] = [admit_dict.get(k, pd.NaT) for k in keys]

    chunk = chunk[chunk['admit_date'].notna()]
    if chunk.empty:
        print("no rows with admit_date in this chunk")
        continue

    chunk['chart_date'] = chunk['charttime'].dt.normalize()
    chunk['day_index'] = (chunk['chart_date'] - chunk['admit_date']).dt.days.fillna(0).astype(int)
    chunk.loc[chunk['day_index'] < 0, 'day_index'] = 0

    chunk['numeric_val'] = pd.to_numeric(chunk['valuenum'], errors='coerce')
    mask_num_missing = chunk['numeric_val'].isna()
    if mask_num_missing.any():
        parsed = pd.to_numeric(chunk.loc[mask_num_missing, 'value'].astype(str).str.replace(',',''), errors='coerce')
        chunk.loc[mask_num_missing, 'numeric_val'] = parsed

    chunk['value_raw'] = chunk['value'].astype(str)

    grp_keys = ['subject_id','hadm_id','day_index','itemid']

    numeric_rows = chunk[chunk['numeric_val'].notna()].copy()
    if not numeric_rows.empty:
        grp_num = numeric_rows.groupby(grp_keys, as_index=False)['numeric_val'].max()
        grp_num = grp_num.rename(columns={'numeric_val':'agg_value_num'})
    else:
        grp_num = pd.DataFrame(columns=grp_keys + ['agg_value_num'])

    chunk_sorted = chunk.sort_values('charttime')
    grp_last = chunk_sorted.groupby(grp_keys, as_index=False).last()[grp_keys + ['value_raw','charttime']]
    grp_last = grp_last.rename(columns={'value_raw':'agg_value_text', 'charttime':'agg_time_text'})

    merged_grps = pd.merge(grp_last, grp_num, on=grp_keys, how='left')

    def pick_final_val(row):
        if pd.notna(row.get('agg_value_num')):
            return row['agg_value_num']
        else:
            v = row.get('agg_value_text')
            if pd.isna(v) or v in ("nan","None","NoneType","NA","<NA>"):
                return pd.NA
            return v

    merged_grps['final_value'] = merged_grps.apply(pick_final_val, axis=1)

    assigned = 0
    for _, r in merged_grps.iterrows():
        key = (int(r['subject_id']), int(r['hadm_id']), int(r['day_index']))
        row_idx = merged_initial_index_map.get(key)
        if row_idx is None:
            continue
        itemid_col = str(int(r['itemid']))
        val = r['final_value']
        merged_initial.at[row_idx, itemid_col] = val
        assigned += 1

    total_assigned += assigned
    print(f"Chunk {chunk_no}: groups aggregated = {len(merged_grps)}, assigned = {assigned}, total_assigned so far = {total_assigned}")

    del chunk, chunk_sorted, numeric_rows, grp_num, grp_last, merged_grps
    gc.collect()

print("\n--- ALL CHUNKS PROCESSED ---")
print("Total assigned cells:", total_assigned)

out_path = Path("merged_with_chartevents_filled.csv")
n_rows = len(merged_initial)
write_chunk = 20000
first = True
for start in range(0, n_rows, write_chunk):
    end = min(start + write_chunk, n_rows)
    merged_initial.iloc[start:end].to_csv(out_path, mode='w' if first else 'a', index=False, header=first)
    first = False
    print(f"Saved rows {start}-{end-1}")
print("Saved final to:", out_path)

In [None]:
merged_with_chartevents_filled_path = Path("merged_with_chartevents_filled.csv")
merged_with_chartevents_filled = pd.read_csv(merged_with_chartevents_filled_path, nrows=500)


In [None]:
merged_with_chartevents_filled

In [None]:
input_path = Path("chartevents_missing50_dropped.csv")
output_path = Path("chartevents_missing50_dropped_filtered_hadm_id_23282506.csv")

chunksize = 2_000_000

first = True
total_rows = 0

for i, chunk in enumerate(pd.read_csv(input_path, chunksize=chunksize, low_memory=False)):
    filtered = chunk[chunk['hadm_id'] == 23282506]
    if not filtered.empty:
        filtered.to_csv(output_path, mode='w' if first else 'a',
                        index=False, header=first)
        first = False
        total_rows += len(filtered)
        print(f"Chunk {i}: wrote {len(filtered)} rows (total so far: {total_rows})")

print("Done! Final rows written:", total_rows)
print("Output file:", output_path)


In [None]:
ditems_path = Path("d_items_chartevents_missing50_dropped.csv")
in_path = Path("merged_with_chartevents_filled.csv")
out_path = Path("merged_with_chartevents_filled_renamed.csv")
chunksize = 20_000
max_name_len = 80

if not ditems_path.exists():
    raise FileNotFoundError(ditems_path)
if not in_path.exists():
    raise FileNotFoundError(in_path)

d = pd.read_csv(ditems_path, usecols=['itemid','label','abbreviation'], dtype=str)
d['itemid'] = d['itemid'].str.strip()
d['label'] = d['label'].fillna('').astype(str).str.strip()
d['abbreviation'] = d['abbreviation'].fillna('').astype(str).str.strip()

d['chosen'] = d.apply(lambda r: r['abbreviation'] if r['abbreviation']!='' else (r['label'] if r['label']!='' else ''), axis=1)

def sanitize_name(s):
    if pd.isna(s) or s is None:
        return ''
    s = str(s).strip()
    s = re.sub(r'\s+', '_', s)
    s = re.sub(r'[^\w\-]', '', s)
    s = re.sub(r'_+', '_', s)
    s = s[:max_name_len]
    return s

name_map = {}
used = set()

for _, row in d.iterrows():
    iid = row['itemid']
    chosen = row['chosen']
    if chosen == '':
        base = f"item_{iid}"
    else:
        base = sanitize_name(chosen)
        if base == '':
            base = f"item_{iid}"
    name = base
    if name in used:
        name = f"{base}__{iid}"
    counter = 1
    while name in used:
        name = f"{base}__{iid}_{counter}"
        counter += 1
    used.add(name)
    name_map[str(iid)] = name

orig_header = pd.read_csv(in_path, nrows=0).columns.tolist()
new_header = []
conflicts = 0
for col in orig_header:
    new_col = col
    col_str = str(col).strip()
    if col_str in name_map:
        new_col = "chartevents_" + name_map[col_str]
    else:
        try:
            icol = str(int(float(col_str)))
            if icol in name_map:
                new_col = name_map[icol]
        except Exception:
            pass
    if new_col in new_header:
        conflicts += 1
        new_col = f"{new_col}__orig_{sanitize_name(col_str)}"
        k = 1
        while new_col in new_header:
            new_col = f"{new_col}_{k}"; k += 1
    new_header.append(new_col)

print(f"Prepared header mapping. Total cols: {len(orig_header)}, conflicts resolved: {conflicts}")

sample_map = {k: name_map[k] for k in list(name_map)[:10]}
print("sample itemid->name (first 10):", sample_map)

first = True
rows_written = 0
for i, chunk in enumerate(pd.read_csv(in_path, chunksize=chunksize, low_memory=False)):
    chunk.columns = new_header
    chunk.to_csv(out_path, mode='w' if first else 'a', index=False, header=first)
    first = False
    rows_written += len(chunk)
    print(f"Chunk {i+1}: wrote {len(chunk):,} rows (total {rows_written:,})")
    del chunk
    gc.collect()

print("✅ Done. Output saved to:", out_path)
print("Rows written:", rows_written)

In [None]:
merged_with_chartevents_filled_renamed_path = Path("merged_with_chartevents_filled_renamed.csv")
merged_with_chartevents_filled_renamed = pd.read_csv(merged_with_chartevents_filled_renamed_path, nrows=500)


In [None]:
merged_with_chartevents_filled_renamed

In [None]:
datetimeevents_path = Path("datetimeevents.csv")
datetimeevents = pd.read_csv(datetimeevents_path, nrows=1000)

In [None]:
datetimeevents.head(100).to_csv('test_datetimeevents', index=False)

In [None]:
datetimeevents[300:350]

In [None]:
microbiologyevents_path = Path("microbiologyevents.csv")
microbiologyevents = pd.read_csv(microbiologyevents_path, nrows=1000)

In [None]:
microbiologyevents.head(100).to_csv('test_microbiologyevents', index=False)

In [None]:
microbiologyevents.head(5)

In [None]:
# dup

In [None]:
# --- Config ---
DIAGNOSIS_CSV = Path("diagnoses_icd.csv")
DICT_CSV = Path("d_icd_diagnoses.csv")
MERGED_INITIAL_CSV = Path("admissions_expanded.csv")   # input (admission x day)
OUT_CSV = Path("merged_with_diagnoses.csv")      # output
CHUNK_DIAG = 200_000        # chunk size for diagnoses reading (diagnoses table is usually small)
CHUNK_WRITE = 20_000        # chunk size for writing merged_initial with diagnoses
TOP_K = 5                   # produce diag_1..diag_K columns (set to 0 to disable)

# --- Helpers ---
def normalize_code(code):
    """Normalize ICD code strings for matching: str, strip, uppercase, remove dots."""
    if pd.isna(code):
        return ""
    s = str(code).strip().upper()
    s = s.replace(".", "")
    return s

def try_lookup_description(code, version, dict_map):
    """Attempt to find long_title for (code, version) with fallback strategies."""
    key = (code, str(int(version)) if pd.notna(version) else str(version))
    if key in dict_map:
        return dict_map[key]
    # fallback: remove leading zeros from both sides (e.g., '0010' -> '10')
    code_nolead = code.lstrip("0")
    key2 = (code_nolead, key[1])
    if key2 in dict_map:
        return dict_map[key2]
    # fallback: if dict keys have leading zeros and code doesn't, try to left-pad to 4 (common for old formats)
    if code.isdigit():
        for pad in (3,4,5):
            kp = (code.zfill(pad), key[1])
            if kp in dict_map:
                return dict_map[kp]
    return pd.NA

# --- Step 1: load ICD dictionary into memory (small) ---
if not DICT_CSV.exists():
    raise FileNotFoundError(f"{DICT_CSV} not found. Place d_icd_diagnoses.csv next to this script.")

dict_df = pd.read_csv(DICT_CSV, dtype=str)  # icd_code, icd_version, long_title
# normalize dict codes:
dict_df['icd_code_norm'] = dict_df['icd_code'].astype(str).apply(normalize_code)
dict_df['icd_version_norm'] = dict_df['icd_version'].astype(str).str.strip()
# build mapping (code_norm, version) -> long_title
dict_map = dict(((row.icd_code_norm, row.icd_version_norm), row.long_title) for row in dict_df.itertuples(index=False))

print(f"Loaded ICD dictionary rows: {len(dict_df)}")

# --- Step 2: read diagnoses_icd in chunks and accumulate per-admission lists ---
if not DIAGNOSIS_CSV.exists():
    raise FileNotFoundError(f"{DIAGNOSIS_CSV} not found. Place diagnoses_icd.csv next to this script.")

acc = defaultdict(list)   # key -> list of (seq_num (int), icd_code_norm (str), icd_version)
rows_seen = 0
for chunk in pd.read_csv(DIAGNOSIS_CSV, chunksize=CHUNK_DIAG, dtype=str, low_memory=False):
    # ensure required columns exist
    for col in ("subject_id","hadm_id","seq_num","icd_code","icd_version"):
        if col not in chunk.columns:
            raise KeyError(f"Expected column '{col}' in diagnoses_icd.csv but missing.")
    # normalize and iterate
    chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce').astype('Int64')
    chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce').astype('Int64')
    chunk['seq_num'] = pd.to_numeric(chunk['seq_num'], errors='coerce').fillna(99999).astype(int)
    chunk['icd_code_norm'] = chunk['icd_code'].astype(str).apply(normalize_code)
    chunk['icd_version_norm'] = chunk['icd_version'].astype(str).str.strip()

    for r in chunk.itertuples(index=False):
        # skip if missing hadm or subject
        if pd.isna(r.subject_id) or pd.isna(r.hadm_id):
            continue
        key = (int(r.subject_id), int(r.hadm_id))
        acc[key].append((int(r.seq_num), r.icd_code_norm, r.icd_version_norm))
        rows_seen += 1
    print(f"Processed diagnoses rows so far: {rows_seen}", end='\r')

print(f"\nTotal diagnosis rows processed: {rows_seen}; unique admissions with diagnoses: {len(acc)}")

# --- Step 3: build per-admission aggregate DataFrame ---
agg_rows = []
for (subj, hadm), entries in acc.items():
    # sort by seq_num ascending
    entries_sorted = sorted(entries, key=lambda x: (x[0] if x[0] is not None else 99999))
    codes = [e[1] for e in entries_sorted if e[1] != ""]
    versions = [e[2] for e in entries_sorted]
    # lookup descriptions (preserve order)
    descs = [ try_lookup_description(c, v, dict_map) if c != "" else pd.NA for c,v in zip(codes, versions) ]
    n = len(codes)
    primary_code = codes[0] if n >= 1 else pd.NA
    primary_desc = descs[0] if n >= 1 else pd.NA
    # top-K split
    top_codes = {}
    top_descs = {}
    for k in range(1, TOP_K+1):
        if n >= k:
            top_codes[f"diag_{k}_code"] = codes[k-1]
            top_descs[f"diag_{k}_desc"] = descs[k-1]
        else:
            top_codes[f"diag_{k}_code"] = pd.NA
            top_descs[f"diag_{k}_desc"] = pd.NA

    agg_rows.append({
        "subject_id": int(subj),
        "hadm_id": int(hadm),
        "diag_n": int(n),
        "diag_codes": ";".join(codes) if codes else pd.NA,
        "diag_descs": ";".join([str(d) for d in descs]) if descs else pd.NA,
        "primary_diag_code": primary_code,
        "primary_diag_desc": primary_desc,
        **top_codes,
        **top_descs
    })

diag_df = pd.DataFrame(agg_rows)
# ensure dtypes
if not diag_df.empty:
    diag_df['subject_id'] = diag_df['subject_id'].astype('Int64')
    diag_df['hadm_id'] = diag_df['hadm_id'].astype('Int64')
    diag_df['diag_n'] = diag_df['diag_n'].astype('Int64')

print("Built diag_df with rows:", len(diag_df))

# --- Step 4: merge diag_df into merged_initial.csv in chunks (so we don't load merged_initial fully) ---
if not MERGED_INITIAL_CSV.exists():
    raise FileNotFoundError(f"{MERGED_INITIAL_CSV} not found. Place merged_initial.csv next to this script.")

first_write = True
written = 0
for chunk in pd.read_csv(MERGED_INITIAL_CSV, chunksize=CHUNK_WRITE, parse_dates=['admittime'], low_memory=False):
    # ensure keys have correct dtype
    if 'subject_id' in chunk.columns:
        chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce').astype('Int64')
    if 'hadm_id' in chunk.columns:
        chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce').astype('Int64')

    merged_chunk = chunk.merge(diag_df, on=['subject_id','hadm_id'], how='left')
    # if diag_df is empty, the merge will just add nothing; that's okay.

    merged_chunk.to_csv(OUT_CSV, mode='w' if first_write else 'a', index=False, header=first_write)
    first_write = False
    written += len(merged_chunk)
    print(f"Wrote merged rows: {written}", end='\r')
    del chunk, merged_chunk
    gc.collect()

print(f"\nDone. Output saved to: {OUT_CSV} (rows written: {written})")


In [None]:
merged_initial = pd.read_csv(MERGED_INITIAL_CSV)

In [None]:
merged_initial.shape

In [None]:
merged_with_diagnoses = pd.read_csv("merged_with_diagnoses.csv")

In [None]:
merged_with_diagnoses.shape

In [None]:
merged_with_diagnoses.head(10)

In [None]:
procedures_icd = pd.read_csv("procedures_icd.csv")

In [None]:
procedures_icd.head(5)

In [None]:
d_icd_procedures = pd.read_csv("d_icd_procedures.csv")

In [None]:
d_icd_procedures.head(5)

In [None]:
# === CONFIG ===
procedures_path = Path("procedures_icd.csv")
dprocedures_path = Path("d_icd_procedures.csv")
merged_initial_path = Path("admissions_expanded.csv")
intermediate_chunks_path = Path("procedures_daily_chunks.csv")
final_daily_path = Path("procedures_daily_final.csv")
out_merged_path = Path("merged_with_procedures.csv")

chunksize = 500_000   # tune to your environment
write_chunk = 20000

# === 0) sanity checks ===
for p in (procedures_path, dprocedures_path, merged_initial_path):
    if not p.exists():
        raise FileNotFoundError(f"Required file not found: {p.resolve()}")

# === 1) build admit_date lookup from merged_initial ===
print("Loading merged_initial admissions (admit_time -> admit_date map)...")
mi_cols = ['subject_id', 'hadm_id', 'admittime']
mi = pd.read_csv(merged_initial_path, usecols=mi_cols, parse_dates=['admittime'], low_memory=False)
mi['subject_id'] = pd.to_numeric(mi['subject_id'], errors='coerce').astype('Int64')
mi['hadm_id'] = pd.to_numeric(mi['hadm_id'], errors='coerce').astype('Int64')

# take first admittime per (subject_id, hadm_id)
admit_map = mi.groupby(['subject_id', 'hadm_id'], dropna=False)['admittime'].first().reset_index().rename(columns={'admittime':'admit_time'})
admit_map['admit_date'] = pd.to_datetime(admit_map['admit_time'], errors='coerce').dt.normalize()

# make a dict: (int(subject_id), int(hadm_id)) -> admit_date (Timestamp) for fast lookup
admit_dict = {}
for r in admit_map.itertuples(index=False):
    try:
        key = (int(r.subject_id), int(r.hadm_id))
    except Exception:
        continue
    admit_dict[key] = r.admit_date
print("Admit map size:", len(admit_dict))

del mi, admit_map; gc.collect()

# === 2) chunked read of procedures_icd -> per-chunk daily aggregates ===
print("Streaming procedures_icd in chunks and writing per-chunk daily aggregates...")
first_out = True
total_skipped_no_admit = 0
total_rows_processed = 0
reader = pd.read_csv(procedures_path,
                     usecols=['subject_id','hadm_id','seq_num','chartdate','icd_code','icd_version'],
                     parse_dates=['chartdate'],
                     chunksize=chunksize,
                     low_memory=True)

for chunk_i, chunk in enumerate(reader, start=1):
    total_rows_processed += len(chunk)
    # normalize ids
    chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce').astype('Int64')
    chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce').astype('Int64')

    # drop rows with missing ids
    chunk = chunk[chunk['subject_id'].notna() & chunk['hadm_id'].notna()]
    if chunk.empty:
        print(f"Chunk {chunk_i}: no valid subject/hadm ids, skipping")
        continue

    # icd_code as cleaned string
    chunk['icd_code'] = chunk['icd_code'].astype(str).str.strip()
    # map admit_date quickly using vectorized approach via list comprehension (safe for chunk sizes)
    keys = list(zip(chunk['subject_id'].astype(int), chunk['hadm_id'].astype(int)))
    chunk['admit_date'] = [admit_dict.get(k, pd.NaT) for k in keys]

    # drop rows without admit_date (no matching admission in merged_initial)
    missing_admit_mask = chunk['admit_date'].isna()
    n_missing = int(missing_admit_mask.sum())
    total_skipped_no_admit += n_missing
    if n_missing:
        # keep memory low by filtering now
        chunk = chunk.loc[~missing_admit_mask]
    if chunk.empty:
        print(f"Chunk {chunk_i}: {n_missing} rows had no admit_date; chunk empty after drop -> continue")
        continue

    # compute day_index (chart_date normalized minus admit_date), clipped to >= 0
    chunk['chart_date'] = pd.to_datetime(chunk['chartdate'], errors='coerce').dt.normalize()
    chunk['day_index'] = (chunk['chart_date'] - chunk['admit_date']).dt.days.fillna(0).astype(int)
    chunk.loc[chunk['day_index'] < 0, 'day_index'] = 0

    # group per day and aggregate:
    # - proc_count: number of procedure rows that day
    # - last_proc_charttime: most recent chartdate (max)
    # - proc_codes: unique semicolon-separated icd_code strings (sorted)
    def join_unique_codes(series):
        s = set([str(x).strip() for x in series.dropna() if str(x).strip() not in ("", "nan", "None")])
        if not s:
            return ""
        return ";".join(sorted(s))

    grp = chunk.groupby(['subject_id','hadm_id','day_index'], dropna=False)
    df_agg = grp.agg(
        proc_count = ('icd_code', 'size'),
        last_proc_charttime = ('chartdate', 'max'),
        proc_codes = ('icd_code', join_unique_codes)
    ).reset_index()

    # write per-chunk aggregates (append)
    df_agg.to_csv(intermediate_chunks_path, mode='w' if first_out else 'a', index=False, header=first_out)
    first_out = False

    print(f"Chunk {chunk_i}: rows_in={len(chunk):,}, groups_out={len(df_agg):,}, skipped_no_admit={n_missing}")
    del chunk, df_agg, grp
    gc.collect()

print("Streaming done. Total rows processed:", total_rows_processed)
print("Total rows skipped because no matching admission:", total_skipped_no_admit)

# === 3) finalize aggregated daily procedures by grouping intermediate file ===
print("Reading intermediate chunks and final-aggregating...")
if not intermediate_chunks_path.exists():
    raise FileNotFoundError(f"Expected intermediate file {intermediate_chunks_path} not found.")

daily = pd.read_csv(intermediate_chunks_path, parse_dates=['last_proc_charttime'], low_memory=False)

# final aggregation: sum counts, max(last_proc_charttime), union of proc_codes across chunked writes
def union_semicolon_lists(series):
    sset = set()
    for val in series.dropna():
        if val == "":
            continue
        parts = [p.strip() for p in str(val).split(";") if p.strip() != ""]
        sset.update(parts)
    if not sset:
        return ""
    return ";".join(sorted(sset))

final = daily.groupby(['subject_id','hadm_id','day_index'], as_index=False).agg(
    proc_count = ('proc_count', 'sum'),
    last_proc_charttime = ('last_proc_charttime', 'max'),
    proc_codes = ('proc_codes', union_semicolon_lists)
)

final.to_csv(final_daily_path, index=False)
print("Final per-day procedures saved to:", final_daily_path)
del daily; gc.collect()

# === 4) optionally map codes -> titles from d_icd_procedures (if file available) ===
print("Loading d_icd_procedures to map codes -> titles (if available)...")
dproc = pd.read_csv(dprocedures_path, dtype=str, low_memory=False)
dproc['icd_code'] = dproc['icd_code'].astype(str).str.strip()
code2title = dict(zip(dproc['icd_code'], dproc['long_title'].fillna("").astype(str)))

def map_codes_to_titles(codes_str):
    if pd.isna(codes_str) or codes_str == "":
        return ""
    codes = [c for c in codes_str.split(";") if c.strip() != ""]
    titles = [code2title.get(c, "") for c in codes]
    titles = [t for t in titles if t != ""]
    return ";".join(titles)

final['proc_titles'] = final['proc_codes'].apply(map_codes_to_titles)
# Save updated final
final.to_csv(final_daily_path, index=False)
print("Final per-day procedures (with titles) saved to:", final_daily_path)

# === 5) LEFT JOIN final daily procedures into merged_initial and write merged file ===
print("Merging final daily procedure aggregates into merged_initial master table...")
merged = pd.read_csv(merged_initial_path, low_memory=False, parse_dates=['admittime','dischtime','deathtime'])
# ensure types
merged['subject_id'] = pd.to_numeric(merged['subject_id'], errors='coerce').astype('Int64')
merged['hadm_id'] = pd.to_numeric(merged['hadm_id'], errors='coerce').astype('Int64')
merged['day_index'] = pd.to_numeric(merged['day_index'], errors='coerce').astype('Int64')

# load final daily procedures
proc_daily = pd.read_csv(final_daily_path, parse_dates=['last_proc_charttime'], low_memory=False)
proc_daily['subject_id'] = pd.to_numeric(proc_daily['subject_id'], errors='coerce').astype('Int64')
proc_daily['hadm_id'] = pd.to_numeric(proc_daily['hadm_id'], errors='coerce').astype('Int64')
proc_daily['day_index'] = pd.to_numeric(proc_daily['day_index'], errors='coerce').astype('Int64')

# left join
merged_with_proc = merged.merge(proc_daily, on=['subject_id','hadm_id','day_index'], how='left')

# optional: fill NaN counts with 0
merged_with_proc['proc_count'] = merged_with_proc['proc_count'].fillna(0).astype('Int64')
# keep proc_codes and proc_titles as empty string where missing
merged_with_proc['proc_codes'] = merged_with_proc['proc_codes'].fillna("").astype(str)
merged_with_proc['proc_titles'] = merged_with_proc['proc_titles'].fillna("").astype(str)

# write final merged CSV in chunks (to avoid huge memory spikes)
print("Writing final merged file (chunked writes)...")
n_rows = len(merged_with_proc)
first = True
for start in range(0, n_rows, write_chunk):
    end = min(start + write_chunk, n_rows)
    merged_with_proc.iloc[start:end].to_csv(out_merged_path, mode='w' if first else 'a', index=False, header=first)
    first = False
    print(f"Wrote rows {start}..{end-1}")
print("Merged output saved to:", out_merged_path)


In [None]:
merged_initial = pd.read_csv("admissions_expanded.csv")

In [None]:
merged_initial.shape

In [None]:
merged_with_procedures = pd.read_csv("merged_with_procedures.csv")

In [None]:
merged_with_procedures.shape

In [None]:
merged_with_procedures.head(100)

In [None]:
# --- Config / paths ---
drg_path = Path("drgcodes.csv")
merged_initial_path = Path("admissions_expanded.csv")   # or use DataFrame merged_initial if already in memory
out_path = Path("merged_with_drg.csv")

# --- Load merged_initial (admission x day rows) ---
if 'merged_initial' in globals():
    merged_initial = globals()['merged_initial']
else:
    if not merged_initial_path.exists():
        raise FileNotFoundError(f"{merged_initial_path} not found. Provide merged_initial.csv or have merged_initial DataFrame in memory.")
    merged_initial = pd.read_csv(merged_initial_path, parse_dates=['admittime','dischtime','deathtime'], low_memory=False)
# ensure keys have consistent types
merged_initial['subject_id'] = pd.to_numeric(merged_initial['subject_id'], errors='coerce').astype('Int64')
merged_initial['hadm_id'] = pd.to_numeric(merged_initial['hadm_id'], errors='coerce').astype('Int64')

# --- Load drgcodes (small) ---
if not drg_path.exists():
    raise FileNotFoundError(f"{drg_path} not found.")
drg = pd.read_csv(drg_path, low_memory=False)

# normalize / coerce types
drg['subject_id'] = pd.to_numeric(drg['subject_id'], errors='coerce').astype('Int64')
drg['hadm_id'] = pd.to_numeric(drg['hadm_id'], errors='coerce').astype('Int64')
# keep textual drg_code as string but strip
drg['drg_code'] = drg['drg_code'].astype(str).str.strip()
drg['description'] = drg['description'].astype(str).str.strip()
drg['drg_type'] = drg['drg_type'].astype(str).str.strip()

# numeric columns: coerce to numeric (float), keep NaN when missing
drg['drg_severity_num'] = pd.to_numeric(drg['drg_severity'], errors='coerce')
drg['drg_mortality_num'] = pd.to_numeric(drg['drg_mortality'], errors='coerce')

# --- Aggregation strategy ---
# 1) Compute numeric aggregates per admission (max)
numeric_aggs = drg.groupby(['subject_id','hadm_id'], dropna=False).agg({
    'drg_severity_num': 'max',
    'drg_mortality_num': 'max'
}).reset_index().rename(columns={
    'drg_severity_num': 'drg_severity_max',
    'drg_mortality_num': 'drg_mortality_max'
})

# 2) Choose single representative row per admission for textual fields.
# Build a sort key: (drg_severity_num desc, drg_mortality_num desc, drg_type priority desc)
type_priority = {'APR': 2, 'HCFA': 1}  # APR preferred over HCFA; others map to 0
drg['type_prio'] = drg['drg_type'].map(type_priority).fillna(0).astype(int)

# Replace NaN with very small number so numerics that exist always win
drg['_sev_for_sort'] = drg['drg_severity_num'].fillna(-9999)
drg['_mort_for_sort'] = drg['drg_mortality_num'].fillna(-9999)

drg_sorted = drg.sort_values(
    by=['subject_id','hadm_id','_sev_for_sort','_mort_for_sort','type_prio'],
    ascending=[True, True, False, False, False]
)

# pick first per group (best by our rule)
drg_rep = drg_sorted.groupby(['subject_id','hadm_id'], as_index=False).first()[[
    'subject_id','hadm_id','drg_type','drg_code','description','drg_severity','drg_mortality'
]].rename(columns={
    'drg_type': 'drg_type_chosen',
    'drg_code': 'drg_code_chosen',
    'description': 'drg_description_chosen',
    'drg_severity': 'drg_severity_chosen',
    'drg_mortality': 'drg_mortality_chosen'
})

# 3) merge numeric_aggs and textual representative row into a single drg_summary
drg_summary = numeric_aggs.merge(drg_rep, on=['subject_id','hadm_id'], how='left')

# Optional: if you want both numeric_max and chosen textual numeric (string),
# ensure columns consistent and cast types
# (drg_severity_max is float; drg_severity_chosen may be string original - try to coerce)
drg_summary['drg_severity_chosen'] = pd.to_numeric(drg_summary['drg_severity_chosen'], errors='coerce')

# --- Merge into merged_initial (broadcast to day rows) ---
merged_final = merged_initial.merge(
    drg_summary,
    on=['subject_id','hadm_id'],
    how='left',
    validate='m:1'  # many merged_initial rows (days) to one drg_summary row
)

# --- Save output ---
merged_final.to_csv(out_path, index=False)
print("Merged drgcodes -> written to:", out_path)
print("Rows with DRG info:", int(merged_final['drg_code_chosen'].notna().sum()))

# End of script


In [None]:
merged_initial = pd.read_csv("admissions_expanded.csv")

In [None]:
merged_initial.shape

In [None]:
merged_with_drg = pd.read_csv("merged_with_drg.csv")

In [None]:
merged_with_drg.shape

In [None]:
merged_with_drg.head(5)

In [None]:
# Config - tune these to your environment
MERGED_IN_PATH = Path("admissions_expanded.csv")   # or use Memory DataFrame merged_initial
TRANSFERS_PATH   = Path("transfers.csv")
SERVICES_PATH    = Path("services.csv")
OUT_PATH         = Path("merged_with_transfers_services.csv")

TRANS_CHUNKSIZE = 200_000   # transfers often smaller; safe default
SERV_CHUNKSIZE  = 200_000

# Load or reference merged_initial
if 'merged_initial' in globals():
    merged = merged_initial.copy()
else:
    if not MERGED_IN_PATH.exists():
        raise FileNotFoundError(f"{MERGED_IN_PATH} not found. Provide merged_initial DataFrame or file.")
    merged = pd.read_csv(MERGED_IN_PATH, parse_dates=['admittime'], low_memory=False)
print("Loaded merged rows:", len(merged))

# Prepare new columns (if not present)
new_cols = [
    'transfers_eventtype', 'transfers_careunit', 'transfers_intime', 'transfers_outtime',
    'services_prev_service', 'services_curr_service', 'services_transfertime'
]
for c in new_cols:
    if c not in merged.columns:
        merged[c] = pd.NA

# Build admit_map: (subject_id, hadm_id) -> admit_date (normalized)
admit_map = merged.groupby(['subject_id','hadm_id'], dropna=False)['admittime'].first().reset_index().rename(columns={'admittime':'admit_time'})
admit_map['admit_date'] = pd.to_datetime(admit_map['admit_time'], errors='coerce').dt.normalize()
admit_map['key'] = list(zip(admit_map['subject_id'].astype('Int64'), admit_map['hadm_id'].astype('Int64')))
admit_dict = dict(zip(admit_map['key'], admit_map['admit_date']))
print("Admit map size:", len(admit_dict))

# Build index map for merged rows: (subject_id, hadm_id, day_index) -> row index
merged_index_map = {}
for idx, row in merged[['subject_id','hadm_id','day_index']].iterrows():
    try:
        k = (int(row['subject_id']), int(row['hadm_id']), int(row['day_index']))
    except Exception:
        # skip malformed keys
        continue
    merged_index_map[k] = idx
print("Merged rows map size:", len(merged_index_map))

########################
# Process transfers.csv
########################
if not TRANSFERS_PATH.exists():
    raise FileNotFoundError(f"{TRANSFERS_PATH} not found.")

# We'll collect first-match per (row_idx) via assignment - to mimic your ICU first-match semantics,
# we process transfers sorted by intime so earliest covering transfer is assigned first.
trans_reader = pd.read_csv(TRANSFERS_PATH, parse_dates=['intime','outtime'], chunksize=TRANS_CHUNKSIZE, low_memory=True)

total_assigned_transfers = 0
chunk_no = 0
for chunk in trans_reader:
    chunk_no += 1
    # keep relevant columns
    chunk = chunk[['subject_id','hadm_id','transfer_id','eventtype','careunit','intime','outtime']].copy()
    # coerce ids
    chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce')
    chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce')
    # normalize times
    chunk['intime'] = pd.to_datetime(chunk['intime'], errors='coerce')
    chunk['outtime'] = pd.to_datetime(chunk['outtime'], errors='coerce')
    chunk = chunk.dropna(subset=['subject_id','hadm_id','intime'])  # need atleast intime
    if chunk.empty:
        continue

    # sort by intime ascending so earliest transfers processed first
    chunk = chunk.sort_values('intime')

    # iterate rows (transfers per admission are usually few; OK to loop)
    assigned = 0
    for _, tr in chunk.iterrows():
        s = int(tr['subject_id'])
        h = int(tr['hadm_id'])
        admit_date = admit_dict.get((s,h), pd.NaT)
        if pd.isna(admit_date):
            continue
        intime = tr['intime']
        outtime = tr['outtime'] if pd.notna(tr['outtime']) else tr['intime']
        intime_norm = intime.normalize()
        outtime_norm = pd.to_datetime(outtime).normalize()
        di_start = int(max(0, (intime_norm - admit_date).days))
        di_end   = int(max(0, (outtime_norm - admit_date).days))
        for di in range(di_start, di_end + 1):
            key = (s, h, di)
            row_idx = merged_index_map.get(key)
            if row_idx is None:
                continue
            existing_val = merged.at[row_idx, 'transfers_careunit']
            if pd.notna(existing_val):
                continue
            merged.at[row_idx, 'transfers_eventtype'] = tr.get('eventtype', pd.NA)
            merged.at[row_idx, 'transfers_careunit'] = tr.get('careunit', pd.NA)
            merged.at[row_idx, 'transfers_intime'] = intime
            merged.at[row_idx, 'transfers_outtime'] = outtime if pd.notna(tr.get('outtime')) else pd.NA
            assigned += 1
    total_assigned_transfers += assigned
    print(f"Transfers chunk {chunk_no}: assigned {assigned} rows (total assigned so far: {total_assigned_transfers})")
    del chunk
    gc.collect()

print("Total transfer assignments:", total_assigned_transfers)

########################
# Process services.csv (last-per-day semantics)
########################
if not SERVICES_PATH.exists():
    raise FileNotFoundError(f"{SERVICES_PATH} not found.")

# We'll keep a dict for best (latest) service per (subject,hadm,day_index)
service_best = {}  # key -> (transfertime, prev_service, curr_service)

serv_reader = pd.read_csv(SERVICES_PATH, parse_dates=['transfertime'], chunksize=SERV_CHUNKSIZE, low_memory=True)
chunk_no = 0
for chunk in serv_reader:
    chunk_no += 1
    # ensure columns exist
    chunk = chunk[['subject_id','hadm_id','transfertime','prev_service','curr_service']].copy()
    chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce')
    chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce')
    chunk['transfertime'] = pd.to_datetime(chunk['transfertime'], errors='coerce')
    chunk = chunk.dropna(subset=['subject_id','hadm_id','transfertime'])
    if chunk.empty:
        continue

    # iterate rows - services per admission relatively small
    for _, srow in chunk.iterrows():
        s = int(srow['subject_id'])
        h = int(srow['hadm_id'])
        key_admit = (s,h)
        admit_date = admit_dict.get(key_admit, pd.NaT)
        if pd.isna(admit_date):
            continue
        t = srow['transfertime']
        day_idx = int(max(0, (t.normalize() - admit_date).days))
        map_key = (s, h, day_idx)
        existing = service_best.get(map_key)
        # pick the later transfertime on that day
        if existing is None or (pd.notna(existing[0]) and t > existing[0]):
            service_best[map_key] = (t, srow.get('prev_service', pd.NA), srow.get('curr_service', pd.NA))

    del chunk
    gc.collect()
    print(f"Services chunk {chunk_no} processed; current cached service keys: {len(service_best)}")

# Apply service_best into merged DataFrame
assigned_services = 0
for map_key, (t, prev_s, curr_s) in service_best.items():
    row_idx = merged_index_map.get(map_key)
    if row_idx is None:
        continue
    merged.at[row_idx, 'services_prev_service'] = prev_s if prev_s is not None else pd.NA
    merged.at[row_idx, 'services_curr_service'] = curr_s if curr_s is not None else pd.NA
    merged.at[row_idx, 'services_transfertime'] = t
    assigned_services += 1

print("Total service rows applied:", assigned_services)

# Finalize types (optional)
for c in ['transfers_intime','transfers_outtime','services_transfertime']:
    merged[c] = pd.to_datetime(merged[c], errors='coerce')

# Write out in chunks to avoid memory spikes
write_chunk = 20_000
first = True
n_rows = len(merged)
for start in range(0, n_rows, write_chunk):
    end = min(start + write_chunk, n_rows)
    merged.iloc[start:end].to_csv(OUT_PATH, mode='w' if first else 'a', index=False, header=first)
    first = False
    print(f"Saved rows {start}-{end-1}")
print("Saved merged file to:", OUT_PATH)


In [None]:
merged_initial = pd.read_csv("admissions_expanded.csv")

In [None]:
merged_initial.shape

In [None]:
merged_with_transfers_services = pd.read_csv("merged_with_transfers_services.csv")

In [None]:
merged_with_transfers_services.shape

In [None]:
merged_with_transfers_services.head(10)

In [None]:
prescriptions = pd.read_csv("prescriptions.csv")

In [None]:
prescriptions.shape

In [None]:
prescriptions.head(5).to_csv('test_prescriptions')

In [None]:
pharmacy = pd.read_csv("pharmacy.csv")

In [None]:
pharmacy.shape

In [None]:
pharmacy.head(5).to_csv('test_pharmacy')

In [None]:
emar = pd.read_csv("emar.csv")

In [None]:
emar.shape

In [None]:
emar.head(5).to_csv('test_emar')

In [None]:
# ---------- config ----------
BASE_DIR = Path(".")
MERGED_INITIAL_PATH = BASE_DIR / "admissions_expanded.csv"
PRESC_PATH = BASE_DIR / "prescriptions.csv"
PHARM_PATH = BASE_DIR / "pharmacy.csv"
EMAR_PATH = BASE_DIR / "emar.csv"

AGG_PRESC_PATH = BASE_DIR / "agg_prescriptions_daily.csv"
AGG_PHARM_PATH = BASE_DIR / "agg_pharmacy_daily.csv"
AGG_EMAR_PATH  = BASE_DIR / "agg_emar_daily.csv"

# chunk sizes (tune these)
SRC_CHUNKSIZE = 200_000
WRITE_CHUNK = 20_000
MATCH_CHUNK = 200_000

# ---------- helpers ----------
def nowstr():
    return time.strftime("%Y-%m-%d %H:%M:%S")

def parse_numeric(x):
    if pd.isna(x): return np.nan
    try:
        if isinstance(x, (int,float,np.number)): return float(x)
        s = str(x).strip().replace(',', '')
        if s in ("", "NaN", "nan", "None", "none", "___"): return np.nan
        return float(s)
    except:
        return np.nan

def read_header(path: Path):
    if not path.exists():
        return []
    return pd.read_csv(path, nrows=0).columns.tolist()

def build_admit_map(merged_initial_path):
    print(f"[{nowstr()}] Building admit_map from {merged_initial_path} (light read)...")
    mi = pd.read_csv(merged_initial_path, usecols=['subject_id','hadm_id','admittime'],
                     parse_dates=['admittime'], low_memory=False)
    mi['subject_id'] = pd.to_numeric(mi['subject_id'], errors='coerce').astype('Int64')
    mi['hadm_id'] = pd.to_numeric(mi['hadm_id'], errors='coerce').astype('Int64')
    admit = mi.groupby(['subject_id','hadm_id'], dropna=False)['admittime'].first().reset_index().rename(columns={'admittime':'admit_time'})
    admit['admit_date'] = pd.to_datetime(admit['admit_time']).dt.normalize()
    admit_dict = dict(zip(zip(admit['subject_id'].astype(int), admit['hadm_id'].astype(int)), admit['admit_date']))
    print(f"[{nowstr()}] Admit map keys: {len(admit_dict)}")
    del mi, admit
    gc.collect()
    return admit_dict

def consolidate_agg(path: Path, out_path: Path, key_cols=('subject_id','hadm_id','day_index'),
                    sum_cols=None, min_cols=None, max_cols=None, pick_by_max=None, pick_cols=None):
    """
    Read a per-chunk AGG file (which may have duplicates for the same (s,h,d)),
    and consolidate so result has one row per (s,h,d).
    - sum_cols: list of columns to sum across duplicates (e.g., counts)
    - min_cols: list of cols to take min
    - max_cols: list of cols to take max
    - pick_by_max: a timestamp column name — for pick_cols choose the row with max(pick_by_max) and take those values
    """
    sum_cols = sum_cols or []
    min_cols = min_cols or []
    max_cols = max_cols or []
    pick_cols = pick_cols or []

    if not path.exists():
        # nothing to do
        print(f"[{nowstr()}] consolidate_agg: {path} not found, skipping.")
        return

    print(f"[{nowstr()}] Consolidating {path} -> {out_path} ...")
    # read whole AGG file (these AGG files are usually much smaller than raw sources)
    df = pd.read_csv(path, low_memory=False)
    # ensure key types
    for c in key_cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors='coerce')

    # convert candidate datetime columns if present
    candidate_dt = []
    if pick_by_max:
        candidate_dt.append(pick_by_max)
    for col in (min_cols + max_cols + candidate_dt):
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors='coerce')

    # Build aggregation dict for groupby. Use named aggregations for readability.
    agg_dict = {}
    for c in sum_cols:
        if c in df.columns:
            agg_dict[c] = 'sum'
    for c in min_cols:
        if c in df.columns:
            agg_dict[c] = 'min'
    for c in max_cols:
        if c in df.columns:
            agg_dict[c] = 'max'

    # if no aggregation columns present (edge), just drop duplicates using first
    if not agg_dict and not pick_cols:
        out = df.drop_duplicates(subset=list(key_cols)).reset_index(drop=True)
        out.to_csv(out_path, index=False)
        print(f"[{nowstr()}] Consolidation done (simple dedupe). rows: {len(out)}")
        return

    grouped = df.groupby(list(key_cols), dropna=False, as_index=False).agg(agg_dict) if agg_dict else df.groupby(list(key_cols), dropna=False, as_index=False).first()

    # handle pick_by_max for pick_cols
    if pick_by_max and any(pc in df.columns for pc in pick_cols):
        # For each group, pick the row index in df with max(pick_by_max)
        # handle groups where pick_by_max is NaT by falling back to first occurrence
        idx = df.groupby(list(key_cols))[pick_by_max].idxmax().dropna()
        # idx is a Series mapping group -> row index (may miss groups where pick_by_max all NaT)
        picked = df.loc[idx].reset_index(drop=True)
        # reduce picked to key_cols + pick_cols (and pick_by_max if exists)
        take_cols = list(key_cols) + [c for c in pick_cols if c in picked.columns]
        picked = picked[take_cols]
        # merge picked into grouped (picked values overwrite grouped's columns if same names)
        grouped = grouped.merge(picked, on=list(key_cols), how='left')

        # for groups not present in picked (all NaT), try to fill pick_cols from first row per group
        missing_mask = grouped[pick_cols[0]].isna() if pick_cols and pick_cols[0] in grouped.columns else None
        if missing_mask is not None and missing_mask.any():
            # get first row per group
            first_rows = df.groupby(list(key_cols), as_index=False).first()[list(key_cols)+[c for c in pick_cols if c in df.columns]]
            grouped = grouped.merge(first_rows, on=list(key_cols), how='left', suffixes=('','__first'))
            # fill NaNs in pick_cols with __first
            for c in pick_cols:
                if c in grouped.columns and (c + "__first") in grouped.columns:
                    grouped[c] = grouped[c].combine_first(grouped[c + "__first"])
                    grouped = grouped.drop(columns=[c + "__first"])
    # Ensure types for keys are Int64 where possible
    for c in key_cols:
        if c in grouped.columns:
            grouped[c] = pd.to_numeric(grouped[c], errors='coerce').astype('Int64')

    grouped.to_csv(out_path, index=False)
    print(f"[{nowstr()}] Consolidation done. rows: {len(grouped)} (written to {out_path})")
    del df, grouped
    gc.collect()


def collect_matching_rows(agg_path: Path, keys_set, usecols=None, parse_dates=None, chunksize=MATCH_CHUNK):
    """Scan agg_path in chunks and return rows matching keys_set (keys_set of (s,h,d) ints)."""
    if not agg_path.exists():
        return pd.DataFrame(columns=(usecols or []))
    if not keys_set:
        return pd.DataFrame(columns=(usecols or []))
    keys_str = set(f"{int(s)}|{int(h)}|{int(d)}" for (s,h,d) in keys_set)
    matches = []
    usecols = usecols or None
    for c_i, chunk in enumerate(pd.read_csv(agg_path, usecols=usecols, parse_dates=parse_dates or [], chunksize=chunksize, low_memory=True)):
        # ensure key columns exist
        if not {'subject_id','hadm_id','day_index'}.issubset(set(chunk.columns)):
            continue
        chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce').astype('Int64')
        chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce').astype('Int64')
        chunk['day_index'] = pd.to_numeric(chunk['day_index'], errors='coerce').astype('Int64')
        keys = chunk['subject_id'].astype(str) + '|' + chunk['hadm_id'].astype(str) + '|' + chunk['day_index'].astype(str)
        mask = keys.isin(keys_str)
        sel = chunk[mask]
        if not sel.empty:
            matches.append(sel)
        del chunk, keys, mask, sel
        gc.collect()
    if matches:
        return pd.concat(matches, ignore_index=True)
    else:
        return pd.DataFrame(columns=(usecols or []))


# -------------------- run --------------------
import gc
admit_dict = build_admit_map(MERGED_INITIAL_PATH)

# ---------- Phase 1: aggregate prescriptions ----------
print(f"[{nowstr()}] Phase 1 — aggregate prescriptions -> {AGG_PRESC_PATH}")
if AGG_PRESC_PATH.exists(): AGG_PRESC_PATH.unlink()

presc_header = read_header(PRESC_PATH)
presc_usecols_want = ['subject_id','hadm_id','starttime','stoptime','drug','formulary_drug_cd','ndc','prod_strength','dose_val_rx','dose_unit_rx','route','doses_per_24_hrs','drug_type','poe_id','poe_seq','order_provider_id']
presc_usecols = [c for c in presc_usecols_want if c in presc_header]
presc_parse_dates = [c for c in ['starttime','stoptime'] if c in presc_header]

first_out = True
total_out = 0
for i, chunk in enumerate(pd.read_csv(PRESC_PATH, usecols=presc_usecols, parse_dates=presc_parse_dates, chunksize=SRC_CHUNKSIZE, low_memory=True), start=1):
    t0 = time.time()
    print(f"[{nowstr()}] Presc chunk {i} rows={len(chunk):,}")
    # cast ids
    if 'subject_id' in chunk.columns:
        chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce').astype('Int64')
        chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce').astype('Int64')
    keys = list(zip(chunk['subject_id'].astype('Int64').astype(object), chunk['hadm_id'].astype('Int64').astype(object)))
    chunk['admit_date'] = [admit_dict.get((int(s), int(h)), pd.NaT) if not (pd.isna(s) or pd.isna(h)) else pd.NaT for s,h in keys]
    before = len(chunk)
    chunk = chunk[chunk['admit_date'].notna()].copy()
    dropped = before - len(chunk)
    if dropped:
        print(f"  -> dropped {dropped:,} presc rows with no admit mapping")
    if chunk.empty:
        del chunk; gc.collect(); continue
    chunk['event_time'] = pd.to_datetime(chunk['starttime'], errors='coerce').fillna(pd.to_datetime(chunk.get('stoptime', pd.NaT), errors='coerce'))
    chunk['chart_date'] = chunk['event_time'].dt.normalize()
    chunk['day_index'] = (chunk['chart_date'] - chunk['admit_date']).dt.days.fillna(0).astype(int)
    chunk.loc[chunk['day_index'] < 0, 'day_index'] = 0
    chunk['dose_val_num'] = chunk['dose_val_rx'].apply(parse_numeric) if 'dose_val_rx' in chunk.columns else np.nan
    chunk['prod_strength_num'] = chunk['prod_strength'].apply(parse_numeric) if 'prod_strength' in chunk.columns else np.nan
    chunk['doses_per_24h_num'] = chunk['doses_per_24_hrs'].apply(parse_numeric) if 'doses_per_24_hrs' in chunk.columns else np.nan
    chunk['drug_text'] = chunk['drug'].astype(str).str.strip().replace({'nan':None})
    chunk['route_text'] = chunk['route'].astype(str).str.strip().replace({'nan':None})
    agg = chunk.groupby(['subject_id','hadm_id','day_index'], as_index=False).agg(
        presc_orders_count = ('drug_text','count'),
        presc_first_start = ('starttime','min') if 'starttime' in chunk.columns else pd.NamedAgg(column='event_time', aggfunc='min'),
        presc_last_stop  = ('stoptime','max') if 'stoptime' in chunk.columns else pd.NamedAgg(column='event_time', aggfunc='max'),
        presc_last_drug  = ('drug_text', lambda s: s.dropna().iloc[-1] if len(s.dropna())>0 else pd.NA),
        presc_last_route = ('route_text', lambda s: s.dropna().iloc[-1] if len(s.dropna())>0 else pd.NA),
        presc_max_dose   = ('dose_val_num','max'),
        presc_max_strength = ('prod_strength_num','max'),
        presc_doses_per_24h = ('doses_per_24h_num','max')
    )
    agg['subject_id'] = agg['subject_id'].astype('Int64')
    agg['hadm_id'] = agg['hadm_id'].astype('Int64')
    agg['day_index'] = agg['day_index'].astype('Int64')
    agg.to_csv(AGG_PRESC_PATH, mode='w' if first_out else 'a', index=False, header=first_out)
    first_out = False
    total_out += len(agg)
    print(f"[{nowstr()}] Presc chunk {i} -> wrote {len(agg):,} agg rows (total {total_out:,})  took {time.time()-t0:.1f}s")
    del chunk, agg
    gc.collect()

print(f"[{nowstr()}] Prescriptions aggregation done. total agg rows (unconsolidated): {total_out}")

# consolidate the AGG presc into unique keys
consolidate_agg(AGG_PRESC_PATH, AGG_PRESC_PATH,
                key_cols=('subject_id','hadm_id','day_index'),
                sum_cols=['presc_orders_count'],
                min_cols=['presc_first_start'],
                max_cols=['presc_last_stop','presc_max_dose','presc_max_strength','presc_doses_per_24h'],
                pick_by_max='presc_last_stop',
                pick_cols=['presc_last_drug','presc_last_route'])

# ---------- Phase 2: aggregate pharmacy ----------
print(f"[{nowstr()}] Phase 2 — aggregate pharmacy -> {AGG_PHARM_PATH}")
if AGG_PHARM_PATH.exists(): AGG_PHARM_PATH.unlink()

pharm_header = read_header(PHARM_PATH)
pharm_usecols_want = ['subject_id','hadm_id','starttime','stoptime','medication','route','frequency','dispensation','fill_quantity','entertime','verifiedtime','expirationdate']
pharm_usecols = [c for c in pharm_usecols_want if c in pharm_header]
pharm_parse_dates = [c for c in ['starttime','stoptime','entertime','verifiedtime','expirationdate'] if c in pharm_header]

first_out = True
total_out = 0
for i, chunk in enumerate(pd.read_csv(PHARM_PATH, usecols=pharm_usecols, parse_dates=pharm_parse_dates, chunksize=SRC_CHUNKSIZE, low_memory=True), start=1):
    t0 = time.time()
    print(f"[{nowstr()}] Pharm chunk {i} rows={len(chunk):,}")
    if 'subject_id' in chunk.columns:
        chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce').astype('Int64')
        chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce').astype('Int64')
    keys = list(zip(chunk['subject_id'].astype('Int64').astype(object), chunk['hadm_id'].astype('Int64').astype(object)))
    chunk['admit_date'] = [admit_dict.get((int(s), int(h)), pd.NaT) if not (pd.isna(s) or pd.isna(h)) else pd.NaT for s,h in keys]
    before = len(chunk)
    chunk = chunk[chunk['admit_date'].notna()].copy()
    dropped = before - len(chunk)
    if dropped:
        print(f"  -> dropped {dropped:,} pharm rows with no admit mapping")
    if chunk.empty:
        del chunk; gc.collect(); continue
    chunk['event_time'] = pd.to_datetime(chunk['starttime'], errors='coerce').fillna(pd.to_datetime(chunk.get('verifiedtime', pd.NaT), errors='coerce')).fillna(pd.to_datetime(chunk.get('entertime', pd.NaT), errors='coerce'))
    chunk['chart_date'] = chunk['event_time'].dt.normalize()
    chunk['day_index'] = (chunk['chart_date'] - chunk['admit_date']).dt.days.fillna(0).astype(int)
    chunk.loc[chunk['day_index'] < 0, 'day_index'] = 0
    chunk['fill_qty_num'] = chunk['fill_quantity'].apply(parse_numeric) if 'fill_quantity' in chunk.columns else np.nan
    chunk['med_text'] = chunk['medication'].astype(str).str.strip().replace({'nan':None})
    chunk['route_text'] = chunk['route'].astype(str).str.strip().replace({'nan':None})
    agg = chunk.groupby(['subject_id','hadm_id','day_index'], as_index=False).agg(
        pharm_dispense_count = ('med_text','count'),
        pharm_first_start = ('starttime','min') if 'starttime' in chunk.columns else pd.NamedAgg(column='event_time', aggfunc='min'),
        pharm_last_stop  = ('stoptime','max') if 'stoptime' in chunk.columns else pd.NamedAgg(column='event_time', aggfunc='max'),
        pharm_last_med   = ('med_text', lambda s: s.dropna().iloc[-1] if len(s.dropna())>0 else pd.NA),
        pharm_last_route = ('route_text', lambda s: s.dropna().iloc[-1] if len(s.dropna())>0 else pd.NA),
        pharm_max_fill_qty = ('fill_qty_num','max')
    )
    agg['subject_id'] = agg['subject_id'].astype('Int64')
    agg['hadm_id'] = agg['hadm_id'].astype('Int64')
    agg['day_index'] = agg['day_index'].astype('Int64')
    agg.to_csv(AGG_PHARM_PATH, mode='w' if first_out else 'a', index=False, header=first_out)
    first_out = False
    total_out += len(agg)
    print(f"[{nowstr()}] Pharm chunk {i} -> wrote {len(agg):,} agg rows (total {total_out:,})  took {time.time()-t0:.1f}s")
    del chunk, agg
    gc.collect()

print(f"[{nowstr()}] Pharmacy aggregation done. total agg rows (unconsolidated): {total_out}")

consolidate_agg(AGG_PHARM_PATH, AGG_PHARM_PATH,
                key_cols=('subject_id','hadm_id','day_index'),
                sum_cols=['pharm_dispense_count'],
                min_cols=['pharm_first_start'],
                max_cols=['pharm_last_stop','pharm_max_fill_qty'],
                pick_by_max='pharm_last_stop',
                pick_cols=['pharm_last_med','pharm_last_route'])


# ---------- Phase 3: aggregate emar ----------
print(f"[{nowstr()}] Phase 3 — aggregate emar -> {AGG_EMAR_PATH}")
if AGG_EMAR_PATH.exists(): AGG_EMAR_PATH.unlink()

emar_header = read_header(EMAR_PATH)
emar_usecols_want = ['subject_id','hadm_id','charttime','medication','event_txt','pharmacy_id']
emar_usecols = [c for c in emar_usecols_want if c in emar_header]
emar_parse_dates = [c for c in ['charttime'] if c in emar_header]

first_out = True
total_out = 0
for i, chunk in enumerate(pd.read_csv(EMAR_PATH, usecols=emar_usecols, parse_dates=emar_parse_dates, chunksize=SRC_CHUNKSIZE, low_memory=True), start=1):
    t0 = time.time()
    print(f"[{nowstr()}] Emar chunk {i} rows={len(chunk):,}")
    if 'subject_id' in chunk.columns:
        chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce').astype('Int64')
        chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce').astype('Int64')
    keys = list(zip(chunk['subject_id'].astype('Int64').astype(object), chunk['hadm_id'].astype('Int64').astype(object)))
    chunk['admit_date'] = [admit_dict.get((int(s), int(h)), pd.NaT) if not (pd.isna(s) or pd.isna(h)) else pd.NaT for s,h in keys]
    before = len(chunk)
    chunk = chunk[chunk['admit_date'].notna()].copy()
    dropped = before - len(chunk)
    if dropped:
        print(f"  -> dropped {dropped:,} emar rows with no admit mapping")
    if chunk.empty:
        del chunk; gc.collect(); continue
    chunk['chart_date'] = pd.to_datetime(chunk['charttime'], errors='coerce').dt.normalize()
    chunk['day_index'] = (chunk['chart_date'] - chunk['admit_date']).dt.days.fillna(0).astype(int)
    chunk.loc[chunk['day_index'] < 0, 'day_index'] = 0
    chunk['med_text'] = chunk['medication'].astype(str).str.strip().replace({'nan':None})
    chunk['event_txt'] = chunk['event_txt'].astype(str).str.strip().replace({'nan':None})
    chunk['admin_flag'] = chunk['event_txt'].str.contains('Administered|Given|Given by|Admin|Dose|Flushed', case=False, na=False).astype(int)
    agg = chunk.groupby(['subject_id','hadm_id','day_index'], as_index=False).agg(
        emar_events_count = ('event_txt','count'),
        emar_admin_count  = ('admin_flag','sum'),
        emar_last_event   = ('event_txt', lambda s: s.dropna().iloc[-1] if len(s.dropna())>0 else pd.NA),
        emar_last_med     = ('med_text', lambda s: s.dropna().iloc[-1] if len(s.dropna())>0 else pd.NA),
        emar_last_charttime = ('charttime','max')
    )
    agg['subject_id'] = agg['subject_id'].astype('Int64')
    agg['hadm_id'] = agg['hadm_id'].astype('Int64')
    agg['day_index'] = agg['day_index'].astype('Int64')
    agg.to_csv(AGG_EMAR_PATH, mode='w' if first_out else 'a', index=False, header=first_out)
    first_out = False
    total_out += len(agg)
    print(f"[{nowstr()}] Emar chunk {i} -> wrote {len(agg):,} agg rows (total {total_out:,})  took {time.time()-t0:.1f}s")
    del chunk, agg
    gc.collect()

print(f"[{nowstr()}] Emar aggregation done. total agg rows (unconsolidated): {total_out}")

consolidate_agg(AGG_EMAR_PATH, AGG_EMAR_PATH,
                key_cols=('subject_id','hadm_id','day_index'),
                sum_cols=['emar_events_count','emar_admin_count'],
                min_cols=[],
                max_cols=['emar_last_charttime'],
                pick_by_max='emar_last_charttime',
                pick_cols=['emar_last_event','emar_last_med'])


# -------------------- Phase 4: merge aggregated into merged_initial in chunks --------------------
OUT_PATH = BASE_DIR / "merged_with_medications.csv"
if OUT_PATH.exists(): OUT_PATH.unlink()

print(f"[{nowstr()}] Phase 4 — merge aggregated files into {OUT_PATH} in chunks (write_chunk={WRITE_CHUNK})")
first_write = True
mchunk_no = 0
for mchunk in pd.read_csv(MERGED_INITIAL_PATH, chunksize=WRITE_CHUNK, parse_dates=['admittime','dischtime','deathtime','edregtime','edouttime'], low_memory=False):
    mchunk_no += 1
    t0 = time.time()
    print(f"[{nowstr()}] Merged chunk {mchunk_no}: rows={len(mchunk):,}")
    mchunk['subject_id'] = pd.to_numeric(mchunk['subject_id'], errors='coerce').astype('Int64')
    mchunk['hadm_id'] = pd.to_numeric(mchunk['hadm_id'], errors='coerce').astype('Int64')
    mchunk['day_index'] = pd.to_numeric(mchunk['day_index'], errors='coerce').astype('Int64')
    # build key set
    keys = set((int(r['subject_id']), int(r['hadm_id']), int(r['day_index'])) for _, r in mchunk[['subject_id','hadm_id','day_index']].iterrows())
    # collect matching rows (each consolidated AGG is now unique per key)
    presc_match = collect_matching_rows(AGG_PRESC_PATH, keys) if AGG_PRESC_PATH.exists() else pd.DataFrame()
    pharm_match = collect_matching_rows(AGG_PHARM_PATH, keys) if AGG_PHARM_PATH.exists() else pd.DataFrame()
    emar_match  = collect_matching_rows(AGG_EMAR_PATH, keys)  if AGG_EMAR_PATH.exists() else pd.DataFrame()
    # merge (left) - since AGG files were consolidated, no key will duplicate
    if not presc_match.empty:
        mchunk = mchunk.merge(presc_match, on=['subject_id','hadm_id','day_index'], how='left')
    if not pharm_match.empty:
        mchunk = mchunk.merge(pharm_match, on=['subject_id','hadm_id','day_index'], how='left')
    if not emar_match.empty:
        mchunk = mchunk.merge(emar_match, on=['subject_id','hadm_id','day_index'], how='left')
    # write
    mchunk.to_csv(OUT_PATH, mode='w' if first_write else 'a', index=False, header=first_write)
    first_write = False
    print(f"[{nowstr()}] Written merged chunk {mchunk_no} (took {time.time()-t0:.1f}s).")
    del mchunk, presc_match, pharm_match, emar_match
    gc.collect()

print(f"[{nowstr()}] All done. Output file: {OUT_PATH}")


In [None]:
merged_initial = pd.read_csv("admissions_expanded.csv")

In [None]:
merged_initial.shape

In [None]:
merged_initial[-2:-1]

In [None]:
merged_initial.head(5)

In [None]:
merged_with_medications = pd.read_csv("merged_with_medications.csv")

In [None]:
merged_with_medications.head(5)

In [None]:
merged_with_medications.shape

In [None]:
merged_with_medications[-2:-1]

In [None]:
merged_with_medications[]

In [None]:
ingredientevents = pd.read_csv("ingredientevents.csv")

In [None]:
ingredientevents.head(5).to_csv('test_ingredientevents')

In [None]:
inputevents = pd.read_csv("inputevents.csv")

In [None]:
inputevents.head(5).to_csv('test_inputevents')

In [None]:
procedureevents = pd.read_csv("procedureevents.csv")

In [None]:
procedureevents.head(5).to_csv('test_procedureevents')

In [None]:
# corrected_merge_inputs_procs.py
import time
import gc
from pathlib import Path
import pandas as pd
import numpy as np

# --------- CONFIG ----------
BASE = Path(".")
MERGED_INITIAL_PATH = BASE / "admissions_expanded.csv"   # existing admission-day base
ING_PATH  = BASE / "ingredientevents.csv"
INP_PATH  = BASE / "inputevents.csv"
PROC_PATH = BASE / "procedureevents.csv"

AGG_ING_PATH  = BASE / "agg_ingredient_daily.csv"
AGG_INP_PATH  = BASE / "agg_input_daily.csv"
AGG_PROC_PATH = BASE / "agg_procedure_daily.csv"

SRC_CHUNKSIZE = 200_000   # reduce if memory pressure
WRITE_CHUNK    = 20_000   # rows of merged_initial to process per write
MATCH_CHUNK    = 200_000   # chunk size when scanning agg csvs for matches
# ---------------------------

# ---------- helpers ----------
def nowstr(): return time.strftime("%Y-%m-%d %H:%M:%S")
def parse_numeric(x):
    if pd.isna(x): return np.nan
    try:
        if isinstance(x,(int,float,np.number)): return float(x)
        s = str(x).strip().replace(',','')
        if s in ("","NaN","nan","None","none","___"): return np.nan
        return float(s)
    except:
        return np.nan

def read_header(path: Path):
    if not path.exists(): return []
    return pd.read_csv(path, nrows=0).columns.tolist()

def build_admit_map(merged_initial_path):
    print(f"[{nowstr()}] Build admit_map (light read)...")
    mi = pd.read_csv(merged_initial_path, usecols=['subject_id','hadm_id','admittime'], parse_dates=['admittime'], low_memory=False)
    mi['subject_id'] = pd.to_numeric(mi['subject_id'], errors='coerce').astype('Int64')
    mi['hadm_id'] = pd.to_numeric(mi['hadm_id'], errors='coerce').astype('Int64')
    adm = mi.groupby(['subject_id','hadm_id'], dropna=False)['admittime'].first().reset_index().rename(columns={'admittime':'admit_time'})
    adm['admit_date'] = pd.to_datetime(adm['admit_time']).dt.normalize()
    d = dict(zip(zip(adm['subject_id'].astype(int), adm['hadm_id'].astype(int)), adm['admit_date']))
    print(f"[{nowstr()}] admit_map keys: {len(d)}")
    del mi, adm; gc.collect()
    return d

def key_str(s,h,d): return f"{int(s)}|{int(h)}|{int(d)}"

def key_series_from_df(df):
    s = pd.to_numeric(df['subject_id'], errors='coerce').fillna(-1).astype(int).astype(str)
    h = pd.to_numeric(df['hadm_id'], errors='coerce').fillna(-1).astype(int).astype(str)
    d = pd.to_numeric(df['day_index'], errors='coerce').fillna(-1).astype(int).astype(str)
    return s + '|' + h + '|' + d

def _choose_agg_for_col(col_name):
    """
    Heuristic for collapsing duplicates in agg files.
    - counts/totals -> sum
    - '*_max' or 'max' in name -> max
    - numeric-like names containing 'amount' or 'rate' -> sum (for amounts) or max (for rate) depending on keywords
    - otherwise -> take last non-null
    """
    name = col_name.lower()
    if name in ('subject_id','hadm_id','day_index'):
        return 'first'
    if 'count' in name or 'total' in name or name.endswith('_sum'):
        return 'sum'
    if 'max' in name or name.endswith('_max'):
        return 'max'
    # amounts -> sum
    if 'amount' in name or 'total' in name:
        return 'sum'
    # rate or val or _num -> prefer max (for numeric measures we often want max)
    if 'rate' in name or name.endswith('_num') or 'val' in name or 'value' in name:
        return 'max'
    # fallback: last non-null string
    return lambda s: s.dropna().iloc[-1] if s.dropna().shape[0] > 0 else pd.NA

def collect_matching_rows(agg_path, keys_set, usecols=None, parse_dates=None, chunksize=MATCH_CHUNK):
    """
    Read agg_path in chunks and collect rows matching keys_set.
    Then collapse duplicate keys by grouping and applying heuristics for aggregation.
    Returns a DataFrame with unique keys.
    """
    if not agg_path.exists() or not keys_set:
        return pd.DataFrame(columns=(usecols or []))

    # prepare set of string keys for fast isin
    keys_s = set(key_str(s,h,d) for (s,h,d) in keys_set)

    matches = []
    cols_seen = None
    for chunk in pd.read_csv(agg_path, usecols=usecols, parse_dates=parse_dates, chunksize=chunksize, low_memory=True):
        # ensure expected key columns exist in this chunk
        if not {'subject_id','hadm_id','day_index'}.issubset(set(chunk.columns)):
            continue
        # normalize types
        chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce').astype('Int64')
        chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce').astype('Int64')
        chunk['day_index'] = pd.to_numeric(chunk['day_index'], errors='coerce').astype('Int64')

        ks = key_series_from_df(chunk)
        mask = ks.isin(keys_s)
        sel = chunk.loc[mask]
        if not sel.empty:
            matches.append(sel)
            if cols_seen is None:
                cols_seen = sel.columns.tolist()
        del chunk, ks, mask; gc.collect()

    if not matches:
        return pd.DataFrame(columns=(usecols or []))

    df = pd.concat(matches, ignore_index=True)

    # ensure keys are typed properly
    df['subject_id'] = pd.to_numeric(df['subject_id'], errors='coerce').astype('Int64')
    df['hadm_id'] = pd.to_numeric(df['hadm_id'], errors='coerce').astype('Int64')
    df['day_index'] = pd.to_numeric(df['day_index'], errors='coerce').astype('Int64')

    # Build aggregation dict using heuristics
    agg_dict = {}
    for col in df.columns:
        agg_dict[col] = _choose_agg_for_col(col)

    # Perform groupby aggregation to collapse duplicates
    grouped = df.groupby(['subject_id','hadm_id','day_index'], as_index=False).agg(agg_dict)

    # Post-process: ensure columns for keys are Int64 again
    grouped['subject_id'] = pd.to_numeric(grouped['subject_id'], errors='coerce').astype('Int64')
    grouped['hadm_id'] = pd.to_numeric(grouped['hadm_id'], errors='coerce').astype('Int64')
    grouped['day_index'] = pd.to_numeric(grouped['day_index'], errors='coerce').astype('Int64')

    return grouped

# ---------- prep ----------
admit_dict = build_admit_map(MERGED_INITIAL_PATH)

# ---------- Phase A: agg ingredientevents ----------
print(f"[{nowstr()}] Phase A: aggregate ingredientevents -> {AGG_ING_PATH}")
if AGG_ING_PATH.exists(): AGG_ING_PATH.unlink()
hdr = read_header(ING_PATH)
use_want = ['subject_id','hadm_id','starttime','endtime','storetime','itemid','amount','amountuom','rate','rateuom','orderid','statusdescription','originalamount','originalrate']
usecols = [c for c in use_want if c in hdr]
parse_dates = [c for c in ['starttime','endtime','storetime'] if c in hdr]
first_out=True; total=0
for i, chunk in enumerate(pd.read_csv(ING_PATH, usecols=usecols, parse_dates=parse_dates, chunksize=SRC_CHUNKSIZE, low_memory=True), start=1):
    t0=time.time(); print(f"[{nowstr()}] ing chunk {i} rows={len(chunk):,}")
    chunk['subject_id']=pd.to_numeric(chunk['subject_id'],errors='coerce').astype('Int64')
    chunk['hadm_id']=pd.to_numeric(chunk['hadm_id'],errors='coerce').astype('Int64')
    keys=list(zip(chunk['subject_id'].astype('Int64').astype(object), chunk['hadm_id'].astype('Int64').astype(object)))
    chunk['admit_date']=[admit_dict.get((int(s),int(h)), pd.NaT) if not (pd.isna(s) or pd.isna(h)) else pd.NaT for s,h in keys]
    before=len(chunk); chunk=chunk[chunk['admit_date'].notna()].copy(); dropped=before-len(chunk)
    if dropped: print(f"  -> dropped {dropped:,} ing rows w/o admit")
    if chunk.empty: del chunk; gc.collect(); continue
    # compute day_index by starttime (fallback storetime/endtime)
    chunk['event_time']=pd.to_datetime(chunk.get('starttime', pd.NaT),errors='coerce').fillna(pd.to_datetime(chunk.get('storetime', pd.NaT),errors='coerce')).fillna(pd.to_datetime(chunk.get('endtime', pd.NaT),errors='coerce'))
    chunk['chart_date']=chunk['event_time'].dt.normalize()
    chunk['day_index']= (chunk['chart_date'] - chunk['admit_date']).dt.days.fillna(0).astype(int); chunk.loc[chunk['day_index']<0,'day_index']=0
    # numeric parsing
    chunk['amount_num']=chunk['amount'].apply(parse_numeric) if 'amount' in chunk.columns else np.nan
    chunk['rate_num']=chunk['rate'].apply(parse_numeric) if 'rate' in chunk.columns else np.nan
    # text
    chunk['item_text']=chunk['itemid'].astype(str).str.strip()
    # aggregate: sum amounts (ingredients sum often makes sense), max rate, last status/text, count
    agg = chunk.groupby(['subject_id','hadm_id','day_index'], as_index=False).agg(
        ing_events_count = ('item_text','count'),
        ing_total_amount = ('amount_num','sum'),
        ing_max_rate = ('rate_num','max'),
        ing_last_itemid = ('item_text', lambda s: s.dropna().astype(str).iloc[-1] if len(s.dropna())>0 else pd.NA),
        ing_last_status = ('statusdescription', lambda s: s.dropna().iloc[-1] if 'statusdescription' in chunk.columns and len(s.dropna())>0 else pd.NA)
    )
    agg['subject_id']=agg['subject_id'].astype('Int64'); agg['hadm_id']=agg['hadm_id'].astype('Int64'); agg['day_index']=agg['day_index'].astype('Int64')
    agg.to_csv(AGG_ING_PATH, mode='w' if first_out else 'a', index=False, header=first_out)
    first_out=False; total+=len(agg)
    print(f"[{nowstr()}] ing chunk {i} -> wrote {len(agg):,} agg rows (total {total:,}) took {time.time()-t0:.1f}s")
    del chunk, agg; gc.collect()
print(f"[{nowstr()}] ingredient aggregation done. total agg rows (raw appended): {total}")

# ---------- Phase B: agg inputevents ----------
print(f"[{nowstr()}] Phase B: aggregate inputevents -> {AGG_INP_PATH}")
if AGG_INP_PATH.exists(): AGG_INP_PATH.unlink()
hdr = read_header(INP_PATH)
use_want = ['subject_id','hadm_id','starttime','endtime','storetime','itemid','amount','amountuom','rate','rateuom','orderid','ordercategoryname','ordercomponenttypedescription','totalamount','totalamountuom','isopenbag','statusdescription']
usecols = [c for c in use_want if c in hdr]
parse_dates = [c for c in ['starttime','endtime','storetime'] if c in hdr]
first_out=True; total=0
for i, chunk in enumerate(pd.read_csv(INP_PATH, usecols=usecols, parse_dates=parse_dates, chunksize=SRC_CHUNKSIZE, low_memory=True), start=1):
    t0=time.time(); print(f"[{nowstr()}] inp chunk {i} rows={len(chunk):,}")
    chunk['subject_id']=pd.to_numeric(chunk['subject_id'],errors='coerce').astype('Int64')
    chunk['hadm_id']=pd.to_numeric(chunk['hadm_id'],errors='coerce').astype('Int64')
    keys=list(zip(chunk['subject_id'].astype('Int64').astype(object), chunk['hadm_id'].astype('Int64').astype(object)))
    chunk['admit_date']=[admit_dict.get((int(s),int(h)), pd.NaT) if not (pd.isna(s) or pd.isna(h)) else pd.NaT for s,h in keys]
    before=len(chunk); chunk=chunk[chunk['admit_date'].notna()].copy(); dropped=before-len(chunk)
    if dropped: print(f"  -> dropped {dropped:,} input rows w/o admit")
    if chunk.empty: del chunk; gc.collect(); continue
    chunk['event_time']=pd.to_datetime(chunk.get('starttime', pd.NaT),errors='coerce').fillna(pd.to_datetime(chunk.get('storetime', pd.NaT),errors='coerce')).fillna(pd.to_datetime(chunk.get('endtime', pd.NaT),errors='coerce'))
    chunk['chart_date']=chunk['event_time'].dt.normalize()
    chunk['day_index']=(chunk['chart_date']-chunk['admit_date']).dt.days.fillna(0).astype(int); chunk.loc[chunk['day_index']<0,'day_index']=0
    chunk['amount_num']=chunk['amount'].apply(parse_numeric) if 'amount' in chunk.columns else np.nan
    chunk['rate_num']=chunk['rate'].apply(parse_numeric) if 'rate' in chunk.columns else np.nan
    chunk['totalamount_num']=chunk['totalamount'].apply(parse_numeric) if 'totalamount' in chunk.columns else np.nan
    chunk['item_text']=chunk['itemid'].astype(str)
    chunk['ordercat_text']=chunk['ordercategoryname'].astype(str) if 'ordercategoryname' in chunk.columns else pd.NA
    # aggregations: sum amounts (for fluid totals), max rate, counts, last textual descriptors
    agg = chunk.groupby(['subject_id','hadm_id','day_index'], as_index=False).agg(
        input_events_count = ('item_text','count'),
        input_total_amount = ('amount_num','sum'),
        input_totalamount_field = ('totalamount_num','sum'),
        input_max_rate = ('rate_num','max'),
        input_last_ordercat = ('ordercat_text', lambda s: s.dropna().iloc[-1] if len(s.dropna())>0 else pd.NA),
        input_last_status = ('statusdescription', lambda s: s.dropna().iloc[-1] if 'statusdescription' in chunk.columns and len(s.dropna())>0 else pd.NA)
    )
    agg['subject_id']=agg['subject_id'].astype('Int64'); agg['hadm_id']=agg['hadm_id'].astype('Int64'); agg['day_index']=agg['day_index'].astype('Int64')
    agg.to_csv(AGG_INP_PATH, mode='w' if first_out else 'a', index=False, header=first_out)
    first_out=False; total+=len(agg)
    print(f"[{nowstr()}] inp chunk {i} -> wrote {len(agg):,} agg rows (total {total:,}) took {time.time()-t0:.1f}s")
    del chunk, agg; gc.collect()
print(f"[{nowstr()}] inputevents aggregation done. total agg rows (raw appended): {total}")

# ---------- Phase C: agg procedureevents ----------
print(f"[{nowstr()}] Phase C: aggregate procedureevents -> {AGG_PROC_PATH}")
if AGG_PROC_PATH.exists(): AGG_PROC_PATH.unlink()
hdr = read_header(PROC_PATH)
use_want = ['subject_id','hadm_id','starttime','endtime','storetime','itemid','value','valueuom','location','locationcategory','ordercategoryname','statusdescription']
usecols = [c for c in use_want if c in hdr]
parse_dates = [c for c in ['starttime','endtime','storetime'] if c in hdr]
first_out=True; total=0
for i, chunk in enumerate(pd.read_csv(PROC_PATH, usecols=usecols, parse_dates=parse_dates, chunksize=SRC_CHUNKSIZE, low_memory=True), start=1):
    t0=time.time(); print(f"[{nowstr()}] proc chunk {i} rows={len(chunk):,}")
    chunk['subject_id']=pd.to_numeric(chunk['subject_id'],errors='coerce').astype('Int64')
    chunk['hadm_id']=pd.to_numeric(chunk['hadm_id'],errors='coerce').astype('Int64')
    keys=list(zip(chunk['subject_id'].astype('Int64').astype(object), chunk['hadm_id'].astype('Int64').astype(object)))
    chunk['admit_date']=[admit_dict.get((int(s),int(h)), pd.NaT) if not (pd.isna(s) or pd.isna(h)) else pd.NaT for s,h in keys]
    before=len(chunk); chunk=chunk[chunk['admit_date'].notna()].copy(); dropped=before-len(chunk)
    if dropped: print(f"  -> dropped {dropped:,} proc rows w/o admit")
    if chunk.empty: del chunk; gc.collect(); continue
    chunk['event_time']=pd.to_datetime(chunk.get('starttime', pd.NaT),errors='coerce').fillna(pd.to_datetime(chunk.get('storetime', pd.NaT),errors='coerce')).fillna(pd.to_datetime(chunk.get('endtime', pd.NaT),errors='coerce'))
    chunk['chart_date']=chunk['event_time'].dt.normalize()
    chunk['day_index']=(chunk['chart_date']-chunk['admit_date']).dt.days.fillna(0).astype(int); chunk.loc[chunk['day_index']<0,'day_index']=0
    chunk['val_num']=chunk['value'].apply(parse_numeric) if 'value' in chunk.columns else np.nan
    chunk['item_text']=chunk['itemid'].astype(str)
    chunk['loc_text']=chunk['location'].astype(str) if 'location' in chunk.columns else pd.NA
    agg = chunk.groupby(['subject_id','hadm_id','day_index'], as_index=False).agg(
        proc_events_count = ('item_text','count'),
        proc_max_value = ('val_num','max'),
        proc_last_valueuom = ('valueuom', lambda s: s.dropna().iloc[-1] if 'valueuom' in chunk.columns and len(s.dropna())>0 else pd.NA),
        proc_last_location = ('loc_text', lambda s: s.dropna().iloc[-1] if len(s.dropna())>0 else pd.NA),
        proc_last_category = ('ordercategoryname', lambda s: s.dropna().iloc[-1] if 'ordercategoryname' in chunk.columns and len(s.dropna())>0 else pd.NA)
    )
    agg['subject_id']=agg['subject_id'].astype('Int64'); agg['hadm_id']=agg['hadm_id'].astype('Int64'); agg['day_index']=agg['day_index'].astype('Int64')
    agg.to_csv(AGG_PROC_PATH, mode='w' if first_out else 'a', index=False, header=first_out)
    first_out=False; total+=len(agg)
    print(f"[{nowstr()}] proc chunk {i} -> wrote {len(agg):,} agg rows (total {total:,}) took {time.time()-t0:.1f}s")
    del chunk, agg; gc.collect()
print(f"[{nowstr()}] procedure aggregation done. total agg rows (raw appended): {total}")

# ---------- Phase D: merge aggregated files into merged_initial in chunks ----------
OUT = BASE / "merged_with_inputs_procs.csv"
if OUT.exists(): OUT.unlink()
print(f"[{nowstr()}] Phase D: merge aggregated files into {OUT} in chunks (write_chunk={WRITE_CHUNK})")
first_write=True; mchunk_no=0
for mchunk in pd.read_csv(MERGED_INITIAL_PATH, chunksize=WRITE_CHUNK, parse_dates=['admittime','dischtime','deathtime','edregtime','edouttime'], low_memory=False):
    mchunk_no+=1; t0=time.time()
    print(f"[{nowstr()}] merged chunk {mchunk_no} rows={len(mchunk):,}")
    mchunk['subject_id']=pd.to_numeric(mchunk['subject_id'],errors='coerce').astype('Int64')
    mchunk['hadm_id']=pd.to_numeric(mchunk['hadm_id'],errors='coerce').astype('Int64')
    mchunk['day_index']=pd.to_numeric(mchunk['day_index'],errors='coerce').astype('Int64')

    # build keys set for this chunk
    keys = set((int(r['subject_id']), int(r['hadm_id']), int(r['day_index'])) for _,r in mchunk[['subject_id','hadm_id','day_index']].iterrows())

    # collect matches and collapse duplicates internally
    ing_match = collect_matching_rows(AGG_ING_PATH, keys, usecols=None) if AGG_ING_PATH.exists() else pd.DataFrame()
    inp_match = collect_matching_rows(AGG_INP_PATH, keys, usecols=None) if AGG_INP_PATH.exists() else pd.DataFrame()
    proc_match = collect_matching_rows(AGG_PROC_PATH, keys, usecols=None) if AGG_PROC_PATH.exists() else pd.DataFrame()

    # Now merge - since collected matches are already deduplicated per key, no Cartesian duplicates will occur
    if not ing_match.empty:
        mchunk = mchunk.merge(ing_match, on=['subject_id','hadm_id','day_index'], how='left')
    if not inp_match.empty:
        mchunk = mchunk.merge(inp_match, on=['subject_id','hadm_id','day_index'], how='left')
    if not proc_match.empty:
        mchunk = mchunk.merge(proc_match, on=['subject_id','hadm_id','day_index'], how='left')

    mchunk.to_csv(OUT, mode='w' if first_write else 'a', index=False, header=first_write)
    first_write=False
    print(f"[{nowstr()}] Written merged chunk {mchunk_no} (took {time.time()-t0:.1f}s)")
    del mchunk, ing_match, inp_match, proc_match; gc.collect()

print(f"[{nowstr()}] All done. Output: {OUT}")


In [None]:
merged_initial = pd.read_csv("admissions_expanded.csv")

In [None]:
merged_initial.shape

In [None]:
merged_with_inputs_procs = pd.read_csv("merged_with_inputs_procs.csv")

In [None]:
merged_with_inputs_procs.shape

In [None]:
merged_with_inputs_procs.head(5)

In [None]:
microbiologyevents = pd.read_csv("microbiologyevents.csv")

In [None]:
microbiologyevents.head(50).to_csv('test_microbiologyevents')

In [None]:
pd.set_option('display.max_columns', 200)
pd.set_option('display.width', 160)

# Paths - adjust if needed
merged_initial_path = Path("admissions_expanded.csv")   # created earlier by your notebook
micro_path = Path("microbiologyevents.csv")
out_merged_path = Path("merged_with_microbiologyevents.csv")

if not merged_initial_path.exists():
    raise FileNotFoundError(f"{merged_initial_path} not found. Run admissions expansion first.")

if not micro_path.exists():
    raise FileNotFoundError(f"{micro_path} not found. Put microbiologyevents.csv next to admissions.csv")

# 1) Load merged_initial (admission-day base)
merged_initial = pd.read_csv(merged_initial_path, low_memory=False,
                             parse_dates=['admittime','dischtime','deathtime','edregtime','edouttime'])
# ensure day_index is integer
merged_initial['day_index'] = merged_initial['day_index'].astype('Int64')

# 2) Build admit_map (admit_date per (subject_id,hadm_id)) and index map
admit_map = merged_initial.groupby(['subject_id','hadm_id'], dropna=False)['admittime'].first().reset_index().rename(columns={'admittime':'admit_time'})
admit_map['admit_date'] = pd.to_datetime(admit_map['admit_time'], errors='coerce').dt.normalize()
admit_map['key'] = list(zip(admit_map['subject_id'].astype('Int64'), admit_map['hadm_id'].astype('Int64')))
admit_dict = dict(zip(admit_map['key'], admit_map['admit_date']))

# merged_initial index map to assign quickly
merged_initial_index_map = {}
for idx, row in merged_initial[['subject_id','hadm_id','day_index']].iterrows():
    # ensure ints
    try:
        key = (int(row['subject_id']), int(row['hadm_id']), int(row['day_index']))
        merged_initial_index_map[key] = idx
    except Exception:
        continue

print("Loaded merged_initial rows:", len(merged_initial))
print("Admit map keys:", len(admit_dict), "merged rows map size:", len(merged_initial_index_map))

# 3) Read microbiologyevents header to get columns
micro_head = pd.read_csv(micro_path, nrows=0)
micro_cols = micro_head.columns.tolist()
print("microbiologyevents columns:", micro_cols)

# We'll create merged_initial columns for each original micro column (prefixed with micro_)
prefix = "micro_"
new_cols = []
for c in micro_cols:
    if c in ('subject_id','hadm_id'):
        continue
    new_c = prefix + c
    new_cols.append(new_c)
    if new_c not in merged_initial.columns:
        merged_initial[new_c] = pd.Series([pd.NA] * len(merged_initial), dtype="object")

print("Added new micro columns to merged_initial (if missing). Count:", len(new_cols))

# 4) Decide which columns to treat as numeric (attempt a conservative list; we'll coerce in-chunk)
# Common numeric-like fields in microbiologyevents: isolate_num, quantity, dilution_value, ab_itemid, test_seq, microevent_id, spec_itemid
candidate_numeric = ['isolate_num','quantity','dilution_value','ab_itemid','test_seq','microevent_id','spec_itemid']

# 5) Chunked processing
chunksize = 200_000   # adjust if you want larger/smaller chunks
reader = pd.read_csv(micro_path, parse_dates=['charttime','chartdate','storedate'], chunksize=chunksize, low_memory=True)

total_assigned = 0
chunk_no = 0

for chunk in reader:
    chunk_no += 1
    print(f"\n--- Processing chunk {chunk_no} (rows: {len(chunk):,}) ---")
    # Ensure subject_id/hadm_id are numeric ints
    chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce')
    chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce')
    chunk = chunk.dropna(subset=['subject_id','hadm_id'])
    chunk['subject_id'] = chunk['subject_id'].astype(int)
    chunk['hadm_id'] = chunk['hadm_id'].astype(int)

    # Resolve chart_date: prefer charttime (if available) else chartdate
    if 'charttime' in chunk.columns and chunk['charttime'].notna().any():
        chunk['chartref'] = pd.to_datetime(chunk['charttime'], errors='coerce')
    else:
        chunk['chartref'] = pd.to_datetime(chunk.get('chartdate', pd.NaT), errors='coerce')

    chunk['chart_date'] = chunk['chartref'].dt.normalize()

    # Attach admit_date using admit_dict
    keys = list(zip(chunk['subject_id'].astype(int), chunk['hadm_id'].astype(int)))
    chunk['admit_date'] = [admit_dict.get(k, pd.NaT) for k in keys]

    # Drop rows with no matching admission
    before_drop = len(chunk)
    chunk = chunk[chunk['admit_date'].notna()].copy()
    after_drop = len(chunk)
    if after_drop == 0:
        print("no matching admissions in this chunk -> skipping")
        continue
    if after_drop < before_drop:
        print(f"Dropped {before_drop-after_drop} rows with no admit_date")

    # Compute day_index relative to admit_date (normalized)
    chunk['day_index'] = (chunk['chart_date'] - chunk['admit_date']).dt.days.fillna(0).astype(int)
    chunk.loc[chunk['day_index'] < 0, 'day_index'] = 0

    # Coerce numeric-like candidate columns to numeric where possible
    numeric_cols_present = [c for c in candidate_numeric if c in chunk.columns]
    coerced_numeric = []
    for nc in numeric_cols_present:
        coerced = pd.to_numeric(chunk[nc], errors='coerce')
        # treat as numeric if a non-trivial fraction parse as numeric (>=1%)
        frac_num = coerced.notna().sum() / max(1, len(chunk))
        if frac_num > 0.01:
            chunk[nc + "_num"] = coerced
            coerced_numeric.append(nc)
        # else leave as text (we'll treat as text)

    # Group keys: day-level per admission
    grp_keys = ['subject_id','hadm_id','day_index']

    # For textual fields (and all fields as fallback) pick the last row by chartref (charttime)
    chunk_sorted = chunk.sort_values('chartref')
    grp_last = chunk_sorted.groupby(grp_keys, as_index=False).last()  # last row per field

    # For numeric columns we created "<col>_num" - take max per day
    if coerced_numeric:
        agg_spec = {nc + "_num": "max" for nc in coerced_numeric}
        grp_num = chunk.groupby(grp_keys, as_index=False).agg(agg_spec)
    else:
        grp_num = pd.DataFrame(columns=grp_keys)  # empty

    # Merge last/text and numeric max results
    if not grp_num.empty:
        merged_grps = pd.merge(grp_last, grp_num, on=grp_keys, how='left', suffixes=('','_nummax'))
    else:
        merged_grps = grp_last

    # Now determine final value per original column:
    # - if original column in coerced_numeric -> use the max from "<col>_num" (after merging)
    # - else use the last observed value (grp_last[col])
    assigned = 0
    for _, r in merged_grps.iterrows():
        key = (int(r['subject_id']), int(r['hadm_id']), int(r['day_index']))
        row_idx = merged_initial_index_map.get(key)
        if row_idx is None:
            continue
        # assign each original column value into merged_initial under prefix 'micro_<col>'
        for col in micro_cols:
            if col in ('subject_id','hadm_id'):
                continue
            out_col = prefix + col
            if col in coerced_numeric:
                # numeric stored as "<col>_num" in merged_grps (or "<col>_num" in chunk)
                numcol = col + "_num"
                val = r.get(numcol, pd.NA)
                # fallback: if no numeric, try textual column value
                if pd.isna(val):
                    val = r.get(col, pd.NA)
            else:
                val = r.get(col, pd.NA)
            # assign as-is (preserve strings/dates). Convert numpy types to python native where needed
            merged_initial.at[row_idx, out_col] = val
        assigned += 1

    total_assigned += assigned
    print(f"Chunk {chunk_no}: groups aggregated = {len(merged_grps)}, assigned rows = {assigned}, total_assigned so far = {total_assigned}")

    # cleanup
    del chunk, chunk_sorted, grp_last, grp_num, merged_grps
    gc.collect()

print("\n--- ALL CHUNKS PROCESSED ---")
print("Total assigned day-rows updated:", total_assigned)

# 6) Save out (chunked write to avoid memory surge)
n_rows = len(merged_initial)
write_chunk = 20000
first_out = True
for start in range(0, n_rows, write_chunk):
    end = min(start + write_chunk, n_rows)
    merged_initial.iloc[start:end].to_csv(out_merged_path, mode='w' if first_out else 'a', index=False, header=first_out)
    first_out = False
    print(f"Saved rows {start}-{end-1}")
print("Saved final merged file with microbiologyevents to:", out_merged_path)


In [None]:
merged_initial = pd.read_csv("admissions_expanded.csv")

In [None]:
merged_initial.shape

In [None]:
merged_with_microbiologyevents = pd.read_csv("merged_with_microbiologyevents.csv")

In [None]:
merged_with_microbiologyevents.shape

In [None]:
merged_with_microbiologyevents.head(5)

In [None]:
datetimeevents_path = Path("datetimeevents.csv")
datetimeevents = pd.read_csv(datetimeevents_path, nrows=1000)

In [None]:
datetimeevents.head(100).to_csv('test_datetimeevents', index=False)

In [None]:
datetimeevents.head(5)

In [None]:
d_items_path = Path("d_items.csv")
d_items = pd.read_csv(d_items_path, nrows=1000)

In [None]:
d_items.head(10).to_csv('test_d_items', index=False)

In [None]:
pd.set_option('display.max_columns', 200)
pd.set_option('display.width', 140)

# فایل‌ها
merged_initial_file = Path("admissions_expanded.csv")   # یا admissions_expanded.csv طبق جریان قبلی
ditems_file = Path("d_items.csv")
datetimeevents_file = Path("datetimeevents.csv")

# پارامترها
chunksize = 500_000   # مثل کد قبلی، می‌توانی تغییر دهی

# بررسی وجود فایل‌ها
if not merged_initial_file.exists():
    raise FileNotFoundError(f"{merged_initial_file} not found. load or generate merged_initial first.")
if not ditems_file.exists():
    raise FileNotFoundError(f"{ditems_file} not found.")
if not datetimeevents_file.exists():
    raise FileNotFoundError(f"{datetimeevents_file} not found.")

# 1) بارگذاری merged_initial (ممکن است بزرگ باشد — اما ما برای افزودن ستون‌ها آن را در حافظه فرض می‌کنیم همانند نوت‌بوک تو)
print("Loading merged_initial (this may use substantial memory)...")
merged_initial = pd.read_csv(merged_initial_file, parse_dates=['admittime','dischtime','deathtime','edregtime','edouttime'], low_memory=False)
n_rows = len(merged_initial)
print("merged_initial rows:", n_rows)

# 2) دریافت itemidهای مربوط به datetimeevents از d_items
print("Loading d_items and selecting itemids where linksto == 'datetimeevents' ...")
d = pd.read_csv(ditems_file, dtype=str, usecols=['itemid','linksto','label','abbreviation'])
d['linksto'] = d['linksto'].fillna('').astype(str)
dt_items = d.loc[d['linksto'].str.lower() == 'datetimeevents', 'itemid'].dropna().unique().tolist()
# convert to ints (where possible)
dt_itemids = []
for iid in dt_items:
    try:
        dt_itemids.append(int(iid))
    except:
        pass
dt_itemids = sorted(set(dt_itemids))
print(f"Found {len(dt_itemids)} datetimeevents itemids (sample):", dt_itemids[:20])

# 3) اضافه کردن ستون‌های itemid به merged_initial (نام ستون‌ها همان id به صورت رشته)
added = 0
for iid in dt_itemids:
    col = str(iid)
    if col not in merged_initial.columns:
        merged_initial[col] = pd.Series([pd.NA] * n_rows, dtype="object")
        added += 1
print(f"Added {added} new columns to merged_initial for datetimeitems. Total columns now: {len(merged_initial.columns)}")

# 4) ساختن admit_map: (subject_id,hadm_id) -> admit_date (normalized)
print("Building admit_date map from merged_initial ...")
admit_map = merged_initial.groupby(['subject_id','hadm_id'], dropna=False)['admittime'].first().reset_index().rename(columns={'admittime':'admit_time'})
admit_map['admit_date'] = pd.to_datetime(admit_map['admit_time'], errors='coerce').dt.normalize()
admit_map['key'] = list(zip(admit_map['subject_id'].astype('Int64'), admit_map['hadm_id'].astype('Int64')))
admit_dict = dict(zip(admit_map['key'], admit_map['admit_date']))
print("Admit map entries:", len(admit_dict))

# 5) ساختن index map برای merged_initial: (subject_id,hadm_id,day_index) -> row index
print("Building merged_initial index map (for direct assignment) ...")
merged_initial_index_map = {}
for idx, row in merged_initial[['subject_id','hadm_id','day_index']].iterrows():
    try:
        key = (int(row['subject_id']), int(row['hadm_id']), int(row['day_index']))
        merged_initial_index_map[key] = idx
    except Exception:
        # skip rows with missing keys
        continue
print("Merged rows map size:", len(merged_initial_index_map))

# 6) خواندن datetimeevents چانک‌به‌چانک و نگاشت به day_index و aggregate
usecols = ['subject_id','hadm_id','stay_id','caregiver_id','charttime','storetime','itemid','value','valueuom','warning']
reader = pd.read_csv(datetimeevents_file, usecols=usecols, parse_dates=['charttime','storetime'], chunksize=chunksize, low_memory=True)

total_assigned = 0
chunk_no = 0

for chunk in reader:
    chunk_no += 1
    print(f"\n--- Processing datetimeevents chunk {chunk_no} (rows: {len(chunk)}) ---")
    # itemid numeric
    chunk['itemid'] = pd.to_numeric(chunk['itemid'], errors='coerce').astype('Int64')
    chunk = chunk[chunk['itemid'].notna()]
    if chunk.empty:
        print("no itemids in this chunk")
        continue

    # فیلتر فقط itemidهای مربوط به datetimeevents (از d_items)
    chunk = chunk[chunk['itemid'].isin(dt_itemids)]
    if chunk.empty:
        print("no relevant datetime itemids in this chunk")
        continue

    # تبدیل شناسه‌ها به int برای lookup
    chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce').astype('Int64')
    chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce').astype('Int64')
    # lookup admit_date
    keys = list(zip(chunk['subject_id'].astype('Int64'), chunk['hadm_id'].astype('Int64')))
    chunk['admit_date'] = [admit_dict.get(k, pd.NaT) for k in keys]

    # حذف ردیف‌هایی که admission match ندارند
    chunk = chunk[chunk['admit_date'].notna()].copy()
    if chunk.empty:
        print("no rows with admit_date in this chunk")
        continue

    # محاسبه day_index (بر اساس charttime normalize)
    chunk['chart_date'] = pd.to_datetime(chunk['charttime'], errors='coerce').dt.normalize()
    chunk['day_index'] = (chunk['chart_date'] - chunk['admit_date']).dt.days.fillna(0).astype(int)
    chunk.loc[chunk['day_index'] < 0, 'day_index'] = 0

    # تلاش برای پارس کردن مقدار value به datetime (اگر ممکن باشد)
    chunk['value_dt'] = pd.to_datetime(chunk['value'], errors='coerce')

    # اگر value_dt خالی است، می‌توانیم به عنوان fallback از charttime استفاده کنیم (اختیاری)
    # اینجا ما ترجیح می‌دهیم مقدار value_dt را اگر موجود باشد استفاده کنیم؛ در غیر اینصورت مقدار متنی را نگه می‌داریم.
    chunk['value_raw'] = chunk['value'].astype(str)

    # گروه‌بندی: برای هر (subject_id,hadm_id,day_index,itemid) آخرین مقدار بر اساس charttime را می‌گیریم
    grp_keys = ['subject_id','hadm_id','day_index','itemid']
    chunk_sorted = chunk.sort_values('charttime')
    grp_last = chunk_sorted.groupby(grp_keys, as_index=False).last()[grp_keys + ['value_dt','value_raw','charttime']]
    # فرمت نهایی: اگر value_dt موجود است آن را به رشتهٔ ISO ذخیره کن، وگرنه value_raw
    def make_final_val(r):
        if pd.notna(r.get('value_dt')):
            # تبدیل به ISO (بدون timezone)
            return pd.to_datetime(r['value_dt']).isoformat()
        else:
            v = r.get('value_raw')
            if pd.isna(v) or v in ("nan","None","NoneType","NA","<NA>"):
                return pd.NA
            return v

    grp_last['final_value'] = grp_last.apply(make_final_val, axis=1)

    # اکنون مقدارها را به merged_initial اختصاص می‌دهیم (همان روش قبلی: find row_idx و .at assignment)
    assigned = 0
    for _, r in grp_last.iterrows():
        try:
            key = (int(r['subject_id']), int(r['hadm_id']), int(r['day_index']))
        except Exception:
            continue
        row_idx = merged_initial_index_map.get(key)
        if row_idx is None:
            continue
        itemid_col = str(int(r['itemid']))
        val = r['final_value']
        merged_initial.at[row_idx, itemid_col] = val
        assigned += 1

    total_assigned += assigned
    print(f"Chunk {chunk_no}: groups aggregated = {len(grp_last)}, assigned = {assigned}, total_assigned so far = {total_assigned}")

    # پاکسازی
    del chunk, chunk_sorted, grp_last
    gc.collect()

print("\n--- ALL datetimeevents CHUNKS PROCESSED ---")
print("Total assigned datetime cells:", total_assigned)

# 7) ذخیرهٔ خروجی نهایی (chunked write برای حافظه دوستانه)
out_path = Path("merged_with_datetimeevents_filled.csv")
write_chunk = 20000
first = True
n_rows = len(merged_initial)
for start in range(0, n_rows, write_chunk):
    end = min(start + write_chunk, n_rows)
    merged_initial.iloc[start:end].to_csv(out_path, mode='w' if first else 'a', index=False, header=first)
    first = False
    print(f"Saved rows {start}-{end-1}")
print("Saved final to:", out_path)


In [None]:
merged_initial = pd.read_csv("admissions_expanded.csv")

In [None]:
merged_initial.shape

In [None]:
merged_with_datetimeevents_filled = pd.read_csv("merged_with_datetimeevents_filled.csv")

In [None]:
merged_with_datetimeevents_filled.shape

In [None]:
merged_with_datetimeevents_filled.head(2)

In [None]:
# rename_datetimeevents_columns.py
pd.set_option('display.max_columns', 200)
pd.set_option('display.width', 140)

# فایل‌ها
ditems_path = Path("d_items.csv")
in_path = Path("merged_with_datetimeevents_filled.csv")
out_path = Path("merged_with_datetimeevents_filled_renamed.csv")

# پارامترها
chunksize = 20000
max_name_len = 80

# بررسی وجود فایل‌ها
if not ditems_path.exists():
    raise FileNotFoundError(f"{ditems_path} not found.")
if not in_path.exists():
    raise FileNotFoundError(f"{in_path} not found.")

# بارگذاری دیکشنری آیتم‌ها (فقط datetimeevents)
d = pd.read_csv(ditems_path, usecols=['itemid','label','abbreviation','linksto'], dtype=str)
d['linksto'] = d['linksto'].fillna('').astype(str)
d_dt = d.loc[d['linksto'].str.lower() == 'datetimeevents'].copy()
d_dt['itemid'] = d_dt['itemid'].astype(str).str.strip()
d_dt['label'] = d_dt['label'].fillna('').astype(str).str.strip()
d_dt['abbreviation'] = d_dt['abbreviation'].fillna('').astype(str).str.strip()

# انتخاب نام نهایی (abbreviation اگر هست، وگرنه label، وگرنه fallback)
d_dt['chosen'] = d_dt.apply(lambda r: r['abbreviation'] if r['abbreviation']!='' else (r['label'] if r['label']!='' else ''), axis=1)

def sanitize_name(s):
    if pd.isna(s) or s is None:
        return ''
    s = str(s).strip()
    s = re.sub(r'\s+', '_', s)
    s = re.sub(r'[^\w\-]', '', s)   # اجازه حروف/اعداد/underscore/dash
    s = re.sub(r'_+', '_', s)
    s = s[:max_name_len]
    return s

# ساخت map itemid -> chosen name (با تضمین یکتایی)
name_map = {}
used = set()
for _, row in d_dt.iterrows():
    iid = row['itemid']
    chosen = row['chosen']
    if chosen == '':
        base = f"item_{iid}"
    else:
        base = sanitize_name(chosen)
        if base == '':
            base = f"item_{iid}"
    name = base
    if name in used:
        name = f"{base}__{iid}"
    counter = 1
    while name in used:
        name = f"{base}__{iid}_{counter}"
        counter += 1
    used.add(name)
    name_map[iid] = name

# آماده‌سازی header جدید
orig_header = pd.read_csv(in_path, nrows=0).columns.tolist()
new_header = []
conflicts = 0

for col in orig_header:
    new_col = col
    col_str = str(col).strip()
    # اگر خودِ ستون دقیقا یک itemid از datetimeevents باشد، نامگذاری کنیم
    if col_str in name_map:
        new_col = "datetimeevents_" + name_map[col_str]
    else:
        # بعضی هدرها ممکنه به صورت عدد/float ذخیره شده باشند؛ تلاش کن به int تبدیل کنی و بررسی کنی
        try:
            icol = str(int(float(col_str)))
            if icol in name_map:
                new_col = "datetimeevents_" + name_map[icol]
        except Exception:
            pass
    # جلوگیری از تضاد نام‌ها در هدر جدید
    if new_col in new_header:
        conflicts += 1
        new_col = f"{new_col}__orig_{sanitize_name(col_str)}"
        k = 1
        while new_col in new_header:
            new_col = f"{new_col}_{k}"; k += 1
    new_header.append(new_col)

print(f"Prepared header mapping. Total cols: {len(orig_header)}, conflicts resolved: {conflicts}")
sample_map = {k: name_map[k] for k in list(name_map)[:10]}
print("sample itemid->name (first 10):", sample_map)

# نوشتن فایل با هدر جدید به صورت chunked
first = True
rows_written = 0
for i, chunk in enumerate(pd.read_csv(in_path, chunksize=chunksize, low_memory=False)):
    chunk.columns = new_header
    chunk.to_csv(out_path, mode='w' if first else 'a', index=False, header=first)
    first = False
    rows_written += len(chunk)
    print(f"Chunk {i+1}: wrote {len(chunk):,} rows (total {rows_written:,})")
    del chunk
    gc.collect()

print("✅ Done. Output saved to:", out_path)
print("Rows written:", rows_written)


In [None]:
merged_with_datetimeevents_filled_renamed = pd.read_csv("merged_with_datetimeevents_filled_renamed.csv")

In [None]:
merged_with_datetimeevents_filled_renamed.head(5)

In [None]:
outputevents_path = Path("outputevents.csv")
outputevents = pd.read_csv(outputevents_path, nrows=1000)

In [None]:
outputevents.head(10).to_csv('test_outputevents', index=False)

In [None]:
outputevents.head(5)

In [None]:
# merge_outputevents_to_merged_initial.py

pd.set_option('display.max_columns', 200)
pd.set_option('display.width', 140)

# فایل‌ها
merged_initial_file = Path("admissions_expanded.csv")   # یا مسیر دیگری که merged_initial تو ذخیره کردی
ditems_file = Path("d_items.csv")
outputevents_file = Path("outputevents.csv")

# پارامترها
chunksize = 500_000   # مثل کدهای قبلی، قابل تغییر

# بررسی وجود فایل‌ها
if not merged_initial_file.exists():
    raise FileNotFoundError(f"{merged_initial_file} not found. Load or generate merged_initial first.")
if not ditems_file.exists():
    raise FileNotFoundError(f"{ditems_file} not found.")
if not outputevents_file.exists():
    raise FileNotFoundError(f"{outputevents_file} not found.")

# 1) بارگذاری merged_initial (همانند نوت‌بوک تو در حافظه نگه داشته می‌شود)
print("Loading merged_initial (may use substantial memory)...")
merged_initial = pd.read_csv(merged_initial_file, parse_dates=['admittime','dischtime','deathtime','edregtime','edouttime'], low_memory=False)
n_rows = len(merged_initial)
print("merged_initial rows:", n_rows)

# 2) گرفتن itemidهای مربوط به outputevents از d_items
print("Loading d_items and selecting itemids where linksto == 'outputevents' ...")
d = pd.read_csv(ditems_file, dtype=str, usecols=['itemid','linksto','label','abbreviation'])
d['linksto'] = d['linksto'].fillna('').astype(str)
out_items = d.loc[d['linksto'].str.lower() == 'outputevents', 'itemid'].dropna().unique().tolist()
out_itemids = []
for iid in out_items:
    try:
        out_itemids.append(int(iid))
    except:
        pass
out_itemids = sorted(set(out_itemids))
print(f"Found {len(out_itemids)} outputevents itemids (sample):", out_itemids[:20])

# 3) اضافه کردن ستون‌های itemid به merged_initial در صورت نبودن
added = 0
for iid in out_itemids:
    col = str(iid)
    if col not in merged_initial.columns:
        merged_initial[col] = pd.Series([pd.NA] * n_rows, dtype="object")
        added += 1
print(f"Added {added} new columns to merged_initial for outputitems. Total columns now: {len(merged_initial.columns)}")

# 4) ساختن admit_map: (subject_id,hadm_id) -> admit_date (normalized)
print("Building admit_date map from merged_initial ...")
admit_map = merged_initial.groupby(['subject_id','hadm_id'], dropna=False)['admittime'].first().reset_index().rename(columns={'admittime':'admit_time'})
admit_map['admit_date'] = pd.to_datetime(admit_map['admit_time'], errors='coerce').dt.normalize()
admit_map['key'] = list(zip(admit_map['subject_id'].astype('Int64'), admit_map['hadm_id'].astype('Int64')))
admit_dict = dict(zip(admit_map['key'], admit_map['admit_date']))
print("Admit map entries:", len(admit_dict))

# 5) ساختن index map برای merged_initial: (subject_id,hadm_id,day_index) -> row index
print("Building merged_initial index map (for direct assignment) ...")
merged_initial_index_map = {}
for idx, row in merged_initial[['subject_id','hadm_id','day_index']].iterrows():
    try:
        key = (int(row['subject_id']), int(row['hadm_id']), int(row['day_index']))
        merged_initial_index_map[key] = idx
    except Exception:
        continue
print("Merged rows map size:", len(merged_initial_index_map))

# 6) خواندن outputevents چانک‌به‌چانک و نگاشت به day_index و aggregate
usecols = ['subject_id','hadm_id','stay_id','caregiver_id','charttime','storetime','itemid','value','valueuom']
reader = pd.read_csv(outputevents_file, usecols=usecols, parse_dates=['charttime','storetime'], chunksize=chunksize, low_memory=True)

total_assigned = 0
chunk_no = 0

for chunk in reader:
    chunk_no += 1
    print(f"\n--- Processing outputevents chunk {chunk_no} (rows: {len(chunk)}) ---")
    # itemid numeric
    chunk['itemid'] = pd.to_numeric(chunk['itemid'], errors='coerce').astype('Int64')
    chunk = chunk[chunk['itemid'].notna()]
    if chunk.empty:
        print("no itemids in this chunk")
        continue

    # صرفاً itemidهای مربوط به outputevents (بر اساس d_items)
    chunk = chunk[chunk['itemid'].isin(out_itemids)]
    if chunk.empty:
        print("no relevant output itemids in this chunk")
        continue

    # تبدیل شناسه‌ها به int برای lookup
    chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce').astype('Int64')
    chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce').astype('Int64')

    # lookup admit_date
    keys = list(zip(chunk['subject_id'].astype('Int64'), chunk['hadm_id'].astype('Int64')))
    chunk['admit_date'] = [admit_dict.get(k, pd.NaT) for k in keys]

    # حذف ردیف‌هایی که admission match ندارند
    chunk = chunk[chunk['admit_date'].notna()].copy()
    if chunk.empty:
        print("no rows with admit_date in this chunk")
        continue

    # محاسبه day_index (بر اساس charttime normalize)
    chunk['chart_date'] = pd.to_datetime(chunk['charttime'], errors='coerce').dt.normalize()
    chunk['day_index'] = (chunk['chart_date'] - chunk['admit_date']).dt.days.fillna(0).astype(int)
    chunk.loc[chunk['day_index'] < 0, 'day_index'] = 0

    # numeric parse: تلاش برای ساخت valuenum از value (ممکنه در فایل outputevents ستون valuenum نباشه)
    chunk['numeric_val'] = pd.to_numeric(chunk['value'], errors='coerce')

    # گروه‌بندی: برای هر (subject_id,hadm_id,day_index,itemid) 
    grp_keys = ['subject_id','hadm_id','day_index','itemid']

    # 6a) برای مقادیر عددی: جمع روزانه
    numeric_rows = chunk[chunk['numeric_val'].notna()].copy()
    if not numeric_rows.empty:
        grp_num = numeric_rows.groupby(grp_keys, as_index=False)['numeric_val'].sum().rename(columns={'numeric_val':'agg_value_num_sum'})
    else:
        grp_num = pd.DataFrame(columns=grp_keys + ['agg_value_num_sum'])

    # 6b) برای مقادیر غیرعددی یا برای مرجع متن آخرین مقدار متن/زمان
    chunk_sorted = chunk.sort_values('charttime')
    grp_last = chunk_sorted.groupby(grp_keys, as_index=False).last()[grp_keys + ['value','charttime']]
    grp_last = grp_last.rename(columns={'value':'agg_value_text','charttime':'agg_time_text'})

    # 6c) ادغام نتایج عددی و متنی
    merged_grps = pd.merge(grp_last, grp_num, on=grp_keys, how='left')

    # 6d) انتخاب مقدار نهایی: اگر جمع عددی موجود است از آن استفاده کن، وگرنه متن آخر
    def pick_final_val(row):
        if pd.notna(row.get('agg_value_num_sum')):
            return row['agg_value_num_sum']
        else:
            v = row.get('agg_value_text')
            if pd.isna(v) or str(v).strip() in ("nan","None","NoneType","NA","<NA>",""):
                return pd.NA
            return v

    merged_grps['final_value'] = merged_grps.apply(pick_final_val, axis=1)

    # 6e) تخصیص به merged_initial با استفاده از merged_initial_index_map و .at
    assigned = 0
    for _, r in merged_grps.iterrows():
        try:
            key = (int(r['subject_id']), int(r['hadm_id']), int(r['day_index']))
        except Exception:
            continue
        row_idx = merged_initial_index_map.get(key)
        if row_idx is None:
            continue
        itemid_col = str(int(r['itemid']))
        val = r['final_value']
        merged_initial.at[row_idx, itemid_col] = val
        assigned += 1

    total_assigned += assigned
    print(f"Chunk {chunk_no}: groups aggregated = {len(merged_grps)}, assigned = {assigned}, total_assigned so far = {total_assigned}")

    # پاکسازی
    del chunk, chunk_sorted, grp_last, grp_num, merged_grps, numeric_rows
    gc.collect()

print("\n--- ALL outputevents CHUNKS PROCESSED ---")
print("Total assigned outputevents cells:", total_assigned)

# 7) ذخیرهٔ خروجی نهایی (chunked write)
out_path = Path("merged_with_outputevents_filled.csv")
write_chunk = 20000
first = True
n_rows = len(merged_initial)
for start in range(0, n_rows, write_chunk):
    end = min(start + write_chunk, n_rows)
    merged_initial.iloc[start:end].to_csv(out_path, mode='w' if first else 'a', index=False, header=first)
    first = False
    print(f"Saved rows {start}-{end-1}")
print("Saved final to:", out_path)


In [None]:
# rename_outputevents_columns.py

pd.set_option('display.max_columns', 200)
pd.set_option('display.width', 140)

# فایل‌ها
ditems_path = Path("d_items.csv")
in_path = Path("merged_with_outputevents_filled.csv")
out_path = Path("merged_with_outputevents_filled_renamed.csv")

# پارامترها
chunksize = 20000
max_name_len = 80

# بررسی وجود فایل‌ها
if not ditems_path.exists():
    raise FileNotFoundError(f"{ditems_path} not found.")
if not in_path.exists():
    raise FileNotFoundError(f"{in_path} not found.")

# بارگذاری دیکشنری آیتم‌ها (فقط outputevents)
d = pd.read_csv(ditems_path, usecols=['itemid','label','abbreviation','linksto'], dtype=str)
d['linksto'] = d['linksto'].fillna('').astype(str)
d_out = d.loc[d['linksto'].str.lower() == 'outputevents'].copy()
d_out['itemid'] = d_out['itemid'].astype(str).str.strip()
d_out['label'] = d_out['label'].fillna('').astype(str).str.strip()
d_out['abbreviation'] = d_out['abbreviation'].fillna('').astype(str).str.strip()

# انتخاب نام نهایی (abbreviation اگر هست، وگرنه label، وگرنه fallback)
d_out['chosen'] = d_out.apply(lambda r: r['abbreviation'] if r['abbreviation']!='' else (r['label'] if r['label']!='' else ''), axis=1)

def sanitize_name(s):
    if pd.isna(s) or s is None:
        return ''
    s = str(s).strip()
    s = re.sub(r'\s+', '_', s)
    s = re.sub(r'[^\w\-]', '', s)
    s = re.sub(r'_+', '_', s)
    s = s[:max_name_len]
    return s

# ساخت map itemid -> chosen name (با تضمین یکتایی)
name_map = {}
used = set()
for _, row in d_out.iterrows():
    iid = row['itemid']
    chosen = row['chosen']
    if chosen == '':
        base = f"item_{iid}"
    else:
        base = sanitize_name(chosen)
        if base == '':
            base = f"item_{iid}"
    name = base
    if name in used:
        name = f"{base}__{iid}"
    counter = 1
    while name in used:
        name = f"{base}__{iid}_{counter}"
        counter += 1
    used.add(name)
    name_map[iid] = name

# آماده‌سازی header جدید
orig_header = pd.read_csv(in_path, nrows=0).columns.tolist()
new_header = []
conflicts = 0

for col in orig_header:
    new_col = col
    col_str = str(col).strip()
    if col_str in name_map:
        new_col = "outputevents_" + name_map[col_str]
    else:
        try:
            icol = str(int(float(col_str)))
            if icol in name_map:
                new_col = "outputevents_" + name_map[icol]
        except Exception:
            pass
    if new_col in new_header:
        conflicts += 1
        new_col = f"{new_col}__orig_{sanitize_name(col_str)}"
        k = 1
        while new_col in new_header:
            new_col = f"{new_col}_{k}"; k += 1
    new_header.append(new_col)

print(f"Prepared header mapping. Total cols: {len(orig_header)}, conflicts resolved: {conflicts}")
sample_map = {k: name_map[k] for k in list(name_map)[:10]}
print("sample itemid->name (first 10):", sample_map)

# نوشتن فایل با هدر جدید به صورت chunked
first = True
rows_written = 0
for i, chunk in enumerate(pd.read_csv(in_path, chunksize=chunksize, low_memory=False)):
    chunk.columns = new_header
    chunk.to_csv(out_path, mode='w' if first else 'a', index=False, header=first)
    first = False
    rows_written += len(chunk)
    print(f"Chunk {i+1}: wrote {len(chunk):,} rows (total {rows_written:,})")
    del chunk
    gc.collect()

print("✅ Done. Output saved to:", out_path)
print("Rows written:", rows_written)


In [None]:
merged_initial = pd.read_csv("admissions_expanded.csv")

In [None]:
merged_initial.shape

In [None]:
merged_with_outputevents_filled_renamed = pd.read_csv("merged_with_outputevents_filled_renamed.csv")

In [None]:
merged_with_outputevents_filled_renamed.shape

In [None]:
merged_with_outputevents_filled_renamed.head(50)

In [None]:
poe = pd.read_csv("poe.csv")

In [None]:
poe.head(10).to_csv('test_poe', index=False)

In [None]:
def safe_name(s):
    # make small safe filename-like token
    s = re.sub(r'[^0-9A-Za-z]+', '_', s)
    return s.strip('_')[:40] or 'file'

def side_by_side_merge(base_path, other_paths, out_path, chunksize=5000,
                       base_cols_keep=None, prefix_conflicts=True, verbose=True):
    base_path = Path(base_path)
    other_paths = [Path(p) for p in other_paths]
    out_path = Path(out_path)

    # read base header
    base_header = pd.read_csv(base_path, nrows=0).columns.tolist()
    if base_cols_keep is None:
        base_cols_keep = base_header  # keep them as canonical

    base_cols_keep = list(base_cols_keep)

    # Pre-open readers for all files
    base_reader = pd.read_csv(base_path, chunksize=chunksize, dtype=str, low_memory=True)
    other_readers = {}
    other_headers = {}
    for p in other_paths:
        other_readers[p] = pd.read_csv(p, chunksize=chunksize, dtype=str, low_memory=True)
        other_headers[p] = pd.read_csv(p, nrows=0).columns.tolist()

    first_out = True
    chunk_i = 0

    try:
        while True:
            chunk_i += 1
            base_chunk = next(base_reader)  # may raise StopIteration -> finished
            # keep only canonical base columns (in case file has extra)
            base_chunk = base_chunk.loc[:, [c for c in base_cols_keep if c in base_chunk.columns]]

            out_chunk = base_chunk.copy()

            # drop typical unnamed index columns if present
            drop_unnamed = [c for c in out_chunk.columns if str(c).startswith('Unnamed:') or str(c).strip()=='' ]
            if drop_unnamed:
                out_chunk = out_chunk.drop(columns=drop_unnamed, errors='ignore')

            for p, reader in other_readers.items():
                try:
                    other_chunk = next(reader)
                except StopIteration:
                    raise RuntimeError(f"File {p} ended earlier than base file at chunk {chunk_i}. Row counts differ!")

                # drop unnamed index columns
                other_chunk = other_chunk.loc[:, [c for c in other_chunk.columns if not (str(c).startswith('Unnamed:') or str(c).strip()=='')]]

                # remove base columns duplicated in this other file
                overlap = [c for c in base_cols_keep if c in other_chunk.columns]
                if overlap:
                    other_chunk = other_chunk.drop(columns=overlap, errors='ignore')

                # If any remaining columns collide with out_chunk columns, rename them
                collisions = [c for c in other_chunk.columns if c in out_chunk.columns]
                if collisions:
                    safe = safe_name(p.name)
                    new_names = {}
                    for c in collisions:
                        if prefix_conflicts:
                            new_names[c] = f"{c}__from_{safe}"
                        else:
                            # fallback: append file suffix to make unique
                            new_names[c] = f"{c}__{safe}"
                    other_chunk = other_chunk.rename(columns=new_names)

                # As a final safety, if other_chunk has more/less rows than base_chunk, check lengths
                if len(other_chunk) != len(out_chunk):
                    # If lengths mismatch within chunk, that's fatal for exact alignment
                    raise RuntimeError(
                        f"Row-count mismatch in chunk {chunk_i} for file {p.name}: "
                        f"base_chunk rows={len(out_chunk)}, other_chunk rows={len(other_chunk)}. "
                        "Ensure files are aligned and have same row order/count."
                    )

                # concatenate horizontally (preserve column order: base then others incrementally)
                out_chunk = pd.concat([out_chunk.reset_index(drop=True), other_chunk.reset_index(drop=True)], axis=1)

            # write out_chunk to CSV (append after first)
            if first_out:
                out_chunk.to_csv(out_path, index=False, mode='w', header=True)
                first_out = False
            else:
                out_chunk.to_csv(out_path, index=False, mode='a', header=False)

            if verbose:
                print(f"Chunk {chunk_i}: wrote {len(out_chunk)} rows, total cols now: {len(out_chunk.columns)}")

    except StopIteration:
        # base finished normally; ensure all other_readers are also finished
        pass

    # verify other readers exhausted
    for p, reader in other_readers.items():
        try:
            next(reader)
            raise RuntimeError(f"File {p} has MORE rows than base file — row counts differ.")
        except StopIteration:
            # good, finished
            if verbose:
                print(f"Verified {p.name} finished (same length as base).")

    if verbose:
        print("Merge complete. Output saved to:", out_path)


if __name__ == "__main__":
    # ----- CONFIGURE THESE -----
    base = "admissions_expanded.csv"   # canonical base
    others = [
        "merged_with_icu.csv",
        "merged_with_vanco.csv",
        "merged_with_chartevents_filled_renamed.csv", # تا این چک شد 
        "merged_with_diagnoses.csv",
        "merged_with_procedures.csv",
        "merged_with_drg.csv",
        "merged_with_transfers_services.csv",
        "merged_with_medications.csv",
        "merged_with_inputs_procs.csv",
        "merged_with_microbiologyevents.csv",
        "merged_with_datetimeevents_filled_renamed.csv",
        "merged_with_outputevents_filled_renamed.csv",
        
        # add other merged_*.csv files here in the order you want columns appended
    ]
    out = "final_merged_admission_day.csv"
    CHUNK = 5000   # tune down/up depending on memory
    PREF_CONFLICT = True
    # --------------------------

    # you can also call side_by_side_merge() from another script and pass different args
    side_by_side_merge(base, others, out, chunksize=CHUNK, prefix_conflicts=PREF_CONFLICT, verbose=True)


In [None]:
final_merged_admission_day = pd.read_csv("final_merged_admission_day.csv")

In [None]:
final_merged_admission_day.shape

# end

In [None]:
poe = pd.read_csv("poe.csv")

In [None]:
poe.head(10).to_csv('test_poe', index=False)

In [None]:
# ---------------- CONFIG ----------------
BASE = Path(".")
MERGED_INITIAL_PATH = BASE / "admissions_expanded.csv"
POE_PATH = BASE / "poe.csv"

AGG_POE_PATH = BASE / "agg_poe_daily.csv"
OUT_MERGED_POE = BASE / "merged_with_poe.csv"

SRC_CHUNKSIZE = 200_000   # read size for poe.csv
WRITE_CHUNK = 20_000      # rows of merged_initial processed per write
MATCH_CHUNK = 200_000     # chunk size when scanning agg csvs for matches
# ----------------------------------------

# ---------- helpers ----------
def nowstr(): return time.strftime("%Y-%m-%d %H:%M:%S")
def parse_numeric(x):
    if pd.isna(x): return np.nan
    try:
        if isinstance(x,(int,float,np.number)): return float(x)
        s = str(x).strip().replace(',','')
        if s in ("","NaN","nan","None","none","___"): return np.nan
        return float(s)
    except:
        return np.nan

def read_header(path: Path):
    if not path.exists(): return []
    return pd.read_csv(path, nrows=0).columns.tolist()

def build_admit_map(merged_initial_path):
    print(f"[{nowstr()}] Build admit_map (light read)...")
    mi = pd.read_csv(merged_initial_path, usecols=['subject_id','hadm_id','admittime'], parse_dates=['admittime'], low_memory=False)
    mi['subject_id'] = pd.to_numeric(mi['subject_id'], errors='coerce').astype('Int64')
    mi['hadm_id'] = pd.to_numeric(mi['hadm_id'], errors='coerce').astype('Int64')
    adm = mi.groupby(['subject_id','hadm_id'], dropna=False)['admittime'].first().reset_index().rename(columns={'admittime':'admit_time'})
    adm['admit_date'] = pd.to_datetime(adm['admit_time']).dt.normalize()
    d = dict(zip(zip(adm['subject_id'].astype(int), adm['hadm_id'].astype(int)), adm['admit_date']))
    print(f"[{nowstr()}] admit_map keys: {len(d)}")
    del mi, adm; gc.collect()
    return d

def key_str(s,h,d): return f"{int(s)}|{int(h)}|{int(d)}"
def key_series_from_df(df):
    s = pd.to_numeric(df['subject_id'], errors='coerce').fillna(-1).astype(int).astype(str)
    h = pd.to_numeric(df['hadm_id'], errors='coerce').fillna(-1).astype(int).astype(str)
    d = pd.to_numeric(df['day_index'], errors='coerce').fillna(-1).astype(int).astype(str)
    return s + '|' + h + '|' + d

def _choose_agg_for_col(col_name):
    name = col_name.lower()
    if name in ('subject_id','hadm_id','day_index'):
        return 'first'
    if 'count' in name or 'total' in name or name.endswith('_sum'):
        return 'sum'
    if 'max' in name or name.endswith('_max'):
        return 'max'
    if 'amount' in name:
        return 'sum'
    if 'rate' in name or name.endswith('_num') or 'val' in name or 'value' in name or 'ordertime' in name:
        # for ordertime we want min/max, but groupby agg requires consistent functions:
        # ordetime we'll handle separately; here default to 'max' for numeric-like
        return 'max'
    return lambda s: s.dropna().iloc[-1] if s.dropna().shape[0] > 0 else pd.NA

def collect_matching_rows(agg_path, keys_set, usecols=None, parse_dates=None, chunksize=MATCH_CHUNK):
    """
    Read agg_path in chunks and collect rows matching keys_set.
    Then collapse duplicate keys via heuristics and return unique-key dataframe.
    """
    if not agg_path.exists() or not keys_set:
        return pd.DataFrame(columns=(usecols or []))

    keys_s = set(key_str(s,h,d) for (s,h,d) in keys_set)
    matches = []
    cols_seen = None
    for chunk in pd.read_csv(agg_path, usecols=usecols, parse_dates=parse_dates, chunksize=chunksize, low_memory=True):
        if not {'subject_id','hadm_id','day_index'}.issubset(set(chunk.columns)):
            continue
        chunk['subject_id'] = pd.to_numeric(chunk['subject_id'], errors='coerce').astype('Int64')
        chunk['hadm_id'] = pd.to_numeric(chunk['hadm_id'], errors='coerce').astype('Int64')
        chunk['day_index'] = pd.to_numeric(chunk['day_index'], errors='coerce').astype('Int64')

        ks = key_series_from_df(chunk)
        mask = ks.isin(keys_s)
        sel = chunk.loc[mask]
        if not sel.empty:
            matches.append(sel)
            if cols_seen is None:
                cols_seen = sel.columns.tolist()
        del chunk, ks, mask; gc.collect()

    if not matches:
        return pd.DataFrame(columns=(usecols or []))

    df = pd.concat(matches, ignore_index=True)
    # ensure types
    df['subject_id'] = pd.to_numeric(df['subject_id'], errors='coerce').astype('Int64')
    df['hadm_id'] = pd.to_numeric(df['hadm_id'], errors='coerce').astype('Int64')
    df['day_index'] = pd.to_numeric(df['day_index'], errors='coerce').astype('Int64')

    # choose agg per column
    agg_dict = {}
    for col in df.columns:
        agg_dict[col] = _choose_agg_for_col(col)

    # group and aggregate
    grouped = df.groupby(['subject_id','hadm_id','day_index'], as_index=False).agg(agg_dict)

    # post-process ordertime columns if present: compute min/max from original matches if needed
    # (In our AGG_POE we included first/last ordertime already; if not, this block can be extended.)

    grouped['subject_id'] = pd.to_numeric(grouped['subject_id'], errors='coerce').astype('Int64')
    grouped['hadm_id'] = pd.to_numeric(grouped['hadm_id'], errors='coerce').astype('Int64')
    grouped['day_index'] = pd.to_numeric(grouped['day_index'], errors='coerce').astype('Int64')

    return grouped

# ---------- prepare admit_map ----------
admit_dict = build_admit_map(MERGED_INITIAL_PATH)

# ---------- Phase A: aggregate poe.csv -> AGG_POE_PATH ----------
print(f"[{nowstr()}] Phase A: aggregate poe -> {AGG_POE_PATH}")
if AGG_POE_PATH.exists(): AGG_POE_PATH.unlink()
hdr = read_header(POE_PATH)
# choose columns we expect; safe to include extras if present
use_want = ['poe_id','poe_seq','subject_id','hadm_id','ordertime','order_type','order_subtype','transaction_type','discontinue_of_poe_id','discontinued_by_poe_id','order_provider_id','order_status']
usecols = [c for c in use_want if c in hdr]
parse_dates = ['ordertime'] if 'ordertime' in usecols else []

first_out=True; total=0
for i, chunk in enumerate(pd.read_csv(POE_PATH, usecols=usecols, parse_dates=parse_dates, chunksize=SRC_CHUNKSIZE, low_memory=True), start=1):
    t0=time.time(); print(f"[{nowstr()}] poe chunk {i} rows={len(chunk):,}")
    # normalize ids
    if 'subject_id' not in chunk.columns or 'hadm_id' not in chunk.columns:
        print("ERROR: poe missing subject_id or hadm_id columns. Aborting.")
        raise SystemExit(1)
    chunk['subject_id']=pd.to_numeric(chunk['subject_id'],errors='coerce').astype('Int64')
    chunk['hadm_id']=pd.to_numeric(chunk['hadm_id'],errors='coerce').astype('Int64')

    # map admit_date
    keys = list(zip(chunk['subject_id'].astype('Int64').astype(object), chunk['hadm_id'].astype('Int64').astype(object)))
    chunk['admit_date'] = [admit_dict.get((int(s), int(h)), pd.NaT) if not (pd.isna(s) or pd.isna(h)) else pd.NaT for s,h in keys]

    before = len(chunk)
    chunk = chunk[chunk['admit_date'].notna()].copy()
    dropped = before - len(chunk)
    if dropped: print(f"  -> dropped {dropped:,} poe rows w/o admit")

    if chunk.empty:
        del chunk; gc.collect(); continue

    # event_time and day_index: use ordertime
    chunk['event_time'] = pd.to_datetime(chunk.get('ordertime', pd.NaT), errors='coerce')
    chunk['chart_date'] = chunk['event_time'].dt.normalize()
    chunk['day_index'] = (chunk['chart_date'] - chunk['admit_date']).dt.days.fillna(0).astype(int)
    chunk.loc[chunk['day_index'] < 0, 'day_index'] = 0

    # ensure provider and status columns exist
    if 'order_provider_id' not in chunk.columns:
        chunk['order_provider_id'] = pd.NA
    if 'order_status' not in chunk.columns:
        chunk['order_status'] = pd.NA
    if 'order_type' not in chunk.columns:
        chunk['order_type'] = pd.NA
    if 'order_subtype' not in chunk.columns:
        chunk['order_subtype'] = pd.NA
    if 'transaction_type' not in chunk.columns:
        chunk['transaction_type'] = pd.NA

    # string/text normalization
    chunk['order_provider_id'] = chunk['order_provider_id'].astype(str).replace('nan','').replace('None','').replace('NoneType','').fillna(pd.NA)
    chunk['order_status'] = chunk['order_status'].astype(str).replace('nan','').replace('None','').fillna(pd.NA)
    chunk['order_type'] = chunk['order_type'].astype(str).replace('nan','').replace('None','').fillna(pd.NA)
    chunk['order_subtype'] = chunk['order_subtype'].astype(str).replace('nan','').replace('None','').fillna(pd.NA)
    chunk['transaction_type'] = chunk['transaction_type'].astype(str).replace('nan','').replace('None','').fillna(pd.NA)

    # Aggregations per chunk (per day)
    agg = chunk.groupby(['subject_id','hadm_id','day_index'], as_index=False).agg(
        poe_events_count = ('poe_id','count'),
        poe_first_ordertime = ('event_time','min'),
        poe_last_ordertime = ('event_time','max'),
        poe_first_order_provider = ('order_provider_id', lambda s: s.dropna().iloc[0] if s.dropna().shape[0]>0 else pd.NA),
        poe_last_order_provider = ('order_provider_id', lambda s: s.dropna().iloc[-1] if s.dropna().shape[0]>0 else pd.NA),
        poe_last_order_type = ('order_type', lambda s: s.dropna().astype(str).iloc[-1] if s.dropna().shape[0]>0 else pd.NA),
        poe_last_order_subtype = ('order_subtype', lambda s: s.dropna().astype(str).iloc[-1] if s.dropna().shape[0]>0 else pd.NA),
        poe_last_transaction_type = ('transaction_type', lambda s: s.dropna().astype(str).iloc[-1] if s.dropna().shape[0]>0 else pd.NA),
        poe_last_order_status = ('order_status', lambda s: s.dropna().astype(str).iloc[-1] if s.dropna().shape[0]>0 else pd.NA)
    )

    # ensure key dtypes
    agg['subject_id']=agg['subject_id'].astype('Int64')
    agg['hadm_id']=agg['hadm_id'].astype('Int64')
    agg['day_index']=agg['day_index'].astype('Int64')

    # write chunked aggregated rows (may produce multiple rows per key across chunks; will collapse on merge)
    agg.to_csv(AGG_POE_PATH, mode='w' if first_out else 'a', index=False, header=first_out)
    first_out=False
    total += len(agg)
    print(f"[{nowstr()}] poe chunk {i} -> wrote {len(agg):,} agg rows (total appended {total:,}) took {time.time()-t0:.1f}s")
    del chunk, agg; gc.collect()

print(f"[{nowstr()}] POE aggregation done. raw agg rows appended: {total}")

# ---------- Phase B: merge aggregated poe into merged_initial in chunks ----------
print(f"[{nowstr()}] Phase B: merge aggregated poe into {OUT_MERGED_POE} in chunks (write_chunk={WRITE_CHUNK})")
if OUT_MERGED_POE.exists(): OUT_MERGED_POE.unlink()
first_write = True
mchunk_no = 0

for mchunk in pd.read_csv(MERGED_INITIAL_PATH, chunksize=WRITE_CHUNK, parse_dates=['admittime','dischtime','deathtime','edregtime','edouttime'], low_memory=False):
    mchunk_no += 1
    t0 = time.time()
    print(f"[{nowstr()}] merged chunk {mchunk_no} rows={len(mchunk):,}")
    mchunk['subject_id'] = pd.to_numeric(mchunk['subject_id'], errors='coerce').astype('Int64')
    mchunk['hadm_id'] = pd.to_numeric(mchunk['hadm_id'], errors='coerce').astype('Int64')
    mchunk['day_index'] = pd.to_numeric(mchunk['day_index'], errors='coerce').astype('Int64')

    keys = set((int(r['subject_id']), int(r['hadm_id']), int(r['day_index'])) for _,r in mchunk[['subject_id','hadm_id','day_index']].iterrows())

    # collect matching rows from AGG_POE_PATH and collapse duplicates per key
    poe_match = collect_matching_rows(AGG_POE_PATH, keys, usecols=None, parse_dates=['poe_first_ordertime','poe_last_ordertime']) if AGG_POE_PATH.exists() else pd.DataFrame()

    if not poe_match.empty:
        # note: parse_dates argument above may or may not have effect since agg csv stores ISO timestamps;
        # ensure datetime dtype for ordertime cols
        if 'poe_first_ordertime' in poe_match.columns:
            poe_match['poe_first_ordertime'] = pd.to_datetime(poe_match['poe_first_ordertime'], errors='coerce')
        if 'poe_last_ordertime' in poe_match.columns:
            poe_match['poe_last_ordertime'] = pd.to_datetime(poe_match['poe_last_ordertime'], errors='coerce')

        mchunk = mchunk.merge(poe_match, on=['subject_id','hadm_id','day_index'], how='left')

    # write chunk
    mchunk.to_csv(OUT_MERGED_POE, mode='w' if first_write else 'a', index=False, header=first_write)
    first_write = False
    print(f"[{nowstr()}] Written merged chunk {mchunk_no} (took {time.time()-t0:.1f}s)")
    del mchunk, poe_match; gc.collect()

print(f"[{nowstr()}] Done. Output: {OUT_MERGED_POE}")


In [None]:
merged_initial = pd.read_csv("admissions_expanded.csv")

In [None]:
merged_initial.shape

In [None]:
merged_with_poe = pd.read_csv("merged_with_poe.csv")

In [None]:
merged_with_poe.shape

In [None]:
merged_with_poe.head(5)

In [None]:
merged_with_poe.head(5).to_csv('merged_with_poe_test', index=False)