# Raw to intermediate Node

In [35]:
import pandas as pd
from utils import data_path, list_data_files

# Loading Stage Documentation

### The loading stage focuses on ensuring data is correctly ingested with proper data types and meaningful column names.

**Best Practices Followed:**
1. **Defining Data Types:**
   - A `dtype_dict` is created to explicitly specify data types for each column in the dataset. This improves memory efficiency and ensures correct data interpretation.
   - Examples include treating `gender` as a categorical variable and `bmi` as a float for precise numerical analysis.

2. **Using a Column Lookup Table:**
   - A dictionary (`column_lookup`) is used to rename columns to more descriptive and meaningful names. This makes the dataset easier to understand and work with.

3. **Efficient Loading:**
   - The dataset is loaded using `pd.read_csv` with the `dtype_dict`, minimizing post-load type conversions and errors.

4. **Validation:**
   - The data types and the first few rows are printed to verify successful loading and renaming.



In [36]:
# Prin the list of data files
print(list_data_files())

# Define the path to the raw data file
raw_data_path =  data_path()+ "\\" + 'raw\\sample_set_1 1-1.csv'

['intermediate\\intermediate_data.parquet', 'lookup\\city_lookup.csv', 'raw\\sample_set_1 1-1.csv']


In [37]:
# Define the data types for each column in the dataset
dtype_dict = {
    "MEMBER_CODE": "int64",    # De-identified member ID, stored as float to match dataset format
    "Age": "int64",             # Age of the member
    "GENDER": "category",       # Gender is a categorical variable
    "POLICY_NO": "int64",       # Policy number, stored as integer
    "CMS_Score": "int64",       # Charlson comorbidity index score, stored as integer
    "ICD_CODE": "category",     # ICD-10 codes are categorical
    "ICD_desc": "string",       # ICD-10 description as a string
    "City": "string",           # City as a string, handling missing values separately
    "CLAIM_TYPE": "category",   # Claim type is categorical
    "BMI": "float64"            # BMI as a float
}

# Column renaming lookup table
column_lookup = {
    "MEMBER_CODE": "member_code",
    "Age": "age",
    "GENDER": "gender",
    "POLICY_NO": "policy_number",
    "CMS_Score": "cms_score",
    "ICD_CODE": "icd_code",
    "ICD_desc": "icd_description",
    "City": "city",
    "CLAIM_TYPE": "claim_type",
    "BMI": "bmi"
}


# Load the dataset with specified data types
raw_data = pd.read_csv(raw_data_path, dtype=dtype_dict)

# Rename columns using the lookup table
raw_data.rename(columns=column_lookup, inplace=True)

# Verify the data types after loading
print(raw_data.dtypes)

# Display the first few rows to confirm successful loading
print(raw_data.head())

member_code                 int64
age                         int64
gender                   category
policy_number               int64
cms_score                   int64
icd_code                 category
icd_description    string[python]
city               string[python]
claim_type               category
bmi                       float64
dtype: object
   member_code  age gender  policy_number  cms_score icd_code  \
0   1961848012   78      M       26730932          4      E11   
1   1961848012   78      M       26730932          4      E11   
2   1961848012   78      M       26730932          4   E11.33   
3   1961848012   78      M       26730932          4   E11.33   
4   1961848012   78      M       26730932          4   E11.42   

                                     icd_description    city claim_type    bmi  
0                           Type 2 diabetes mellitus  RIYADH          I  31.25  
1                           Type 2 diabetes mellitus  RIYADH          O  31.25  
2  Type 2 di

# Handling Missing Data and Duplicates Documentation

### The second stage focuses on ensuring data integrity by handling missing values and duplicates.

**Best Practices Followed:**
1. **Identifying Missing Data:**
   - A summary of missing values is generated to identify columns with missing entries.

2. **Handling Missing Data:**
   - Missing values in the `city` column are replaced with "Unknown" as an example strategy.
   - Other strategies can include imputation or dropping rows/columns based on context.

3. **Checking for Duplicates:**
   - Duplicate rows are identified, counted, and removed to ensure data uniqueness and prevent bias.

