## HDB's Technical Test for Senior Data Engineer
#### Name: Wong Yoke Yong

Instruction:

1. Click on "Run All" button.

## Import Libraries

In [13]:
# Basic EDA
from typing import List, Any, Tuple
import pandas as pd
import requests
import os

# Download the dataset from the provided URL
import time
import concurrent.futures

## Data Profiling
from ydata_profiling import ProfileReport

## Data Validation
import great_expectations as gx
from great_expectations import ResultFormat
from datetime import datetime

## Hashing
import hashlib

## Logging
from loguru import logger

## Constants

In [None]:
BASE_URL = "https://data.gov.sg"
BATCH_URL_SIZE = 10
NEEDED_FILES = ['Resale Flat Prices (Based on Registration Date), From Mar 2012 to Dec 2014.csv', 
                'Resale Flat Prices (Based on Registration Date), From Jan 2015 to Dec 2016.csv', 
                'Resale Flat Prices (Based on Approval Date), 2000 - Feb 2012.csv']
ALL_FILES = list(os.listdir())
HEADERS = {"x-api-key": <YOUR_API_KEY_HERE>}
VALIDATION_RESULTS_SUCCESS = "validation_success_summary.txt"
VALIDATION_RESULTS_FAILED = "validation_failed_summary.txt"

## Common Functions

In [15]:
def write_to_csv(dataset: pd.DataFrame, filename: str) -> None:
    """Writes the provided DataFrame to a CSV file."""
    dataset.to_csv(filename, index=False)
    logger.info(f"Successfully wrote DataFrame to CSV file: {filename}")

def write_to_text_file_failure_records_and_retrieve_index(validation_results: dict) -> list:
    """Write the validation records to the text file for checking purposes.
    This is for failed cases."""
    with open(VALIDATION_RESULTS_FAILED, "w") as f:
        f.write(f"Checkpoint Status: {validation_results.get('success')}\n")
        output_list = []
        results_list = validation_results.get("results")
        for result in results_list:
            if not result.get("success"):
                f.write(f"FAILED: {result.get('expectation_config').get('type')}\n") 
                f.write(f"Observed: {result.get('result').get('observed_value')}\n\n")
                f.write(str(result.to_json_dict()))
                output_list.append(result.to_json_dict())
        return output_list
    logger.info("Written the failed validation definition to a text file for checking purposes")

def write_to_text_file_success_records(validation_results: dict) -> None:
    """Write the validation records to the text file for checking purposes. 
    This is for successful cases."""
    with open(VALIDATION_RESULTS_SUCCESS, "w") as f:
        f.write(f"Checkpoint Status: {validation_results.get('success')}\n")
        results_list = validation_results.get("results")
        for result in results_list:
            if result.get("success"):
                f.write(f"SUCCESS: {result.get('expectation_config').get('type')}\n") 
                f.write(f"Observed: {result.get('result').get('observed_value')}\n\n")
    logger.info("Written the successful validation definition to a text file for checking purposes")

def remove_duplicates_based_on_fields(fields: list, dataset: pd.DataFrame) -> None:
    """Remove duplicates from the dataset based on the sorted fields."""
    logger.info("Remove duplicates based on the fields supplied for sorting first, then to remove duplicates.")
    try:
        dataset = dataset.sort_values(by=fields, ascending=[False, False])
        master_to_keep_unique_records = dataset[~dataset.duplicated(keep='first')]
        master_failed_unique_records = dataset[dataset.duplicated(keep='first')]
        return (master_to_keep_unique_records, master_failed_unique_records)
    except Exception as e:
        logger.exception(f"Failed to remove duplicates from the dataset. Error - {e}")
        raise e

def append_datasets_for_storage(datasets: list) -> pd.DataFrame:
    """General function for appending the datasets"""
    try:
        return pd.concat(datasets, axis=0)
    except Exception as e:
        logger.exception(f"Issues encountered when appending raw datasets for storage. Error - {e}")
        raise e

def filter_for_failed_validation_records(validation_failure_status: bool, output: list, dataset: pd.DataFrame) -> pd.DataFrame:
    """Split the failed records from the successful records for ETL process"""
    try:
        failed_records_index = set()
        if validation_failure_status:
            for output_result in output:
                failed_records_index = failed_records_index | set(output_result.get("result").get('unexpected_index_list'))
        failed_records_index_list = list(failed_records_index)
        failed_records = dataset[dataset.index.isin(failed_records_index_list)]
        success_records = dataset[~dataset.index.isin(failed_records_index_list)]
        return (success_records, failed_records)
    except Exception as e:
        logger.exception(f"Filter for failed validation records. Error - {e}")
        raise e

