Description :
### **Part 2 of the EM_Digit Processing Project: Automated Validation, Correction, and Data Integrity Checks**  

Part 2 of the **EM_Digit Processing Project** is dedicated to **ensuring the accuracy, consistency, and reliability of location data** through a series of automated validation and correction processes. This phase addresses potential errors in geographic coordinates, distance calculations, and state assignments, leveraging **geospatial computation techniques and automated heuristics** to detect and resolve inconsistencies efficiently. The process begins with **two key distance validation checks**: `dist_test1`, which flags locations where the revised distance from the previous known location exceeds **30 km**, and `dist_test2`, which identifies locations where the actual recorded coordinates deviate by more than **30 km from the approximated coordinates**. These tests help detect data anomalies such as misplaced locations or miscalculated distances.

 Additionally, the **coordinate trend test (`coords_test`)** is applied to identify instances where the direction of movement **unexpectedly reverses**, ensuring logical continuity in the travel path. **State validation tests (`state_test` and `state_test2`)** further enhance data accuracy by checking for discrepancies in state assignments—`state_test` flags locations where the **state differs from both the previous and next locations**, while `state_test2` identifies entries where the **state does not match any other instance within the same route description**, helping maintain **geopolitical consistency** in the dataset. To further automate corrections, the script conducts **route boundary adjustments**, where missing latitude and longitude values are **filled using the nearest valid entry within the same route**, ensuring a **continuous geographic flow**. If a **bearing (direction of travel) is missing or incorrect**, it is **automatically recalculated using GeographicLib’s Geodesic function**, preventing erroneous trajectory assumptions. A key component of this phase is the **alternative location matching process**, which leverages **geodesic distance calculations** to find the **nearest possible match from the gazetteer dataset (`gaz_df`)**, providing an alternative geographic reference for locations with incomplete or incorrect data. Throughout this stage, all corrections are **logged in `bounds_check_log.csv`**, ensuring **transparency and traceability** of modifications.


 Finally, an **`Automated_Flag`** column is added to mark any row flagged by these validation tests, and the **cleaned, validated dataset** is exported as `full_test_df.csv` for further review and analysis. By the end of Part 2, the dataset has undergone **a rigorous automated verification process**, significantly reducing the need for manual intervention while ensuring that only **high-quality, geospatially consistent data** proceeds to the next stage. This phase **sets the foundation for further integration, visualization, and mapping**, making the dataset **ready for advanced geographic and statistical analyses** in the subsequent stages of the project. 🚀

Setup and Prelims

Do not forget to type in the itinerary_name accordingly

In [1]:
#Packages
import os
import csv as csv
import glob
import pandas as pd
import numpy as np
import re
import string
import requests
import math
#! pip install geographiclib
from geographiclib.geodesic import Geodesic
import pandas as pd

In [1]:
#File import/export currently written for mounted Drive
from google.colab import drive
drive.mount('/content/drive', force_remount=False)

In [None]:
import pandas as pd

# Corrected directory path
df = pd.read_csv('/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/clean_df.csv')

# Ensure missing values are handled correctly
df.fillna('na', inplace=True)

# Rename column if needed
if "region_type" in df.columns:
    df.rename(columns={"region_type": "line_type"}, inplace=True)

# Display the first few rows to verify correctness
df.head()


Establishing route boundaries using bounds_df


In [None]:
import pandas as pd
import numpy as np
from geographiclib.geodesic import Geodesic

def get_route_boundaries(df):
    """Extracts route boundaries (first and last location) and computes bearing for each route."""

    # Filter rows where 'line_type' is 'location' and 'geonameId' is not NaN
    filtered_df = df[df['line_type'] == 'location'].dropna(subset=['geonameId'])

    if filtered_df.empty:
        print("Warning: No valid locations found in the dataset.")
        return pd.DataFrame(columns=['route_description', 'first_lat', 'last_lat', 'first_lng', 'last_lng', 'bearing'])

    # Group by 'route_description' and extract first/last coordinates
    route_boundaries = (
        filtered_df.groupby('route_description')
        .agg(
            first_lat=('Location_Lat', 'first'),
            last_lat=('Location_Lat', 'last'),
            first_lng=('Location_Lng', 'first'),
            last_lng=('Location_Lng', 'last')
        )
        .reset_index()
    )

    # Compute bearing using Geodesic calculations
    geod = Geodesic.WGS84
    def compute_bearing(row):
        if pd.isna(row['first_lat']) or pd.isna(row['first_lng']) or pd.isna(row['last_lat']) or pd.isna(row['last_lng']):
            return np.nan
        return geod.Inverse(row['first_lat'], row['first_lng'], row['last_lat'], row['last_lng'])['azi1']

    # Apply bearing calculation
    route_boundaries['bearing'] = route_boundaries.apply(compute_bearing, axis=1)

    return route_boundaries