4. **Validation:**
   - After handling missing data and duplicates, the data is re-checked to confirm integrity.


In [38]:
# Handle missing values
missing_summary = raw_data.isnull().sum()
print("Missing Values Summary:\n", missing_summary)

# Filling missing city values with 'Unknown'
raw_data["city"] = raw_data["city"].fillna("Unknown")

# Check and document duplicates
duplicate_count = raw_data.duplicated().sum()
print(f"Number of duplicate rows: {duplicate_count}")

# Print duplicate rows if they exist
if duplicate_count > 0:
    print("\nDuplicate rows:\n", raw_data[raw_data.duplicated()])

# Remove duplicate rows
raw_data = raw_data.drop_duplicates()

# Verify changes after handling missing values and duplicates
print("\nData after handling missing values and duplicates:\n", raw_data.head())


Missing Values Summary:
 member_code           0
age                   0
gender                0
policy_number         0
cms_score             0
icd_code              0
icd_description       0
city               4700
claim_type            0
bmi                   0
dtype: int64
Number of duplicate rows: 2

Duplicate rows:
          member_code  age gender  policy_number  cms_score icd_code  \
35671  1011590000000   64      M      202320252          3      E11   
35695  1011590000000   64      M      202320252          3      I10   

                        icd_description    city claim_type        bmi  
35671          Type 2 diabetes mellitus  JEDDAH          O  24.691358  
35695  Essential (primary) hypertension  JEDDAH          O  24.691358  

Data after handling missing values and duplicates:
    member_code  age gender  policy_number  cms_score icd_code  \
0   1961848012   78      M       26730932          4      E11   
1   1961848012   78      M       26730932          4      E11  

Cleaning city names documentation

The cleaning city names stage focuses on ensuring consistency and standardization in the dataset by processing city names. This involves handling variations in formatting, capitalization, and spelling. Consistent city names are crucial for accurate analysis, grouping, and reporting.

**Best Practices Followed:**
1. **Normalization Process:**
   - A function `normalize_city_name` is created to handle city name inconsistencies. This function:
     - Strips leading and trailing whitespace.
     - Converts names to title case (e.g., "new york" becomes "New York").
     - Properly formats hyphenated names (e.g., "los-angeles" becomes "Los-Angeles").

2. **Creating a Lookup Table:**
   - After normalizing city names, a lookup table is generated. The structure of the table is as follows:
     - `clean_city`: The standardized city name.
     - `list_of_variants`: A list of raw or uncleaned city names that map to the standardized name.
   - This table ensures traceability and provides a reference for re-mapping in future processes.

3. **Exporting the Lookup Table:**
   - The lookup table is saved as a CSV file (`city_lookup.csv`) for reuse in downstream processes and documentation purposes.


**Benefits of This Approach:**
- Improved consistency in city-related analysis.
- Enhanced traceability with the `list_of_variants` column.
- Reusability of the lookup table across multiple datasets or reports.

---

In [39]:
import re

# Function to normalize city names
def normalize_city_name(name):
    """
    Normalize city names to ensure consistent formatting.

    Parameters:
        name (str): The original city name.

    Returns:
        str: The cleaned and normalized city name.
        - Strips leading and trailing whitespace.
        - Capitalizes the first letter of each word and handles hyphens appropriately.
    """
    name = name.strip()  # Strip leading and trailing whitespace
    # Split by spaces and hyphens, capitalize each part, and rejoin
    parts = re.split(r'(\s+|-)', name)
    name = ''.join(part.capitalize() if part.isalpha() else part for part in parts)
    return name


# Normalize city names in the raw_data DataFrame
raw_data['clean_city'] = raw_data['city'].apply(normalize_city_name)


# Create a lookup table
city_lookup = (
    raw_data.groupby('clean_city')['city']
    .unique()
    .reset_index()
    .rename(columns={'city': 'list_of_variants'})
)

# Save the lookup table to a CSV file
city_lookup.to_csv('..\\data\\lookup\\city_lookup.csv', index=False)

# Verify the lookup table
print("\nCity Lookup Table:\n", city_lookup)



City Lookup Table:
          clean_city   list_of_variants
