# Import

In [1]:
import pandas as pd
import polars as pl
import multiprocessing as mp
!pip install pyreadstat
import pyreadstat
import time




[notice] A new release of pip is available: 23.2.1 -> 23.3
[notice] To update, run: C:\Users\lam yukyin\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


# Function

In [2]:
def phy_clean(df):
    ADL_IADL = [col for col in df.columns if 'PFQ061' in col]
    target_columns = ['SEQN'] + ADL_IADL
    df = df.select(target_columns)
    replace_word = {5: "does not do this activity", 7: "refused", 9: "don't know"}
    #--------------------------------------------------------------------------------
    df = df.with_columns(pl.col(ADL_IADL).map_dict(replace_word, default=pl.first()))
    #--------------------------------------------------------------------------------
    df = df.with_columns(pl.col(ADL_IADL).cast(pl.Float64, strict=False))
    df = df.drop_nulls()
    func_score_arr = df.select(pl.all().exclude("SEQN")).sum(axis=1)
    df = df.with_columns(pl.Series(name="func_score", values=func_score_arr)) 

    imp_count = (df[ADL_IADL] == 4).sum(axis=1)
    df = df.with_columns(pl.Series(name="imp_cnt", values=imp_count)) 

    for i in range(1, 21):
        col = f'impaired_{i}'
        df = df.with_columns(pl.when(pl.col("imp_cnt")==i).then(1).otherwise(0).alias(col))

    df = df.drop(ADL_IADL, "imp_cnt")
    #--------------------------------------------------------------------------------
    # prefix = "PFQ"
    # replacement_prefix = "impaired_"

    # column_mapping = {
    #     col: replacement_prefix + str(i)
    #     for i, col in enumerate(df.columns)
    #     if col.startswith(prefix)
    # }

    # df = df.with_columns(
    # pl.when(pl.col(ADL_IADL) == None)
    #     .then(1)
    #     .otherwise(0)
    #     .keep_name()
    # )
    # #--------------------------------------------------------------------------------
    # df = df.rename(column_mapping)
    return df

def demo_clean(df):
    df = df.select(['SEQN', 'RIAGENDR', 'RIDAGEYR'])
    return df

def merge(df1, df2):
    df1 = df1.join(df2, on="SEQN")
    front = ['SEQN', 'RIDAGEYR', 'RIAGENDR', 'func_score']
    column_order =  front + [col for col in df1.columns if col not in front]
    df1 = df1.select(column_order)
    df1 = df1.with_columns(df1[front].cast(pl.Int64))
    
    return df1

In [3]:
def read_sas(fpath, row, max_processors):
    df,_ = pyreadstat.read_file_multiprocessing(pyreadstat.read_xport, fpath, num_processes=max_processors, num_rows = row)
    pldf = pl.from_pandas(df)
    return pldf

# Main

In [4]:
data_dir = r"A:\fyp\data\\"

In [5]:
dmeo1112 = pl.from_pandas(pd.read_sas(data_dir + "DEMO_G.XPT"))
dmeo1314 = pl.from_pandas(pd.read_sas(data_dir +"DEMO_H.XPT"))
phy1112 = pl.from_pandas(pd.read_sas(data_dir +"PFQ_G.XPT"))
phy1314 = pl.from_pandas(pd.read_sas(data_dir +"PFQ_H.XPT"))

In [6]:
dmeo1112 = demo_clean(dmeo1112)
dmeo1314 = demo_clean(dmeo1314)
phy1112 = phy_clean(phy1112)
phy1314 = phy_clean(phy1314)

df1112 = merge(phy1112, dmeo1112)
df1314 = merge(phy1314, dmeo1314)

In [7]:
df = df1112.vstack(df1314)

In [8]:
if __name__ == "__main__":
    ids_nonacc = df['SEQN'].unique()
    
    start = time.time()
    acc1112 = read_sas(data_dir +"PAXMIN_G.XPT", 78126856, mp.cpu_count())
    print(f"Process Time: {time.time() - start} sec")
    acc1112_header = pl.from_pandas(pd.read_sas(data_dir +"PAXHD_G.XPT", encoding='latin1'))

    start = time.time()
    acc1314 = read_sas(data_dir +"PAXMIN_H.XPT", 88223479, mp.cpu_count())
    print(f"Process Time: {time.time() - start} sec")
    acc1314_header = pl.from_pandas(pd.read_sas(data_dir +"PAXHD_H.XPT", encoding='latin1'))

Process Time: 385.5133948326111 sec
Process Time: 456.07867765426636 sec


In [19]:
def check(acc, header, ids_nonacc, path):

    acc = acc.filter(pl.col("SEQN").is_in(ids_nonacc)) \
        .select(pl.all().exclude(["PAXFLGSM", "PAXQFM"])) \
        .with_columns(
            pl.when(pl.col("PAXMTSM") == -0.01). \
            then(0). \
            otherwise(pl.col("PAXMTSM")).alias("PAXMTSM")
        )
    
    header = header.filter(
        pl.col("SEQN").is_in(ids_nonacc)) \
        .select(pl.all().exclude(["PAXSENID", "PAXSTS"])
    )

    acc_ids = acc['SEQN'].unique()
    n_iter = 1
    days_3 = 1440 * 3

    for ids11_acc_i in acc_ids.to_list():
        # Join accelerometry with header df
        acc_temp = acc.filter(pl.col("SEQN") == ids11_acc_i) \
            .join(header.filter(pl.col("SEQN") == ids11_acc_i), on="SEQN")

        # Check exclusion criteria
        acc_temp = acc_temp.with_columns(pl.col("PAXPREDM").cast(pl.Int64))
        if  len(acc_temp.filter((pl.col("PAXPREDM") == 1) | (pl.col("PAXPREDM") == 2)))< days_3:
            print(f"Invalid data for ID {ids11_acc_i}")
        else:
            # Save the individual file
            df_path = path+ f"{ids11_acc_i}-accelerometry.csv"
            acc_temp.write_csv(df_path)

        print(f"{n_iter} of {len(acc)} files processed")
        n_iter += 1
    

In [20]:
path = f"A:\\fyp\\output\\"
check(acc1112, acc1112_header, ids_nonacc, path)
check(acc1314, acc1314_header, ids_nonacc, path)

1 of 15754721 files processed
2 of 15754721 files processed
3 of 15754721 files processed
4 of 15754721 files processed
5 of 15754721 files processed
Invalid data for ID 62221.0
6 of 15754721 files processed
7 of 15754721 files processed
8 of 15754721 files processed
9 of 15754721 files processed
10 of 15754721 files processed
11 of 15754721 files processed
12 of 15754721 files processed
13 of 15754721 files processed
14 of 15754721 files processed
15 of 15754721 files processed
16 of 15754721 files processed
17 of 15754721 files processed
18 of 15754721 files processed
19 of 15754721 files processed
20 of 15754721 files processed
21 of 15754721 files processed
22 of 15754721 files processed
23 of 15754721 files processed
24 of 15754721 files processed
Invalid data for ID 62324.0
25 of 15754721 files processed
26 of 15754721 files processed
27 of 15754721 files processed
28 of 15754721 files processed
29 of 15754721 files processed
30 of 15754721 files processed
31 of 15754721 files pr