## Quality Control Script for ArcGIS Online Points Layer
by Tara Wu, Spring 2025

<div class="alert alert-block alert-info">

<b>Purpose:</b>  This script performs a series of quality checks for the Legacy Restoration Fund project in New England. As damage points along the Appalachian Trail are being collected, the resulting dataset will be checked for the following: 
<ul>
    <li>nulls in specific fields invalid dates orphaned related records</li>
    <li>missing related records </li>
    <li> duplicates due to sync error,</li> 
    <li>inputs not in domains, repetitive attributes per user </li>
    <li>offline data has been synced</li>
    <li>proximity to trails/features</li> 
    <li>matching collector and region</li>
</ul>
</div>

In [None]:
# === PARAMETERS ===

# === import modules ===
import datetime
import importlib
import logging
import os
import sys
import traceback

import geopandas as gpd
import numpy as np
import pandas as pd
from arcgis.features import FeatureLayer
from arcgis.gis import GIS
from shapely.geometry import shape
from shapely.ops import unary_union

# check for required packages
required_packages = ["pandas", "geopandas", "numpy", "arcgis", "shapely", "xlsxwriter"]

for pkg in required_packages:
    if importlib.util.find_spec(pkg) is None:
        print(f"Missing package '{pkg}'. Install via: pip install {pkg}")
        sys.exit(1)


# === output setup ===
# base repo directory
BASE_DIR = os.path.dirname(os.path.abspath(__file__))

# output folder and files
OUTPUT_FOLDER = os.path.join(BASE_DIR, "outputs")  # outputs folder inside repo
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

# current date for outputs
CURRENT_DATE = datetime.datetime.today().strftime("%Y-%m-%d")

# output files
FILE_PATH = os.path.join(OUTPUT_FOLDER, f"{CURRENT_DATE}_QC_summary.txt")
OUTPUT_XLSX = os.path.join(OUTPUT_FOLDER, f"{CURRENT_DATE}_all_errors.xlsx")


# === connect to AGOL services ===
# choose login method: "pro", "home", or "credentials"
GIS_LOGIN_METHOD = "pro"  # change to "home" if running in AGOL Notebooks

# if using credentials (not recommended for shared scripts), fill in here:
GIS_USERNAME = None
GIS_PASSWORD = None

# connect to AGOL
try:
    if GIS_LOGIN_METHOD == "pro":
        gis = GIS("pro")
    elif GIS_LOGIN_METHOD == "home":
        gis = GIS("home")
    elif GIS_LOGIN_METHOD == "credentials":
        gis = GIS(GIS_USERNAME, GIS_PASSWORD)
    else:
        raise ValueError(
            "Invalid GIS_LOGIN_METHOD. Use 'pro', 'home', or 'credentials'."
        )

except Exception as e:
    logging.info("X   Login error")
    logging.info(f"    {type(e).__name__}: {e}\n")
else:
    if gis.users.me:
        logging.info(f"    Login successful as {gis.users.me.username}\n")
    else:
        logging.info("X   Not logged in. Please sign in.\n")

# base URLs for AGOL feature services
BASE_URL = "https://services1.arcgis.com/fBc8EJBxQRMcHlei/arcgis/rest/services/APPA_LRF_ProjectsV2/FeatureServer"
FACILITIES_URL = "https://services1.arcgis.com/fBc8EJBxQRMcHlei/arcgis/rest/services/ANST_Facilities/FeatureServer"

# feature service URLs
FC_URL = f"{BASE_URL}/0"  # LRF Tread Deficiency
RT_URL = f"{BASE_URL}/1"  # Related Table
TREAD_URL = f"{FACILITIES_URL}/7"
SIDETRAIL_URL = f"{FACILITIES_URL}/6"
POINT_FEATURE_URLS = [
    f"{FACILITIES_URL}/0",  # bridges
    f"{FACILITIES_URL}/1",  # campsites
    f"{FACILITIES_URL}/2",  # parking
    f"{FACILITIES_URL}/3",  # privies
    f"{FACILITIES_URL}/4",  # shelters
    f"{FACILITIES_URL}/5",  # vistas
]


# === QC parameters===
BUFFER_FT = 100
THRESHOLD = 0.9
PROJECT_START_YEAR = 2025
PROJECT_START_MONTH = 5
PROJECT_START_DAY = 5

# fields that shouldn't be null
FC_REQUIRED_FIELDS = ["GlobalID", "created_user", "created_date", "SHAPE"]
RT_REQUIRED_FIELDS = ["defGlobalID", "Feature", "Feature_Action"]