# Generate route boundaries
bounds_df = get_route_boundaries(df)

# Export to CSV with correct file path
bounds_df.to_csv('/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/bounds_df.csv', index=False)

# Display DataFrame
bounds_df.head()


Automated error checking for bounds - Under review - tends to be fool proof most of the time


In [None]:
import pandas as pd
import numpy as np
from geographiclib.geodesic import Geodesic

# Load the existing bounds file
bounds_df = pd.read_csv('/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/bounds_df.csv')

# Initialize a log to track corrections
correction_log = []

# Geodesic calculator
geod = Geodesic.WGS84

# Function to recompute bearing
def compute_bearing(row):
    """Compute bearing if coordinates are valid."""
    if pd.isna(row['first_lat']) or pd.isna(row['first_lng']) or pd.isna(row['last_lat']) or pd.isna(row['last_lng']):
        return np.nan
    return geod.Inverse(row['first_lat'], row['first_lng'], row['last_lat'], row['last_lng'])['azi1']

# Iterate over the dataset for corrections
for index, row in bounds_df.iterrows():
    corrected = False

    # If first_lat or first_lng is missing, try using next valid row within the same route
    if pd.isna(row['first_lat']) or pd.isna(row['first_lng']):
        next_valid = bounds_df.loc[(bounds_df['route_description'] == row['route_description']) &
                                   bounds_df['first_lat'].notna()].head(1)
        if not next_valid.empty:
            bounds_df.at[index, 'first_lat'] = next_valid.iloc[0]['first_lat']
            bounds_df.at[index, 'first_lng'] = next_valid.iloc[0]['first_lng']
            correction_log.append([row['route_description'], 'first_lat/lng corrected using next valid row'])
            corrected = True

    # If last_lat or last_lng is missing, try using previous valid row within the same route
    if pd.isna(row['last_lat']) or pd.isna(row['last_lng']):
        prev_valid = bounds_df.loc[(bounds_df['route_description'] == row['route_description']) &
                                   bounds_df['last_lat'].notna()].tail(1)
        if not prev_valid.empty:
            bounds_df.at[index, 'last_lat'] = prev_valid.iloc[0]['last_lat']
            bounds_df.at[index, 'last_lng'] = prev_valid.iloc[0]['last_lng']
            correction_log.append([row['route_description'], 'last_lat/lng corrected using previous valid row'])
            corrected = True

    # Check if bearing is missing or incorrect, recompute it
    if pd.isna(row['bearing']) or row['bearing'] < 0 or row['bearing'] > 360:
        new_bearing = compute_bearing(row)
        if not pd.isna(new_bearing):
            bounds_df.at[index, 'bearing'] = new_bearing
            correction_log.append([row['route_description'], 'Bearing recalculated'])
            corrected = True

# Save the corrected file only if changes were made
if correction_log:
    bounds_df.to_csv('/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/bounds_df.csv', index=False)

    # Save the correction log
    log_df = pd.DataFrame(correction_log, columns=['route_description', 'Correction Applied'])
    log_df.to_csv('/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/bounds_check_log.csv', index=False)

    print("✅ Bounds file corrected and saved. Corrections logged in bounds_check_log.csv.")
else:
    print("✅ No corrections needed. The bounds file is clean.")


Automated Distance and Unit Processing



In [None]:
import pandas as pd
import numpy as np
import re

# ============================== STEP 1: Extract Unit Measurements ==============================

# Dictionary mapping abbreviations to units
unit_mapping = {
    '[Pp]': 'posts', 'Poste': 'posts', '[Mm]': 'miles',
    '[Mm]iglia': 'miles', '[Ll]eghe': 'leagues', '[Ll]': 'leagues'
}

# Function to extract numeric values based on unit abbreviations
def extract_units(text, mapping):
    """Extracts distance values based on unit abbreviations in the content column."""
    result = {'miles': np.nan, 'posts': np.nan, 'leagues': np.nan}
    for abbr, unit in mapping.items():
        match = re.search(rf'{abbr}\.?(\d+)', text)
        if match:
            result[unit] = int(match.group(1))
    return pd.Series(result)

# Apply the function to create new columns
df[['miles', 'posts', 'leagues']] = df['content'].apply(lambda x: extract_units(str(x), unit_mapping))

# ============================== STEP 2: Fill Missing Coordinates Based on "qui" ==============================

def fill_missing_coordinates(df):
    """
    If 'qui' is detected in "prose" content, fill missing Location_Lat and Location_Lng
    using the nearest previous valid coordinates.
    """
    for index, row in df.iterrows():
        if row['line_type'] == 'prose' and re.search(r'qui\b', str(row['content'])):
            prev_index = index - 1
            while prev_index >= 0:
                if pd.notnull(df.at[prev_index, 'Location_Lat']) and pd.notnull(df.at[prev_index, 'Location_Lng']):
                    df.at[index, 'Location_Lat'] = df.at[prev_index, 'Location_Lat']
                    df.at[index, 'Location_Lng'] = df.at[prev_index, 'Location_Lng']
                    break
                prev_index -= 1

