### WLLS_I_estimator.ipynb
- Siep Dokter
- Emil Jousimaa
- Oleksandr Sosovskyy
- Mario Gabriele Carofano

> This file contains the implementation of 2 alternative WLLS estimators, named OS-WLLS_I (One Step) and TS-WLLS_I (Two Step), as requested in the 4th task, used for estimate the Target Coordinates starting from the RSS information coming from the anchors.

> In addition, at the end of file, there are also plots showing the actual position of the target and the anchors, and the estimated position of the target obtained from the execution of the 2 estimators.

In [None]:
# IMPORTS
import import_ipynb
import constants
import auxfunc
import sys
import pandas as pd
import numpy as np
import pprint
import math

In [None]:
def calculate_AI_matrix(anchor_coords):
	'''
	Calculates the 'AI' matrix of the system of equations [R2, eq. (9)] which the WLLS_I method solves.
	This is the implementation of [R2, eq. (10)].

	Parameters:
	anchor_coords (list) : A list containing one list for each anchor, e.g. the 2D-coordinates of each anchor.

	Returns:
	Returns a 2D numpy.ndarray which values
	are elements of the 'AI' matrix for the selected scenario.
	'''

	n_anchors = len(anchor_coords)
	A = np.zeros((n_anchors, 3))

	for i in range(n_anchors):
		A[i, 0] = -2 * anchor_coords[i][0]
		A[i, 1] = -2 * anchor_coords[i][1]
		A[i, 2] = 1

	return A

In [None]:
def calculate_bI_vector(anchor_coords, estimated_distances):
	'''
	Calculates the 'bI' vector of the system of equations [R2, eq. (9)] which the WLS method solves.
	This is the implementation of [R2, eq. (10)].

	Parameters:
	anchor_coords (list) : A list containing one list for each anchor, e.g. the 2D-coordinates of each anchor.
	estimated_distances (list):
	A list containing one real number for each anchor,
	e.g. the distance estimation between each anchor and the target.

	Returns:
	Returns a 1D numpy.ndarray which values
	are elements of the 'bII' vector for the selected configuration.
	'''

	n_anchors = len(anchor_coords)
	b = []

	for i in range(n_anchors):
		sqr_distance = math.pow(estimated_distances[i], 2)
		sqr_x = math.pow(anchor_coords[i][0], 2)
		sqr_y = math.pow(anchor_coords[i][1], 2)
		b.append(sqr_distance - sqr_x - sqr_y)
		i = i + 1
	
	return np.array(b)

In [None]:
def calculate_CI_matrix(estimated_distances):
	'''
	Calculates, for the selected configuration, the 'CI' diagonal matrix.
	This is the implementation of [R2, eq. (18)].

	Parameters:
	estimated_distances (list):
	A list containing one real number for each anchor,
	e.g. the distance estimation between each anchor and the target.

	Returns:
    Returns a 2D numpy.ndarray which values are elements
    of the 'CI' covariance diagonal matrix for the selected configuration.
	'''

	covariance = []
	variance = math.pow(constants.STANDARD_DEVIATION, 2)

	for d in estimated_distances:
		sqr_distance = math.pow(d, 2)
		covariance.append(variance * sqr_distance)
	
	return 4 * np.diag(covariance)

In [None]:
def calculate_Lambda_WLLS_matrix(AI, bI, CI):
	'''
	Calculates the Weighted Linear Least Squares (WLLS) position estimate.
	This is the implementation of [R2, eq. (18)].

	Parameters:
	AI (numpy.ndarray): Matrix AI.
	bI (numpy.ndarray): Vector bI.
	CI (numpy.ndarray): Weight matrix CI.

	Returns:
	Returns the OS_WLLS_I position estimate.
	'''

	AI_transpose = np.transpose(AI)
	CI_inverse = np.linalg.inv(CI)
	
	left_term = np.linalg.inv(np.matmul(np.matmul(AI_transpose, CI_inverse), AI))
	right_term = np.matmul(np.matmul(AI_transpose, CI_inverse), bI)

	return np.matmul(left_term, right_term)

In [None]:
def sgn(x):
    if x > 0:
        return 1
    if x < 0:
        return -1
    if x == 0:
        return 0

# https://www.cuemath.com/algebra/signum-function/

In [None]:
def calculate_h_vector(Lambda_WLLS):
	'''
	
	'''

	h = []

	h.append(math.pow(Lambda_WLLS[0], 2))
	h.append(math.pow(Lambda_WLLS[1], 2))
	h.append(Lambda_WLLS[2])

	return np.transpose(np.array(h))