# fields that suggest sync errors if containing duplicated data
FC_SYNC_ERROR_FIELDS = ["created_date", "created_user"]
RT_SYNC_ERROR_FIELDS = ["CreationDate", "Creator"]

# domain dictionaries
FC_DOMAIN_DICTIONARY = {
    "State": ["MA", "ME", "CT", "NH", "VT"],
    "Club": ["AMC", "AMC-CT", "AMC-WMA", "DOC", "GMC", "MATC", "RMC"],
    "Evaluation_Code": ["Low", "Moderate", "High", None],
    "OnsiteMaterials": ["Yes", "No", "Maybe", None],
    "ConsiderRelocation": ["Yes", "No", None],
}

RT_DOMAIN_DICTIONARY = {
    "Feature": [...],  # keep as in your script
    "FeatureAction": ["Build Add", "Repair Replace", "Remove", None],
    "Units": ["Each", "LinearFeet", "SquareFeet", None],
    "onsitematerials": ["Yes", "No", "Maybe", None],
}

# fields to check for repetitive input
FC_REP_ERROR_FIELDS = ["Evaluation_Code", "Deficiency_Length"]
RT_REP_ERROR_FIELDS = ["Feature"]

# collector-region dictionary
COLLECTOR_DICT = {
    "twu_ATConservancy": ["MATC"],
    "userB": ["RMC", "AMC", "DOC"],
    "userC": ["GMC"],
    "userD": ["AMC-WMA"],
    "userE": ["AMC-CT"],
}


# === output parameters ===

WRITE_LOG = True  # set to False if you only want console output

# column order for outputs
FC_ERROR_ORDER = [
    "error_type",
    "error_desc",
    "OBJECTID",
    "GlobalID",
    "created_user",
    "created_date",
    "Deficiency_Length",
    "Evaluation_Code",
    "ConsiderRelocation",
    "Notes",
    "OnsiteMaterials",
    "RelativeLinearLocation",
    "SHAPE",
    "MileMarker",
    "State",
    "LandOwner",
    "OwnershipType",
    "Club",
]

RT_ERROR_ORDER = [
    "error_type",
    "error_desc",
    "OBJECTID",
    "GlobalID",
    "defGlobalID",
    "CreationDate",
    "Creator",
    "Feature",
    "Feature_Action",
    "Quantity",
    "Units",
    "onsitematerials",
]


# === CONNECT TO RESOURCES, LOAD SURVEY DATA ===

# log start time to measure elapsed time for full code
start_time = datetime.datetime.now()

# configure console and txt file output
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

handlers = [logging.StreamHandler(sys.stdout)]
if WRITE_LOG:
    handlers.append(logging.FileHandler(FILE_PATH, mode="w", encoding="utf-8"))

logging.basicConfig(
    level=logging.INFO,
    format="%(message)s",
    handlers=handlers,
)


# connect to feature service
# === CONNECT TO FEATURE SERVICES ===
try:
    fc_layer = FeatureLayer(FC_URL)
    rt_layer = FeatureLayer(RT_URL)
    tread_layer = FeatureLayer(TREAD_URL)
    sidetrail_layer = FeatureLayer(SIDETRAIL_URL)
    feature_layers = [FeatureLayer(url) for url in POINT_FEATURE_URLS]

except Exception as e:
    logging.info("X   Error loading feature service URLs\n")
    logging.info(f"    {type(e).__name__}: {e}\n")
else:
    logging.info(
        "    URLs loaded for survey point data, related table, tread, side trail, and point features\n"
    )


# load survey and line data into sdf
try:
    fc_features = fc_layer.query(where="1=1", out_fields="*", return_geometry=True).sdf
    rt_features = rt_layer.query(where="1=1", out_fields="*").sdf

    tread_features = tread_layer.query(
        where="1=1", out_fields="*", return_geometry=True
    ).sdf
    sidetrail_features = sidetrail_layer.query(
        where="1=1", out_fields="*", return_geometry=True
    ).sdf
except:
    logging.info("X   Error converting to spatially-enabled data frames\n")
else:
    logging.info(
        "    Converted survey point data, related table, tread, and side trail to spatially-enabled data frames\n"
    )


# === QUALITY CHECKS ===

fc_error_rows = []
rt_error_rows = []
summary_rows = []


