# WP-2 mdltour warning checks

<p style="color:rgb(0,162,219); font-family:Arial; font-size:16px;">Notebook Information </p>

<table style="color:rgb(88,89,91); font-family:Arial; float:left; font-size:13px; text-align:left;">
    <tr>
        <td style="color:rgb(0,90,132);font-size:13px; text-align:left;"><b>Project</b></td>
        <td style="text-align:left;">NorMITs Demand Partner </td>
    </tr>
    <tr>
        <td style="color:rgb(0,90,132);font-size:13px; text-align:left;"><b>Primary Contact Name</b></td>
        <td style="text-align:left;"> </td>
    </tr>
    <tr>
        <td style="color:rgb(0,90,132);font-size:13px; text-align:left;"><b>Primary Contact Email</b></td>
        <td style="text-align:left;"> </td>
    </tr>
    <tr>
        <td style="color:rgb(0,90,132);font-size:13px; text-align:left;"><b>Document Sensitivity</b></td>
        <td style="text-align:left;"> </td>
    </tr>
</table>

In [1]:
import pandas as pd 
import numpy as np
import os
import logging
import sys
import time
import json
from utils import ChecksLogger

In [2]:
logger = ChecksLogger(log_file_name='checks_log.csv')

In [17]:
# Set paths etc.
model_version = 'v5'
known_correct_schema_model_version = 'v3' # Version of the tour model which we know has the correct schema in its output files for comparing new runs to

tour_dir = r'I:\NTS\outputs\tour'
reports_dir = os.path.join(tour_dir, 'reports', model_version)
known_correct_schema_dir = os.path.join(
    tour_dir,
    'reports',
    known_correct_schema_model_version
)

mat_file = 'matrix_county_output.csv'
report_files = ['attr_county_output.csv', 
                mat_file, 
                'nts_distr_output.csv', 
                'nts_mts_output.csv', 
                'nts_tour_output.csv', 
                'prod_county_output.csv']

script_cwd = os.getcwd()

### TM011 - Check expected output files exist

Check specified output files have been produced in expected location and contain data, have been last edited within a certain time window of the initial tour model script running (which is hopefully logged somewhere?) - in other words, checking these are definitely the outputs produced by the latest run. Probably wants to be the first thing we check


In [6]:
def check_output_files(report_files, reports_dir, time_window):
    """
    Check if a file exists, has data, and has been modified within time window.
    
    Parameters:
    -----------
    report_files: list of strings
        List of expected tour model output files
    reports_dir: str (file path)
        Directory for expected tour model output files
    time_window: datetime object
        Acceptable time window to be considered output from latest run
    
    Outputs
    -----------
    None, writes outcome of checks straight to log file
    """
    checkstring = f"TM011: Check output files exist"
    logger.info(checkstring, f"Commencing check TM011:")
    for file in report_files:
        file_path = os.path.join(reports_dir, file)
        # Check if file exist
        if os.path.exists(file_path):
            # Check if file has data
            if os.path.getsize(file_path) > 0:
                # Check if file has been modified within the time window
                # TODO: Work out a minimum time and implement that.
                # Is there an existing run log this could be obtained from?
                mod_time = os.path.getmtime(file_path)
                current_time = time.time()
                if current_time - mod_time <= time_window:
                    logger.success(checkstring, f"Success - {file} exists and contains data from latest run.")
                else:
                    logger.warning(checkstring, f"Warning - {file} exists and contains data, but was not produced by latest run.")
            else:
                logger.warning(checkstring, f"Warning - {file} exists but does not contain data.")
        else:
            logger.warning(checkstring, f"Warning - {file} does not exist.")
    logger.info(checkstring, f"Check TM011 complete")
    logger.save_logs()

### TM012 - Check output schemas

List the expected column names and schemas in all output files, then check the outputs match these. Might end up wrapping some of the other checks into this depending on things like table shapes and how descriptive Python schemas are. Probably also wants to be quite an early check

In [13]:
def get_columns_datatypes(csv_file):
    """
    Get the columns and data types of a CSV file
    
    Parameters:
    -----------
    csv_file: str (file path)
        Path to CSV file to check
    
    Outputs:
    -----------
    columns_datatypes: list
        List of column datatypes for the input CSV file
    """
    df = pd.read_csv(csv_file)
    columns_datatypes = {column: str(df[column].dtype) for column in df.columns}
    return columns_datatypes