## Source (Download)

In [16]:
def selected_dataset() -> List[dict]:
    """Define the selected datasets."""
    selected_ids = [{0: ("d_43f493c6c50d54243cc1eab0df142d6a", "Resale Flat Prices (Based on Approval Date), 2000 - Feb 2012.csv")},
                    {1: ("d_2d5ff9ea31397b66239f245f57751537", "Resale Flat Prices (Based on Registration Date), From Mar 2012 to Dec 2014.csv")},
                    {2: ("d_ea9ed51da2787afaf8e51f827c304208", "Resale Flat Prices (Based on Registration Date), From Jan 2015 to Dec 2016.csv")}]
    return selected_ids

# Downloads the dataset from data.gov.sg
def fetch_url(next_url: str) -> pd.DataFrame:
    """Fetches a single URL and returns its status code and URL."""
    try:
        # Use the requests library to make a GET request
        logger.info(f"Fetching URL: {next_url}")
        response = requests.get(next_url, headers=HEADERS)
        data = response.json()
        column_names = [field["id"] for field in data["result"]["fields"]]
        data_subset = pd.DataFrame(data["result"]["records"], columns=column_names)
        return data_subset
    except requests.exceptions.RequestException as e:
        logger.exception(f"Error fetching URL: {next_url} - {e}")
        raise e

# Main function for downloading dataset.
def download_datasets(selected_ids: List[dict]) -> pd.DataFrame:
    """Downloads datasets from the provided list of selected dataset IDs from data.gov.sg."""
    try:

        for index in range(len(selected_ids)):

            # Define the starting URL for the current dataset
            STARTING_URL = f"{BASE_URL}/api/action/datastore_search?resource_id={selected_ids[index].get(index)[0]}"

            logger.info(f"Starting download for dataset: {selected_ids[index].get(index)[1]}")

            # fetch the first page so we have a `data` object to follow the links
            response = requests.get(STARTING_URL, headers=HEADERS)
            data = response.json()
            column_names = [f["id"] for f in data["result"]["fields"]]
            first_dataset = pd.DataFrame(data["result"]["records"], columns=column_names)

            # retrieve the total number of records for logging purposes
            total_records = data["result"]["total"]
            logger.info(f"Total records to download for {selected_ids[index].get(index)[0]}: {total_records}")

            # master frame to accumulate all the pages
            master_dataset = first_dataset.copy()

            # walk through the remaining pages in chunks of ten urls
            while "next" in data["result"]["_links"]:
                next_urls = []
                # build list of up to ten `next` URLs
                for _ in range(BATCH_URL_SIZE):
                    next_link = data["result"]["_links"].get("next")
                    if not next_link:
                        break
                    url = f"{BASE_URL}{next_link}"
                    next_urls.append(url)
                    # advance `data` so the following iteration of the inner loop
                    # grabs the subsequent link
                    resp = requests.get(url, headers=HEADERS)
                    data = resp.json()

                # download the batch concurrently
                with concurrent.futures.ThreadPoolExecutor(max_workers=BATCH_URL_SIZE) as executor:
                    dfs = list(executor.map(fetch_url, next_urls))

                bigger_sub_dataset = pd.concat(dfs, ignore_index=True)
                master_dataset = pd.concat([master_dataset, bigger_sub_dataset], ignore_index=True)
                logger.info(f"Number of rows and columns for Master Dataset: {master_dataset.shape}")

                if master_dataset.shape[0] == total_records:
                    logger.info(f"All records downloaded for {selected_ids[index].get(index)[1]}")
                    break

                # pause for the rate limit
                time.sleep(10)

            # at the end `master_dataset` contains all of the concatenated records
            write_to_csv(master_dataset, selected_dataset()[index].get(index)[1])
    
    except Exception as e:
        logger.exception(f"Error downloading datasets: {e}")
        raise e

def source_download_main() -> None:
    """Main function to orchestrate the dataset download process."""
    logger.info("Starting the dataset download process.")
    selected_ids = selected_dataset()
    download_datasets(selected_ids=selected_ids)

## Ingestion (Extract)

In [17]:
def read_dataset() -> pd.DataFrame:
    """Read dataset from local files."""
    try:
        logger.info("Read in the three datasets that ranges from January 2000 to December 2016.")
        first = pd.read_csv("Resale Flat Prices (Based on Approval Date), 2000 - Feb 2012.csv")
        second = pd.read_csv("Resale Flat Prices (Based on Registration Date), From Mar 2012 to Dec 2014.csv")
        third = pd.read_csv("Resale Flat Prices (Based on Registration Date), From Jan 2015 to Dec 2016.csv")
        # Combines the three dataset.
        master_dataset = pd.concat([first, second, third], ignore_index=True)
        return master_dataset
    except Exception as e:
        logger.exception(f"Error reading datasets: {e}")
        raise e