# === null checks ===
# function to check nulls, collect rows for error report
def check_nulls(df, df_name, fields, error_list):
    any_nulls = False
    for field in fields:
        if field in df.columns:
            nulls = df[df[field].isnull()].copy()
            if not nulls.empty:
                any_nulls = True
                nulls.loc[:, "error_type"] = "nulls"
                nulls.loc[:, "error_desc"] = f"NULL in {df_name} field: {field}"
                error_list.append(nulls)
                message = f"{df_name}.{field}: {len(nulls)} nulls"
                logging.info("        " + message)
                summary_rows.append(message)
        else:
            logging.info(f"        {field} not in list of fields for {df_name}")

    if not any_nulls:
        message = f"0 null errors found in {df_name}"
        logging.info("        " + message)
        summary_rows.append(message)


# run null checks
try:
    summary_rows.append("Null errors:")
    check_nulls(fc_features, "fc", FC_REQUIRED_FIELDS, fc_error_rows)
    check_nulls(rt_features, "rt", RT_REQUIRED_FIELDS, rt_error_rows)
    summary_rows.append("")  # blank line between error checks
except Exception as e:
    # Print the type of error and the error message
    logging.info("X   Error running null check")
    logging.info(f"    Error type: {type(e).__name__}")
    logging.info(f"    Error message: {e}\n")
else:
    logging.info("    Null check completed\n")


# === date check ===
# check for dates in the future or prior to the beginning of the project
try:
    project_start_date = datetime.datetime(
        PROJECT_START_YEAR, PROJECT_START_MONTH, PROJECT_START_DAY
    )
    today = datetime.datetime.today()

    fc_features["created_date"] = pd.to_datetime(
        fc_features["created_date"], errors="coerce"
    )
    invalid_dates = fc_features[
        (fc_features["created_date"] > today)
        | (fc_features["created_date"] < project_start_date)
    ].copy()

    if not invalid_dates.empty:
        invalid_dates.loc[:, "error_type"] = "dates"
        invalid_dates.loc[:, "error_desc"] = "Invalid observation date"
        fc_error_rows.append(invalid_dates)
    message = f"{len(invalid_dates)} records with invalid dates"
    logging.info("        " + message)
    summary_rows.append("Date errors:")
    summary_rows.append(message)
    summary_rows.append("")  # blank line between error checks
except Exception as e:
    # Print the type of error and the error message
    logging.info("X   Error running date check")
    logging.info(f"    Error type: {type(e).__name__}")
    logging.info(f"    Error message: {e}\n")
    logging.info(traceback.format_exc())
else:
    logging.info("    Date check completed\n")


# === orphaned related records check ===
# check for records in table that aren't related to a fc point
try:
    valid_ids = set(fc_features["GlobalID"])
    orphaned = rt_features[~rt_features["defGlobalID"].isin(valid_ids)].copy()
    orphaned.loc[:, "error_type"] = "orphaned"
    orphaned.loc[:, "error_desc"] = "Prescription record has no related tread feature"
    rt_error_rows.append(orphaned)
    message = f"{len(orphaned)} orphaned prescription records"
    logging.info("        " + message)
    summary_rows.append("Orphaned record errors:")
    summary_rows.append(message)
    summary_rows.append("")  # blank line between error checks
except Exception as e:
    # Print the type of error and the error message
    logging.info("X   Error running orphaned records check")
    logging.info(f"    Error type: {type(e).__name__}")
    logging.info(f"    Error message: {e}\n")
else:
    logging.info("    Orphaned related records check completed\n")


# === missing related records check ===
# check for fc points that aren't related to any records in the related table
try:
    related_counts = rt_features["defGlobalID"].value_counts()
    fc_features["related_count"] = (
        fc_features["GlobalID"].map(related_counts).fillna(0).astype(int)
    )
    missing_related = fc_features[fc_features["related_count"] == 0].copy()
    missing_related.loc[:, "error_type"] = "missing related"
    missing_related.loc[:, "error_desc"] = (
        "Tread feature has no related prescription records"
    )
    fc_error_rows.append(missing_related)
    message = f"{len(missing_related)} features with no related prescriptions"
    logging.info("        " + message)
    summary_rows.append("Related record errors:")
    summary_rows.append(message)
    summary_rows.append("")  # blank line between error checks
except Exception as e:
    # Print the type of error and the error message
    logging.info("X   Error running related records check")
    logging.info(f"    Error type: {type(e).__name__}")
    logging.info(f"    Error message: {e}\n")
else:
    logging.info("    Missing related records check completed\n")


