In [3]:
import torch
from statsmodels.stats.weightstats import DescrStatsW
import numpy as np
import tqdm
import copy

from opt_targeted_transfers.dataset_utils import standardize, Dataset


In [21]:
def get_quantile_regressor(dataset, tolerance, low_dim=False, n_epochs=300):
    """
    Get a quantile regressor for a given dataset.

    :param dataset: The dataset used for training the regressor.
    :type dataset: Dataset
    :param tolerance: The tolerance for the poverty rate
    :type tolerance: float
    :param n_epochs: The number of epochs for training the regressor. Defaults to 300.
    :type n_epochs: int
    :return: The quantile regressor.
    :rtype: Callable[[np.ndarray], np.ndarray]
    """
    X = dataset.X
    y = dataset.y
    r = dataset.r

    X, X_mean, X_std = standardize(X)
    y, y_mean, y_std = standardize(y)

    np.random.seed(123456)
    torch.manual_seed(123456)

    if X.shape[1] == 0:
        wq = DescrStatsW(data=y, weights=r)
        final_q_hat = wq.quantile(tolerance).item()
    else:
        d = X.shape[1]

        if low_dim:
            q_hat = torch.nn.Sequential(torch.nn.Linear(d, 1))
        else:
            q_hat = torch.nn.Sequential(
                torch.nn.Linear(d, 64), torch.nn.ReLU(), torch.nn.Linear(64, 1)
            )

        def quantile_loss(q_hat, idx):
            sub_n = len(idx)
            y_pred = q_hat(torch.Tensor(X[idx, :])).squeeze()
            return tolerance * torch.nn.functional.relu(
                torch.Tensor(y[idx]) - y_pred
            ) + (1 - tolerance) * torch.nn.functional.relu(
                y_pred - torch.Tensor(y[idx])
            )

        optimizer = torch.optim.Adam(q_hat.parameters(), lr=1e-2)
        train_prop = 0.7
        idx_train_set, idx_val_set = list(range(int(train_prop * len(X)))), list(
            range(int(train_prop * len(X)), len(X))
        )

        batch_size = int(len(idx_train_set) / 3)
        print("Fitting quantile regression...")
        pbar = tqdm.tqdm(list(range(n_epochs)))
        val_losses = []
        models = []

        for epoch in pbar:
            if epoch % 25 == 0:
                val_loss = torch.sum(
                    quantile_loss(q_hat, idx_val_set) * torch.Tensor(r[idx_val_set])
                )
                val_losses.append(val_loss.detach().item())
                models.append(copy.deepcopy(q_hat))

            idx = np.random.choice(idx_train_set, size=batch_size)
            optimizer.zero_grad()
            loss = torch.sum(quantile_loss(q_hat, idx) * torch.Tensor(r[idx]))
            loss.backward()
            optimizer.step()

            pbar.set_postfix({"loss": loss.item()})
        best_model_idx = np.argmin(val_losses)
        final_q_hat = models[best_model_idx]

    def quantile_regressor(X_test):
        if X_test.shape[1] == 0:
            quantile = (final_q_hat * y_std + y_mean) * np.ones((X_test.shape[0], 1))
        else:
            X_test = (X_test - X_mean) / X_std
            quantile = (
                (
                    final_q_hat(torch.Tensor(X_test)).reshape(X_test.shape[0], 1)
                    * y_std
                    + y_mean
                )
                .detach()
                .numpy()
            )
        return np.maximum(quantile, 0)

    return quantile_regressor

In [35]:
dataset_one_covariate = Dataset(
    X = np.array([[0], [0], [100], [100]]),
    y = np.array([0, 1, 100, 101])
)
dataset_no_covariates = Dataset(
    X = np.array([[], [], [], []]),
    y = np.array([0, 1, 10, 11])
)

In [36]:
regressor = get_quantile_regressor(
    dataset_one_covariate,
    tolerance=0.25,
    n_epochs=1000
)

Fitting quantile regression...


100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1000/1000 [00:01<00:00, 690.33it/s, loss=0]


In [38]:
regressor(np.array([[0], [100]]))

array([[32.970062],
       [42.3787  ]], dtype=float32)