def dataset_type_enforcement(dataset: pd.DataFrame) -> None:
    """Enforce the correct data types for each column in the master dataset."""
    try:
        logger.info("Conduct type enforcement on the fields.")
        dataset.month = pd.to_datetime(dataset.month, format="%Y-%m")
        dataset.town = dataset.town.astype('category')
        dataset.flat_type = dataset.flat_type.astype('category')
        dataset.block = dataset.block.astype(str)
        dataset.street_name = dataset.street_name.astype(str)
        dataset.storey_range = dataset.storey_range.astype('category')
        dataset.floor_area_sqm = dataset.floor_area_sqm.astype(float)
        dataset.flat_model = dataset.flat_model.astype('category')
        dataset.lease_commence_date = dataset.lease_commence_date.astype(int)
        dataset.remaining_lease = dataset.remaining_lease.astype(float)
        dataset._id = dataset._id.astype(int)
        return dataset
    except Exception as e:
        logger.exception(f"Error enforcing data types: {e}")
        raise e

def filter_2012_to_2016(dataset: pd.DataFrame) -> pd.DataFrame:
    """Filter the master dataset to only include records from 2012 to 2016."""
    try:
        logger.info("Filter the dataset to contain between January 2012 to December 2016.")
        filtered_dataset = dataset[(dataset.month >= "2012-01-01") & (dataset.month <= "2016-12-31")]
        return filtered_dataset
    except Exception as e:
        logger.exception(f"Error filtering dataset: {e}")
        raise e

def load_up_dataset() -> pd.DataFrame:
    """Load up the dataset from the downloaded CSV files and concatenate
    them into a dataset."""
    logger.info("Starting the dataset loading process.")
    full_raw_dataset = read_dataset()
    logger.info(f"Number of rows and columns for Full Raw Dataset: {full_raw_dataset.shape}")
    logger.info("Enforcing data types for the master dataset.")
    type_enforced_dataset = dataset_type_enforcement(dataset=full_raw_dataset)
    logger.info("Filtering the master dataset to only include records from January 2012 to December 2016.")
    filtered_dataset = filter_2012_to_2016(dataset=type_enforced_dataset)
    logger.info(f"Number of rows and columns for Filtered Dataset: {filtered_dataset.shape}")
    return (full_raw_dataset, filtered_dataset)

## Data Profiling

In [18]:
def data_profiling(dataset: pd.DataFrame) -> None:
    """Generate a data profiling report for the master dataset."""
    logger.info("Starting the data profiling process.")
    try:
        profile = ProfileReport(dataset, title="Data Profiling Report", explorative=True)
        profile.to_file(f"data_profiling_report_{datetime.now()}.html")
    except Exception as e:
        logger.exception(f"Error generating data profiling report: {e}")
        raise e

## Data Quality Check (Using Great Expectations)

In [19]:
def get_context() -> Any:
    """Get the context for the Great Expectations data validation."""
    context = gx.get_context()
    return context