# === sync error checks ===
# check for sync errors via duplicated date/time and user fields, collect rows for error report
def check_sync_errors(df, df_name, fields, error_list):
    any_sync_errors = False
    dup_keys = df.groupby(fields).size().reset_index(name="count")
    dup_keys = dup_keys[dup_keys["count"] > 1]

    if not dup_keys.empty:
        any_sync_errors = True
        # Merge back to get full duplicate records
        sync_errors = df.merge(dup_keys[fields], on=fields, how="inner")
        sync_errors.loc[:, "error_type"] = "sync"
        sync_errors.loc[:, "error_desc"] = f"Potential sync error in {df_name}"
        error_list.append(sync_errors)
        message = f"{len(sync_errors)} potential sync errors in {df_name}"
        logging.info("        " + message)
        summary_rows.append(message)
    else:
        logging.info(f"        0 potential sync errors found in {df_name}")

    if not any_sync_errors:
        summary_rows.append(f"0 potential sync errors found in {df_name}")


# run sync error check
try:
    summary_rows.append("Potential sync errors:")
    check_sync_errors(fc_features, "fc", FC_SYNC_ERROR_FIELDS, fc_error_rows)
    check_sync_errors(rt_features, "rt", RT_SYNC_ERROR_FIELDS, rt_error_rows)
    summary_rows.append("")  # blank line between error checks
except Exception as e:
    # Print the type of error and the error message
    logging.info("X   Error running sync errors check")
    logging.info(f"    Error type: {type(e).__name__}")
    logging.info(f"    Error message: {e}\n")
else:
    logging.info("    Sync error checks completed\n")


# === domains check ===
# function to check domains, collect rows for error report
def check_domains(df, df_name, domain_dict, error_list):
    any_domain_errors = False
    for d in domain_dict:
        if d in df.columns:
            valid_values = domain_dict[d]
            # Identify invalid rows (not in valid list and not null)
            invalid_domain = df[~(df[d].isin(valid_values) | df[d].isnull())].copy()
            if not invalid_domain.empty:
                any_domain_errors = True
                # Get unique invalid values for this column
                unique_invalids = invalid_domain[d].dropna().unique()
                invalid_domain.loc[:, "error_type"] = "domains"
                invalid_domain.loc[:, "error_desc"] = invalid_domain[d].apply(
                    lambda x: f"'{x}' not in domains for {d}"
                )
                error_list.append(invalid_domain)
                message = f"Invalid entries for field '{d}' in {df_name}: {list(unique_invalids)}"
                logging.info("        " + message)
                summary_rows.append(message)

    if not any_domain_errors:
        message = f"0 domain errors found in {df_name}"
        logging.info("        " + message)
        summary_rows.append(message)


# run domain check
try:
    summary_rows.append("Domain errors:")
    check_domains(fc_features, "fc", FC_DOMAIN_DICTIONARY, fc_error_rows)
    check_domains(rt_features, "rt", RT_DOMAIN_DICTIONARY, rt_error_rows)
    summary_rows.append("")  # blank line between error checks
except Exception as e:
    # Print the type of error and the error message
    logging.info("X   Error running domain check")
    logging.info(f"    Error type: {type(e).__name__}")
    logging.info(f"    Error message: {e}\n")
else:
    logging.info("    Domain checks completed\n")


# === repetitive attribute check ===
# NOTE there is potential for this to flag false positives, e.g. if there just
# happens to be a predominance of one feature type.  This will also
# flag a false positive if a user only logs one point


def check_repetitive_values(df, df_name, field_list, error_list, THRESHOLD):
    # currently, there are different column names for fc and rt
    if "created_user" in df.columns:
        user_field = "created_user"
    elif "Creator" in df.columns:
        user_field = "Creator"
    else:
        logging.info("no username field found")
        return

    # group by user
    grouped = df.groupby(user_field)
    rep_error_count = 0

    # loop through each user's records
    for user, group in grouped:
        # check values in fields identified as having potential repetition
        for field in field_list:
            if field in group.columns:
                # calculate frequency of value in field as proportion of user's total (how many times a particular user added x in a field / total number of records they logged)
                value_counts = group[field].value_counts(normalize=True)

                # filter for values with proportion above specified threshold
                dominant_values = value_counts[value_counts > THRESHOLD]

                # filter user's records for dominant values and append to error list
                for val in dominant_values.index:
                    rep_errors = group[group[field] == val].copy()
                    rep_errors.loc[:, "error_type"] = "repetitive"
                    rep_errors.loc[:, "error_desc"] = (
                        f"{user} repeated '{val}' in {df_name}.{field} over {int(THRESHOLD*100)}% of the time"
                    )
                    error_list.append(rep_errors)
                    rep_error_count += len(rep_errors)

    if rep_error_count > 0:
        message = f"{rep_error_count} potentially repetitive inputs found in {df_name}"
        logging.info("        " + message)
        summary_rows.append(message)
    else:
        message = f"0 potentially repetitive inputs found in {df_name}"
        logging.info("        " + message)
        summary_rows.append(message)