# Apply the function
fill_missing_coordinates(df)

# ============================== STEP 3: Interpolate Missing Coordinates ==============================

def interpolate_coordinates(df):
    """
    For missing coordinates in 'prose' rows, interpolate values by averaging
    the nearest valid previous and next locations.
    """
    for index, row in df.iterrows():
        if row['line_type'] == "prose" and pd.isna(row['leagues']) and pd.isna(row['posts']) and pd.isna(row['miles']):
            prev_index, next_index = index - 1, index + 1

            # Find previous valid location
            while prev_index >= 0:
                if df.at[prev_index, 'route_description'] == row['route_description'] and \
                        not (pd.isna(df.at[prev_index, 'leagues']) and pd.isna(df.at[prev_index, 'posts']) and pd.isna(df.at[prev_index, 'miles'])):
                    break
                prev_index -= 1

            # Find next valid location
            while next_index < len(df):
                if df.at[next_index, 'route_description'] == row['route_description'] and \
                        not (pd.isna(df.at[next_index, 'leagues']) and pd.isna(df.at[next_index, 'posts']) and pd.isna(df.at[next_index, 'miles'])):
                    break
                next_index += 1

            # Interpolate coordinates if both previous and next valid locations exist
            if prev_index >= 0 and next_index < len(df):
                df.at[index, 'Location_Lat'] = (df.at[prev_index, 'Location_Lat'] + df.at[next_index, 'Location_Lat']) / 2
                df.at[index, 'Location_Lng'] = (df.at[prev_index, 'Location_Lng'] + df.at[next_index, 'Location_Lng']) / 2

# Apply interpolation
interpolate_coordinates(df)

# ============================== STEP 4: Assign Distance & Unit ==============================

def find_distance_and_unit(row):
    """
    Determines the left-most non-zero value for distance & assigns the corresponding unit.
    """
    for col in ['leagues', 'posts', 'miles']:
        if pd.notna(row[col]) and row[col] > 0:
            return row[col], col
    return np.nan, np.nan

# Apply the function to each row
df[['distance', 'unit']] = df.apply(find_distance_and_unit, axis=1, result_type='expand')

# ============================== STEP 5: Convert Distances to Kilometers ==============================

# Distance conversion dictionary based on contemporary state & country values
dist_conversion = {
    'Carinthia': {'leagues': 7.20, 'miles': 1.58, 'posts': 14.09},
    'Lower Austria': {'leagues': 7.51, 'miles': 1.49, 'posts': 19.37},
    'Salzburg': {'leagues': 8.89, 'miles': 2.50, 'posts': 14.90},
    'Styria': {'leagues': 9.15, 'miles': 1.32, 'posts': 17.91},
    'AT': {'leagues': 6.60, 'miles': 1.49, 'posts': 15.02},  # Austria fallback
    'Berlin': {'leagues': 7.44, 'miles': 1.81, 'posts': 15.90},
    'Bavaria': {'leagues': 7.43, 'miles': 1.81, 'posts': 15.80},
    'DE': {'leagues': 7.44, 'miles': 1.81, 'posts': 15.90},  # Germany fallback
    'England': {'leagues': 5.68, 'miles': 1.65, 'posts': 11.11},
    'GB': {'leagues': 5.68, 'miles': 1.65, 'posts': 11.11},  # UK fallback
}

# Function to convert historical distances to kilometers
def convert_distance_to_km(row):
    """
    Converts historical distance measurements into kilometers using contemporary conversion values.
    """
    if pd.isna(row['distance']) or pd.isna(row['unit']):
        return np.nan

    state = row['state']
    country = row['country_code']

    # Look for state-specific conversion first, else fallback to country
    conversion_factor = dist_conversion.get(state, dist_conversion.get(country, {})).get(row['unit'], np.nan)

    return row['distance'] * conversion_factor if pd.notna(conversion_factor) else np.nan

# Apply conversion
df['distance_km'] = df.apply(convert_distance_to_km, axis=1)

# ============================== STEP 6: Save Final Processed Data ==============================

df.to_csv('/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/processed_distances.csv', index=False)

print("✅ Data processing complete. File saved as 'processed_distances.csv'")


# Example Usage: Convert distance using the dictionary
state = "Lombardy"
unit = "miles"
distance_value = 10  # Example distance in miles

# Lookup conversion factor
conversion_factor = dist_conversion.get(state, dist_conversion.get("IT", {})).get(unit, np.nan)