def gx_expectations_definitions(dataset: pd.DataFrame) -> list:
    """Define the expectations suite for data validation."""
    ## Setting data validation rules for each column in the dataset

    ## Total Number of rows in the dataset to be checked

    dataset.month = dataset.month.astype(str)

    dataset_row_number_checker = gx.expectations.ExpectTableRowCountToEqual(
        value=dataset.shape[0], severity="warning"
    )

    dataset_column_number_checker = gx.expectations.ExpectTableColumnCountToEqual(
        value=dataset.shape[1], severity="warning"
    )

    ## Month is to be checked for content to be not null, type datetime and content format YYYY-MM.

    month_null_checker = gx.expectations.ExpectColumnValuesToNotBeNull(
        column="month", severity="warning"
    )

    month_type_checker = gx.expectations.ExpectColumnValuesToBeOfType(
        column="month", type_="str", severity="warning"
    )

    month_content_checker = gx.expectations.ExpectColumnValuesToMatchRegex(
        column="month", regex=r"\d{4}-\d{2}", severity="warning"
    )

    # month is to be checked for the statistical distribution of the values to be between January 2012 and December 2016,
    # to prevent data drift in the future when new data is added to the dataset, we can set a baseline distribution for the month column 
    # using the existing data. This can be done by calculating the histogram of the month values and using it as a reference for 
    # future checks.

    # build a baseline partition from the raw counts instead of the normalized
    # frequencies so that `ExpectColumnKLDivergenceToBeLessThan` can consume
    # “counts” directly (the expectation will normalise internally).

    month_partition_object = {
        "weights": [float(dataset.month.value_counts().get(month, 0)/dataset.shape[0]) for month in dataset.month.unique().tolist()],
        "values": dataset.month.astype(str).unique().tolist()
    }

    month_data_drift_checker = gx.expectations.ExpectColumnKLDivergenceToBeLessThan(
        column="month",
        partition_object=month_partition_object,
        threshold=0.1  # Sensitivity: lower is stricter
    )

    ## Town is to be checked for existing categories, type to be str and not null.

    town_null_checker = gx.expectations.ExpectColumnValuesToNotBeNull(
        column="town", severity="warning"
    )

    town_type_checker = gx.expectations.ExpectColumnValuesToBeOfType(
        column="town", type_="CategoricalDtypeType", severity="warning"
    )

    town_category_checker = gx.expectations.ExpectColumnValuesToBeInSet(
        column="town",
        value_set=[
            "ANG MO KIO",
            "BEDOK",
            "BISHAN",
            "BUKIT BATOK",
            "BUKIT MERAH",
            "BUKIT TIMAH",
            "BUKIT PANJANG",
            "CENTRAL AREA",
            "CHOA CHU KANG",
            "CLEMENTI",
            "GEYLANG",
            "HOUGANG",
            "JURONG EAST",
            "JURONG WEST",
            "KALLANG/WHAMPOA",
            "MARINE PARADE",
            "PASIR RIS",
            "PUNGGOL",
            "QUEENSTOWN",
            "SEMBAWANG",
            "SENGKANG",
            "SERANGOON",
            "TAMPINES",
            "TOA PAYOH",
            "WOODLANDS",
            "YISHUN"
        ],
        severity="warning"
    )

    # Checks for data drift for Town column. Same Reasoning as Month column. 
    # We can set a baseline distribution for the town column using the existing data, 
    # and use it as a reference for future checks.

    town_partition_object = {
        "weights": [float(dataset.town.value_counts().get(town, 0)/dataset.shape[0]) for town in dataset.town.cat.categories],
        "values": dataset.town.cat.categories.astype(str).tolist()
    }

    town_data_drift_checker = gx.expectations.ExpectColumnKLDivergenceToBeLessThan(
        column="town",
        partition_object=town_partition_object,
        threshold=0.1  # Sensitivity: lower is stricter
    )

    ## Flat Type to be checked …

    flat_type_null_checker = gx.expectations.ExpectColumnValuesToNotBeNull(
        column="flat_type", severity="warning"
    )

    flat_type_type_checker = gx.expectations.ExpectColumnValuesToBeOfType(
        column="flat_type", type_="CategoricalDtypeType", severity="warning"
    )

    flat_type_category_checker = gx.expectations.ExpectColumnValuesToBeInSet(
        column="flat_type",
        value_set=[
            "1 ROOM",
            "2 ROOM",
            "3 ROOM",
            "4 ROOM",
            "5 ROOM",
            "EXECUTIVE",
            "MULTI-GENERATION"
        ],
        severity="warning"
    )

    # Checks for data drift for Flat Type column. Same Reasoning as Month column. 
    # We can set a baseline distribution for the flat_type column using the existing data, 
    # and use it as a reference for future checks.

    flat_type_partition_object = {
        "weights": [float(dataset.flat_type.value_counts().get(flat_type, 0)/dataset.shape[0]) for flat_type in dataset.flat_type.cat.categories],
        "values": dataset.flat_type.cat.categories.astype(str).tolist()
    }

    flat_type_data_drift_checker = gx.expectations.ExpectColumnKLDivergenceToBeLessThan(
        column="flat_type",
        partition_object=flat_type_partition_object,
        threshold=0.1  # Sensitivity: lower is stricter
    )

    ## Block Column …

    block_null_checker = gx.expectations.ExpectColumnValuesToNotBeNull(
        column="block", severity="warning"
    )

    block_type_checker = gx.expectations.ExpectColumnValuesToBeOfType(
        column="block", type_="str", severity="warning"
    )

    block_format_checker = gx.expectations.ExpectColumnValuesToMatchRegex(
        column="block", regex=r"^[A-Za-z0-9]{1,4}$", severity="warning"
    )

    ## Street Name …

    street_name_null_checker = gx.expectations.ExpectColumnValuesToNotBeNull(
        column="street_name", severity="warning"
    )

    street_name_type_checker = gx.expectations.ExpectColumnValuesToBeOfType(
        column="street_name", type_="str", severity="warning"
    )

    street_name_no_purely_digit_checker = gx.expectations.ExpectColumnValuesToNotMatchRegex(
        column="street_name", regex=r"^\d+$", severity="warning"
    )

    ## storey_range …

    storey_range_null_checker = gx.expectations.ExpectColumnValuesToNotBeNull(
        column="storey_range", severity="warning"
    )

    storey_range_type_checker = gx.expectations.ExpectColumnValuesToBeOfType(
        column="storey_range", type_="CategoricalDtypeType", severity="warning"
    )

    storey_range_format_checker = gx.expectations.ExpectColumnValuesToMatchRegex(
        column="storey_range", regex=r"^\d{1,2} TO \d{1,2}$", severity="warning"
    )

    # Checks for data drift for Storey Range column. Same Reasoning as Month column. 
    # We can set a baseline distribution for the storey_range column using the existing data, 
    # and use it as a reference for future checks.

    storey_range_partition_object = {
        "weights": [float(dataset.storey_range.value_counts().get(storey_range, 0)/dataset.shape[0]) for storey_range in dataset.storey_range.cat.categories],
        "values": dataset.storey_range.cat.categories.astype(str).tolist()
    }

    storey_range_data_drift_checker = gx.expectations.ExpectColumnKLDivergenceToBeLessThan(
        column="storey_range",
        partition_object=storey_range_partition_object,
        threshold=0.1  # Sensitivity: lower is stricter
    )

    ## floor_area_sqm …

    floor_area_sqm_null_checker = gx.expectations.ExpectColumnValuesToNotBeNull(
        column="floor_area_sqm", severity="warning"
    )

    floor_area_sqm_type_checker = gx.expectations.ExpectColumnValuesToBeOfType(
        column="floor_area_sqm", type_="float", severity="warning"
    )

    floor_area_sqm_range_checker = gx.expectations.ExpectColumnValuesToBeBetween(
        column="floor_area_sqm",
        min_value=min(dataset.floor_area_sqm.values),
        max_value=max(dataset.floor_area_sqm.values),
        severity="warning"
    )

    ## Flat Model …

    flat_model_null_checker = gx.expectations.ExpectColumnValuesToNotBeNull(
        column="flat_model", severity="warning"
    )

    flat_model_type_checker = gx.expectations.ExpectColumnValuesToBeOfType(
        column="flat_model", type_="CategoricalDtypeType", severity="warning"
    )

    flat_model_category_checker = gx.expectations.ExpectColumnValuesToBeInSet(
        column="flat_model",
        value_set=[
            'Improved', 'New Generation', 'Model A', 'Standard', 'Apartment',
            'Simplified', 'Model A-Maisonette', 'Maisonette',
            'Multi Generation', 'Adjoined flat', 'Premium Apartment',
            'Terrace', 'Improved-Maisonette', 'Premium Maisonette', '2-room',
            'Model A2', 'DBSS', 'Type S1', 'Type S2', 'Premium Apartment Loft'
        ],
        severity="warning"
    )

    # Checks for data drift for Flat Model column. Same Reasoning as Month column. 
    # We can set a baseline distribution for the flat_model column using the existing data, 
    # and use it as a reference for future checks.

    flat_model_partition_object = {
        "weights": [float(dataset.flat_model.value_counts().get(flat_model, 0)/dataset.shape[0]) for flat_model in dataset.flat_model.cat.categories],
        "values": dataset.flat_model.cat.categories.astype(str).tolist()
    }

    flat_model_data_drift_checker = gx.expectations.ExpectColumnKLDivergenceToBeLessThan(
        column="flat_model",
        partition_object=flat_model_partition_object,
        threshold=0.1  # Sensitivity: lower is stricter
    )

    ## lease_commence_date …

    lease_commence_date_null_checker = gx.expectations.ExpectColumnValuesToNotBeNull(
        column="lease_commence_date", severity="warning"
    )

    lease_commence_date_type_checker = gx.expectations.ExpectColumnValuesToBeOfType(
        column="lease_commence_date", type_="int", severity="warning"
    )

    lease_commence_date_format_checker = gx.expectations.ExpectColumnValuesToMatchRegex(
        column="lease_commence_date", regex=r"^\d{4}$", severity="warning"
    )

    ## remaining_lease …

    remaining_lease_range_checker = gx.expectations.ExpectColumnValuesToBeBetween(
        column="remaining_lease",
        min_value=min(dataset[dataset.remaining_lease.notna()].remaining_lease.values),
        max_value=max(dataset[dataset.remaining_lease.notna()].remaining_lease.values),
        severity="warning"
    )

    profilers = [
        dataset_row_number_checker,
        dataset_column_number_checker,
        month_null_checker,
        month_type_checker,
        month_content_checker,
        month_data_drift_checker,
        town_null_checker,
        town_type_checker,
        town_category_checker,
        town_data_drift_checker,
        flat_type_null_checker,
        flat_type_type_checker,
        flat_type_category_checker,
        flat_type_data_drift_checker,
        block_null_checker,
        block_type_checker,
        block_format_checker,
        street_name_null_checker,
        street_name_type_checker,
        street_name_no_purely_digit_checker,
        storey_range_null_checker,
        storey_range_type_checker,
        storey_range_format_checker,
        storey_range_data_drift_checker,
        floor_area_sqm_null_checker,
        floor_area_sqm_type_checker,
        floor_area_sqm_range_checker,
        flat_model_null_checker,
        flat_model_type_checker,
        flat_model_category_checker,
        flat_model_data_drift_checker,
        lease_commence_date_null_checker,
        lease_commence_date_type_checker,
        lease_commence_date_format_checker,
        remaining_lease_range_checker
    ]

    return profilers

