In [1]:
import torch
import ltn
from sklearn.metrics import accuracy_score
import numpy as np
from ltn_imp.parsing import convert_to_ltn

In [2]:
nr_samples = 100
dataset = torch.rand((nr_samples, 2))
labels_dataset = torch.sum(torch.square(dataset - torch.tensor([.5, .5])), dim=1) < .09

## Data Preparation

In [3]:
class ModelA(torch.nn.Module):
    def __init__(self):
        super(ModelA, self).__init__()
        self.sigmoid = torch.nn.Sigmoid()
        self.layer1 = torch.nn.Linear(2, 16)
        self.layer2 = torch.nn.Linear(16, 16)
        self.layer3 = torch.nn.Linear(16, 1)
        self.elu = torch.nn.ELU()

    def forward(self, x):
        x = self.elu(self.layer1(x))
        x = self.elu(self.layer2(x))
        return self.sigmoid(self.layer3(x))

In [4]:
class DataLoader(object):
    def __init__(self,
                 data,
                 labels,
                 batch_size=1,
                 shuffle=True):
        self.data = data
        self.labels = labels
        self.batch_size = batch_size
        self.shuffle = shuffle

    def __len__(self):
        return int(np.ceil(self.data.shape[0] / self.batch_size))

    def __iter__(self):
        n = self.data.shape[0]
        idxlist = list(range(n))
        if self.shuffle:
            np.random.shuffle(idxlist)

        for _, start_idx in enumerate(range(0, n, self.batch_size)):
            end_idx = min(start_idx + self.batch_size, n)
            data = self.data[idxlist[start_idx:end_idx]]
            labels = self.labels[idxlist[start_idx:end_idx]]

            yield data, labels

train_loader = DataLoader(dataset[:50], labels_dataset[:50], 64, True)
test_loader = DataLoader(dataset[50:], labels_dataset[50:], 64, False)

## My Implementation 

In [5]:
model = ModelA()
predicates = {"Classifier": model }

expression_1 = "all x. (Classifier(x))"
rule_1 = convert_to_ltn(expression_1, predicates=predicates, functions=None, quantifier_impls={"forall" : "pmean_error"})

expression_2 = " all x. not(Classifier(x))"
rule_2 = convert_to_ltn(expression_2, predicates=predicates, functions=None, quantifier_impls={"forall" : "pmean_error"})

In [6]:
from ltn_imp.fuzzy_operators.aggregators import SatAgg

sat_agg = SatAgg()

def compute_sat_level(loader):
    mean_sat = 0
    for data, labels in loader:
        
        pos =  data[torch.nonzero(labels)]
        neg =  data[torch.nonzero(torch.logical_not(labels))]

        # Compute satisfaction level
        mean_sat += sat_agg(
            rule_1( {"x" : pos} ),
            rule_2( { "x" : neg })
        )
        
    mean_sat /= len(loader)
    return mean_sat

def compute_accuracy(loader, model):
    mean_accuracy = 0.0
    for data, labels in loader:
        predictions = model(data).detach().numpy()
        predictions = np.where(predictions > 0.5, 1., 0.).flatten()
        mean_accuracy += accuracy_score(labels, predictions)

    return mean_accuracy / len(loader)

In [7]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(801):
    train_loss = 0.0
    for batch_idx, (data, labels) in enumerate(train_loader):
        optimizer.zero_grad()

        pos =  data[torch.nonzero(labels)]
        neg =  data[torch.nonzero(torch.logical_not(labels))]
        
        # Compute satisfaction level
        sat_agg_value = sat_agg(
            rule_1( {"x" : pos} ),
            rule_2( { "x" : neg })
        )
        
        # Compute loss
        loss = 1.0 - sat_agg_value
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss = train_loss / len(train_loader)

    if epoch % 200 == 0:
        print(" epoch %d | loss %.4f | Train Sat %.3f | Test Sat %.3f | Train Acc %.3f | Test Acc %.3f"
              %(epoch, train_loss, compute_sat_level(train_loader), compute_sat_level(test_loader),
                    compute_accuracy(train_loader, model), compute_accuracy(test_loader, model)))
        
        print()
        print(f"Positive {rule_1( {"x" : pos} )}")
        print(f"Negative { rule_2( { "x" : neg })}") 
        print()

 epoch 0 | loss 0.4997 | Train Sat 0.500 | Test Sat 0.500 | Train Acc 0.460 | Test Acc 0.420

Positive 0.5091327428817749
Negative 0.49169570207595825

 epoch 200 | loss 0.4544 | Train Sat 0.546 | Test Sat 0.546 | Train Acc 0.760 | Test Acc 0.860