# Convert to kilometers
distance_in_km = distance_value * conversion_factor if pd.notna(conversion_factor) else np.nan

print(f"Converted Distance: {distance_in_km} km")


In [None]:
# ============================== Distance Conversion Dictionary ==============================
# Uses approximate distance conversion from contemporary measures by state and country_code
# to calculate distance in modern kilometers.
# The state value is used if present; if not, defaults to country_code.

dist_conversion = {
    'Carinthia': {'leagues': 7.20, 'miles': 1.58, 'posts': 14.09},
    'Lower Austria': {'leagues': 7.51, 'miles': 1.49, 'posts': 19.37},
    'Salzburg': {'leagues': 8.89, 'miles': 2.50, 'posts': 14.90},
    'Styria': {'leagues': 9.15, 'miles': 1.32, 'posts': 17.91},
    'Tyrol': {'leagues': 5.03, 'miles': 1.21, 'posts': 13.42},
    'Upper Austria': {'leagues': 8.04, 'miles': 1.49, 'posts': 18.56},
    'Vienna': {'leagues': 6.49, 'miles': 1.49, 'posts': 14.58},
    'Vorarlberg': {'leagues': 6.60, 'miles': 1.49, 'posts': 10.80},
    'AT': {'leagues': 6.60, 'miles': 1.49, 'posts': 15.02},  # Austria fallback

    'Brussels Capital': {'leagues': 5.78, 'miles': 1.25, 'posts': 12.46},
    'Flanders': {'leagues': 5.37, 'miles': 1.25, 'posts': 12.90},
    'Wallonia': {'leagues': 6.21, 'miles': 1.25, 'posts': 13.30},
    'BE': {'leagues': 5.88, 'miles': 1.25, 'posts': 13.14},  # Belgium fallback

    'Baden-Wurttemberg': {'leagues': 8.17, 'miles': 1.81, 'posts': 16.77},
    'Bavaria': {'leagues': 7.43, 'miles': 1.81, 'posts': 15.80},
    'Berlin': {'leagues': 7.44, 'miles': 1.81, 'posts': 15.90},
    'DE': {'leagues': 7.44, 'miles': 1.81, 'posts': 15.90},  # Germany fallback

    'England': {'leagues': 5.68, 'miles': 1.65, 'posts': 11.11},
    'GB': {'leagues': 5.68, 'miles': 1.65, 'posts': 11.11},  # UK fallback

    'Madrid': {'leagues': 5.38, 'miles': 1.65, 'posts': 12.67},
    'Navarre': {'leagues': 5.02, 'miles': 1.65, 'posts': 17.33},
    'Valencia': {'leagues': 6.09, 'miles': 1.65, 'posts': 13.41},
    'ES': {'leagues': 5.70, 'miles': 1.65, 'posts': 12.47},  # Spain fallback

    'Lombardy': {'leagues': 5.17, 'miles': 1.58, 'posts': 13.14},
    'Tuscany': {'leagues': 5.17, 'miles': 1.74, 'posts': 13.60},
    'Veneto': {'leagues': 5.17, 'miles': 1.62, 'posts': 13.51},
    'IT': {'leagues': 5.17, 'miles': 1.66, 'posts': 12.77},  # Italy fallback

    'Warsaw': {'leagues': 6.00, 'miles': 1.50, 'posts': 12.00},
    'PL': {'leagues': 5.68, 'miles': 1.65, 'posts': 10.48},  # Poland fallback

    'Lisbon': {'leagues': 4.66, 'miles': 1.65, 'posts': 13.36},
    'Porto': {'leagues': 5.00, 'miles': 1.60, 'posts': 14.50},
    'PT': {'leagues': 5.72, 'miles': 1.65, 'posts': 14.50},  # Portugal fallback

    'Ljubljana': {'leagues': 5.68, 'miles': 1.65, 'posts': 9.60},
    'Maribor': {'leagues': 5.68, 'miles': 1.65, 'posts': 17.96},
    'SI': {'leagues': 8.82, 'miles': 1.65, 'posts': 17.64},  # Slovenia fallback
}

# ✅ Distance conversion dictionary successfully stored for further calculations.


In [None]:
# ============================== Function to Calculate Revised Distance ==============================

