In [1]:
import pandas as pd
import numpy as np
import os
import utm
import matplotlib.pyplot as plt
from utils.preprocessing import preprocess
from utils.from_latlon import from_latlon
import math
from sklearn.linear_model import LinearRegression
import tkinter as tk
from tkinter import filedialog
from tkinter.simpledialog import askstring

In [2]:
routine = 'training'
dimensions = ['xOffset', 'yOffset']
sm = 150 # maximum signal difference between min and max, used for calibration

In [3]:
def preprocess_sim_data(sim_data, freq, tower_locs, routine):
    
    sim_dat_filt, predictors = preprocess(sim_data, freq, routine)
     
    # Calculate easting and northing from lat long
    sim_dat_filt['easting'], sim_dat_filt['northing'], sim_dat_filt['zone_num'], sim_dat_filt['zone_letter'] = from_latlon(sim_dat_filt['POINT_Y'].values, sim_dat_filt['POINT_X'].values)

    # Create a dictionary of the coordinates of the towers
    offset_dict = tower_locs.set_index('TowerID').to_dict()
    point_x = offset_dict['POINT_X']
    point_y = offset_dict['POINT_Y']

    # Standardise the coordinates so that the tower location == 0 on both the x and y axes.
    sim_dat_filt['xOffset'] = sim_dat_filt['easting'] - sim_dat_filt['TowerID'].map(point_x).fillna(0)
    sim_dat_filt['yOffset'] = sim_dat_filt['northing'] - sim_dat_filt['TowerID'].map(point_y).fillna(0)
    
    sim_dat_filt['easting_of_tower'] = sim_dat_filt['TowerID'].map(point_x).fillna(0)
    sim_dat_filt['northing_of_tower'] = sim_dat_filt['TowerID'].map(point_y).fillna(0)

    return sim_dat_filt, predictors

In [4]:
# User input of data paths and temporal resolution

# Initialize Tkinter
root = tk.Tk()
root.attributes('-topmost', True)
root.withdraw()

# Ask the user to select the train data file
train_data = filedialog.askopenfilename(
    title="Select training data",
    filetypes=[("Excel files", "*.xlsx")]
)

# Ask the user to select the test data file
test_data = filedialog.askopenfilename(
    title="Select testing data",
    filetypes=[("Excel files", "*.xlsx")]
)

# Ask the user to select the radio tower XY data file
radio_tower_xy_path = filedialog.askopenfilename(
    title="Select radio tower location data",
    filetypes=[("Excel files", "*.xlsx")]
)

# Ask the user to select the model save path
model_save_path = filedialog.askdirectory(
    title="Select model save path"
)

# Function to get minutes from user
def get_minutes():
    while True:
        minutes = askstring("Time (in minutes) to compile location data (t)", "Enter time period (t) in minutes (must be an integer):")
        if minutes and minutes.isdigit():
            return minutes
        messagebox.showerror("Error", "Invalid input. Please enter a number.")

# Prompt the user and get the validated input
minutes = get_minutes()

# Append the input number to 'min'
freq = minutes + 'min'

# Print freq to verify (optional)
print("Frequency:", freq)

Frequency: 3min


In [5]:
# Get training data
train_data = pd.read_excel(train_data)
train_data['DateAndTime'] = pd.to_datetime(train_data['DateAndTime'])

# Get testing data
test_data = pd.read_excel(test_data)
test_data['DateAndTime'] = pd.to_datetime(test_data['DateAndTime'])

# Get tower locations
tower_locs = pd.read_excel(radio_tower_xy_path)

In [6]:
# Create test dataset as per train_model_h2o method
test_data_preproc, predictors_test = preprocess_sim_data(test_data, freq, tower_locs, routine)
train_data_preproc, predictors_train = preprocess_sim_data(train_data, freq, tower_locs, routine)

  .agg(['mean', 'count', np.std])
  .agg(['mean', 'count', np.std])


In [7]:
def calculate_delta_g(s1, s2, sm):
    delta_g = (s1-s2)/sm
    return delta_g

def calculate_offset_angle(delta_g):
    bearing = math.acos(delta_g) * 90/math.pi
    return bearing

In [8]:
# Make distance values absolute
train_data_preproc['xOffset_abs'] = train_data_preproc['xOffset'].abs()
train_data_preproc['yOffset_abs'] = train_data_preproc['yOffset'].abs()

