In [1]:
import numpy as np
import copy
from sklearn.datasets import make_classification, make_moons, make_blobs
from IPython.display import display, Math

In [2]:
from torch.utils.data import DataLoader
import torch
from torch import nn

In [3]:
class TrainData:
    def __init__(self, X, y):
        self.X = X
        self.y = y
        
    def __getitem__(self, i):
        return self.X[i], self.y[i]
    
    def __len__(self):
        return len(self.y)

In [4]:
def generate_dataset(moons=False, dots=1000):
    if moons:
        X, y = make_moons(dots, noise=0.075, random_state=420)
    else:
        X, y = make_blobs(dots, 2, centers=[[0, 0], [-4, 2], [3.5, -2.0], [3.5, 3.5]], random_state=420)
        y = y % 2
    s = np.zeros(dots)
    s[np.random.choice(np.arange(len(y))[y == 1], int(0.2 * dots), replace=False)] = 1
    return X, y, s

In [5]:
def trainValidateSplit(X, s, ratio=0.8):
    l = round(ratio*len(X))
    return X[:l], s[:l], X[l:], s[l:]

In [6]:
X, y, s = generate_dataset(True, 2000)

Classifier предсказывает $p(s=1|x)$.

In [7]:
class Classifier(nn.Module):
    def __init__(self, 
                 input_dim=2, 
                 output_dim=1, 
                 hd=256):
        super().__init__()
        self.model = nn.Sequential(nn.Linear(input_dim, hd), 
                                    nn.ReLU(), 
                                    nn.Linear(hd, hd), 
                                    nn.ReLU(),
                                    nn.Linear(hd, output_dim), 
                                    nn.Sigmoid())
        
    def forward(self, x):
        return self.model(x)

    def est_c(self, mode=0):
        """
            Находит c по одному из приближений
        """
        if mode == 0:
            self.c = self(self.x_val[self.s_val==1]).mean().data
        elif mode == 1:
            self.c = self(self.x_val[self.s_val==1]).sum().data / self(self.x_val).sum().data
        elif mode == 2:
            self.c = self(self.x_val).max().data
        else:
            raise ValueError(f"unsupported mode={mode}")
    
    def train(self, x, s, epochs=69, mode=0):
        X_train, s_train, X_val, s_val = trainValidateSplit(x, s)
        self.x = torch.FloatTensor(X_train)
        self.s = torch.FloatTensor(s_train)
        self.x_val = torch.FloatTensor(X_val)
        self.s_val = torch.FloatTensor(s_val)
        
        train_loader = DataLoader(TrainData(X_train, s_train), batch_size=50, shuffle=True)
        opt = torch.optim.Adam(self.parameters(), lr=1e-4)
        loss_func = nn.BCELoss()
        
        for i in range(epochs):
            for Xi, si in iter(train_loader):
                s_pred = self(Xi.float())
                loss = loss_func(s_pred, si.unsqueeze(1))
                opt.zero_grad()
                loss.backward()
                opt.step()
        self.est_c()
        
        
    def est_h(self, h):
        """
            Находит приближение E(h(x, y))
        """
        x1 = self.x[self.s == 1]
        x2 = self.x[self.s != 1]
        w = self.w(x2)
        res = h(x1, torch.ones(len(x1))).sum()
        res += (w * h(x2, torch.ones(len(x2)))).sum()
        res += ((1-w) * h(x2, torch.zeros(len(x2)))).sum()
        res /= len(self.x)
        return res.data.item()

    def w(self, x):
        g_x = self(x).squeeze()
        return (1-self.c) * g_x / self.c / (1 - g_x)

    def predict(self, x):
        res = torch.zeros(len(x))
        x = torch.FloatTensor(x)
        probs = self(x).squeeze().detach() / self.c
        res[probs > 0.5] = 1
        return res.numpy()

Протестируем, получившийся класификатор. Найдем recall, precision, accuracy для всех вариантов приближения c, а также оценку на $E(h)$, где $h(x, y)=y$ (истинное значение $0.5$).

In [8]:
def h(x, y):
    return y

In [9]:
def test():
    clas = Classifier(hd=228)
    clas.train(X, s)
    X_test, y_test, _ = generate_dataset(True, 500)
    cs = [r'Приближение:\quad c\sim\frac 1n \sum\limits_{x\in P} g(x)', 
          r'Приближение:\quad c\sim \sum\limits_{x\in P} g(x) / \sum\limits_{x\in V} g(x)', 
          r'Приближение:\quad c\sim \max_{x\in V} g(x)']
    
    for i, c in enumerate(cs):
        clas.est_c(i)
        y_pred = clas.predict(X_test)
        TP = y_pred[y_test==1].sum()
        TN = (1-y_pred)[y_test==1].sum()
        FP = y_pred[y_test==0].sum()
        FN = (1-y_pred)[y_test==0].sum()
        recall = TP / (TP + TN)
        precision = TP / (TP + FP)
        accuracy = (TP + FN) / len(y_test)
        display(Math(c))
        print("recall = {}\nprecision = {}\naccuracy = {}\nE(h)={}".format(recall, precision, accuracy, clas.est_h(h)))

In [10]:
test()

<IPython.core.display.Math object>

recall = 0.9919999837875366
precision = 0.9959839582443237
accuracy = 0.994
E(h)=0.5279846787452698


<IPython.core.display.Math object>

recall = 0.9919999837875366
precision = 0.9959839582443237
accuracy = 0.994
E(h)=0.5190211534500122


<IPython.core.display.Math object>

recall = 0.9599999785423279
precision = 1.0
accuracy = 0.98
E(h)=0.4003617465496063