0              Abha             [Abha]
1              Afif             [AFIF]
2             Aflaj            [AFLAJ]
3           Al Baha          [AL BAHA]
4         Al Dwadmi        [AL DWADMI]
..              ...                ...
59          Unknown          [Unknown]
60  Wadi Al Dawasir  [WADI AL DAWASIR]
61             Wajh             [WAJH]
62            Yanbu            [YANBU]
63            Zulfi            [ZULFI]

[64 rows x 2 columns]


# Data Quality Checks Documentation

### The data quality checks phase ensures the dataset is accurate, consistent, and logically sound for analysis. 

**Steps to be Implemented:**

1. **Logical Value Checks:**
   - Validate values within logical ranges for key variables:
     - `age`: Ensure all ages are within a plausible range (e.g., 0-120).
     - `bmi`: Confirm all BMI values fall within a reasonable range (e.g., 10-80).
     - `gender`: Ensure only valid values are present (e.g., "M", "F").

2. **Consistency Check for Member Data:**
   - Verify that variables that should not change across rows for a member (e.g., `member_code`, `gender`) are consistent.

3. **ICD Code and Description Lookup:**
   - Create a unique lookup table of `icd_code` and `icd_description` to identify duplicate or erroneous mappings.

4. **Save Intermediate Quality Report:**
   - Document any issues found during checks and save.


In [40]:
# 1 - Logical checks
invalid_age_rows = raw_data[~raw_data['age'].between(0, 120)]
assert invalid_age_rows.empty, f"Age values that are not logical: {invalid_age_rows[['age', 'member_code']].to_dict(orient='records')}"

invalid_bmi_rows = raw_data[~raw_data['bmi'].between(10, 80)]
assert invalid_bmi_rows.empty, f"BMI values that are extreme: {invalid_bmi_rows[['bmi', 'member_code']].to_dict(orient='records')}"



AssertionError: BMI values that are extreme: [{'bmi': 85.61236623, 'member_code': 1000460000000}, {'bmi': 90.81632653, 'member_code': 1009970000000}, {'bmi': 90.81632653, 'member_code': 1009970000000}, {'bmi': 90.81632653, 'member_code': 1009970000000}, {'bmi': 88.7755102, 'member_code': 1016020000000}, {'bmi': 88.7755102, 'member_code': 1016020000000}, {'bmi': 88.7755102, 'member_code': 1016020000000}, {'bmi': 88.7755102, 'member_code': 1016020000000}, {'bmi': 88.7755102, 'member_code': 1016020000000}, {'bmi': 88.7755102, 'member_code': 1016020000000}, {'bmi': 104.0582726, 'member_code': 1016820000000}, {'bmi': 90.81632653, 'member_code': 1074890000000}, {'bmi': 90.81632653, 'member_code': 1074890000000}, {'bmi': 90.81632653, 'member_code': 1074890000000}, {'bmi': 80.98477486, 'member_code': 1077330000000}, {'bmi': 80.98477486, 'member_code': 1077330000000}, {'bmi': 82.37308529, 'member_code': 1102200000000}, {'bmi': 82.37308529, 'member_code': 1102200000000}, {'bmi': 82.37308529, 'member_code': 1102200000000}, {'bmi': 82.37308529, 'member_code': 1102200000000}]

In [41]:
# 2- Consistency checks for member_code gender
valid_genders = ["M", "F"]
assert raw_data['gender'].isin(valid_genders).all(), "Invalid gender values found."

# Group by member_code and check for consistent gender
member_gender_consistency = raw_data.groupby('member_code')['gender']

# Identify inconsistencies
inconsistent_rows = member_gender_consistency.transform(lambda group: group.nunique() > 1)
inconsistent_gender_members = raw_data[inconsistent_rows][['member_code', 'gender']].drop_duplicates()

print("Inconsistent rows due to gender:")
print(inconsistent_gender_members)



Inconsistent rows due to gender:
            member_code gender
121        720035000000      M
131        720035000000      F
144        760902000000      M
176        760902000000      F
244       1000110000000      F
...                 ...    ...
173196    2502550000000      F
173476    2530880000000      M
173477    2530880000000      F
173577  247070000000000      M
173583  247070000000000      F