# Perform linear regression for each antenna
train_data_filtered = train_data_preproc[train_data_preproc['ant1_mean'] != 0]
x = train_data_filtered[['ant1_mean']]
y = train_data_filtered[['yOffset_abs']]
model_ant1 = LinearRegression().fit(x, y)

train_data_filtered = train_data_preproc[train_data_preproc['ant2_mean'] != 0]
x = train_data_filtered[['ant2_mean']]
y = train_data_filtered[['xOffset_abs']]
model_ant2 = LinearRegression().fit(x, y)

train_data_filtered = train_data_preproc[train_data_preproc['ant3_mean'] != 0]
x = train_data_filtered[['ant3_mean']]
y = train_data_filtered[['yOffset_abs']]
model_ant3 = LinearRegression().fit(x, y)

train_data_filtered = train_data_preproc[train_data_preproc['ant4_mean'] != 0]
x = train_data_filtered[['ant4_mean']]
y = train_data_filtered[['xOffset_abs']]
model_ant4 = LinearRegression().fit(x, y)

# Also create an average model
# Extract coefficients from individual models
coefficients_ant1 = np.array([model_ant1.coef_[0][0], model_ant1.intercept_[0]])
coefficients_ant2 = np.array([model_ant2.coef_[0][0], model_ant2.intercept_[0]])
coefficients_ant3 = np.array([model_ant3.coef_[0][0], model_ant3.intercept_[0]])
coefficients_ant4 = np.array([model_ant4.coef_[0][0], model_ant4.intercept_[0]])

# Calculate average coefficients
average_coefficients = np.mean([coefficients_ant1, coefficients_ant2, coefficients_ant3, coefficients_ant4], axis=0)

# Create a new Linear Regression model with average coefficients
model_ant_average = LinearRegression()
model_ant_average.coef_ = np.array([average_coefficients[0]])
model_ant_average.intercept_ = np.array([average_coefficients[1]])

In [9]:
## Calculate total paired antenna values to find maximum sum, only if both values do not equal zero.

# Add 'ant_1_2_sum' column
test_data_preproc['ant_1_2_sum'] = test_data_preproc.apply(lambda row: row['ant1_mean'] + row['ant2_mean'] if row['ant1_mean'] != 0 and row['ant2_mean'] != 0 else 'na', axis=1)

# Add 'ant_2_3_sum' column
test_data_preproc['ant_2_3_sum'] = test_data_preproc.apply(lambda row: row['ant2_mean'] + row['ant3_mean'] if row['ant2_mean'] != 0 and row['ant3_mean'] != 0 else 'na', axis=1)

# Add 'ant_3_4_sum' column
test_data_preproc['ant_3_4_sum'] = test_data_preproc.apply(lambda row: row['ant3_mean'] + row['ant4_mean'] if row['ant3_mean'] != 0 and row['ant4_mean'] != 0 else 'na', axis=1)

# Add 'ant_4_1_sum' column
test_data_preproc['ant_4_1_sum'] = test_data_preproc.apply(lambda row: row['ant4_mean'] + row['ant1_mean'] if row['ant4_mean'] != 0 and row['ant1_mean'] != 0 else 'na', axis=1)



In [10]:
#### Calculate the bearings from the tower

# List of antenna names
antenna_names = ['ant_1_2_sum', 'ant_2_3_sum', 'ant_3_4_sum', 'ant_4_1_sum']

# Create a new column 'bearing' in the DataFrame
test_data_preproc['bearing'] = None

