In [1]:
from lib import *
train_set = pd.read_csv('train.csv')
val_set = pd.read_csv('val.csv')
test_set = pd.read_csv('test.csv')

In [2]:
class RectangularGaussianFilterCovModel(CovarianceModel):
    """
    This model estimates var[long] and var[lat] independently with the same technique (we don't yet estimate cov[long, lat]):

    Train:
    * Create a space with a dimension for each latent variable, and a dimension for var[X]
    * For each point in the training set, add a vector to the space with the latent variables and where var[X] is estimated by
      the squared prediction error in X (ie we're assuming a z-score of 1 treating X as a univariate normal distribution)

    Evaluate:
    * We are given some latent variables
    * Compute the average of all the var[X]s in the space, weighted by a Gaussian filter / window that is centred on the latent variables

    Then we return the covariance matrix with the average var[X]s on the diagonal and 0s everywhere else
    """

    def __init__(self):
        self.space = None

    def train(self, train_set: pd.DataFrame):
        residuals = df_residuals(train_set).T
        # scale the latent variables to keep the variance of each dimension roughly the same
        self.scale = np.array([0.3, 0.5, 5])
        self.space = np.column_stack((
            train_set.long.to_numpy(),
            train_set.lat.to_numpy(),
            train_set.intensity.to_numpy(),
            residuals[0]**2, # var[long]
            residuals[1]**2, # var[lat]
        ))
        self.space[:,:3] *= self.scale
        print("Finished training")

    def estimate(self, long: float, lat: float, intensity: float) -> np.ndarray:
        # create a list of weights for each point in the space
        # the weight is the Gaussian filter / window centred on the given long/lat/intensity
        latent_centered = self.space[:,:3] - np.array([long, lat, intensity]) * self.scale
        weights = np.exp(-0.5 * np.sum(latent_centered**2, axis=1))
        weights /= np.sum(weights)
        # compute the weighted average of the var[X]s
        weighted_var_long = np.sum(weights * self.space[:,3])
        weighted_var_lat = np.sum(weights * self.space[:,4])
        return np.array([[weighted_var_long, 0], [0, weighted_var_lat]])

print(f"\nRectangularGaussianFilterCovModel {RectangularGaussianFilterCovModel().assess_geo_mean_log_likelihood(train_set, test_set)}")

Finished training


100%|██████████| 21544/21544 [02:55<00:00, 122.76it/s]


RectangularGaussianFilterCovModel -4.061573553309507





In [3]:
class GaussianFilterCovModel(CovarianceModel):
    """
    This model estimates var[long] and var[lat] and cov[long, lat] independently with the same technique

    Train:
    * Create a space with a dimension for each latent variable, and a dimension for each quantity we're estimating
    * For each point in the training set, add a vector to the space with the latent variables and where var[X] is estimated by
      the squared prediction error in X (ie we're assuming a z-score of 1 treating X as a univariate normal distribution with 0 mean)
      and cov[X, Y] is estimated by the product of errors in X and Y (because var[X]=E[X^2] and cov[X,Y]=E[X]E[Y] with a 0 mean)

    Evaluate:
    * We are given some latent variables
    * Compute the average of all the var[X]s in the space, weighted by a Gaussian filter / window that is centred on the latent variables

    Then we return the covariance matrix with the average var[X]s on the diagonal and 0s everywhere else
    """

    def __init__(self):
        self.space = None

    def train(self, train_set: pd.DataFrame):
        residuals = df_residuals(train_set).T
        self.scale = np.array([0.3, 0.5, 5])
        self.space = np.column_stack((
            train_set.long.to_numpy(),
            train_set.lat.to_numpy(),
            train_set.intensity.to_numpy(),
            residuals[0]**2, # var[long]
            residuals[1]**2, # var[lat]
            residuals[0]*residuals[1], # cov[long, lat]
        ))
        self.space[:,:3] *= self.scale
        print("Finished training")

    def estimate(self, long: float, lat: float, intensity: float) -> np.ndarray:
        # create a list of weights for each point in the space
        # the weight is the Gaussian filter / window centred on the given long/lat/intensity
        latent_centered = self.space[:,:3] - np.array([long, lat, intensity]) * self.scale
        weights = np.exp(-0.5 * np.sum(latent_centered**2, axis=1))
        weights /= np.sum(weights)
        # compute the weighted average of the var[X]s
        weighted_var_long = np.sum(weights * self.space[:,3])
        weighted_var_lat = np.sum(weights * self.space[:,4])
        weighted_cov_long_lat = np.sum(weights * self.space[:,5])
        return np.array([[weighted_var_long, weighted_cov_long_lat], [weighted_cov_long_lat, weighted_var_lat]])

print(f"\nGaussianFilterCovModel {GaussianFilterCovModel().assess_geo_mean_log_likelihood(train_set, test_set)}")

Finished training


100%|██████████| 21544/21544 [02:20<00:00, 153.30it/s]


GaussianFilterCovModel -4.125538328293637





In [4]:
from sklearn.neighbors import KNeighborsRegressor

class NearestNeighborsCovModel(CovarianceModel):
    """
    This model estimates var[long] and var[lat] and cov[long, lat] independently with the same technique

    Train:
    * Create a space with a dimension for each latent variable, and a dimension for each quantity we're estimating
    * For each point in the training set, add a vector to the space with the latent variables and where var[X] is estimated by
      the squared prediction error in X (ie we're assuming a z-score of 1 treating X as a univariate normal distribution with 0 mean)
      and cov[X, Y] is estimated by the product of errors in X and Y (because var[X]=E[X^2] and cov[X,Y]=E[X]E[Y] with a 0 mean)

    Evaluate:
    * We are given some latent variables
    * Compute the average of all the var[X]s in the space, weighted by a Gaussian filter / window that is centred on the latent variables

    Then we return the covariance matrix with the average var[X]s on the diagonal and 0s everywhere else
    """

    def __init__(self):
        self.space = None

    def train(self, train_set: pd.DataFrame):
        residuals = df_residuals(train_set).T
        self.scale = np.array([0.3, 0.5, 5])
        self.space = np.column_stack((
            train_set.long.to_numpy(),
            train_set.lat.to_numpy(),
            train_set.intensity.to_numpy(),
            residuals[0]**2, # var[long]
            residuals[1]**2, # var[lat]
            residuals[0]*residuals[1], # cov[long, lat]
        ))
        self.space[:,:3] *= self.scale
        self.neigh = KNeighborsRegressor(n_neighbors=2000)
        self.neigh.fit(self.space[:,:3], self.space[:,3:])
        print("Finished training")

    def estimate(self, long: float, lat: float, intensity: float) -> np.ndarray:
        x = np.array([long, lat, intensity]) * self.scale
        params = self.neigh.predict(x.reshape(1, -1))[0]
        return np.array([[params[0], params[2]], [params[2], params[1]]])

print(f"\nNearestNeighborsCovModel {NearestNeighborsCovModel().assess_geo_mean_log_likelihood(train_set, test_set)}")



Finished training


100%|██████████| 21544/21544 [00:17<00:00, 1224.61it/s]


NearestNeighborsCovModel -3.9152996999375564



