In [12]:
import os
import typing
from sklearn.gaussian_process.kernels import *
import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
import matplotlib.pyplot as plt
from matplotlib import cm
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C, WhiteKernel, Matern
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.cluster import KMeans
from sklearn.utils import check_random_state


In [13]:
# Set `EXTENDED_EVALUATION` to `True` in order to visualize your predictions.
EXTENDED_EVALUATION = False
EVALUATION_GRID_POINTS = 300  # Number of grid points used in extended evaluation

# Cost function constants
COST_W_UNDERPREDICT = 50.0
COST_W_NORMAL = 1.0

In [14]:
class Model(object):
    """
    Model for this task.
    You need to implement the train_model and generate_predictions methods
    without changing their signatures, but are allowed to create additional methods.
    """

    def __init__(self, method="full", undersample_ratio=0.1, n_inducing=200, n_clusters=10):
        """
        Initialize your model here.
        We already provide a random number generator for reproducibility.
        """
        self.rng = np.random.default_rng(seed=0)
        self.method = method
        self.undersample_ratio = undersample_ratio
        self.n_inducing = n_inducing
        self.n_clusters = n_clusters
        self.local_gps = []
        self.gp = None

    def generate_predictions(self, test_coordinates: np.ndarray, test_area_flags: np.ndarray) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Predict the pollution concentration for a given set of city_areas.
        :param test_coordinates: city_areas as a 2d NumPy float array of shape (NUM_SAMPLES, 2)
        :param test_area_flags: city_area info for every sample in a form of a bool array (NUM_SAMPLES,)
        :return:
            Tuple of three 1d NumPy float arrays, each of shape (NUM_SAMPLES,),
            containing your predictions, the GP posterior mean, and the GP posterior stddev (in that order)
        """

        if self.method == "nystrom":
            # Generate predictions using Nyström approximation
            Kmn = RBF()(self.inducing_points, test_coordinates)
            gp_mean = np.dot(self.alpha, Kmn)
            gp_std = np.zeros_like(gp_mean)  # For now, no stddev for approximation
            predictions = gp_mean

        elif self.method == "local_gps":
            # Generate predictions using the nearest local GP for each test point
            kmeans = KMeans(n_clusters=self.n_clusters)
            test_labels = kmeans.predict(test_coordinates)

            gp_mean = np.zeros(test_coordinates.shape[0])
            gp_std = np.zeros(test_coordinates.shape[0])

            for i in range(self.n_clusters):
                cluster_indices = np.where(test_labels == i)[0]
                if len(cluster_indices) > 0:
                    gp_mean[cluster_indices], gp_std[cluster_indices] = self.local_gps[i].predict(test_coordinates[cluster_indices], return_std=True)

            predictions = gp_mean

        elif self.gp is not None:
            # Full GP predictions
            gp_mean, gp_std = self.gp.predict(test_coordinates, return_std=True)
            predictions = gp_mean

        else:
            raise RuntimeError("No trained model available for generating predictions.")
        
        # Asymmetric cost adjustment for residential areas
        residential_area_mask = test_area_flags == 1  # Boolean mask for residential areas (area_id == 1)

        # Apply both bias and quantile adjustment in residential areas
        bias_factor = 0.3  # Bias factor for the stddev (can be tuned)
        z_90 = 1.28  # z-score for 90th percentile

        # Adjust predictions in residential areas
        predictions[residential_area_mask] = (
            gp_mean[residential_area_mask] 
            + bias_factor * gp_std[residential_area_mask]  # Add bias factor
            + z_90 * gp_std[residential_area_mask]  # Add quantile adjustment
        )

        return predictions, gp_mean, gp_std
    
    def undersample_data(coordinates: np.ndarray, targets: np.ndarray, area_flags: np.ndarray, ratio: float = 0.1):
        """
        Undersample the dataset by randomly selecting a subset of the data.
        :param coordinates: 2D coordinates of the data points.
        :param targets: PM2.5 target values.
        :param area_flags: Binary flags for residential areas (1) or non-residential areas (0).
        :param ratio: Fraction of data to keep (e.g., 0.1 for 10% of the data).
        :return: Undersampled coordinates, targets, and area flags.
        """
        num_samples = int(len(coordinates) * ratio)
        indices = np.random.choice(len(coordinates), num_samples, replace=False)

        undersampled_coordinates = coordinates[indices]
        undersampled_targets = targets[indices]
        undersampled_area_flags = area_flags[indices]

        return undersampled_coordinates, undersampled_targets, undersampled_area_flags

    def partition_data_with_kmeans(coordinates: np.ndarray, n_clusters: int = 10):
        """
        Partition the data into clusters using KMeans for training multiple local GPs.
        :param coordinates: 2D coordinates of the data points.
        :param n_clusters: Number of clusters to create.
        :return: Cluster labels and the trained k-means model.
        """
        kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(coordinates)
        labels = kmeans.labels_
        
        return labels, kmeans
    
    def nystrom_approximation(coordinates: np.ndarray, targets: np.ndarray, n_inducing: int = 200):
        """
        Perform the Nyström method to approximate the kernel matrix.
        :param coordinates: 2D coordinates of the data points.
        :param targets: PM2.5 target values.
        :param n_inducing: Number of inducing points to use for the approximation.
        :return: Coordinates of inducing points, the approximated kernel matrix, and the targets.
        """
        rng = check_random_state(42)
        inducing_indices = rng.choice(len(coordinates), n_inducing, replace=False)
        inducing_points = coordinates[inducing_indices]

        # Approximate the kernel matrix using the inducing points
        Kmm = RBF()(inducing_points)  # Kernel matrix for inducing points
        Kmn = RBF()(inducing_points, coordinates)  # Cross kernel matrix
        Kmm_inv = np.linalg.inv(Kmm)

        # Predictive mean
        alpha = np.dot(Kmm_inv, np.dot(Kmn, targets))
        
        return inducing_points, alpha

    def train_model(self, train_targets: np.ndarray, train_coordinates: np.ndarray, train_area_flags: np.ndarray):
        """
        Fit your model on the given training data.
        :param train_coordinates: Training features as a 2d NumPy float array of shape (NUM_SAMPLES, 2)
        :param train_targets: Training pollution concentrations as a 1d NumPy float array of shape (NUM_SAMPLES,)
        :param train_area_flags: Binary variable denoting whether the 2D training point is in the residential area (1) or not (0)
        """
        if self.method == "undersample":
            print("Applying undersampling...")
            train_coordinates, train_targets, train_area_flags = self.undersample_data(train_coordinates, train_targets, train_area_flags, ratio=self.undersample_ratio)

        elif self.method == "nystrom":
            print("Applying Nyström kernel approximation...")
            inducing_points, alpha = self.nystrom_approximation(train_coordinates, train_targets, n_inducing=self.n_inducing)
            self.inducing_points = inducing_points  # Store inducing points for later predictions
            self.alpha = alpha

        elif self.method == "local_gps":
            print("Applying multiple local GPs...")
            labels, kmeans = self.partition_data_with_kmeans(train_coordinates, n_clusters=self.n_clusters)

            # Train a separate GP for each cluster
            for i in range(self.n_clusters):
                cluster_indices = np.where(labels == i)[0]
                gp = GaussianProcessRegressor(kernel=RBF(), alpha=0.0, n_restarts_optimizer=5)
                gp.fit(train_coordinates[cluster_indices], train_targets[cluster_indices])
                self.local_gps.append(gp)

        else:
            # Full dataset GP (no scaling techniques)
            kernel = RBF(length_scale=1.0, length_scale_bounds=(1e-2, 1e2)) + WhiteKernel(noise_level=1)
            self.gp = GaussianProcessRegressor(kernel=kernel, alpha=0.0, n_restarts_optimizer=5)
            self.gp.fit(train_coordinates, train_targets)
            

In [16]:
# You don't have to change this function
def calculate_cost(ground_truth: np.ndarray, predictions: np.ndarray, area_flags: np.ndarray) -> float:
    """
    Calculates the cost of a set of predictions.

    :param ground_truth: Ground truth pollution levels as a 1d NumPy float array
    :param predictions: Predicted pollution levels as a 1d NumPy float array
    :param area_flags: city_area info for every sample in a form of a bool array (NUM_SAMPLES,)
    :return: Total cost of all predictions as a single float
    """
    assert ground_truth.ndim == 1 and predictions.ndim == 1 and ground_truth.shape == predictions.shape

    # Unweighted cost
    cost = (ground_truth - predictions) ** 2
    weights = np.ones_like(cost) * COST_W_NORMAL

    # Case i): underprediction
    mask = (predictions < ground_truth) & [bool(area_flag) for area_flag in area_flags]
    weights[mask] = COST_W_UNDERPREDICT

    # Weigh the cost and return the average
    return np.mean(cost * weights)


# You don't have to change this function
def check_within_circle(coordinate, circle_parameters):
    """
    Checks if a coordinate is inside a circle.
    :param coordinate: 2D coordinate
    :param circle_parameters: 3D coordinate of the circle center and its radius
    :return: True if the coordinate is inside the circle, False otherwise
    """
    return (coordinate[0] - circle_parameters[0])**2 + (coordinate[1] - circle_parameters[1])**2 < circle_parameters[2]**2

# You don't have to change this function 
def identify_city_area_flags(grid_coordinates):
    """
    Determines the city_area index for each coordinate in the visualization grid.
    :param grid_coordinates: 2D coordinates of the visualization grid
    :return: 1D array of city_area indexes
    """
    # Circles coordinates
    circles = np.array([[0.5488135, 0.71518937, 0.17167342],
                    [0.79915856, 0.46147936, 0.1567626 ],
                    [0.26455561, 0.77423369, 0.10298338],
                    [0.6976312,  0.06022547, 0.04015634],
                    [0.31542835, 0.36371077, 0.17985623],
                    [0.15896958, 0.11037514, 0.07244247],
                    [0.82099323, 0.09710128, 0.08136552],
                    [0.41426299, 0.0641475,  0.04442035],
                    [0.09394051, 0.5759465,  0.08729856],
                    [0.84640867, 0.69947928, 0.04568374],
                    [0.23789282, 0.934214,   0.04039037],
                    [0.82076712, 0.90884372, 0.07434012],
                    [0.09961493, 0.94530153, 0.04755969],
                    [0.88172021, 0.2724369,  0.04483477],
                    [0.9425836,  0.6339977,  0.04979664]])
    
    area_flags = np.zeros((grid_coordinates.shape[0],))

    for i,coordinate in enumerate(grid_coordinates):
        area_flags[i] = any([check_within_circle(coordinate, circ) for circ in circles])

    return area_flags

# You don't have to change this function
def execute_extended_evaluation(model: Model, output_dir: str = '/results'):
    """
    Visualizes the predictions of a fitted model.
    :param model: Fitted model to be visualized
    :param output_dir: Directory in which the visualizations will be stored
    """
    print('Performing extended evaluation')

    # Visualize on a uniform grid over the entire coordinate system
    grid_lat, grid_lon = np.meshgrid(
        np.linspace(0, EVALUATION_GRID_POINTS - 1, num=EVALUATION_GRID_POINTS) / EVALUATION_GRID_POINTS,
        np.linspace(0, EVALUATION_GRID_POINTS - 1, num=EVALUATION_GRID_POINTS) / EVALUATION_GRID_POINTS,
    )
    visualization_grid = np.stack((grid_lon.flatten(), grid_lat.flatten()), axis=1)
    grid_area_flags = identify_city_area_flags(visualization_grid)
    
    # Obtain predictions, means, and stddevs over the entire map
    predictions, gp_mean, gp_stddev = model.generate_predictions(visualization_grid, grid_area_flags)
    predictions = np.reshape(predictions, (EVALUATION_GRID_POINTS, EVALUATION_GRID_POINTS))
    gp_mean = np.reshape(gp_mean, (EVALUATION_GRID_POINTS, EVALUATION_GRID_POINTS))

    vmin, vmax = 0.0, 65.0

    # Plot the actual predictions
    fig, ax = plt.subplots()
    ax.set_title('Extended visualization of task 1')
    im = ax.imshow(predictions, vmin=vmin, vmax=vmax)
    cbar = fig.colorbar(im, ax=ax)

    # Save figure to pdf
    figure_path = os.path.join(output_dir, 'extended_evaluation.pdf')
    fig.savefig(figure_path)
    print(f'Saved extended evaluation to {figure_path}')

    plt.show()


def extract_area_information(train_x: np.ndarray, test_x: np.ndarray) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Extracts the city_area information from the training and test features.
    :param train_x: Training features
    :param test_x: Test features
    :return: Tuple of (training features' 2D coordinates, training features' city_area information,
        test features' 2D coordinates, test features' city_area information)
    """
    
    train_coordinates = train_x[:, :2]
    test_coordinates = test_x[:, :2]
    train_area_flags = train_x[:, 2].astype(bool)
    test_area_flags = test_x[:, 2].astype(bool)  


    assert train_coordinates.shape[0] == train_area_flags.shape[0] and test_coordinates.shape[0] == test_area_flags.shape[0]
    assert train_coordinates.shape[1] == 2 and test_coordinates.shape[1] == 2
    assert train_area_flags.ndim == 1 and test_area_flags.ndim == 1

    return train_coordinates, train_area_flags, test_coordinates, test_area_flags

In [17]:
# you don't have to change this function
def main():
    # Load the training dateset and test features
    train_x = np.loadtxt('train_x.csv', delimiter=',', skiprows=1)
    train_y = np.loadtxt('train_y.csv', delimiter=',', skiprows=1)
    test_x = np.loadtxt('test_x.csv', delimiter=',', skiprows=1)

    # Extract the city_area information
    train_coordinates, train_area_flags, test_coordinates, test_area_flags = extract_area_information(train_x, test_x)
    
    # Fit the model
    print('Training model')
    model = Model()
    model.train_model(train_y, train_coordinates, train_area_flags)

    # Predict on the test features
    print('Predicting on test features')
    predictions = model.generate_predictions(test_coordinates, test_area_flags)
    print(predictions)

    if EXTENDED_EVALUATION:
        execute_extended_evaluation(model, output_dir='.')


if __name__ == "__main__":
    main()


Training model


KeyboardInterrupt: 