Positive 0.553217887878418
Negative 0.5396345853805542

 epoch 400 | loss 0.1942 | Train Sat 0.807 | Test Sat 0.749 | Train Acc 0.980 | Test Acc 0.960

Positive 0.813523530960083
Negative 0.8002948760986328

 epoch 600 | loss 0.0767 | Train Sat 0.924 | Test Sat 0.795 | Train Acc 1.000 | Test Acc 0.960

Positive 0.9257856607437134
Negative 0.9214745759963989

 epoch 800 | loss 0.0321 | Train Sat 0.968 | Test Sat 0.807 | Train Acc 1.000 | Test Acc 0.960

Positive 0.9684513807296753
Negative 0.967540442943573



## LTN

In [8]:
A = ltn.Predicate(ModelA())
Not = ltn.Connective(ltn.fuzzy_ops.NotStandard())
Forall = ltn.Quantifier(ltn.fuzzy_ops.AggregPMeanError(p=2), quantifier="f")
SatAgg = ltn.fuzzy_ops.SatAgg()

In [9]:
def compute_sat_level(loader):
    mean_sat = 0
    for data, labels in loader:
        
        x_A = ltn.Variable("x_A", data[torch.nonzero(labels)])  # positive examples
        x_not_A = ltn.Variable("x_not_A", data[torch.nonzero(torch.logical_not(labels))])  # negative examples

        mean_sat += SatAgg(
            Forall(x_A, A(x_A)),
            Forall(x_not_A, Not(A(x_not_A)))
        )
        
    mean_sat /= len(loader)
    return mean_sat

def compute_accuracy(loader):
    mean_accuracy = 0.0
    for data, labels in loader:
        predictions = A.model(data).detach().numpy()
        predictions = np.where(predictions > 0.5, 1., 0.).flatten()
        mean_accuracy += accuracy_score(labels, predictions)

    return mean_accuracy / len(loader)

In [10]:
optimizer = torch.optim.Adam(A.parameters(), lr=0.001)


for epoch in range(801):
    train_loss = 0.0
    for batch_idx, (data, labels) in enumerate(train_loader):
        optimizer.zero_grad()

        x_A = ltn.Variable("x_A", data[torch.nonzero(labels)]) # positive examples
        x_not_A = ltn.Variable("x_not_A", data[torch.nonzero(torch.logical_not(labels))]) # negative examples

        sat_agg = SatAgg(
            Forall(x_A, A(x_A)),
            Forall(x_not_A, Not(A(x_not_A)))
        )

        loss = 1. - sat_agg
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss = train_loss / len(train_loader)

    if epoch % 200 == 0:
        print(" epoch %d | loss %.4f | Train Sat %.3f | Test Sat %.3f | Train Acc %.3f | Test Acc %.3f"
        %(epoch, train_loss, compute_sat_level(train_loader), compute_sat_level(test_loader),
            compute_accuracy(train_loader), compute_accuracy(test_loader)))
        
        print()
        print(f"Positive { Forall(x_A, A(x_A)) }")
        print(f"Negative { Forall(x_not_A, Not(A(x_not_A)))}") 
        print()

 epoch 0 | loss 0.5008 | Train Sat 0.499 | Test Sat 0.501 | Train Acc 0.540 | Test Acc 0.480

Positive LTNObject(value=tensor(0.5114, grad_fn=<RsubBackward1>), free_vars=[])
Negative LTNObject(value=tensor(0.4876, grad_fn=<RsubBackward1>), free_vars=[])

 epoch 200 | loss 0.4105 | Train Sat 0.591 | Test Sat 0.561 | Train Acc 0.780 | Test Acc 0.660

Positive LTNObject(value=tensor(0.6090, grad_fn=<RsubBackward1>), free_vars=[])
Negative LTNObject(value=tensor(0.5735, grad_fn=<RsubBackward1>), free_vars=[])

 epoch 400 | loss 0.1941 | Train Sat 0.807 | Test Sat 0.690 | Train Acc 0.980 | Test Acc 0.900

Positive LTNObject(value=tensor(0.8179, grad_fn=<RsubBackward1>), free_vars=[])
Negative LTNObject(value=tensor(0.7966, grad_fn=<RsubBackward1>), free_vars=[])

 epoch 600 | loss 0.0746 | Train Sat 0.926 | Test Sat 0.765 | Train Acc 1.000 | Test Acc 0.940

Positive LTNObject(value=tensor(0.9287, grad_fn=<RsubBackward1>), free_vars=[])
Negative LTNObject(value=tensor(0.9231, grad_fn=<RsubBa