# def process_csv_files(report_files, reports_dir, script_cwd):
def find_correct_schemas(report_files, known_correct_schema_dir, script_cwd, checkstring):
    """
    Find the expected schemas of tour model ouput files
    
    Parameters:
    -----------
    report_files: list of strings
        List of expected tour model output files
    known_correct_schema_dir: str (file path)
        Directory containing tour model output files which are known to have the correct schema
    script_cwd: str (path)
        Location the script is currently running.
        JSON files will be dumped here during running
    checkstring: str
        String identifying the check that has called this function
    
    Outputs:
    -----------
    dumps_path: str (filepath)
        Path where the expected schemas are stored
    """
    result = {}
    for f in report_files:
        if f[-4:] == '.csv':
            file_path = os.path.join(known_correct_schema_dir, f)
            if os.path.exists(file_path):
                result[f] = get_columns_datatypes(file_path)
            else:
                result[f] = "File not found"
                logger.warning(
                    checkstring,
                    f"Warning - {f} not present in directory that should contain all the expected outputs with the correct schemas."
                )
                logger.warning(
                    checkstring,
                    f"Are you sure {known_correct_schema_dir} is the correct directory for this?"
                )
    
    dumps_path = os.path.join(script_cwd, 'checks_json_dump_temp')
    if not os.path.exists(dumps_path):
        os.makedirs(dumps_path)
    json_filename = os.path.join(dumps_path, 'columns_datatypes.json')
    with open(json_filename, 'w') as json_file:
        json.dump(result, json_file, indent=4)
    logger.save_logs()
    return json_filename

def compare_with_json(json_filename, checkstring, folder_path, expected_files):
    """
    Compare JSON output with new CSV files to find and report any issues with schemas
    
    Parameters:
    -----------
    json_filename: str (filepath)
        Path to find the json file which contains the expected schema inforamtion
    checkstring: str
        String identifying the check that has called this function
    folder_path: str (filepath)
        Directory containing the tour model outputs
    expected_files: list of strings
        Files that should exist in the tour model outputs
    
    Outputs:
    -----------
    column_errors: list
        List of errors resulting from mismatched columns
    datatype_errors: list
        List of cases where field datatypes are not as expected
    matching_files: list
        List of all files that match the expected schemas
    """
    column_errors = []
    datatype_errors = []
    matching_files = []
    
    if not os.path.isfile(json_filename):
        # If json containing expected schemas does not exist, try to create it
        logger.info(
            checkstring,
            f"Info - Specified location for json expected schema file {json_filename} did not exist, now trying to create it..."
        )
        json_filename = find_correct_schemas(report_files, known_correct_schema_dir, script_cwd, checkstring)
        skip2check = False
    else:
        skip2check = True
    if skip2check == False or not os.path.exists(json_filename):
        # If the json file is still missing, create warning
        logger.warning(
            checkstring,
            f"Warning - Even after trying to create {json_filename} it still does not exist!"
        )
        logger.warning(
            checkstring,
            f"Warning - {checkstring} cannot be completed!"
        )
        
    else:
        with open(json_filename, 'r') as f:
            json_data = json.load(f)

        for file_name, expected_columns in json_data.items():
            file_path = os.path.join(folder_path, file_name)
            if not os.path.exists(file_path):
                logger.warning(
                    checkstring,
                    f"Warning - {file_name}, which is expected to exist, is not found in the specified folder."
                )
            else:
                actual_columns = get_columns_datatypes(file_path)
                if expected_columns.keys() != actual_columns.keys():
                    column_errors.append(file_name)
                else:
                    if all(column in actual_columns and actual_columns[column] == datatype
                           for column, datatype in expected_columns.items()):
                        matching_files.append(file_name)
                    else:
                        for column, datatype in expected_columns.items():
                            if column not in actual_columns or actual_columns[column] != datatype:
                                datatype_errors.append((file_name, column))

    logger.save_logs()
    return column_errors, datatype_errors, matching_files

### TM005 - Intrasector trips should be highest

