In [1]:
from fairtorch import ConstraintLoss, DemographicParityLoss, EqualiedOddsLoss

import random
import numpy as np 
import os

  
import torch
from torch import nn
from torch.nn import functional as F


from icecream import ic
import math

In [2]:
def seed_everything(seed):
    random.seed(seed)
    os.environ["PYTHONSEED"]=str(seed)
    np.random.seed(seed)

seed_everything(42)

In [3]:
n_samples = 512
n_feature = 4 

def generate_data(n_samples=n_samples, n_feature=n_feature):
    
    y = np.random.randint(0,2, size=n_samples)
    loc0 = np.random.uniform(-1,1,n_feature)
    loc1 = np.random.uniform(-1,1,n_feature)
    
    X = np.zeros((n_samples, n_feature))
    for i,u in enumerate(y):
        if y[i]==0:
            X[i] = np.random.normal(loc=loc0,scale=1.0, size=n_feature)
        else:
            X[i] = np.random.normal(loc=loc1, scale=1.0, size=n_feature)
    
    sensitive_feature = (X[:,0]>X[:,0].mean()).astype(int)
 
    return X, y, sensitive_feature

In [4]:
X,y,sensitive = generate_data(n_samples, n_feature)

In [5]:
dp_criterion = DemographicParityLoss(sensitive_classes=[0,1],alpha=1,p_norm=2)

  return torch._C._cuda_getDeviceCount() > 0


In [6]:
dp_criterion.sensitive_classes

[0, 1]

In [7]:
dp_criterion.dim_condition

3

In [8]:
dp_criterion.M

tensor([[ 1.,  0., -1.],
        [-1.,  0.,  1.],
        [ 1.,  0., -1.],
        [-1.,  0.,  1.]])

In [9]:
M = np.zeros((4,3))
for i in range(4):
    j = i%2
    if j==0:
        M[i,j] = 1.0
        M[i,-1] = -1.0
    else:
        M[i,j-1] = -1.0
        M[i,-1] = 1.0
    ic(M)

ic| M: array([[ 1.,  0., -1.],
              [ 0.,  0.,  0.],
              [ 0.,  0.,  0.],
              [ 0.,  0.,  0.]])
ic| M: array([[ 1.,  0., -1.],
              [-1.,  0.,  1.],
              [ 0.,  0.,  0.],
              [ 0.,  0.,  0.]])
ic| M: array([[ 1.,  0., -1.],
              [-1.,  0.,  1.],
              [ 1.,  0., -1.],
              [ 0.,  0.,  0.]])
ic| M: array([[ 1.,  0., -1.],
              [-1.,  0.,  1.],
              [ 1.,  0., -1.],
              [-1.,  0.,  1.]])


In [10]:
expected_values_list = []
for v in [0,1]:
    idx_true = sensitive==v
    expected_values_list.append(y[idx_true].mean())
expected_values_list.append(y.mean())
ic(expected_values_list)

ic| expected_values_list: [0.6563706563706564, 0.35968379446640314, 0.509765625]


[0.6563706563706564, 0.35968379446640314, 0.509765625]

Mu_f
[s=0のみの平均出力， s＝1のみの平均出力，　全ての平均出力]

In [11]:
n_samples = 512
n_feature = 4

def genelate_data(n_samples = n_samples, n_feature=n_feature):

    y = np.random.randint(0, 2, size=n_samples)
    loc0 = np.random.uniform(-1, 1, n_feature)
    loc1 = np.random.uniform(-1, 1, n_feature)

    X = np.zeros((n_samples, n_feature))
    for i, u in enumerate(y):
        if y[i] ==0:
            X[i] = np.random.normal(loc = loc0, scale=1.0, size=n_feature)  
        else:
            X[i] = np.random.normal(loc = loc1, scale=1.0, size=n_feature)  

    sensi_feat = (X[:, 0] > X[:, 0].mean()).astype(int)
    X[:, 0] = sensi_feat.astype(np.float32)
    X = torch.from_numpy(X).float()
    y = torch.from_numpy(y).float()
    sensi_feat = torch.from_numpy(sensi_feat)
    return X, y, sensi_feat

X_, y_, sensitive_ = genelate_data(n_samples, n_feature)

In [12]:
ic(X_.shape, y_.shape, sensitive_.shape)
ic(X.shape, y.shape, sensitive.shape)

ic| X_.shape: torch.Size([512, 4])
    y_.shape: torch.Size([512])
    sensitive_.shape: torch.Size([512])
ic| X.shape: (512, 4), y.shape: (512,), sensitive.shape: (512,)


((512, 4), (512,), (512,))

In [13]:
dp_criterion.mu_f(X_,y_,sensitive_)

tensor([0.4355, 0.5038, 0.4707])

In [14]:
def mu_f(X, out, sensitive, y=None):
    expected_values_list = []
    for v in [0,1]:
        idx_true = sensitive == v  # torch.bool
        expected_values_list.append(out[idx_true].mean())
    expected_values_list.append(out.mean())
    return torch.stack(expected_values_list)
    
mu_f(X_,y_,sensitive_)

tensor([0.4355, 0.5038, 0.4707])

torch.stackすると
[tensor(1), tensor(2), tensor(3)] -> tensor([1,2,3])になった

In [25]:
for i, v in enumerate(dp_criterion.parameters()):
    print(i,v)

In [34]:
dp_criterion(X_,y_,sensitive_)

tensor(0.0001)

gap_constraint = ReLU(M:[4,3], mu:[3,] - c: [3,])
               = ReLU([4,])