In [None]:
def calculate_K_matrix(Lambda_WLLS):
	'''
	
	'''

	K = []

	K.append(2 * Lambda_WLLS[0])
	K.append(2 * Lambda_WLLS[1])
	K.append(1)

	return np.diag(K)

In [None]:
def calculate_Phi_matrix(K, AI, CI):
	'''
	
	'''

	AI_transpose = np.transpose(AI)
	CI_inverse = np.linalg.inv(CI)
	mid_term = np.linalg.inv(np.matmul(np.matmul(AI_transpose, CI_inverse), AI))

	return np.matmul(np.matmul(K, mid_term), K)

In [None]:
def calculate_G_matrix():
	'''
	
	'''

	G = np.zeros((3, 2))

	G[0, 0] = 1
	G[1, 0] = 0
	G[2, 0] = 1

	G[0, 1] = 0
	G[1, 1] = 1
	G[2, 1] = 1

	return G


In [None]:
def calculate_z_vector(G, Phi, h):
	'''
	
	'''

	G_transpose = np.transpose(G)
	Phi_inverse = np.linalg.inv(Phi)
	left_term = np.linalg.inv(np.matmul(np.matmul(G_transpose, Phi_inverse), G))
	right_term = np.matmul(np.matmul(G_transpose, Phi_inverse), h)

	return np.matmul(left_term, right_term)

In [None]:
def calculate_TS_WLLS_I_output(Lambda_WLLS, z):
    '''
    
    '''

    p = []

    p.append(sgn(Lambda_WLLS[0]) * math.sqrt(z[0]))
    p.append(sgn(Lambda_WLLS[1]) * math.sqrt(z[1]))

    return np.array(p)

In [None]:
def apply_OS_WLLS_I_estimator(scenario_name):
	'''
	Applies the OS_WLLS_I estimator, as shown in [R2] and in [R2, ref. [6]].
	It is used for estimate the target's position from the anchors' position.

	Parameters:
	scenario_name (str): The name of the scenario to be examined.

	Returns:
	data (dict):
	It is a dictionary containing all the salient information retrieved
	from the reading of the dataset (Actual) and from the application of the WLS estimator (Estimated),
	for all types of devices (Arduino, RPI) and for all technology (WiFi, BLT, Hybrid).
	'''
	
	dataset = auxfunc.define_dataset(scenario_name)

	for type in dataset[scenario_name]:

		dict = dataset[scenario_name][type]
		type_path = constants.DATASET_DIRECTORY + scenario_name + "/" + type + "/"

		for tech in dict:
			dataframes = []
			if tech != constants.HYBRID_TECHNOLOGY:
				for a in dict[tech]["Actual"]["Anchors' Name"]:
					anchor_path = type_path + tech + "/" + a
					dataframes.append(pd.read_csv(
						anchor_path,
						sep=constants.CSV_FIELDS_SEPARATOR
					))
			else:
				# Concatenates the blt and wifi estimations
				for b in dict[constants.BLT_TECHNOLOGY]["Actual"]["Anchors' Name"]:
					anchor_path = type_path + constants.BLT_TECHNOLOGY + "/" + b
					dataframes.append(pd.read_csv(
						anchor_path,
						sep=constants.CSV_FIELDS_SEPARATOR
					))

				for w in dict[constants.WIFI_TECHNOLOGY]["Actual"]["Anchors' Name"]:
					anchor_path = type_path + constants.WIFI_TECHNOLOGY + "/" + w
					dataframes.append(pd.read_csv(
						anchor_path,
						sep=constants.CSV_FIELDS_SEPARATOR
					))
			
			dict[tech]["Actual"]["Dataframes"] = dataframes

		for tech in dict:
			dataframes = dict[tech]["Actual"]["Dataframes"]
			length = auxfunc.calculate_smallest_dataset(dataframes)
			burst_quantity = auxfunc.calculate_burst_quantity(length)
			n_anchors = len(dict[tech]["Actual"]["Anchors' Name"])

			# print(type, tech, length, burst_quantity, n_anchors, len(dataframes))

			dict[tech]["Actual"]["Distance Target - Anchor"] = []
			dict[tech]["Actual"]["Anchor Coordinates"] = []
			for a in range(n_anchors):
				dict[tech]["Actual"]["Distance Target - Anchor"].append(dataframes[a]["Distance Target - Anchor [m]"][0])
				dict[tech]["Actual"]["Target Coordinates"] = [eval(i) for i in dataframes[a]["Target Coordinates [m]"][0].split(", ")]
				dict[tech]["Actual"]["Anchor Coordinates"].append([eval(i) for i in dataframes[a]["Relative Coordinates [m]"][0].split(", ")])

			for c in range(burst_quantity):
				dict[tech]["Estimated"][c] = {}
				dict[tech]["Estimated"][c]["Average RSS"] = []
				dict[tech]["Estimated"][c]["Distance Target - Anchor"] = []
				
				for a in range(n_anchors):
					average_RSS = auxfunc.calculate_average_RSS(dataframes[a]["Rx Power [dBm]"].to_list(), c, length)
					estimated_distance = auxfunc.calculate_target_anchor_estimation(average_RSS)
					dict[tech]["Estimated"][c]["Average RSS"].append(average_RSS)
					dict[tech]["Estimated"][c]["Distance Target - Anchor"].append(estimated_distance)

				AI = calculate_AI_matrix(dict[tech]["Actual"]["Anchor Coordinates"])
				bI = calculate_bI_vector(dict[tech]["Actual"]["Anchor Coordinates"], dict[tech]["Estimated"][c]["Distance Target - Anchor"])
				CI = calculate_CI_matrix(dict[tech]["Estimated"][c]["Distance Target - Anchor"])
				
				dict[tech]["Estimated"][c]["AI matrix"] = AI
				dict[tech]["Estimated"][c]["bI vector"] = bI
				dict[tech]["Estimated"][c]["CI matrix"] = CI
				dict[tech]["Estimated"][c]["Target Coordinates"] = calculate_Lambda_WLLS_matrix(AI, bI, CI)[[0,1]]
			
			dataset[scenario_name][type][tech] = dict[tech]

	return dataset

In [None]:
def apply_TS_WLLS_I_estimator(scenario_name):
	'''
	Applies the TS_WLLS_I estimator, as shown in [R2] and in [R2, ref. [16]].
	It is used for estimate the target's position from the anchors' position.

	Parameters:
	scenario_name (str): The name of the scenario to be examined.

	Returns:
	data (dict):
	It is a dictionary containing all the salient information retrieved
	from the reading of the dataset (Actual) and from the application of the WLS estimator (Estimated),
	for all types of devices (Arduino, RPI) and for all technology (WiFi, BLT, Hybrid).
	'''
	
	dataset = auxfunc.define_dataset(scenario_name)

	for type in dataset[scenario_name]:

		dict = dataset[scenario_name][type]
		type_path = constants.DATASET_DIRECTORY + scenario_name + "/" + type + "/"

		for tech in dict:
			dataframes = []
			if tech != constants.HYBRID_TECHNOLOGY:
				for a in dict[tech]["Actual"]["Anchors' Name"]:
					anchor_path = type_path + tech + "/" + a
					dataframes.append(pd.read_csv(
						anchor_path,
						sep=constants.CSV_FIELDS_SEPARATOR
					))
			else:
				# Concatenates the blt and wifi estimations
				for b in dict[constants.BLT_TECHNOLOGY]["Actual"]["Anchors' Name"]:
					anchor_path = type_path + constants.BLT_TECHNOLOGY + "/" + b
					dataframes.append(pd.read_csv(
						anchor_path,
						sep=constants.CSV_FIELDS_SEPARATOR
					))

				for w in dict[constants.WIFI_TECHNOLOGY]["Actual"]["Anchors' Name"]:
					anchor_path = type_path + constants.WIFI_TECHNOLOGY + "/" + w
					dataframes.append(pd.read_csv(
						anchor_path,
						sep=constants.CSV_FIELDS_SEPARATOR
					))
			
			dict[tech]["Actual"]["Dataframes"] = dataframes

		for tech in dict:
			dataframes = dict[tech]["Actual"]["Dataframes"]
			length = auxfunc.calculate_smallest_dataset(dataframes)
			burst_quantity = auxfunc.calculate_burst_quantity(length)
			n_anchors = len(dict[tech]["Actual"]["Anchors' Name"])

			# print(type, tech, length, burst_quantity, n_anchors, len(dataframes))

			dict[tech]["Actual"]["Distance Target - Anchor"] = []
			dict[tech]["Actual"]["Anchor Coordinates"] = []
			for a in range(n_anchors):
				dict[tech]["Actual"]["Distance Target - Anchor"].append(dataframes[a]["Distance Target - Anchor [m]"][0])
				dict[tech]["Actual"]["Target Coordinates"] = [eval(i) for i in dataframes[a]["Target Coordinates [m]"][0].split(", ")]
				dict[tech]["Actual"]["Anchor Coordinates"].append([eval(i) for i in dataframes[a]["Relative Coordinates [m]"][0].split(", ")])

			for c in range(burst_quantity):
				dict[tech]["Estimated"][c] = {}
				dict[tech]["Estimated"][c]["Average RSS"] = []
				dict[tech]["Estimated"][c]["Distance Target - Anchor"] = []
				
				for a in range(n_anchors):
					average_RSS = auxfunc.calculate_average_RSS(dataframes[a]["Rx Power [dBm]"].to_list(), c, length)
					estimated_distance = auxfunc.calculate_target_anchor_estimation(average_RSS)
					dict[tech]["Estimated"][c]["Average RSS"].append(average_RSS)
					dict[tech]["Estimated"][c]["Distance Target - Anchor"].append(estimated_distance)
					# print(scenario_name, type, tech, c, a)
					# print(average_RSS, estimated_distance, dict[tech]["Actual"]["Distance Target - Anchor"][a])

				AI = calculate_AI_matrix(dict[tech]["Actual"]["Anchor Coordinates"])
				bI = calculate_bI_vector(dict[tech]["Actual"]["Anchor Coordinates"], dict[tech]["Estimated"][c]["Distance Target - Anchor"])
				CI = calculate_CI_matrix(dict[tech]["Estimated"][c]["Distance Target - Anchor"])
				Lambda_WLLS = calculate_Lambda_WLLS_matrix(AI, bI, CI)

				# print(Lambda_WLLS[0], Lambda_WLLS[1], Lambda_WLLS[2])

				h = calculate_h_vector(Lambda_WLLS)
				K = calculate_K_matrix(Lambda_WLLS)
				Phi = calculate_Phi_matrix(K, AI, CI)
				G = calculate_G_matrix()
				z = calculate_z_vector(G, Phi, h)
				
				dict[tech]["Estimated"][c]["AI matrix"] = AI
				dict[tech]["Estimated"][c]["bI vector"] = bI
				dict[tech]["Estimated"][c]["CI matrix"] = CI
				dict[tech]["Estimated"][c]["Lambda_WLLS matrix"] = Lambda_WLLS
				dict[tech]["Estimated"][c]["G matrix"] = G
				dict[tech]["Estimated"][c]["K matrix"] = K
				dict[tech]["Estimated"][c]["h vector"] = h
				dict[tech]["Estimated"][c]["Phi matrix"] = Phi

				try:
					dict[tech]["Estimated"][c]["z vector"] = z
					dict[tech]["Estimated"][c]["Target Coordinates"] = calculate_TS_WLLS_I_output(Lambda_WLLS, z)
				except ValueError:
					print(scenario_name, type, tech, c)
					print("Error during the implementation of TS_WLLS_I.")
					print("There are negative values in the 'z' vector.")
					print()
					break
			
			dataset[scenario_name][type][tech] = dict[tech]

	return dataset

In [None]:
data = {}
data["OS_WLLS_I"] = {}
data["OS_WLLS_I"].update(apply_OS_WLLS_I_estimator("Scenario A"))
data["OS_WLLS_I"].update(apply_OS_WLLS_I_estimator("Scenario B"))
data["OS_WLLS_I"].update(apply_OS_WLLS_I_estimator("Scenario C"))

data["TS_WLLS_I"] = {}
data["TS_WLLS_I"].update(apply_TS_WLLS_I_estimator("Scenario A"))
data["TS_WLLS_I"].update(apply_TS_WLLS_I_estimator("Scenario B"))
data["TS_WLLS_I"].update(apply_TS_WLLS_I_estimator("Scenario C"))

# np.set_printoptions(suppress=True)
# pprint.pprint(data["TS_WLLS_I"])
# for configuration in data["Scenario A"]["RPI"]["RSS_BLT_Dataset"]["Estimated"].values():
#     pprint.pprint(configuration["Target Coordinates"])

In [None]:
for estimator in data.keys():
    estimator_dict = data[estimator]
    for scenario in estimator_dict:
        print("RMSE per", scenario, "-", estimator)
        for type in estimator_dict[scenario]:
            for tech in estimator_dict[scenario][type]:
                label = type + "_" + tech + ":"
                try:
                    print(label, auxfunc.calculate_rmse(estimator_dict[scenario][type][tech]), "m")
                except KeyError:
                    print(label, "'Target Coordinates' key not found")
                    break
        print()

In [None]:
for estimator in data.keys():
    auxfunc.plot_data(estimator, data[estimator])