# Checking Data Quality and pipeline

## Accounts

In [128]:
import pandas as pd

# Define the schema
dtype_schema = {
    "account_number": "object",
    "amount": "float64",
    "account_name": "object",
    "account_type": "object"
}

# Columns that should be parsed as dates
date_columns = ["reference_date"]

# Custom date parser for the specific date format in the Excel file
date_parser = lambda x: pd.to_datetime(x, format='%m/%d/%Y', errors='coerce')

# Read the Excel file with the specified schema and date format
file_path = "Datasets/Data Engineer Case Study - Data.xlsx"
accounts_df = pd.read_excel(file_path, sheet_name='Accounts', dtype=dtype_schema, parse_dates=date_columns)




In [129]:
from datetime import datetime
accounts_df['ingest_date_time'] = datetime.now()

In [130]:
accounts_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 18 entries, 0 to 17
Data columns (total 6 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   account_number    18 non-null     object        
 1   amount            18 non-null     float64       
 2   account_name      18 non-null     object        
 3   account_type      18 non-null     object        
 4   reference_date    18 non-null     datetime64[ns]
 5   ingest_date_time  18 non-null     datetime64[us]
dtypes: datetime64[ns](1), datetime64[us](1), float64(1), object(3)
memory usage: 992.0+ bytes


In [131]:
import os
original_output_path = 'Processed_file/Original_data'
os.makedirs(original_output_path, exist_ok=True)
original_output_file_path = os.path.join(original_output_path, 'original_accounts_with_ingest_date_time.xlsx')
accounts_df.to_excel(original_output_file_path, index=False)

### Data Validation Checks

The following data validation checks have been performed on the dataframe:

1. **Missing Values Check**
   - **Description:** Checks if there are any missing values in the dataframe.
   - **Action:** Collects rows with missing values and records an error comment: "Missing values in the row".

2. **Duplicate Account Numbers on the Same Reference Date Check**
   - **Description:** Checks for duplicate account numbers on the same reference date.
   - **Action:** Collects duplicate rows and records an error comment: "Duplicate account number on the same reference date".

3. **Negative Amount Check**
   - **Description:** Checks if the 'amount' column has any negative values.
   - **Action:** Collects rows with negative amounts and records an error comment: "Negative amount".

4. **Valid Reference Date Check**
   - **Description:** Checks if the 'reference_date' column contains valid dates.
   - **Action:** Collects rows with invalid dates and records an error comment: "Invalid date in 'reference_date'".

5. **Valid Account Number Check**
   - **Description:** Checks if the 'account_number' column contains valid integers.
   - **Action:** Collects rows where 'account_number' contains non-integer values and records an error comment: "'account_number' contains non-integer values".

6. **Valid Amount Check**
   - **Description:** Checks if the 'amount' column contains valid floats.
   - **Action:** Collects rows where 'amount' contains non-float values and records an error comment: "'amount' contains non-float values".

7. **Valid Account Type Check**
   - **Description:** Checks if the 'account_type' column only contains 'Asset' or 'Liability'.
   - **Action:** Collects rows where 'account_type' contains invalid values and records an error comment: "'account_type' contains invalid values".

8. **Valid Account Name Check**
   - **Description:** Checks if the 'account_name' column follows the expected pattern: `^(Unsecured Personal Loan|Credit Card|Corporate Leasing|Fixed|Floating) - (EUR|NOK|SEK)$`.
   - **Action:** Collects rows where 'account_name' contains invalid values and records an error comment: "'account_name' contains invalid values".



In [132]:


def validate_data(df):
    error_list = []

    # check for missing values
    if df.isnull().values.any():
        for index, row in df[df.isnull().any(axis=1)].iterrows():
            error_list.append([index, row.to_dict(), "Missing values in the row"])

    # Check for duplicate account numbers on the same reference date
    duplicate_mask = df.duplicated(subset=['account_number', 'reference_date'], keep=False)
    if duplicate_mask.any():
        for index, row in df[duplicate_mask].iterrows():
            error_list.append([index, row.to_dict(), "Duplicate account number on the same reference date"])

    # Check if 'amount' column has negative values
    if (df['amount'] < 0).any():
        for index, row in df[df['amount'] < 0].iterrows():
            error_list.append([index, row.to_dict(), "Negative amount"])

    # Check if 'reference_date' column has valid dates
    try:
        pd.to_datetime(df['reference_date'], format='%m/%d/%y')
    except ValueError:
        for index, row in df[~pd.to_datetime(df['reference_date'], errors='coerce').notna()].iterrows():
            error_list.append([index, row.to_dict(), "Invalid date in 'reference_date'"])

    # Check if 'account_number' column has valid integers
    if not pd.api.types.is_integer_dtype(df['account_number']):
        for index, row in df[~df['account_number'].apply(lambda x: isinstance(x, int))].iterrows():
            error_list.append([index, row.to_dict(), "'account_number' contains non-integer values"])

    # Check if 'amount' column has valid floats
    if not pd.api.types.is_float_dtype(df['amount']):
        for index, row in df[~df['amount'].apply(lambda x: isinstance(x, float))].iterrows():
            error_list.append([index, row.to_dict(), "'amount' contains non-float values"])

    # Check if 'account_type' only contains 'Asset' or 'Liability'
    if not df['account_type'].isin(['Asset', 'Liability']).all():
        for index, row in df[~df['account_type'].isin(['Asset', 'Liability'])].iterrows():
            error_list.append([index, row.to_dict(), "'account_type' contains invalid values"])

    # Check if 'account_name' follows the expected pattern
    if not df['account_name'].str.contains(r'^(Unsecured Personal Loan|Credit Card|Corporate Leasing|Fixed|Floating) - (EUR|NOK|SEK)$').all():
        for index, row in df[~df['account_name'].str.contains(r'^(Unsecured Personal Loan|Credit Card|Corporate Leasing|Fixed|Floating) - (EUR|NOK|SEK)$')].iterrows():
            error_list.append([index, row.to_dict(), "'account_name' contains invalid values"])

    # Create a dataframe to hold errors
    error_df = pd.DataFrame(error_list, columns=['row_index', 'row_data', 'comment'])

    return error_df

# Validate the dataframe
validation_errors_df = validate_data(accounts_df)





  if not df['account_name'].str.contains(r'^(Unsecured Personal Loan|Credit Card|Corporate Leasing|Fixed|Floating) - (EUR|NOK|SEK)$').all():
  for index, row in df[~df['account_name'].str.contains(r'^(Unsecured Personal Loan|Credit Card|Corporate Leasing|Fixed|Floating) - (EUR|NOK|SEK)$')].iterrows():


In [133]:
import os
output_path = 'Processed_file/Invalid_data'
output_file_name = 'invalid_accounts.xlsx'

# Ensure the directory exists
os.makedirs(output_path, exist_ok=True)

# Save the validation errors dataframe to an Excel file
output_file_path = os.path.join(output_path, output_file_name)
validation_errors_df.to_excel(output_file_path, index=False)

### Cleaned accounts data

In [134]:
cleaned_accounts_df = accounts_df[~accounts_df.index.isin(validation_errors_df['row_index'])]

### Check duplicate rows

In [135]:
duplicate_rows = cleaned_accounts_df[cleaned_accounts_df.duplicated()]


In [136]:
assert len(duplicate_rows) == 0

In [137]:
import os
output_path = 'Processed_file/Cleaned_data'
output_file_name = 'cleaned_accounts.xlsx'

# Ensure the directory exists
os.makedirs(output_path, exist_ok=True)

# Save the validation errors dataframe to an Excel file
output_file_path = os.path.join(output_path, output_file_name)
cleaned_accounts_df.to_excel(output_file_path, index=False)

## Deposits

In [138]:
import pandas as pd

# Define the schema for deposits_df
dtype_schema = {
    'customer': 'object',
    'customer_type': 'object',
    'deposit_type': 'object',
    'country': 'object',
    'amount': 'float64',
    'currency': 'object',
    'exchange_rate': 'float64',
    'start_date': 'object',  # Initially read as string to handle parsing later
    'maturity_date': 'object',  # Initially read as string to handle parsing later
    'reference_date': 'object'  # Initially read as string to handle parsing later
}

# Columns that should be parsed as dates
date_columns = ['start_date', 'maturity_date', 'reference_date']

# Custom date parser for the specific date format in the Excel file
date_parser = lambda x: pd.to_datetime(x, format='%m/%d/%Y', errors='coerce')

# Read the Excel file with the specified schema and date format
file_path = "Datasets/Data Engineer Case Study - Data.xlsx"
deposits_df = pd.read_excel(file_path, sheet_name='Deposits', dtype=dtype_schema, parse_dates=date_columns)

# Apply custom date parsing for date columns
for date_col in date_columns:
    deposits_df[date_col] = deposits_df[date_col].apply(date_parser)

# Display the schema and a few rows to verify
print(deposits_df.info())
print(deposits_df.head())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500 entries, 0 to 499
Data columns (total 10 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   customer        500 non-null    object        
 1   customer_type   500 non-null    object        
 2   deposit_type    500 non-null    object        
 3   country         500 non-null    object        
 4   amount          500 non-null    float64       
 5   currency        500 non-null    object        
 6   exchange_rate   500 non-null    float64       
 7   start_date      500 non-null    datetime64[ns]
 8   maturity_date   142 non-null    datetime64[ns]
 9   reference_date  500 non-null    datetime64[ns]
dtypes: datetime64[ns](3), float64(2), object(5)
memory usage: 39.2+ KB
None
                                            customer customer_type  \
0  0002767be2fd774f796f9a90dea9be48f6cbb41b6404ef...     Household   
1  0010c866e31af49ac1e5286bc0606e6a34fc68daf68f52...  

In [139]:
# deposits_df

In [140]:
deposits_df['ingest_date_time'] = datetime.now()

In [141]:
original_output_path = 'Processed_file/Original_data'
os.makedirs(original_output_path, exist_ok=True)
original_output_file_path = os.path.join(original_output_path, 'original_deposits_with_ingest_date_time.xlsx')
deposits_df.to_excel(original_output_file_path, index=False)

### Deposits Data Validation

The following data validation checks have been performed on the `deposits_df` dataframe:

1. **Missing Values Check**
   - **Description:** Checks if there are any missing values in the dataframe.
   - **Action:** Collects rows with missing values and records an error comment: "Missing values in the row".

2. **Duplicate Customer Check**
   - **Description:** Checks for duplicate customers based on the 'customer' column.
   - **Action:** Collects duplicate rows and records an error comment: "Duplicate customer".

3. **Negative or Zero Amount Check**
   - **Description:** Checks if the 'amount' column has any negative or zero values.
   - **Action:** Collects rows with negative or zero amounts and records an error comment: "Amount is negative or zero".

4. **Valid Date Check**
   - **Description:** Checks if the 'start_date', 'maturity_date', and 'reference_date' columns contain valid dates.
   - **Action:** Collects rows with invalid dates in any of these columns and records an error comment: "Invalid date in 'column_name'".

5. **Valid Currency Check**
   - **Description:** Checks if the 'currency' column contains valid currency codes (e.g., EUR, USD, GBP, etc.).
   - **Action:** Collects rows with invalid currency codes and records an error comment: "Invalid currency".

6. **Valid Exchange Rate Check**
   - **Description:** Checks if the 'exchange_rate' column contains valid floats and values greater than 0.
   - **Action:** Collects rows with invalid or non-positive exchange rates and records an error comment: "Invalid exchange rate".



In [142]:
deposits_df.head()

Unnamed: 0,customer,customer_type,deposit_type,country,amount,currency,exchange_rate,start_date,maturity_date,reference_date,ingest_date_time
0,0002767be2fd774f796f9a90dea9be48f6cbb41b6404ef...,Household,Fixed,FI,1000.0,EUR,11.096,2023-10-31,2024-10-31,2023-12-31,2024-05-21 20:02:36.074608
1,0010c866e31af49ac1e5286bc0606e6a34fc68daf68f52...,Household,Fixed,FI,70000.0,EUR,11.096,2023-01-23,2024-01-23,2023-12-31,2024-05-21 20:02:36.074608
2,00221a07b4ab91714997432490aeef35be55c15e47429c...,Household,Fixed,FI,10000.0,EUR,11.096,2023-08-29,2024-08-29,2023-12-31,2024-05-21 20:02:36.074608
3,0029d9489632a584e3e6492e109987159083b2843b391b...,Household,Fixed,FI,10000.0,EUR,11.096,2023-12-19,2024-12-19,2023-12-31,2024-05-21 20:02:36.074608
4,002db3caa61bfeeb6a8323aca83f02175f753bec5c9910...,Household,Fixed,FI,0.0,EUR,11.096,2022-12-07,2023-12-07,2023-12-31,2024-05-21 20:02:36.074608


In [143]:
def validate_data(df):
    error_list = []

    # Check for missing values
    if df.isnull().values.any():
        for index, row in df[df.isnull().any(axis=1)].iterrows():
            error_list.append([index, row.to_dict(), "Missing values in the row"])

    # Check for duplicate customers
    if df.duplicated(subset=['customer']).any():
        for index, row in df[df.duplicated(subset=['customer'], keep=False)].iterrows():
            error_list.append([index, row.to_dict(), "Duplicate customer"])

    # Check if 'amount' column has negative or zero values
    if (df['amount'] <= 0).any():
        for index, row in df[df['amount'] <= 0].iterrows():
            error_list.append([index, row.to_dict(), "Amount is negative or zero"])

    # Check if date columns have valid dates
    for date_column in ['start_date', 'maturity_date', 'reference_date']:
        try:
            pd.to_datetime(df[date_column])
        except ValueError:
            for index, row in df[~pd.to_datetime(df[date_column], errors='coerce').notna()].iterrows():
                error_list.append([index, row.to_dict(), f"Invalid date in '{date_column}'"])

    # Check if 'currency' column has valid currencies (assuming valid currencies are EUR, USD, GBP, etc.)
    valid_currencies = ['EUR', 'USD', 'GBP', 'NOK', 'SEK']
    if not df['currency'].isin(valid_currencies).all():
        for index, row in df[~df['currency'].isin(valid_currencies)].iterrows():
            error_list.append([index, row.to_dict(), "Invalid currency"])

    # Check if 'exchange_rate' column has valid floats and is greater than 0
    if not pd.api.types.is_float_dtype(df['exchange_rate']) or (df['exchange_rate'] <= 0).any():
        for index, row in df[~df['exchange_rate'].apply(lambda x: isinstance(x, float) and x > 0)].iterrows():
            error_list.append([index, row.to_dict(), "Invalid exchange rate"])

    # Create a dataframe to hold errors
    error_df = pd.DataFrame(error_list, columns=['row_index', 'row_data', 'comment'])

    return error_df

validation_errors_df = validate_data(deposits_df)

In [144]:
output_path = 'Processed_file/Invalid_data'
output_file_name = 'invalid_deposits.xlsx'

# Ensure the directory exists
os.makedirs(output_path, exist_ok=True)

# Save the validation errors dataframe to an Excel file
output_file_path = os.path.join(output_path, output_file_name)
validation_errors_df.to_excel(output_file_path, index=False)

### Cleaned_Deposits

In [145]:
cleaned_deposits_df = deposits_df[~deposits_df.index.isin(validation_errors_df['row_index'])]

### Check for Duplicates

In [146]:
assert len(cleaned_deposits_df[cleaned_deposits_df.duplicated()]) == 0

In [147]:
cleaned_output_path = 'Processed_file/Cleaned_data'
os.makedirs(cleaned_output_path, exist_ok=True)
cleaned_output_file_path = os.path.join(cleaned_output_path, 'cleaned_deposits.xlsx')
cleaned_deposits_df.to_excel(cleaned_output_file_path, index=False)

# Loans


In [148]:
import pandas as pd

# Define the schema for loans_df
dtype_schema = {
    'customer': 'object',
    'customer_type': 'object',
    'loan_type': 'object',
    'country': 'object',
    'amount': 'float64',
    'currency': 'object',
    'exchange_rate': 'float64',
    'start_date': 'object',  # Initially read as string to handle parsing later
    'maturity_date': 'object',  # Initially read as string to handle parsing later
    'reference_date': 'object'  # Initially read as string to handle parsing later
}

# Columns that should be parsed as dates
date_columns = ['start_date', 'maturity_date', 'reference_date']

# Custom date parser for the specific date format in the Excel file
date_parser = lambda x: pd.to_datetime(x, format='%m/%d/%Y', errors='coerce')

# Read the Excel file with the specified schema and date format
file_path = "Datasets/Data Engineer Case Study - Data.xlsx"
loans_df = pd.read_excel(file_path, sheet_name='Loans', dtype=dtype_schema, parse_dates=date_columns)

# Apply custom date parsing for date columns
for date_col in date_columns:
    loans_df[date_col] = loans_df[date_col].apply(date_parser)

# Display the schema and a few rows to verify
print(loans_df.info())
print(loans_df.head())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500 entries, 0 to 499
Data columns (total 10 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   customer        500 non-null    object        
 1   customer_type   500 non-null    object        
 2   loan_type       500 non-null    object        
 3   country         500 non-null    object        
 4   amount          500 non-null    float64       
 5   currency        500 non-null    object        
 6   exchange_rate   500 non-null    float64       
 7   start_date      499 non-null    datetime64[ns]
 8   maturity_date   500 non-null    datetime64[ns]
 9   reference_date  500 non-null    datetime64[ns]
dtypes: datetime64[ns](3), float64(2), object(5)
memory usage: 39.2+ KB
None
                                            customer customer_type  \
0  01f259b4c6ce07c68d37c859cf6dfe063fb75fedf39464...     Household   
1  01f9cbc3a44cfb87f57a2faee345dd4451075e6de7439e...  

In [149]:
loans_df['ingest_date_time'] = datetime.now()

In [150]:
# loans_df

In [151]:
original_output_path = 'Processed_file/Original_data'
os.makedirs(original_output_path, exist_ok=True)
original_output_file_path = os.path.join(original_output_path, 'original_loans_with_ingest_date_time.xlsx')
loans_df.to_excel(original_output_file_path, index=False)

### Loans Data Validation

The following data validation checks have been performed on the `loans_df` dataframe:

1. **Missing Values Check**
   - **Description:** Checks if there are any missing values in the dataframe.
   - **Action:** Collects rows with missing values and records an error comment: "Missing values in the row".

2. **Duplicate Customer Check**
   - **Description:** Checks for duplicate customers based on the 'customer' column.
   - **Action:** Collects duplicate rows and records an error comment: "Duplicate customer".

3. **Negative or Zero Amount Check**
   - **Description:** Checks if the 'amount' column has any negative or zero values.
   - **Action:** Collects rows with negative or zero amounts and records an error comment: "Amount is negative or zero".

4. **Valid Date Check**
   - **Description:** Checks if the 'start_date', 'maturity_date', and 'reference_date' columns contain valid dates.
   - **Action:** Collects rows with invalid dates in any of these columns and records an error comment: "Invalid date in 'column_name'".

5. **Valid Currency Check**
   - **Description:** Checks if the 'currency' column contains valid currency codes (e.g., EUR, USD, GBP, NOK, SEK).
   - **Action:** Collects rows with invalid currency codes and records an error comment: "Invalid currency".

6. **Valid Exchange Rate Check**
   - **Description:** Checks if the 'exchange_rate' column contains valid floats and values greater than 0.
   - **Action:** Collects rows with invalid or non-positive exchange rates and records an error comment: "Invalid exchange rate".


In [152]:
def validate_data(df):
    error_list = []

    # Check for missing values
    if df.isnull().values.any():
        for index, row in df[df.isnull().any(axis=1)].iterrows():
            error_list.append([index, row.to_dict(), "Missing values in the row"])

    # Check for duplicate customers
    if df.duplicated(subset=['customer']).any():
        for index, row in df[df.duplicated(subset=['customer'], keep=False)].iterrows():
            error_list.append([index, row.to_dict(), "Duplicate customer"])

    # Check if 'amount' column has negative or zero values
    if (df['amount'] <= 0).any():
        for index, row in df[df['amount'] <= 0].iterrows():
            error_list.append([index, row.to_dict(), "Amount is negative or zero"])

    # Check if date columns have valid dates
    for date_column in ['start_date', 'maturity_date', 'reference_date']:
        try:
            pd.to_datetime(df[date_column])
        except ValueError:
            for index, row in df[~pd.to_datetime(df[date_column], errors='coerce').notna()].iterrows():
                error_list.append([index, row.to_dict(), f"Invalid date in '{date_column}'"])

    # Check if 'currency' column has valid currencies (assuming valid currencies are EUR, USD, GBP, etc.)
    valid_currencies = ['EUR', 'USD', 'GBP', 'NOK', 'SEK']
    if not df['currency'].isin(valid_currencies).all():
        for index, row in df[~df['currency'].isin(valid_currencies)].iterrows():
            error_list.append([index, row.to_dict(), "Invalid currency"])

    # Check if 'exchange_rate' column has valid floats and is greater than 0
    if not pd.api.types.is_float_dtype(df['exchange_rate']) or (df['exchange_rate'] <= 0).any():
        for index, row in df[~df['exchange_rate'].apply(lambda x: isinstance(x, float) and x > 0)].iterrows():
            error_list.append([index, row.to_dict(), "Invalid exchange rate"])

    # Create a dataframe to hold errors
    error_df = pd.DataFrame(error_list, columns=['row_index', 'row_data', 'comment'])

    return error_df

# Validate the dataframe
validation_errors_df = validate_data(loans_df)


In [153]:
output_path = 'Processed_file/Invalid_data'
output_file_name = 'invalid_loans.xlsx'

# Ensure the directory exists
os.makedirs(output_path, exist_ok=True)

# Save the validation errors dataframe to an Excel file
output_file_path = os.path.join(output_path, output_file_name)
validation_errors_df.to_excel(output_file_path, index=False)

### cleaned_df

In [154]:
cleaned_loans_df = loans_df[~loans_df.index.isin(validation_errors_df['row_index'])]

### Check for Duplicates

In [155]:
assert (len(cleaned_loans_df[cleaned_loans_df.duplicated()]) == 0)

In [156]:
cleaned_output_path = 'Processed_file/Cleaned_data'
os.makedirs(cleaned_output_path, exist_ok=True)
cleaned_output_file_path = os.path.join(cleaned_output_path, 'cleaned_loans.xlsx')
cleaned_loans_df.to_excel(cleaned_output_file_path, index=False)


In [157]:
cleaned_loans_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 300 entries, 0 to 499
Data columns (total 11 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   customer          300 non-null    object        
 1   customer_type     300 non-null    object        
 2   loan_type         300 non-null    object        
 3   country           300 non-null    object        
 4   amount            300 non-null    float64       
 5   currency          300 non-null    object        
 6   exchange_rate     300 non-null    float64       
 7   start_date        300 non-null    datetime64[ns]
 8   maturity_date     300 non-null    datetime64[ns]
 9   reference_date    300 non-null    datetime64[ns]
 10  ingest_date_time  300 non-null    datetime64[us]
dtypes: datetime64[ns](3), datetime64[us](1), float64(2), object(5)
memory usage: 28.1+ KB