def calculate_revised_distance(row):
    """
    Converts historical distance values into modern kilometers using state-based conversion factors.
    If state-level data is unavailable, defaults to country_code.
    """
    state = row.get('state')
    country_code = row.get('country_code')
    distance = str(row.get('distance', ''))
    unit = row.get('unit')

    # Skip rows with missing values
    if pd.isna(distance) or pd.isna(unit) or pd.isna(country_code):
        return np.nan

    # Extract first valid numeric distance if multiple values exist
    try:
        distance_values = distance.split('|')
        distance_float = float(distance_values[0].strip())
    except (ValueError, IndexError):
        print(f"⚠️ Warning: Invalid distance value detected - {distance}")
        return np.nan

    # Use state if available; otherwise, default to country_code
    conversion_key = state if pd.notna(state) else country_code

    if conversion_key in dist and unit in dist[conversion_key]:
        # Get conversion ratio
        ratio = dist[conversion_key].get(unit)

        # Ensure ratio is a valid number
        try:
            ratio_float = float(ratio)
        except (ValueError, TypeError):
            print(f"⚠️ Warning: Invalid conversion ratio detected for {conversion_key} - {ratio}")
            return np.nan

        # Calculate revised distance
        return round(distance_float * ratio_float, 2)  # Round to 2 decimal places

    return np.nan  # If no conversion found, return NaN

# ============================== Apply Function & Save to CSV ==============================

df['revised_distance'] = df.apply(calculate_revised_distance, axis=1)

# Save the updated dataframe
output_path = "/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/distance_extract.csv"
df.to_csv(output_path, index=False)

print(f"✅ Revised distance calculation complete. File saved at: {output_path}")


Establishing Bearing


In [None]:

from geographiclib.geodesic import Geodesic

def calculate_bearing(df):
    """
    Computes the bearing (direction) between consecutive valid locations.
    Uses vectorized operations instead of row-wise iteration.
    """

    # Initialize Bearing column with NaN
    df['Bearing'] = np.nan

    # Filter rows where line_type is "location" or "prose"
    valid_rows = df[df['line_type'].isin(['location', 'prose']) & df['Location_Lat'].notna() & df['Location_Lng'].notna()]

    if valid_rows.empty:
        print("⚠️ Warning: No valid locations found for bearing calculation.")
        return df

    # Shift latitude and longitude values to get preceding and following locations
    df['prev_Lat'] = df['Location_Lat'].shift(1)
    df['prev_Lng'] = df['Location_Lng'].shift(1)
    df['next_Lat'] = df['Location_Lat'].shift(-1)
    df['next_Lng'] = df['Location_Lng'].shift(-1)

    # Function to compute bearing using GeographicLib
    def compute_bearing(lat1, lng1, lat2, lng2):
        if pd.isna(lat1) or pd.isna(lng1) or pd.isna(lat2) or pd.isna(lng2):
            return np.nan
        geod = Geodesic.WGS84
        return geod.Inverse(lat1, lng1, lat2, lng2)['azi1']

    # Apply the function to calculate bearing
    df['Bearing'] = df.apply(lambda row: compute_bearing(row['prev_Lat'], row['prev_Lng'], row['next_Lat'], row['next_Lng']), axis=1)

    # Drop helper columns
    df.drop(columns=['prev_Lat', 'prev_Lng', 'next_Lat', 'next_Lng'], inplace=True)

    return df

# Apply the optimized bearing calculation
df = calculate_bearing(df)


Establishing approximate coordinates

Uses bearing and revised_distance to approximate coordinates of a given location

In [None]:

import math
from geographiclib.geodesic import Geodesic

def calculate_approx_coordinates(df):
    """
    Computes approximate coordinates for each row using bearing and distance from the last valid location.
    """

    df_copy = df.copy()
    df_copy.reset_index(drop=True, inplace=True)

    # Initialize Approximate Coordinates column
    if 'approx_coordinates' not in df_copy:
        df_copy['approx_coordinates'] = np.nan

    geod = Geodesic.WGS84  # Geodesic model for accurate distance calculations

    # Filter rows with valid route, bearing, and distance
    valid_rows = df_copy[df_copy[['Bearing', 'revised_distance', 'route_description']].notna().all(axis=1)]

    if valid_rows.empty:
        print("⚠️ Warning: No valid data found for coordinate calculation.")
        return df_copy

    # Shift valid locations for calculations
    df_copy['prev_Lat'] = df_copy['Location_Lat'].shift(1)
    df_copy['prev_Lng'] = df_copy['Location_Lng'].shift(1)

    # Function to compute new coordinates
    def compute_new_coords(lat1, lng1, bearing, distance_km):
        if pd.isna(lat1) or pd.isna(lng1) or pd.isna(bearing) or pd.isna(distance_km):
            return np.nan
        g = geod.Direct(lat1, lng1, bearing, distance_km * 1000)  # Convert km to meters
        return (round(g['lat2'], 6), round(g['lon2'], 6))  # Round to 6 decimal places

    # Apply function to compute new coordinates
    df_copy['approx_coordinates'] = df_copy.apply(
        lambda row: compute_new_coords(row['prev_Lat'], row['prev_Lng'], row['Bearing'], row['revised_distance']), axis=1
    )

    # Convert tuples to strings
    df_copy['approx_coordinates'] = df_copy['approx_coordinates'].astype(str).str.strip('()')

    return df_copy