def create_gx_expectations_suite_and_execute(dataset: pd.DataFrame) -> Tuple[bool, list]:
    """Create the Great Expectations suite and add the defined expectations."""
    try:

        suite = gx.ExpectationSuite(name="Expectation_Suite_for_Resale_Flat_Prices_Dataset")

        context = get_context()

        data_source = context.data_sources.add_pandas(name = "Resale_Flat_Prices_Dataset")
        data_asset = data_source.add_dataframe_asset(name="data_frame_asset")

        batch_definition = data_asset.add_batch_definition_whole_dataframe("Resale_Master_Dataset_Batch_Definition")
        batch_parameters={"dataframe": dataset}

        profilers = gx_expectations_definitions(dataset)

        for checker in profilers:
            suite.add_expectation(expectation=checker.copy())

        context.suites.add(suite)

        definition_name = "my_validation_definition"
        validation_definition = gx.ValidationDefinition(
            data=batch_definition, suite=suite, name=definition_name
        )

        # Run the validation checks
        validation_definition.run(batch_parameters=batch_parameters)

        # Retrieve the validation definition
        validation_definition = context.validation_definitions.get("my_validation_definition")

        # Create a Checkpoint
        checkpoint_name = "Data Validation Checks"
        checkpoint_config = gx.Checkpoint(
            name=checkpoint_name, validation_definitions=[validation_definition], result_format=ResultFormat.COMPLETE
        )

        # Save the Checkpoint to the data context
        checkpoint = context.checkpoints.add(checkpoint_config)

        # Run the Checkpoint
        result = checkpoint.run(batch_parameters=batch_parameters)

        main_identifier = None

        for key, _ in result.run_results.items():
            if main_identifier is None:
                main_identifier = key

        logger.info(f"This is the main identifier {main_identifier}")

        all_results = result.run_results.get(main_identifier)

        if not all_results.get("success"):
            logger.info("Validation failed!")
            output_list = write_to_text_file_failure_records_and_retrieve_index(validation_results=all_results)
            return (True, output_list)

        logger.info(f"Success in {len(profilers)} Data Validation. Writing the results to a file")
        write_to_text_file_success_records(validation_results=all_results)
        return (False, [])

    except Exception as e:
        logger.exception(f"Data Validation Check Failed. Error - {e}")
        raise e