# Iterate through the DataFrame
for index, row in test_data_preproc.iterrows():
    # Extract values from the new columns and handle 'na' values
    values = [float(row['ant_1_2_sum']) if row['ant_1_2_sum'] != 'na' else float('-inf'),
              float(row['ant_2_3_sum']) if row['ant_2_3_sum'] != 'na' else float('-inf'),
              float(row['ant_3_4_sum']) if row['ant_3_4_sum'] != 'na' else float('-inf'),
              float(row['ant_4_1_sum']) if row['ant_4_1_sum'] != 'na' else float('-inf')]
    
    # Check if all values are 'na'; if yes, skip the row
    if all(val == float('-inf') for val in values):
        continue
    
    # Find the column name with the highest value
    max_column = max(zip(values, antenna_names))[1]
        
    if max_column == "ant_1_2_sum":
        if row.ant1_mean >= row.ant2_mean:
            delta_g = calculate_delta_g(row.ant1_mean, row.ant2_mean, sm)
            offset_angle = calculate_offset_angle(delta_g)
            bearing = offset_angle
            
            # print(f'delta_g = {delta_g}')
            # print(f'offset_angle from ant 1 = {offset_angle}')
            # print(f'bearing ant 1 = {bearing}')

        else:
            delta_g = calculate_delta_g(row.ant2_mean, row.ant1_mean, sm)
            offset_angle = calculate_offset_angle(delta_g)          
            bearing = 90 - offset_angle

    elif max_column == "ant_2_3_sum":
        if row.ant2_mean >= row.ant3_mean:
            delta_g = calculate_delta_g(row.ant2_mean, row.ant3_mean, sm)
            offset_angle = calculate_offset_angle(delta_g)
            bearing = 90 + offset_angle

        else:
            delta_g = calculate_delta_g(row.ant3_mean, row.ant2_mean, sm)
            offset_angle = calculate_offset_angle(delta_g)          
            bearing = 180 - offset_angle

    elif max_column == "ant_3_4_sum":
        if row.ant3_mean >= row.ant4_mean:
            delta_g = calculate_delta_g(row.ant3_mean, row.ant4_mean, sm)
            offset_angle = calculate_offset_angle(delta_g)
            bearing = 180 + offset_angle

        else:
            delta_g = calculate_delta_g(row.ant4_mean, row.ant3_mean, sm)
            offset_angle = calculate_offset_angle(delta_g)          
            bearing = 270 - offset_angle

    elif max_column == "ant_4_1_sum":
        if row.ant4_mean >= row.ant1_mean:
            delta_g = calculate_delta_g(row.ant4_mean, row.ant1_mean, sm)
            offset_angle = calculate_offset_angle(delta_g)
            bearing = 270 + offset_angle

        else:
            delta_g = calculate_delta_g(row.ant1_mean, row.ant4_mean, sm)
            offset_angle = calculate_offset_angle(delta_g)          
            bearing = 360 - offset_angle
    
    
    # Correct bearings for magnetic declination and handle those that go over 360 degrees
    bearing = bearing + 7.6
    if bearing >= 360:
        bearing = bearing - 360
    else:
        pass

    # Assign the calculated bearing to the 'bearing' column
    test_data_preproc.at[index, 'bearing'] = bearing

In [11]:
# Convert non-numeric values to NaN
numeric_columns = ['ant_1_2_sum', 'ant_2_3_sum', 'ant_3_4_sum', 'ant_4_1_sum']
test_data_preproc[numeric_columns] = test_data_preproc[numeric_columns].apply(pd.to_numeric, errors='coerce')

# Calculate the maximum value across columns while handling NaN values
test_data_preproc['max_value'] = np.nanmax(test_data_preproc[numeric_columns].values, axis=1) / 2


  test_data_preproc['max_value'] = np.nanmax(test_data_preproc[numeric_columns].values, axis=1) / 2


In [12]:
# Calculate locations using the linear regression along bearings (without biangulation)

# New line to create a complete copy of file for estimates with linear regression method
mech_wLR_estimates = test_data_preproc.copy()

# List of antenna names
antennas = ['ant1_mean', 'ant2_mean', 'ant3_mean', 'ant4_mean']