[5738 rows x 2 columns]


In [42]:
# Create unique lookup table
icd_lookup = raw_data[['icd_code', 'icd_description']].drop_duplicates()

# Check for duplicates or missing values
duplicate_icd = icd_lookup['icd_code'].duplicated().sum()
print(f"Number of duplicate ICD codes: {duplicate_icd}")
missing_icd_desc = icd_lookup['icd_description'].isnull().sum()
print(f"Number of missing ICD descriptions: {missing_icd_desc}")



Number of duplicate ICD codes: 0
Number of missing ICD descriptions: 0


In [44]:
import json
import numpy as np

# Convert numpy.int64 to native Python int for JSON serialization
def convert_to_serializable(obj):
    if isinstance(obj, (np.int64, np.float64)):
        return int(obj)
    raise TypeError(f"Type {type(obj)} not serializable")


# Save intermediate quality report
quality_issues = {
    "Duplicate ICD Codes": int(duplicate_icd),
    "Missing ICD Descriptions": int(missing_icd_desc),
    "Invalid/Extreme Age Rows": invalid_age_rows[['age', 'member_code']].to_dict(orient='records') if not invalid_age_rows.empty else [],
    "Invalid/Extreme BMI Rows": invalid_bmi_rows[['bmi', 'member_code']].to_dict(orient='records') if not invalid_bmi_rows.empty else [],
    "Members code with inconsistent Gender": inconsistent_gender_members.index.tolist() if not inconsistent_gender_members.empty else [],
}

with open('..\\reports\\raw_data_quality_report.json', 'w') as f:
    json.dump(quality_issues, f, indent=4, default=convert_to_serializable)

print("Quality report saved as quality_report.json in the reports directory.")

Quality report saved as quality_report.json in the reports directory.


# Addressing the quality issues of the data

- The `non_unique_member_codes.csv` file documents the identified `member_code` issues.
- To adress this we will create a unique ID by grouping `policy_number`, `member_code`, `gender`, and `age` to differentiate separate individuals.

In [55]:
def assign_unique_id(intermediate_data):
    """
    Assigns a unique ID to individuals by grouping based on policy_number, member_code, gender, and age.

    Parameters:
    intermediate_data (DataFrame): The DataFrame containing the intermediate data.

    Returns:
    DataFrame: The DataFrame with an additional column 'unique_id'.
    """
    intermediate_data['unique_id'] = (
        intermediate_data.groupby(['policy_number', 'member_code', 'gender', 'age'],  observed=True)
        .ngroup()
        .astype(str)  # Convert the unique ID to a string

    )
    return intermediate_data


# Assign a unique ID for individuals by grouping
raw_data = assign_unique_id(raw_data)

In [56]:
raw_data.info()

<class 'pandas.core.frame.DataFrame'>
Index: 173770 entries, 0 to 173771
Data columns (total 12 columns):
 #   Column           Non-Null Count   Dtype   
---  ------           --------------   -----   
 0   member_code      173770 non-null  int64   
 1   age              173770 non-null  int64   
 2   gender           173770 non-null  category
 3   policy_number    173770 non-null  int64   
 4   cms_score        173770 non-null  int64   
 5   icd_code         173770 non-null  category
 6   icd_description  173770 non-null  string  
 7   city             173770 non-null  string  
 8   claim_type       173770 non-null  category
 9   bmi              173770 non-null  float64 
 10  clean_city       173770 non-null  object  
 11  unique_id        173770 non-null  object  
dtypes: category(3), float64(1), int64(4), object(2), string(2)
memory usage: 13.9+ MB


In [46]:
# save the cleaned data to a parquet file
intermediate_data = raw_data.drop(columns=['city'])
intermediate_data.rename(columns={'clean_city': 'city'}, inplace=True)

# specify the path to save the intermediate data
intermediate_data_path = data_path() + '\\intermediate'+'\\intermediate_data.parquet'

# Save the intermediate data to a parquet file
intermediate_data.to_parquet(intermediate_data_path, index=False)

In [1]:
intermediate_data.head()

NameError: name 'intermediate_data' is not defined