In [2]:
from database.MongoDBConnector import MongoDBConnector
from database.SQLDBConnector import SQLDBConnector
from utils.data import process_row, bmi_choose_weight_kg

import os
import json
import pandas as pd
from tqdm.auto import tqdm

from concurrent.futures import ProcessPoolExecutor, as_completed

In [3]:
mongo   = MongoDBConnector(mode='remote')
sql     = SQLDBConnector()

In [4]:
QUERY = """
SELECT
uu.name,
u.mobile,
uu.age,
uu.height,
uu.old_weight,
uu.expected_born_date AS edd,
FROM_UNIXTIME(uu.end_born_ts) AS `add`,
mm.record_type,
mm.record_answer
FROM extant_future_user.user AS u
JOIN extant_future_user.user_detail AS uu ON u.id = uu.uid
LEFT JOIN extant_future_user.medical_record AS mm ON uu.uid = mm.user_id AND mm.record_type IN (1, 2, 4, 5, 8, 13)
WHERE
u.mobile
IN
({mobile_query_str})
"""

In [5]:
hist_measurements = await mongo.get_all_documents(
    "filt_hist",
    projection={
        "_id"               : 1,
        "mobile"            : 1,
        "uc"                : 1,
        "fhr"               : 1,
        "fmov"              : 1,
        "gest_age"          : 1,
        "measurement_date"  : 1,
        "start_test_ts"     : 1
    }
)

  return Cursor(self, *args, **kwargs)


In [6]:
unique_mobiles = set([i['mobile'] for i in hist_measurements])
len(unique_mobiles)

mobile_str = ",".join([f"'{i}'" for i in unique_mobiles])

In [11]:
filt_hist_df = sql.query_to_dataframe(query=QUERY.format(mobile_query_str=mobile_str))
filt_hist_pivot = filt_hist_df.pivot(
    index=[i for i in filt_hist_df.columns if i not in ['record_type', 'record_answer']],
    columns='record_type',
    values='record_answer'
).reset_index()

SSH tunnel started on port 62427


In [12]:
hist_metadata = []
for _, row in filt_hist_pivot.iterrows():

    # 0='0 pregnancies', 1='1 pregnancies', 2='2 pregnancies', 3='>2 pregnancies'
    # Count current pregnancy as well so treat 0 and 1 as same
    preg_count  = row[1.0]
    # 0='有', 1='无', 2='未知'
    had_misc    = row[2.0]
    gdm         = row[4.0]
    pih         = row[5.0]
    had_preterm = row[8.0]
    had_surgery = row[13.0]

    bmi = bmi_choose_weight_kg(
        height_cm = row['height'],
        weight_val = row['old_weight']
    )

    record = {
        'mobile'        : row['mobile'],
        'age'           : int(row['age']) if pd.notna(row['age']) else None,
        'bmi'           : bmi if pd.notna(bmi) else None,
        'edd'           : row['edd'].strftime("%Y-%m-%d") if pd.notna(row['edd']) else None,
        'had_pregnancy' : 1 if (preg_count > 1) else 0,
        'had_preterm'   : 1 if had_preterm == 0 else 0,
        'had_surgery'   : 1 if had_surgery == 0 else 0,
        'gdm'           : 1 if gdm == 0 else 0,
        'pih'           : 1 if pih == 0 else 0,
        'add'           : row['add'].to_pydatetime().strftime("%Y-%m-%d %H:%M"),
        'type'          : 'hist',
    }

    hist_metadata.append(record)

NameError: name 'bmi_choose_weight_kg' is not defined

In [None]:
measurements_df = pd.DataFrame(hist_measurements)
patients_df     = pd.DataFrame(hist_metadata)

merged = pd.merge(
    measurements_df,
    patients_df,
    on="mobile",
    how="inner"
)

In [None]:
combined_data_add = []

for idx, row in merged.iterrows():

    # start_test_ts > start_ts always
    # start_ts indicates start time of finding FHR
    # start_test_ts indicates when FHR has been found
    # earliest_measurement = row["date_joined"]
    #
    # if row["measurement_date"] < earliest_measurement:
    #     continue

    record = {
        "_id"               : row["_id"],               # filt
        "mobile"            : row["mobile"],            # filt/unified
        "uc"                : row["uc"],                # filt
        "fhr"               : row["fhr"],               # filt
        "fmov"              : row["fmov"],              # filt
        "gest_age"          : row["gest_age"],          # filt
        "measurement_date"  : row["measurement_date"],  # filt
        "start_test_ts"     : row["start_test_ts"],     # filt
        "age"               : row["age"],               # unified
        "bmi"               : row["bmi"],               # unified
        "had_pregnancy"     : row["had_pregnancy"],     # unified
        "had_preterm"       : row["had_preterm"],       # unified
        "had_surgery"       : row["had_surgery"],       # unified
        "gdm"               : row["gdm"],               # unified
        "pih"               : row["pih"],               # unified
    }

    # unified (nullable)
    if row["add"] is not None:
        record["add"] = row["add"]
        combined_data_add.append(record)

print(len(combined_data_add), "Measurements")

In [3]:
dataset = []
with ProcessPoolExecutor(max_workers=max(1, os.cpu_count()-1)) as executor:

    futures = [executor.submit(process_row, row, "add") for row in combined_data_add]

    for fut in tqdm(as_completed(futures), total=len(futures)):

        rec = fut.result()
        if rec is not None:
            dataset.append(rec)

  out[i] = np.nanmedian(w)
  out[i] = np.nanmedian(w)
  out[i] = np.nanmedian(w)
  out[i] = np.nanmedian(w)
  out[i] = np.nanmedian(w)
  out[i] = np.nanmedian(w)
  out[i] = np.nanmedian(w)
  out[i] = np.nanmedian(w)
  out[i] = np.nanmedian(w)
  out[i] = np.nanmedian(w)
  out[i] = np.nanmedian(w)
  x[~np.isfinite(x)] = np.nanmedian(x)
  med = np.nanmedian(x)
  x[~np.isfinite(x)] = np.nanmedian(x)
  med = np.nanmedian(x)
  x[~np.isfinite(x)] = np.nanmedian(x)
  med = np.nanmedian(x)
  x[~np.isfinite(x)] = np.nanmedian(x)
  med = np.nanmedian(x)
  x[~np.isfinite(x)] = np.nanmedian(x)
  med = np.nanmedian(x)
  x[~np.isfinite(x)] = np.nanmedian(x)
  med = np.nanmedian(x)
  x[~np.isfinite(x)] = np.nanmedian(x)
  med = np.nanmedian(x)
  x[~np.isfinite(x)] = np.nanmedian(x)
  med = np.nanmedian(x)
  x[~np.isfinite(x)] = np.nanmedian(x)
  med = np.nanmedian(x)
  x[~np.isfinite(x)] = np.nanmedian(x)
  med = np.nanmedian(x)
100%|██████████| 21405/21405 [04:15<00:00, 83.75it/s] 


In [4]:
with open("../datasets/dataset_hist.json", "w") as f:
    json.dump(dataset, f)