# run repetitive value check
try:
    summary_rows.append("Repetitive value errors:")
    check_repetitive_values(
        fc_features, "fc", FC_REP_ERROR_FIELDS, fc_error_rows, THRESHOLD
    )
    check_repetitive_values(
        rt_features, "rt", RT_REP_ERROR_FIELDS, rt_error_rows, THRESHOLD
    )
    summary_rows.append("")  # blank line between error checks
except Exception as e:
    # Print the type of error and the error message
    logging.info("X   Error running repetitive values check")
    logging.info(f"    Error type: {type(e).__name__}")
    logging.info(f"    Error message: {e}\n")
else:
    logging.info("    Repetitive values error checks completed\n")


# === offline data sync check ===
# NOTE this functionality is contingent on getting user logs from field staff.
# then compare user log dates to AGOL dates


# === proximity checks ===
try:
    # any_proximity_errors = False

    # there seems to be an entry in fc_features with null SHAPE geometry.
    # not sure how that happened, but this omits fc entries with null geometry
    fc_features = fc_features[fc_features["SHAPE"].notnull()].copy()

    # convert  SHAPE column to shapely geometry
    fc_features["geometry"] = fc_features["SHAPE"].apply(shape)
    tread_features["geometry"] = tread_features["SHAPE"].apply(shape)
    sidetrail_features["geometry"] = sidetrail_features["SHAPE"].apply(shape)

    # convert points, AT treadway, side trails, and point features to GeoDataFrame
    fc_gdf = gpd.GeoDataFrame(fc_features, geometry="geometry", crs=3857)
    tread_gdf = gpd.GeoDataFrame(tread_features, geometry="geometry", crs=4269)
    sidetrail_gdf = gpd.GeoDataFrame(sidetrail_features, geometry="geometry", crs=4269)

    # reproject line features to match point CRS
    tread_projected = tread_gdf.to_crs(epsg=3857)
    sidetrail_projected = sidetrail_gdf.to_crs(epsg=3857)

    # process point features to gdf with appropriate CRS
    pointfeature_gdfs = []
    for fl in feature_layers:  # reuse the ones you already made earlier
        sdf = fl.query(where="1=1", out_fields="*", return_geometry=True).sdf
        sdf["geometry"] = sdf["SHAPE"].apply(shape)
        gdf = gpd.GeoDataFrame(sdf, geometry="geometry", crs=4269).to_crs(epsg=3857)
        pointfeature_gdfs.append(gdf)

    # perform unary union on line features (tread and side trails)
    all_lines_gdf = pd.concat([tread_projected, sidetrail_projected], ignore_index=True)

    # perform unary union on point features
    all_points_gdf = pd.concat(pointfeature_gdfs, ignore_index=True)

    # change buffer distance to m to match CRS
    buffer_dist = BUFFER_FT * 0.3048

    # create buffer from all features (line and point)
    trail_buffer = all_lines_gdf.geometry.buffer(
        buffer_dist
    )  # THIS LINE TAKES A FEW MINUTES WHEN RUN INDEPENDENTLY
    point_buffer = all_points_gdf.geometry.buffer(buffer_dist)

    # combine buffers (unary union on each, and then union between both)
    combined_buffer = trail_buffer.unary_union.union(point_buffer.unary_union)

    # identify points outside the buffer
    outside_points = fc_gdf[~fc_gdf.geometry.within(combined_buffer)].copy()

    # log errors
    summary_rows.append("Proximity errors:")
    if not outside_points.empty:
        any_proximity_errors = True
        outside_points.loc[:, "error_type"] = "proximity"
        outside_points.loc[:, "error_desc"] = "Point located beyond buffer zone"
        fc_error_rows.append(outside_points)
        message = f"{len(outside_points)} points found beyond buffer zone"
        logging.info("        " + message)
        summary_rows.append(message)
        summary_rows.append("")  # blank line between error checks
    else:
        message = f"0 points found beyond buffer zone"
        logging.info("        " + message)
        summary_rows.append(message)
        summary_rows.append("")  # blank line between error checks