# Apply coordinate calculation
df = calculate_approx_coordinates(df)

# ============================== Interpolation for Prose Entries ==============================

def interpolate_prose_coordinates(df):
    """
    If prose entries have no distance or unit, their coordinates are set as the midpoint between the nearest valid locations.
    """

    for index, row in df.iterrows():
        if row['line_type'] == "prose" and pd.isna(row['unit']) and pd.isna(row['revised_distance']):
            prev_index, next_index = index - 1, index + 1

            # Find previous valid location
            while prev_index >= 0:
                if df.at[prev_index, 'route_description'] == row['route_description'] and \
                        not (pd.isna(df.at[prev_index, 'unit']) and pd.isna(df.at[prev_index, 'revised_distance'])):
                    break
                prev_index -= 1

            # Find next valid location
            while next_index < len(df):
                if df.at[next_index, 'route_description'] == row['route_description'] and \
                        not (pd.isna(df.at[next_index, 'unit']) and pd.isna(df.at[next_index, 'revised_distance'])):
                    break
                next_index += 1

            # Calculate coordinates if both previous and next valid locations exist
            if prev_index >= 0 and next_index < len(df):
                prev_lat, prev_lng = df.at[prev_index, 'Location_Lat'], df.at[prev_index, 'Location_Lng']
                next_lat, next_lng = df.at[next_index, 'Location_Lat'], df.at[next_index, 'Location_Lng']

                df.at[index, 'Location_Lat'] = (prev_lat + next_lat) / 2
                df.at[index, 'Location_Lng'] = (prev_lng + next_lng) / 2

# Apply interpolation for prose coordinates
interpolate_prose_coordinates(df)

# Save processed data
output_path = "/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/processed_coordinates.csv"
df.to_csv(output_path, index=False)

print(f"✅ Coordinate processing complete. File saved at: {output_path}")
import pandas as pd
import numpy as np
import math
from geographiclib.geodesic import Geodesic

def calculate_approx_coordinates(df):
    """
    Computes approximate coordinates for each row using bearing and distance from the last valid location.
    """

    df_copy = df.copy()
    df_copy.reset_index(drop=True, inplace=True)

    # Initialize Approximate Coordinates column
    if 'approx_coordinates' not in df_copy:
        df_copy['approx_coordinates'] = np.nan

    geod = Geodesic.WGS84  # Geodesic model for accurate distance calculations

    # Filter rows with valid route, bearing, and distance
    valid_rows = df_copy[df_copy[['Bearing', 'revised_distance', 'route_description']].notna().all(axis=1)]

    if valid_rows.empty:
        print("⚠️ Warning: No valid data found for coordinate calculation.")
        return df_copy

    # Shift valid locations for calculations
    df_copy['prev_Lat'] = df_copy['Location_Lat'].shift(1)
    df_copy['prev_Lng'] = df_copy['Location_Lng'].shift(1)

    # Function to compute new coordinates
    def compute_new_coords(lat1, lng1, bearing, distance_km):
        if pd.isna(lat1) or pd.isna(lng1) or pd.isna(bearing) or pd.isna(distance_km):
            return np.nan
        g = geod.Direct(lat1, lng1, bearing, distance_km * 1000)  # Convert km to meters
        return (round(g['lat2'], 6), round(g['lon2'], 6))  # Round to 6 decimal places

    # Apply function to compute new coordinates
    df_copy['approx_coordinates'] = df_copy.apply(
        lambda row: compute_new_coords(row['prev_Lat'], row['prev_Lng'], row['Bearing'], row['revised_distance']), axis=1
    )

    # Convert tuples to strings
    df_copy['approx_coordinates'] = df_copy['approx_coordinates'].astype(str).str.strip('()')

    return df_copy

# Apply coordinate calculation
df = calculate_approx_coordinates(df)

# ============================== Interpolation for Prose Entries ==============================

def interpolate_prose_coordinates(df):
    """
    If prose entries have no distance or unit, their coordinates are set as the midpoint between the nearest valid locations.
    """

    for index, row in df.iterrows():
        if row['line_type'] == "prose" and pd.isna(row['unit']) and pd.isna(row['revised_distance']):
            prev_index, next_index = index - 1, index + 1

            # Find previous valid location
            while prev_index >= 0:
                if df.at[prev_index, 'route_description'] == row['route_description'] and \
                        not (pd.isna(df.at[prev_index, 'unit']) and pd.isna(df.at[prev_index, 'revised_distance'])):
                    break
                prev_index -= 1

            # Find next valid location
            while next_index < len(df):
                if df.at[next_index, 'route_description'] == row['route_description'] and \
                        not (pd.isna(df.at[next_index, 'unit']) and pd.isna(df.at[next_index, 'revised_distance'])):
                    break
                next_index += 1

            # Calculate coordinates if both previous and next valid locations exist
            if prev_index >= 0 and next_index < len(df):
                prev_lat, prev_lng = df.at[prev_index, 'Location_Lat'], df.at[prev_index, 'Location_Lng']
                next_lat, next_lng = df.at[next_index, 'Location_Lat'], df.at[next_index, 'Location_Lng']

                df.at[index, 'Location_Lat'] = (prev_lat + next_lat) / 2
                df.at[index, 'Location_Lng'] = (prev_lng + next_lng) / 2