for index, row in mech_wLR_estimates.iterrows():
    # First calculate appropriate bearing and infer distance using linear model
    if row['bearing'] != None:
        distance = model_ant_average.predict(np.array([[row['max_value']]]))[0]
        bearing = row['bearing']
        mech_wLR_estimates.at[index, 'Mech_method'] = "Multiple antenna, single tower"

    elif row['bearing'] == None:
        # Extract values from the new columns and handle 'na' values
        values = [float(row['ant1_mean']) if row['ant1_mean'] != 'na' else float('-inf'),
                float(row['ant2_mean']) if row['ant2_mean'] != 'na' else float('-inf'),
                float(row['ant3_mean']) if row['ant3_mean'] != 'na' else float('-inf'),
                float(row['ant4_mean']) if row['ant4_mean'] != 'na' else float('-inf')]

        mech_wLR_estimates.at[index, 'Mech_method'] = "Single antenna, single tower"

        # Find the column name with the highest value
        max_single_ant = max(zip(values, antennas))[1]

        if max_single_ant == "ant1_mean":
            bearing = 0
            distance = model_ant1.predict(np.array([[row['ant1_mean']]]))[0]

        elif max_single_ant == "ant2_mean":
            bearing = 90
            distance = model_ant2.predict(np.array([[row['ant2_mean']]]))[0]

        elif max_single_ant == "ant3_mean":
            bearing = 180
            distance = model_ant3.predict(np.array([[row['ant3_mean']]]))[0]

        elif max_single_ant == "ant4_mean":
            bearing = 270
            distance = model_ant4.predict(np.array([[row['ant4_mean']]]))[0]

    else:
        pass
    
    # Correct bearings for magnetic declination and handle those that go over 360 degrees
    bearing = bearing + 7.6
    if bearing >= 360:
        bearing = bearing - 360
    else:
        pass
    
    # Estimating new location using trigonometry
    bearing_rad = math.radians(bearing)

    # Calculate new coordinates
    new_easting = row['easting_of_tower'] + distance * math.sin(bearing_rad)
    new_northing = row['northing_of_tower'] + distance * math.cos(bearing_rad)

    # Assigning the new coordinates to the 'new_easting' and 'new_northing' columns in mech_wLR_estimates
    mech_wLR_estimates.at[index, 'easting_pred'] = new_easting
    mech_wLR_estimates.at[index, 'northing_pred'] = new_northing





In [13]:
mech_wLR_estimates = mech_wLR_estimates.groupby(['DateTime', 'TagID']).agg({
    'easting': 'first',
    'northing': 'first',
    'easting_pred': 'mean', 
    'northing_pred': 'mean', 
    'Data_type': 'first'
}).reset_index()

mech_wLR_estimates['easting_error'] = mech_wLR_estimates['easting_pred'] - mech_wLR_estimates['easting']
mech_wLR_estimates['northing_error'] = mech_wLR_estimates['northing_pred'] - mech_wLR_estimates['northing']

mech_wLR_estimates['error_m'] = np.sqrt((mech_wLR_estimates['easting_error']) ** 2
                        + (mech_wLR_estimates['northing_error']) ** 2)

In [14]:
# Save to excel
mech_wLR_estimates.to_excel(os.path.join(model_save_path, "error_estimates_mech_w_linear_regression.xlsx"), index=False)

In [15]:
def triang(x1, y1, alpha1, x2, y2, alpha2):
    # For biangulation GK Coordinates are necessary!
    # First calculate tan keeping in mind that 0° in geo-coordinates are 90° in an x-y plane
    ta1 = (alpha1 % 360) / 180 * np.pi
    ta2 = (alpha2 % 360) / 180 * np.pi

    if ((alpha1 - alpha2) % 180 == 0):
        # print("No biangulation possible: all three points are on one line")
        return (np.nan, np.nan)

    # Finding Intersection Using solver
    b = np.array([x2 - x1, y2 - y1])
    a1 = np.array([np.sin(ta1), np.cos(ta1)])
    a2 = np.array([-np.sin(ta2), -np.cos(ta2)])
    a = np.vstack((a1, a2))
    
    try:
        l = np.linalg.solve(a, b)
    except np.linalg.LinAlgError:
        return (np.nan, np.nan)

    px = x1 + l[0] * np.sin(ta1)
    py = y1 + l[0] * np.cos(ta1)

    if l[1] > 0 and l[0] > 0:
        return px, py
    else:
        return np.nan, np.nan

