# Daly Weights, inflow and outflow of ICD-10 - dalys: Panel creation

This script runs a `sql` code to create a panel data, based on the `case_records_current` table, where we have each `ICD-10` member change of status, and the link between the status change with change in daly weights. 
The calculation involves a lot of cleaning,   such as desconsidering impetous changes of status or *cancelled* icds status as the first status input, etc, we leave that for the sql code, and here is just the script to 
run it and export to the redshift.

Steps:

    - run this notebook to upload to Redshift;
    
    - use the new panel to make calculations, new tables, dashes...
***

## 0. Setup

In [3]:
#connection to redshift
from redshift_import_export.interfaces.data_interactor import DataInteractor
di = DataInteractor()
import requests


#standard packages
import warnings
warnings.filterwarnings('ignore')
import os
import collections
import pandas as pd
import numpy as np



#time packages
import datetime as dt

## 1. Import

In [4]:
df = di.redshift.run_sql_query(os.path.abspath(os.path.join(os.getcwd(), os.pardir)) + '/data/queries/daly_lower_bounds_weights_member_cid_panel_table.sql')
df.head(10)

DatabaseError: Execution failed on sql 'WITH 
---get the icd-10 registries at Alice
cdc_df AS (
    SELECT 
        crc.person_internal_code AS member_internal_code,
        crc.case_id,
        crc.sex,
        crc.age,
        crc.disease_code_value,
        crc.status,
        crc.added_at::timestamp as added_at,
        crc.severity
    FROM
        "public"."case_record_current" AS crc
    WHERE
        crc.status <> 'PENDING'
        AND crc.status <> 'CONTEMPLATING'
        AND crc.disease_code_type = 'CID_10'
),
cdc_df_distinct as(
    SELECT DISTINCT
        cdc_df.member_internal_code,
        cdc_df.case_id,
        cdc_df.added_at::timestamp as added_at,
        cdc_df.disease_code_value,
        cdc_df.status,
        cdc_df.severity
    FROM 
        cdc_df
    
    ), 
--FROM cte to cte6, we build a table that, for each case_id, we get
-- the order of updates. That way, we can compare a status change to
-- the previous status change and see if it is an improvement or not.
cte AS(
    SELECT
        DISTINCT cdc_df.member_internal_code,
        cdc_df.case_id,
        cdc_df.added_at :: timestamp AS added_at,
        ROW_NUMBER() OVER (PARTITION BY member_internal_code,case_id ORDER BY added_at) AS sort
    FROM
        cdc_df
),
-- Here we don't want to get weird updates, such as those in between 1 minute.
cte2 AS -- Let's find the status updt of more than 1hour of difference betw
(
    SELECT
        cte.*,
        CASE
            WHEN DATEDIFF('hour', cte_1.added_at, cte.added_at) <= 1 THEN 0
            ELSE 1
        END AS GrpType
    FROM
        cte
    LEFT OUTER JOIN cte AS cte_1 
        ON cte.sort = cte_1.sort + 1
        AND cte.member_internal_code = cte_1.member_internal_code
        AND cte.case_id = cte_1.case_id
),
cte3 AS -- assign a Sequence id for each update within each case_id.
(
    SELECT
        member_internal_code,
        case_id,
        GrpType,
        added_at,
        ROW_NUMBER() OVER(PARTITION BY member_internal_code,case_id ORDER BY added_at) SeqId
    FROM
        cte2
    WHERE
        GrpType = 1
),
cte4 AS -- find the timestamp range per sequence_id. 
(    SELECT
        cte3.*,
        cte_2.added_at AS TS_to
    FROM
        cte3
    LEFT OUTER JOIN cte3 AS cte_2 
        ON cte3.SeqId = cte_2.SeqId -1
        AND cte3.member_internal_code = cte_2.member_internal_code
        AND cte3.case_id = cte_2.case_id
),
cte5 AS(
    -- we get the correct ordering of updates (seq_id), with each
    -- other update within 1 hour is assigned to a seq_id, and a seq_order within that seq_id.
    SELECT
        t.member_internal_code,
        t.case_id,
        t.disease_code_value,
        t.status,
        t.severity,
        t.added_at,
        cte4.SeqId,
        ROW_NUMBER() OVER(PARTITION BY t.member_internal_code, t.case_id, cte4.SeqId ORDER BY t.added_at) AS SeqOrder
    FROM
        cte4
    INNER JOIN cdc_df_distinct AS t ON t.member_internal_code = cte4.member_internal_code
        AND t.case_id = cte4.case_id
        AND t.added_at >= cte4.added_at
        AND (t.added_at < cte4.TS_to OR cte4.TS_to IS NULL)
),
-- cte6-cte7:Here we will only get the seq_order per seq_id. That way, we only get the update
-- that the HP thinks is the most correct
cte6 AS(
    SELECT
        cte5.member_internal_code,
        cte5.case_id,
        cte5.disease_code_value,
        cte5.status,
        cte5.severity,
        cte5.added_at,
        cte5.SeqId AS seq_id,
        cte5.SeqOrder AS seq_order,
        MAX(cte5.SeqOrder) OVER (PARTITION BY member_internal_code, case_id, SeqId) AS max_seq_order
    FROM
        cte5
),