# Apply interpolation for prose coordinates
interpolate_prose_coordinates(df)

# Save processed data
output_path = "/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/processed_coordinates.csv"
df.to_csv(output_path, index=False)

print(f"✅ Coordinate processing complete. File saved at: {output_path}")


Matching. to alternative locations

In [None]:
from geopy.distance import geodesic

# ============================== Function to Compute Geodesic Distance ==============================

def geodesic_distance(lat1, lon1, lat2, lon2):
    """Computes geodesic distance (in kilometers) between two coordinate points."""
    if pd.isna(lat1) or pd.isna(lon1) or pd.isna(lat2) or pd.isna(lon2):
        return np.nan
    return geodesic((lat1, lon1), (lat2, lon2)).kilometers

# ============================== Function to Find Closest Gazetteer Match ==============================

def find_closest_match(approx_df, gaz_df):
    """
    Finds the closest gazetteer match for each row in approx_df based on geodesic distance.
    Returns a DataFrame with alternative matched locations and distances.
    """

    results = []

    # Ensure gazetteer has valid lat/lng data
    gaz_df = gaz_df.dropna(subset=['Location_Lat', 'Location_Lng'])

    for index, approx_row in approx_df.iterrows():
        approx_coordinates = approx_row.get("approx_coordinates", None)
        location_lat = approx_row.get("Location_Lat", None)
        location_lon = approx_row.get("Location_Lng", None)

        if pd.notna(approx_coordinates) and ',' in approx_coordinates and pd.isna(location_lat):
            try:
                approx_lat, approx_lon = map(float, approx_coordinates.split(","))
            except ValueError:
                continue  # Skip invalid coordinate parsing

            # Compute distances for all gazetteer entries at once
            gaz_df["distance_km"] = gaz_df.apply(
                lambda row: geodesic_distance(approx_lat, approx_lon, row["Location_Lat"], row["Location_Lng"]), axis=1
            )

            # Find the closest location
            closest_match = gaz_df.loc[gaz_df["distance_km"].idxmin()]

            results.append({
                "approx_coordinates": approx_coordinates,
                "alternative_location": (closest_match["Location_Lat"], closest_match["Location_Lng"]),
                "alt_distance": closest_match["distance_km"],
                "alternative_match": closest_match["id"],
                "alternative_geoname": closest_match["geoname"]
            })

    return pd.DataFrame(results)

# ============================== Import Data & Find Closest Matches ==============================

# Import gazetteer data
gaz_df = pd.read_csv("/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/gazetteer.csv")

# Use df as the approximation dataset
approx_df = df

# Find closest matches from the gazetteer
closest_match_df = find_closest_match(approx_df, gaz_df)

# ============================== Merge Approximate & Gazetteer Data ==============================

# Merge to include alternative matches from gazetteer
merged_df = pd.merge(approx_df, closest_match_df, how='left', suffixes=('_x', '_y'))
merged_df.drop_duplicates(inplace=True)

# Save the final merged dataset
output_path = "/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/merged_df.csv"
merged_df.to_csv(output_path, index=False)

# Update df with merged results
df = merged_df

print(f"✅ Closest match processing complete. File saved at: {output_path}")


Distancd tests. - combined

In [None]:
import pandas as pd
import numpy as np
from geopy.distance import geodesic

# ============================== Optimized Distance Test ==============================

def calculate_distance_tests(row):
    """
    Computes two distance tests:
    1. dist_test1: True if revised_distance > 30 km from the previous row.
    2. dist_test2: True if actual coordinates are more than 30 km from the approximated coordinates.
    """

    # Get current coordinates
    actual_coords = (row['Location_Lat'], row['Location_Lng'])

    # Test 1: Distance from previous row
    prev_index = row.name - 1
    if prev_index >= 0:
        prev_row = df.iloc[prev_index]
        prev_coords = (prev_row['Location_Lat'], prev_row['Location_Lng'])
        dist_test1 = (
            pd.notna(row['revised_distance']) and row['revised_distance'] > 30
        ) or (
            pd.notna(prev_coords[0]) and pd.notna(prev_coords[1]) and
            pd.notna(actual_coords[0]) and pd.notna(actual_coords[1]) and
            geodesic(prev_coords, actual_coords).kilometers > 30
        )
    else:
        dist_test1 = False

    # Test 2: Distance from approximated coordinates
    approx_coords_str = row['approx_coordinates']
    if pd.notna(approx_coords_str) and approx_coords_str != 'None' and ',' in approx_coords_str:
        try:
            approx_coords = tuple(map(float, approx_coords_str.strip('()').split(',')))
            dist_test2 = (
                pd.notna(actual_coords[0]) and pd.notna(actual_coords[1]) and
                geodesic(approx_coords, actual_coords).kilometers > 30
            )
        except ValueError:
            dist_test2 = False
    else:
        dist_test2 = False

    return pd.Series([dist_test1, dist_test2])