## Data Quality Check (Anomalous Records and Check for Duplicate Records)

In [20]:
def lease_computation(dataset: pd.DataFrame) -> pd.DataFrame:
    """Computation of the remaining lease for the HDB flats in the dataset."""
    logger.info("Computing lease for the dataset.")
    try:
        dataset.remaining_lease = dataset.apply(lambda x: 99 - (datetime.now().year - int(x.lease_commence_date)), axis=1)
        dataset.remaining_lease = dataset.remaining_lease.astype(str).apply(lambda x: f"{int(x)//12} years {int(x)%12} months")
        return dataset
    except Exception as e:
        logger.exception(f"Lease computation error - {e}")
        raise e

def define_composite_key(dataset: pd.DataFrame) -> pd.DataFrame:
    """Computes the composite key."""
    logger.info("Define composite key")
    try:
        dataset['composite_key'] = dataset.apply(lambda x: f"{x.month.replace('-', '')}_{x.town.replace(' ', '')}_{x.flat_type.replace('-', '').replace(' ', '')}_{x.block}_{x.street_name.replace(' ', '')}_{x.storey_range.replace(' ', '')}_{str(int(x.floor_area_sqm))}_{x.flat_model.replace(' ', '')}_{x.lease_commence_date}_{x.remaining_lease.replace(' ', '')}_{x._id}", axis=1)
        return dataset
    except Exception as e:
        logger.exception(f"Define composite key error - {e}")
        raise e