cte7 AS ( --consider 'double' cids subcategories, cid_code1/cid_code2 as the cid category of the first one.
    SELECT
        cte6.member_internal_code,
        cte6.case_id,
        CASE
            WHEN cte6.disease_code_value LIKE '%/%' THEN SUBSTRING(cte6.disease_code_value, 1, 3)
            ELSE cte6.disease_code_value
        END AS disease_code_value,
        cte6.status,
        cte6.severity,
        cte6.added_at,
        cte6.seq_id
    FROM
        cte6
    WHERE
        seq_order = max_seq_order
),

--from df1 to df7, we clean the table, as well as applying functions to get the change in dalys, from one
-- status change to another.

df1 AS(-- to get the change in dalys, let's get the previous status, cid, etc. but let's not consider
        -- entries that were cancelled right off the bat.
    SELECT
        cte7.member_internal_code,
        cte7.case_id,
        cte7.disease_code_value,
        cte7.status,
        cte7.severity,
        LAG(cte7.disease_code_value) OVER (PARTITION BY member_internal_code,case_id ORDER BY seq_id ASC) AS disease_code_value_lag1,
        LAG(cte7.status) OVER (PARTITION BY member_internal_code,case_id ORDER BY seq_id ASC) AS status_lag1,
        LAG(cte7.severity) OVER ( PARTITION BY member_internal_code,case_id ORDER BY seq_id ASC) AS severity_lag1,
        cte7.added_at,
        cte7.seq_id
    FROM
        cte7
    WHERE
        cte7.seq_id > 1
        OR (cte7.seq_id = 1 AND STATUS !='CANCELLED' AND severity != 'INACTIVE')
),