# Apply the combined distance tests
df[['dist_test1', 'dist_test2']] = df.apply(calculate_distance_tests, axis=1)

# ============================== Export Processed Data ==============================

output_path = "/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/full_test_df.csv"
df.to_csv(output_path, index=False)

print(f" Distance tests completed. File saved at: {output_path}")


Coordinate Testing - scarapped but uselful, fully optimised - always room for improvement



In [None]:
import pandas as pd
import numpy as np

# ============================== Optimized Coordinate Trend Test ==============================

def check_coords_trend(df):
    """
    Flags locations where the latitude and longitude shift from increasing to decreasing (or vice versa).
    """

    df_copy = df.copy()

    # Filter only relevant rows
    location_df = df_copy[(df_copy['line_type'] == "location") & df_copy['Location_Lat'].notna() & df_copy['Location_Lng'].notna()]

    # Ensure correct row order
    location_df = location_df.sort_index()

    # Compute latitude and longitude changes
    location_df['lat_diff'] = location_df['Location_Lat'].diff()
    location_df['lng_diff'] = location_df['Location_Lng'].diff()

    # Shift to compare previous and next trends
    location_df['lat_diff_shifted'] = location_df['lat_diff'].shift(-1)
    location_df['lng_diff_shifted'] = location_df['lng_diff'].shift(-1)

    # Identify rows where both latitude and longitude reverse trends
    location_df['coords_test'] = (location_df['lat_diff'] * location_df['lat_diff_shifted'] < 0) & \
                                 (location_df['lng_diff'] * location_df['lng_diff_shifted'] < 0)

    # Merge back into the original dataframe
    df_copy = df_copy.merge(location_df[['coords_test']], how="left", left_index=True, right_index=True)

    return df_copy.fillna(False)

df = check_coords_trend(df)

# ============================== Optimized State Consistency Tests ==============================

def check_state_test(df):
    """
    Flags locations where the state changes compared to the previous or next location.
    """

    df_copy = df.copy()

    # Ensure 'state_test' column exists
    df_copy['state_test'] = False

    # Filter relevant rows
    location_rows = df_copy[df_copy['line_type'] == 'location'].index

    for index in location_rows:
        state_n = df_copy.at[index, 'state']

        if pd.isna(state_n) or state_n == '':
            continue  # Skip missing values

        prev_index, next_index = index - 1, index + 1
        prev_state = df_copy.at[prev_index, 'state'] if prev_index in df_copy.index else None
        next_state = df_copy.at[next_index, 'state'] if next_index in df_copy.index else None

        # Flag if state changes from both the previous and next location
        if state_n != prev_state and (next_state is None or state_n != next_state):
            df_copy.at[index, 'state_test'] = True

    return df_copy

df = check_state_test(df)

# ============================== Optimized Route-Wide State Check ==============================

def check_state_test2(df):
    """
    Flags locations where the state does not match any state occurring within the same route.
    """

    df_copy = df.copy()

    # Ensure 'state_test2' column exists
    df_copy['state_test2'] = False

    # Get all unique states per route
    route_state_map = df_copy.groupby('route_description')['state'].apply(lambda x: set(x.dropna()))

    # Apply test
    df_copy['state_test2'] = df_copy.apply(
        lambda row: row['state'] not in route_state_map.get(row['route_description'], set()) if pd.notna(row['state']) else False,
        axis=1
    )

    return df_copy

df = check_state_test2(df)

# ============================== Automated Flagging & Export ==============================

# Ensure the test columns exist and fill missing values with False
test_columns = ['dist_test1', 'dist_test2', 'coords_test', 'state_test', 'state_test2']
for col in test_columns:
    if col not in df.columns:
        df[col] = False
    df[col] = df[col].fillna(False)

# Add Automated Flagging
df['Automated_Flag'] = df[test_columns].any(axis=1)

# Save the processed file
output_path = "/content/drive/MyDrive/EmDigitPageFiles/GM1684/497635/GM1684/page/full_test_df.csv"
df.to_csv(output_path, index=False)

print(f"✅ Coordinate and State Tests completed. File saved at: {output_path}")