def filter_for_anomalous_resale_prices(dataset: pd.DataFrame) -> pd.DataFrame:
    """Finds the anomalous price based on 3 standard deviations of the distribution
    of the dataset, based on grouping of month and town. This is assuming that each
    town has its own level of wealth distribution in this country. The anomalous
    price can only be computed when the whole raw dataset has been compiled. Grouping
    of the month is done as I am assuming that different months will see different
    price trend."""
    anomalous_flag = dataset.groupby(['month', 'town']).apply(lambda x: x.resale_price.mean() + 3*x.resale_price.std() < x.resale_price).reset_index(name='is_anomalous')
    anomalous_records = dataset[dataset.index.isin(anomalous_flag[anomalous_flag['is_anomalous'] == True].level_2.tolist())]
    normal_records = dataset[dataset.index.isin(anomalous_flag[anomalous_flag['is_anomalous'] == False].level_2.tolist())]
    return (normal_records, anomalous_records)

def manual_data_quality_checks_and_fixes(dataset: pd.DataFrame) -> None:
    """Data transformation operations for the data pipeline."""
    logger.info("Manual data quality checks.")
    if dataset.shape[0] == 0:
        return (None, None, None)
    # First Step: Compute the remaining lease.
    lease_computation(dataset=dataset)
    # Second Step: Define the composite key for setting the primary key of the dataset.
    define_composite_key(dataset=dataset)
    subset_unique_records, subset_duplicate_records = remove_duplicates_based_on_fields(fields=['composite_key', 'resale_price'], dataset=dataset)
    normal_records, anomalous_records = filter_for_anomalous_resale_prices(dataset=subset_unique_records)
    return (normal_records, anomalous_records, subset_duplicate_records)

## Data Manipulation (Transformation)

In [None]:
def creating_resale_identifier(dataset: pd.DataFrame) -> pd.DataFrame:
    """Create the resale identifier column"""
    try:
        first_resale_identifier_column = ["S"] * dataset.shape[0]
        second_resale_identifier_column = dataset.block.apply(lambda x: x[:3] if len(x) >= 3 else x.zfill(3))
        third_resale_identifier_average_price = dataset.groupby(['month', 'town', 'flat_type']).agg({'resale_price': 'mean'}).reset_index()
        third_resale_identifier_average_price.resale_price = third_resale_identifier_average_price.resale_price.apply(lambda x: str(x)[0:2])
        third_resale_identifier_column = dataset.merge(third_resale_identifier_average_price, on=['month', 'town', 'flat_type'], how='left', suffixes=('', '_avg')).resale_price_avg
        fourth_resale_identifier_column = dataset.month.apply(lambda x: str(x)[len(str(x))-2:])
        fifth_resale_identifier_column = dataset.town.apply(lambda x: x[0])
        resale_identifier_column = pd.DataFrame()
        resale_identifier_column['first'] = first_resale_identifier_column
        resale_identifier_column['second'] = second_resale_identifier_column.tolist()
        resale_identifier_column['third'] = third_resale_identifier_column.tolist()
        resale_identifier_column['fourth'] = fourth_resale_identifier_column.tolist()
        resale_identifier_column['fifth'] = fifth_resale_identifier_column.tolist()
        resale_identifier_values = resale_identifier_column.apply(lambda x: ''.join(x.astype(str)), axis=1)
        dataset["resale_identifier"] = resale_identifier_values.tolist()
        return dataset
    except Exception as e:
        logger.exception(f"Error in creating resale identifier. Error - {e}")
        raise e

def irreversible_hash_sha255(resale_identifier: str) -> str:
    """Hashes the resale identifier column. SHA 256 is chosen as it is much
    cryptographically stronger."""
    try:
        # Ensure the input data is encoded to bytes
        encoded_data = resale_identifier.encode('utf-8')
        # Create a new SHA256 hash object
        hash_object = hashlib.sha256(encoded_data)
        # Get the hexadecimal representation of the hash
        hex_digest = hash_object.hexdigest()
        return hex_digest
    except Exception as e:
        logger.exception(f"Error in creating hash for resale identifier. Error - {e}")
        raise e

def data_transformation_operations(dataset: pd.DataFrame) -> pd.DataFrame:
    """Data Transformation Process"""
    resale_identifier_dataset = creating_resale_identifier(dataset=dataset)
    subset_unique_records, subset_duplicate_records = remove_duplicates_based_on_fields(fields=['resale_identifier', 'resale_price'], dataset=resale_identifier_dataset)
    hashed_data = subset_unique_records.copy()
    hashed_data['resale_identifier_hashed'] = hashed_data['resale_identifier'].apply(lambda x: irreversible_hash_sha255(x))
    return (resale_identifier_dataset, subset_duplicate_records, hashed_data)