The maximum value for any given row/column in the output matrix is expected to be the intrasector cell. Allow some leeway here though - exception is probably Air travel as a mode, and rail in some instances. Calculate % average, flag if not inline.

In [105]:
def check_intras(reports_dir, mat_file, threshold):
    """
    Check if the intra-county cell is the highest for each
    mode, purpose, direction, period combination
    
    Parameters:
    -----------
    reports_dir: str (path)
        Directory containing the tour model output matix file
    mat_file: str (filename)
        Name of the tour model output matrix file
    threshold: float
        Proportion of the intra-county value above the
        intra-county value that the script will allows as a
        tolerance before flagging a warning
    
    Outputs:
    ----------
    None, writes warnings to log files
    """
    logger.info(checkstring, f"Commencing check TM005:")
    
    tour_mat = pd.read_csv(os.path.join(reports_dir, mat_file))
    md = tour_mat['mode'].unique()
    pr = tour_mat['purpose'].unique()
    di = tour_mat['direction'].unique()
    tp = tour_mat['period'].unique()
    og = tour_mat['tmz_o'].unique()

    n=0
    checkstring = 'TM005: Intrasector trips should be highest'

    for m in md:
        for p in pr:
            for dr in di:
                for t in tp:
                    tour_mat_filtered = tour_mat[
                        (tour_mat['mode'] == m) &
                        (tour_mat['purpose'] == p) &
                        (tour_mat['direction'] == dr) &
                        (tour_mat['period'] == t)
                    ]
                    tour_mat_filtered = tour_mat_filtered.drop(
                        columns=[
                            'mode',
                            'purpose',
                            'direction',
                            'period'
                        ])
                    for o in og:
                        # Only working 1 dimensionally to avoid duplicate warning in some cases
                        # I.e. not repeating this in repacing all o's with d's and vice versa
                        # Already generates many warnings!
                        tour_mat_o = tour_mat_filtered[
                            (tour_mat_filtered['tmz_o'] == o)
                        ]
                        tour_mat_intra = tour_mat_o[
                            (tour_mat_o['tmz_d'] == o)
                        ].trips.sum()

                        # Filter out cases where the mode is not long distance (i.e. air and rail) and 
                        # there is a small matrix total, then warn where the intrasectors are 0.
                        if (m != 'Air') & (m != 'Rail') & (tour_mat_o.trips.sum() > 100) & (tour_mat_intra <= 0):
                            logger.warning(
                                checkstring,
                                f'Warning - No {p} {m} {dr} intras in county: {o} in time period {t}'
                            )
                        elif(tour_mat_intra > 0):
                            tour_mat_max = tour_mat_o.trips.max()
                            if tour_mat_intra < tour_mat_max: 
                                for d in og:
                                    d_trips = tour_mat_o[(tour_mat_o['tmz_d'] == d)].trips.sum()
                                    prop = d_trips/tour_mat_intra
                                    if prop > (1 + threshold):
                                        logger.warning(
                                            checkstring,
                                            f"Warning - for {p} {m} {dr} trips in time period {t}, {o} {d} trips are {prop*100:.2f}% of the intra-county trips in {o}"
                                        )

                        if n % 10000 == 0:
                            print(n) # Show progress is occuring whilst running
                        n = n + 1
    logger.info(checkstring, f"Check TM005 complete")
    logger.save_logs()

In [104]:
# CELL TO CALL ALL OF THE ABOVE CHECKS

# Check TM011 - Check expected output files exist
time_window = time.time() # Confirm time frame
time_window
check_output_files(report_files, reports_dir, time_window)
print('Check TM011 complete and logged') # For info when running

# Check TM012 - Check output schemas
tm012 = 'TM012: Check output schemas'
logger.info(tm012, f"Commencing check TM012:")
json_file = find_correct_schemas(report_files, known_correct_schema_dir, script_cwd, tm012)
column_errors, datatype_errors, matching_files = compare_with_json(json_file, tm012, reports_dir, report_files)
    
if column_errors:
    for i in column_errors:
        logger.warning(
            tm012,
            f"Warning - Column errors found in {i}"
        )

if datatype_errors:
    for file_name, column in datatype_errors:
        logger.warning(
            tm012,
            f"Warning - Data type errors found in {file_name}: {column}"
        )