def apply_biangulation(df):
    result_list = []

    # Group by 'TagID' and 'DateTime'
    grouped = df.groupby(['TagID', 'DateTime'])

    for name, group in grouped:
        # For each unique pair of 'TowerID'
        tower_ids = group['TowerID'].unique()

        # Skip if there's only one tower or missing data
        if len(tower_ids) < 2 or group['bearing'].isnull().any():
            continue

        # Generate all unique pairs of TowerID
        tower_combinations = np.array(np.meshgrid(tower_ids, tower_ids)).T.reshape(-1, 2)

        for towers in tower_combinations:
            # Extract data for each TowerID pair
            data1 = group[group['TowerID'] == towers[0]]
            data2 = group[group['TowerID'] == towers[1]]

            # Extract x, y, and alpha values
            x1, y1, alpha1 = data1['easting_of_tower'].values[0], data1['northing_of_tower'].values[0], data1['bearing'].values[0]
            x2, y2, alpha2 = data2['easting_of_tower'].values[0], data2['northing_of_tower'].values[0], data2['bearing'].values[0]

            # Apply biangulation
            easting_estimated, northing_estimated = triang(x1, y1, alpha1, x2, y2, alpha2)
            
            # Append the result along with TagID, DateTime, and TowerID information
            result_list.append({'TagID': name[0], 'DateTime': name[1], 'TowerID1': towers[0], 'TowerID2': towers[1],
                                'easting_pred': easting_estimated, 'northing_pred': northing_estimated})

    # Create a DataFrame from the results
    result_df = pd.DataFrame(result_list)

    return result_df

In [16]:
# Apply the biangulation method

# New line to create a complete copy of file for estimates with linear regression method
mech_wBi_estimates = test_data_preproc.copy()

# Find where bearings are empty (i.e. if there weren't orthogonal antennas that detected signal) and use the cardinal direction of the strongest antenna
for index, row in mech_wBi_estimates.iterrows():
    # First calculate appropriate bearing and infer distance using linear model
    if row['bearing'] != None:
        pass

    elif row['bearing'] == None:
        # Extract values from the new columns and handle 'na' values
        values = [float(row['ant1_mean']) if row['ant1_mean'] != 'na' else float('-inf'),
                float(row['ant2_mean']) if row['ant2_mean'] != 'na' else float('-inf'),
                float(row['ant3_mean']) if row['ant3_mean'] != 'na' else float('-inf'),
                float(row['ant4_mean']) if row['ant4_mean'] != 'na' else float('-inf')]

        # Find the column name with the highest value
        max_single_ant = max(zip(values, antennas))[1]

        if max_single_ant == "ant1_mean":
            bearing = 0

        elif max_single_ant == "ant2_mean":
            bearing = 90

        elif max_single_ant == "ant3_mean":
            bearing = 180

        elif max_single_ant == "ant4_mean":
            bearing = 270

        # Correct bearings for magnetic declination and handle those that go over 360 degrees
        bearing = bearing + 7.6
        if bearing >= 360:
            bearing = bearing - 360
        else:
            pass

        mech_wBi_estimates.at[index, 'bearing'] = bearing
    else:
        pass

In [17]:

# First filter dataframe to those points suitable for the method
# Replace 'None' and 'na' with NaN in the 'bearing' column
mech_wBi_estimates['bearing'].replace(['None', 'na'], float('nan'), inplace=True)

# Drop rows with any missing values in 'bearing' column
mech_wBi_estimates = mech_wBi_estimates.dropna(subset=['bearing'], how='any')

# Apply biangulation
result_dataframe = apply_biangulation(mech_wBi_estimates)

In [18]:
# Merge the biangulation data back in with the bearing estimates
mech_biangulation_estimates = pd.merge(test_data_preproc, result_dataframe, on=['TagID', 'DateTime'], how='left')


In [19]:
mech_biangulation_estimates = mech_biangulation_estimates.groupby(['DateTime', 'TagID']).agg({
    'easting': 'first',
    'northing': 'first',
    'easting_pred': 'mean', 
    'northing_pred': 'mean', 
    'Data_type': 'first'
}).reset_index()

In [20]:
mech_biangulation_estimates['easting_error'] = mech_biangulation_estimates['easting_pred'] - mech_biangulation_estimates['easting']
mech_biangulation_estimates['northing_error'] = mech_biangulation_estimates['northing_pred'] - mech_biangulation_estimates['northing']

mech_biangulation_estimates['error_m'] = np.sqrt((mech_biangulation_estimates['easting_error']) ** 2
                        + (mech_biangulation_estimates['northing_error']) ** 2)

In [21]:
# Save to excel
mech_biangulation_estimates.to_excel(os.path.join(model_save_path, "error_estimates_mech_w_biangulation.xlsx"), index=False)