except Exception as e:
    # Print the type of error and the error message
    logging.info("X   Error finding survey points beyond buffer zone")
    logging.info(f"    Error type: {type(e).__name__}")
    logging.info(f"    Error message: {e}\n")
else:
    logging.info("    Proximity checks completed\n")

"""

# === collector-region check ===
# function to compare attributes
def match_club(row):
    allowed_clubs = COLLECTOR_DICT.get(row['created_user'], [])
    return row['Acronym'] in allowed_clubs

# run collector-region check
try:
    # Nearest spatial join: attach nearest Acronym from tread_projected to each point
    fc_with_club = gpd.sjoin_nearest(
        fc_gdf,
        tread_projected[['Acronym', 'geometry']],
        how='left',
        distance_col='distance_to_trail'
    )

    club_mismatch = fc_with_club[~fc_with_club.apply(match_club, axis=1)].copy()

    # log errors
    summary_rows.append("Collector-region errors:")

    if not club_mismatch.empty:
        club_mismatch["error_type"] = "collector-region"
        club_mismatch["error_desc"] = "Collector not assigned to this trail region"
        fc_error_rows.append(club_mismatch)
        message = f"{len(club_mismatch)} features with mismatched collector-region assignment"
        logging.info("        " + message)
        summary_rows.append(message)
        summary_rows.append("")     # blank line between error checks
    else:
        message = f"0 features with mismatched collector-region assignment"
        logging.info("        " + message)
        summary_rows.append(message)
        summary_rows.append("")     # blank line between error checks
except Exception as e:
    # Print the type of error and the error message
    logging.info("X   Error finding collector-region mismatches")
    logging.info(f"    Error type: {type(e).__name__}")
    logging.info(f"    Error message: {e}\n")
else:
    logging.info("    Collector-region check completed\n")

"""


# === FINAL EXPORTS ===
# === create master XLSX of all issues ===
try:
    source = ["summary", "fc", "rt"]
    error_list = [summary_rows, fc_error_rows, rt_error_rows]
    error_list_order = [None, FC_ERROR_ORDER, RT_ERROR_ORDER]

    # GlobalID and defGlobalID
    cols_to_cap = ["GlobalID", "defGlobalID"]

    output_name = f"{CURRENT_DATE}_all_errors.xlsx"
    output_path = os.path.join(OUTPUT_FOLDER, output_name)

    with pd.ExcelWriter(output_path, engine="xlsxwriter") as writer:

        for source_name, error_rows, order in zip(source, error_list, error_list_order):
            if error_rows:
                if source_name == "summary":
                    all_errors = pd.DataFrame(error_rows, columns=["Summary"])
                else:
                    all_errors = pd.concat(error_rows, ignore_index=True)
                    # sort columns for output
                    all_errors = all_errors[order]
                    # capitalize all letters in GlobalID and defGlobalID columns
                    for col in cols_to_cap:
                        if col in all_errors.columns:
                            all_errors[col] = all_errors[col].astype(str).str.upper()

                all_errors.to_excel(writer, sheet_name=source_name, index=False)
                if source_name == "summary":
                    logging.info(
                        f"    Exported error summary to sheet '{source_name}' in {output_path}\n"
                    )
                else:
                    logging.info(
                        f"    Exported {len(all_errors)} errors to sheet '{source_name}' in {output_path}\n"
                    )
            else:
                logging.info(f"    No errors found in {source_name}!\n")
except Exception as e:
    logging.info("X   Error creating XLSX of errors\n")
    logging.info(f"    Error type: {type(e).__name__}")
    logging.info(f"    Error message: {e}\n")


# confirmation + log export message
if WRITE_LOG:
    logging.info(f"Exported output summary to {FILE_PATH}")
    logging.info("All checks completed. Console outputs also saved to log file.")
else:
    logging.info("All checks completed. (No log file written, console only)")

# calculate elapsed time
end_time = datetime.datetime.now()
elapsed_time_sec = (end_time - start_time).total_seconds()

hours = int(elapsed_time_sec // 3600)
minutes = int((elapsed_time_sec % 3600) // 60)
seconds = int(elapsed_time_sec % 60)

logging.info(f"Elapsed Time: {hours:02d}:{minutes:02d}:{seconds:02d}")