## Storage (Load)

In [22]:
def datasets_on_local_storage(raw: pd.DataFrame, clean: pd.DataFrame, transformed: pd.DataFrame, failed: pd.DataFrame, hashed: pd.DataFrame) -> None:
    """Writes out the produced output files as per the needs of the Technical Assessment."""
    if "output_files" not in os.listdir():
        logger.info("Creating the folder 'output_files' for the first time.")
        os.mkdir("output_files")
    logger.info("Proceeds to write out the five requested data artifacts: Raw, Cleaned, Transformed, Failed and Hashed.")
    write_to_csv(dataset=raw, filename="./output_files/raw.csv")
    write_to_csv(dataset=clean, filename="./output_files/clean.csv")
    write_to_csv(dataset=transformed, filename="./output_files/transformed.csv")
    write_to_csv(dataset=failed, filename="./output_files/failed.csv")
    write_to_csv(dataset=hashed, filename="./output_files/hashed.csv")

## Automated ETL Pipeline

In [23]:
def automated_etl_flow():
    logger.info("Starting the ETL flow. First step: downloading datasets if the datasets do not already exist.")
    # Check for all files in the current directory. Please rerun the constants segment to check for existing files,
    # as the jupyter notebook is unidirectional in execution.
    if not all([file in ALL_FILES for file in NEEDED_FILES]):
        logger.info("Not all datasets found in the current directory. Initiating download process.")
        source_download_main()
    logger.info("All datasets are present. ETL flow complete.")
    logger.info("Next steps would be to profile the data. Second Step would be to read in the datasets and create a profile report for each dataset.")
    raw_dataset, complete_dataset = load_up_dataset()
    logger.info("Data loading complete. Third step is data profiling and validation.")
    data_profiling(complete_dataset)
    logger.info("Data profiling complete. Fourth step is to create Great Expectations suite and execute it against the dataset.")
    validation_failure_status, output = create_gx_expectations_suite_and_execute(dataset=complete_dataset)
    validated_records, failed_validation_records = filter_for_failed_validation_records(validation_failure_status=validation_failure_status, output=output, dataset=complete_dataset)
    normal_records, failed_records, interim_duplicated_records = manual_data_quality_checks_and_fixes(dataset=validated_records)
    if normal_records is not None:
        logger.info("Data Transformation completed. Working on data validation to check the data again.")
        processed_dataset, final_duplicated_records, hashed_data = data_transformation_operations(dataset=normal_records)
        failed_records = append_datasets_for_storage(datasets=[interim_duplicated_records, final_duplicated_records, failed_records, failed_validation_records])
        logger.info("Writes out the output files into the folder 'output_files'")
        datasets_on_local_storage(raw=raw_dataset, clean=normal_records, transformed=processed_dataset, failed=failed_records, hashed=hashed_data)
    else:
        datasets_on_local_storage(raw=raw_dataset, clean=pd.DataFrame(), transformed=pd.DataFrame(), failed=failed_validation_records, hashed=pd.DataFrame())
    logger.info("ETL Process has come to an end.")

In [24]:
automated_etl_flow()

[32m2026-02-22 11:02:38.053[0m | [1mINFO    [0m | [36m__main__[0m:[36mautomated_etl_flow[0m:[36m2[0m - [1mStarting the ETL flow. First step: downloading datasets if the datasets do not already exist.[0m
[32m2026-02-22 11:02:38.053[0m | [1mINFO    [0m | [36m__main__[0m:[36mautomated_etl_flow[0m:[36m8[0m - [1mAll datasets are present. ETL flow complete.[0m
[32m2026-02-22 11:02:38.054[0m | [1mINFO    [0m | [36m__main__[0m:[36mautomated_etl_flow[0m:[36m9[0m - [1mNext steps would be to profile the data. Second Step would be to read in the datasets and create a profile report for each dataset.[0m
[32m2026-02-22 11:02:38.054[0m | [1mINFO    [0m | [36m__main__[0m:[36mload_up_dataset[0m:[36m48[0m - [1mStarting the dataset loading process.[0m
[32m2026-02-22 11:02:38.054[0m | [1mINFO    [0m | [36m__main__[0m:[36mread_dataset[0m:[36m4[0m - [1mRead in the three datasets that ranges from January 2000 to December 2016.[0m
[32m2026-02-22 11: