In [None]:
import numpy as np

def dB_to_lin(pow_dB):
    return 10**(np.array(pow_dB)/10)

def lin_to_dB(pow_lin):
    return 10*np.log10(np.array(pow_lin))

dc_3470_3510 = np.array([45.25,
98.68,
96.1,
37.4,
45.42,
85.82,
44.84,
74.81,
67.82,
100
])


dc_3610_3650 = np.array([45.25,
92.52,
77.09,
23.34,
45.41,
55.23,
44.84,
85.13,
38.63,
100
])

dc_2160_2170 = np.array([99.89,
100,
100,
97.74,
45.42,
100,
100,
100,
100,
100
])


In [None]:
import os
import scipy.io as sio
import matplotlib.pyplot as plt
map_folderdir = "./"
directory = os.listdir(map_folderdir)
flag = 0
for fname in directory:
    if "SLCmap" in fname:
        map_file = os.path.join(map_folderdir, fname)
        flag = 1

if flag == 0:
    errorMessage = 'Error: The file does not exist in the folder:\n ' + map_folderdir
    warnings.warn(errorMessage)

print('Now reading ' + map_file + "\n")
x = sio.loadmat(map_file)
map_struct = x['SLC']

# Define a new struct named SLC
SLC = map_struct[0][0]
column_map = dict(zip([name for name in SLC.dtype.names], [i for i in range(len(SLC.dtype.names))]))

map_ = SLC[column_map["dem"]] + 0.3048 * SLC[column_map["hybrid_bldg"]]
map_ = map_[::10, ::10]

### Implement Pseudo-Inverse

In [None]:
data_points = np.array([
    [966, 2992],
    [2569, 3767],
    [2873, 3447],
    [2621, 4286],
    [1312, 3830],
    [3828, 2667],
    [242, 2442],
    [1711, 3145],
    [2852, 1584],
    [1903, 2393]
])
data_points = data_points.astype(float)/10.0
data_points = np.floor(data_points).astype("int")

In [None]:
def serialize_index(row, col, num_cols=map_.shape[1]):
    return row * num_cols + col

dp_serial = []
for dp in data_points:
    serial_dp = serialize_index(dp[1], dp[0])
    dp_serial.append(serial_dp)
    
dp_serial = np.array(dp_serial)
dp_serial

In [None]:
from tqdm import tqdm
def deserialize_index(serialized_index, num_cols=map_.shape[1]):
    row = serialized_index // num_cols
    col = serialized_index % num_cols
    return row, col

def euclidean_distance(serialized_index1, serialized_index2, num_cols=map_.shape[1]):
    row1, col1 = deserialize_index(serialized_index1, num_cols)
    row2, col2 = deserialize_index(serialized_index2, num_cols)

    distance = ((row2 - row1)**2 + (col2 - col1)**2)**0.5*(0.5*10)
    if distance<1:
        distance=1
    return distance

In [None]:
map_.shape

In [None]:
import pykrige.uk

In [None]:
data_points[:, 0]

In [None]:
ss = np.load("signal_estimates_3470_3510_low_res_cov2.npy")
ss = np.reshape(ss, map_.shape)

In [None]:
np.min(ss)

In [None]:
X, Y = np.meshgrid(gridx, gridy)
plt.figure(figsize=(13, 8))
plt.contourf(X, Y, lin_to_dB(ss), 100, cmap='viridis')  # Use the masked array here
plt.colorbar(label='dBX')

# Plot the data collection points for reference
scatter = plt.scatter(data_points[:, 0], data_points[:, 1], c=rssi_3470_3510, s=50, cmap='plasma', label='Data Collection Points')
cbar = plt.colorbar(scatter, label='Signal Strength (dBX)', location='left')  # Colorbar for signal strengths on the left
plt.legend()

plt.xlabel('UTM_E [m]')
plt.ylabel('UTM_N [m]')
plt.title('2D Predictions of Signal Strength')
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def inverse_distance_weighting(x, y, z, xi, yi, power=2):
    x, y, z, xi, yi = map(np.array, [x, y, z, xi, yi])
    zi = np.zeros_like(xi)

    for i in range(len(xi)):
        distances = np.sqrt((x - xi[i])**2 + (y - yi[i])**2)
        distances[distances == 0] = 0.0001  # Avoid division by zero
        weights = 1 / distances**power
        zi[i] = np.sum(weights * z) / np.sum(weights)

    return zi

# Assuming data_points, dc_3470_3510, and map_ are defined correctly
x_known = data_points[:, 0]
y_known = data_points[:, 1]
z_known = dc_2160_2170

grid_x, grid_y = np.meshgrid(
    np.linspace(0, map_.shape[1], map_.shape[1], endpoint=False),
    np.linspace(0, map_.shape[0], map_.shape[0], endpoint=False)
)

# Flatten the grid arrays for IDW function
x_to_interpolate = grid_x.ravel()
y_to_interpolate = grid_y.ravel()

z_interpolated = inverse_distance_weighting(
    x_known, y_known, z_known, x_to_interpolate, y_to_interpolate
)

# Reshape z_interpolated to match the grid
z_interpolated = z_interpolated.reshape(grid_x.shape)

# Plotting
plt.figure()
plt.contourf(grid_x, grid_y, z_interpolated, 100, cmap='viridis')
plt.colorbar(label='Percentage')
plt.scatter(data_points[:, 0], data_points[:, 1], c=dc_2160_2170, s=50, cmap='plasma', label='Data Collection Points')
plt.xlabel('UTM_E [m]')
plt.ylabel('UTM_N [m]')
plt.title('2D Predictions of Duty Cycle')
plt.legend()
plt.show()

np.save("duty_cycle_2160_2170_idw2.npy", z_interpolated)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.spatial import KDTree

data_points = np.array([
    [966, 2992],
    [2569, 3767],
    [2873, 3447],
    [2621, 4286],
    [1312, 3830],
    [3828, 2667],
    [242, 2442],
    [1711, 3145],
    [2852, 1584],
    [1903, 2393]
])
data_points = data_points.astype(float)/10.0
data_points = np.floor(data_points).astype("int")

dc_3470_3510 = np.array([45.25,
99.88,
99.49,
52.85,
54.27,
97.45,
65.29,
95.48,
76.99,
100
])

def k_nearest_neighbor_interpolation(data_points, values, query_point, k=5):
    """
    Perform k-nearest neighbor interpolation.

    :param data_points: numpy array of data points (x, y)
    :param values: numpy array of values corresponding to data points
    :param query_point: the point (x, y) for which to interpolate a value
    :param k: number of nearest neighbors to consider
    :return: interpolated value at query_point
    """
    tree = KDTree(data_points)
    dists, indexes = tree.query(query_point, k=k)
    nearest_values = values[indexes]

    # Average of the values of the k-nearest neighbors
    interpolated_value = np.mean(nearest_values)
    return interpolated_value

est = np.zeros_like(map_)
for i in range(map_.shape[1]):
    for j in range(map_.shape[0]):
        est[j, i] = k_nearest_neighbor_interpolation(data_points, dc_3470_3510, [i,j], k=5)
        
grid_x = np.linspace(0, map_.shape[1], map_.shape[1], endpoint=False)
grid_y = np.linspace(0, map_.shape[0], map_.shape[0], endpoint=False)

X, Y = np.meshgrid(grid_x, grid_y)
plt.figure(figsize=(13, 8))
plt.contourf(X, Y, est, 100, cmap='viridis')  # Use the masked array here
plt.colorbar(label='Percentage')

# Plot the data collection points for reference
scatter = plt.scatter(data_points[:, 0], data_points[:, 1], c=dc_3470_3510, s=50, cmap='plasma', label='Data Collection Points')
cbar = plt.colorbar(scatter, label='Percentage', location='left')  # Colorbar for signal strengths on the left
plt.legend()

plt.xlabel('UTM_E [m]')
plt.ylabel('UTM_N [m]')
plt.title('2D Predictions of Duty Cycle')
plt.show()

In [None]:
from scipy.interpolate import griddata

# Sample data: Replace these with your actual data points and duty cycle values
# Coordinates of observed points
points = np.array([[0, 0], [1, 0], [0, 1], [1, 1], [0.5, 0.5], 
                   [0.25, 0.75], [0.75, 0.25], [0.75, 0.75], [0.25, 0.25], [0.5, 0.1]])
# Corresponding duty cycle values
values = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0])
grid_x = np.linspace(0, map_.shape[1], map_.shape[1], endpoint=False)
grid_y = np.linspace(0, map_.shape[0], map_.shape[0], endpoint=False)
# Grid dimensions

# Interpolate using natural neighbor interpolation
grid_z = griddata(data_points, dc_3470_3510, (grid_x, grid_y), method='natural')

X, Y = np.meshgrid(grid_x, grid_y)
plt.figure(figsize=(13, 8))
plt.contourf(X, Y, z, 100, cmap='viridis')  # Use the masked array here
plt.colorbar(label='Percentage')

# Plot the data collection points for reference
scatter = plt.scatter(data_points[:, 0], data_points[:, 1], c=dc_3470_3510, s=50, cmap='plasma', label='Data Collection Points')
cbar = plt.colorbar(scatter, label='Percentage', location='left')  # Colorbar for signal strengths on the left
plt.legend()

plt.xlabel('UTM_E [m]')
plt.ylabel('UTM_N [m]')
plt.title('2D Predictions of Duty Cycle')
plt.show()

In [None]:
np.log10(0)

In [None]:
np.min(ss)

In [None]:
True in np.isinf(lin_to_dB(ss))

In [None]:
from pykrige.uk import UniversalKriging
from sklearn.preprocessing import MinMaxScaler
gridx = np.linspace(0, map_.shape[1], map_.shape[1], endpoint=False)
gridy = np.linspace(0, map_.shape[0], map_.shape[0], endpoint=False)
scaler = MinMaxScaler(feature_range=(0, 1))

# Fit the scaler to the data and transform the data
normalized_data = scaler.fit_transform(ss)
UK = UniversalKriging(
    data_points[:, 0],
    data_points[:, 1],
    dc_3470_3510/100,
    drift_terms=["external_Z"],
    external_drift = normalized_data,
    external_drift_x = gridx,
    external_drift_y = gridy
)


z, s_ = UK.execute("grid", gridx, gridy)


In [None]:
X, Y = np.meshgrid(gridx, gridy)
plt.figure(figsize=(13, 8))
plt.contourf(X, Y, z, 100, cmap='viridis')  # Use the masked array here
plt.colorbar(label='Percentage')

# Plot the data collection points for reference
scatter = plt.scatter(data_points[:, 0], data_points[:, 1], c=dc_3470_3510, s=50, cmap='plasma', label='Data Collection Points')
cbar = plt.colorbar(scatter, label='Percentage', location='left')  # Colorbar for signal strengths on the left
plt.legend()

plt.xlabel('UTM_E [m]')
plt.ylabel('UTM_N [m]')
plt.title('2D Predictions of Duty Cycle')
plt.show()

In [None]:
np.unique(z)