df2 AS(--since we're desconsidering cancellations as the first entry, we need to create a new ordering.
        -- Additionally, we will not consider changes that did update anything, that is, the status and severity
        -- were the same as before. 
    SELECT
        df1.member_internal_code,
        df1.case_id,
        df1.seq_id,
        ROW_NUMBER() OVER (PARTITION BY member_internal_code,case_id ORDER BY seq_id ASC) AS seq_id_v2,
        df1.disease_code_value,
        df1.status,
        df1.severity,
        df1.added_at
    FROM
        df1
    WHERE
        (disease_code_value_lag1 IS NULL AND status_lag1 IS NULL AND severity_lag1 IS NULL)
         OR (disease_code_value_lag1 != disease_code_value OR  status!=status_lag1 OR severity != severity_lag1)
),
df3 AS(--renaming the new sequence id (v2) to the original
    SELECT
        df2.member_internal_code,
        df2.case_id,
        df2.seq_id_v2 AS seq_id,
        df2.disease_code_value AS cid,
        df2.status AS cid_status,
        df2.severity AS cid_sev,
        df2.added_at
    FROM
        df2
),
df4 AS( --finally, let's retrieve the DW, disability weights for daly. notice that cancel/inactive = 0.
    SELECT
        df3.member_internal_code,
        df3.case_id,
        df3.cid,
        df3.seq_id,
        df3.cid_status,
        df3.cid_sev,
        df3.added_at,
        CASE 
            WHEN cid_status = 'CANCELLED' OR cid_sev = 'INACTIVE' THEN 0
            ELSE COALESCE(rd.daly_weight_lower, 0) 
        END AS daly_weight_lb
    FROM
        df3
        LEFT JOIN "restricted_datascience"."daly_weights_for_alice" AS rd ON df3.cid = rd.cid
        AND LOWER(df3.cid_sev) = LOWER(rd.severity)
),
df5 AS(-- as in df1, we need to get the previous status, severity, daly...
    SELECT
        df4.member_internal_code,
        df4.case_id,
        df4.cid,
        df4.seq_id,
        df4.cid_status,
        df4.cid_sev,
        df4.daly_weight_lb,
        df4.added_at,
        LAG(df4.cid) OVER (PARTITION BY member_internal_code,case_id ORDER BY seq_id ASC) AS cid_lag1,
        LAG(df4.cid_status) OVER (PARTITION BY member_internal_code,case_id ORDER BY seq_id ASC) AS cid_status_lag1,
        LAG(df4.cid_sev) OVER (PARTITION BY member_internal_code,case_id ORDER BY seq_id ASC) AS cid_sev_lag1,
        LAG(df4.daly_weight_lb) OVER (PARTITION BY member_internal_code,case_id ORDER BY seq_id ASC) AS daly_weight_lag1
    FROM
        df4
),
df6 AS( --create new column, delta_daly, that is just the difference between DW from the current status change to the old one.
    SELECT
        df5.member_internal_code,
        df5.case_id,
        df5.seq_id,
        df5.cid,
        df5.cid_status,
        df5.cid_sev,
        df5.added_at,
        df5.cid_lag1,
        df5.cid_status_lag1,
        df5.cid_sev_lag1,
        df5.daly_weight_lb,
        df5.daly_weight_lag1,
        df5.daly_weight_lb - df5.daly_weight_lag1 AS delta_daly
    FROM
        df5
),
df7 AS(--Let's reorganize the dalys weights to create an inflow/outflow perspective. e.g., if a member 
        -- is entering, then the inflow = DW. if order>1 and it has a delta<0, that means it was 'cured'
        -- and we have and outflow. if delta>0, that means it got worse, and we have an inflow. The
        -- other categories are for cids that we did not input yet, so we consider= zero. Or, if entered
        -- right off the bat as cancel/inactive, we should consider the DW as outflow, although it should
        -- not happen since we desconsidered those at previous steps.
    SELECT
        df6.member_internal_code,
        df6.case_id,
        df6.seq_id,
        df6.cid,
        df6.cid_status,
        df6.cid_sev,
        df6.added_at,
        df6.daly_weight_lb,
        df6.delta_daly,
        CASE
            WHEN seq_id = 1 AND (cid_status != 'CANCELLED' AND cid_sev != 'INACTIVE') THEN daly_weight
            WHEN seq_id > 1 AND delta_daly > 0 THEN delta_daly
            ELSE 0
        END AS daly_inflow,
        CASE 
            WHEN seq_id = 1 AND (cid_status = 'CANCELLED' OR cid_sev = 'INACTIVE') THEN daly_weight
            WHEN seq_id > 1 AND delta_daly < 0 THEN -delta_daly
            WHEN seq_id > 1 AND (cid_status = 'CANCELLED' OR cid_sev = 'INACTIVE') THEN daly_weight
            ELSE 0
        END AS daly_outflow
    FROM
        df6
),

final_panel_df AS(--now, let's bring the cid categories(as the health squad categorize) and chapters (icd-original)
SELECT
    df7.member_internal_code,
    df7.case_id,
    df7.seq_id,
    df7.cid,
    base_chap.capitulo AS cid_cap,
    alice_categories_df.alice_category AS cid_alice_cat,
    df7.cid_status,
    df7.cid_sev,
    df7.added_at,
    df7.daly_weight_lb,
    df7.delta_daly,
    df7.daly_inflow,
    df7.daly_outflow
FROM
    df7
    LEFT JOIN "restricted_health_information"."base_completa_cid" AS base_chap ON df7.cid = base_chap.cid
    LEFT JOIN "restricted_datascience"."alice_cid_categories" AS alice_categories_df ON df7.cid = alice_categories_df.cid
ORDER BY
    member_internal_code,
    case_id,
    seq_id DESC
)

SELECT * FROM final_panel_df': column "daly_weight" does not exist in df6


## 3. Export

In [None]:
di.redshift.insert_table(df, table_name='daly_lower_bounds_weights_member_cid_panel_table', if_exists='replace')