if matching_files:
    for i in matching_files:
        logger.success(
            tm012,
            f"Sucsess - {i} matches the expected columns and data types"
        )

# if not column_errors and not datatype_errors and not matching_files:
#     for i in report_files:
#         print(i, " file matches the expected columns and data types.")
logger.info(tm012, f"Check TM012 complete")
logger.save_logs()
print('Check TM012 complete and logged') # For info when running

# Check TM005 - Intrasector trips should be highest
print('Check TM005 commencing. This can take a while to run') # For info when running
check_intras(reports_dir, mat_file, 0.1)
print('Check TM005 complete and logged') # For info when running

Check TM011 complete and logged
Check TM012 complete and logged
Check TM005 commencing. This can take a while to run
0
10000
20000
30000
40000
50000
Check TM005 complete and logged


### TM001 - Check tour model outputs non-zero

Check that the total in the matrix output file is > 0
Also check each row and column in the output matrix sums to a > 0 value (will cover some of the checks to ensure everything is there too - rest of this check covered by TM010)

In [11]:
dest_col_sums = pivot_table.sum(axis=0)
origin_col_sums = pivot_table.sum(axis=1)
    
try:
    if (dest_col_sums <= 0).any():
        probl_col = pivot_table.columns[dest_col_sums <= 0]
        raise ValueError(f'No value in destination column: {probl_col}')
        
    if (origin_col_sums <= 0).any():
        probl_row = pivot_table.index[origin_col_sums <= 0]
        raise ValueError(f'No value in origin row: {probl_row}')
        
    print('All columns and rows sum to greater than 0')

except ValueError as e:
    print(f'Error: {e}, Column sum: {dest_col_sums}, Row sum: {origin_col_sums}') 

All columns and rows sum to greater than 0


### TM002 - Check tour model outputs not negative

Check that no cell in the matrix output is < 0

In [12]:
# Function to check for negative values and raise an error with details
def check_negative_values(pivot_table):
    errors = []
    for r in pivot_table.index:
        for c in pivot_table.columns:
            if pivot_table.at[r, c] < 0:
                origin_id, origin_name = r
                destination_id, destination_name = c
                errors.append(f"Negative value found at Origin: {origin_name} (ID: {origin_id}), Destination: {destination_name} (ID: {destination_id})")

    if errors:
        raise ValueError("Errors found:\n" + "\n".join(errors))

# Run the function to check for negative values
try:
    check_negative_values(pivot_table)
    print('No negative value in model output')
except ValueError as e:
    print(e)


No negative value in model output


### TM003 - Check for NULLs/NaNs in tour model outputs

Check for NULLs/NaNs in tour model outputs

In [13]:
def check_null_values(pivot_table):
    errors = []
    for r in pivot_table.index:
        for c in pivot_table.columns:
            if pd.isnull(pivot_table.at[r, c]):
                origin_id, origin_name = r
                destination_id, destination_name = c
                errors.append(f"Null value found at Origin: {origin_name} (ID: {origin_id}), Destination: {destination_name} (ID: {destination_id})")

    if errors:
        raise ValueError("Errors found:\n" + "\n".join(errors))

# Run the function to check for null values
try:
    check_null_values(pivot_table)
    print('No null value in model output')
except ValueError as e:
    print(e)

No null value in model output


In [14]:
def check_null_values(pivot_table):
    errors = []
    for r in pivot_table.index:
        for c in pivot_table.columns:
            if pd.isna(pivot_table.at[r, c]):
                origin_id, origin_name = r
                destination_id, destination_name = c
                errors.append(f"NaN value found at Origin: {origin_name} (ID: {origin_id}), Destination: {destination_name} (ID: {destination_id})")

    if errors:
        raise ValueError("Errors found:\n" + "\n".join(errors))

# Run the function to check for NaN values
try:
    check_null_values(pivot_table)
    print('No NaN value in model output')
except ValueError as e:
    print(e)

No NaN value in model output


### TM010 - Check zones/sectors in output

Check list of zones/sectors in output against a list of zones we expect to see - will account for any zones/sectors that have been dropped entirely rather than just 0-ed (as tested for by TM001)