In [None]:
#@title install libs

!pip install fcapy[all]
!pip install frozendict
!pip install ipynb
!pip install sparselinear
!pip install bitsets
!pip install bitarray
import torch
!pip install torch-scatter -f https://data.pyg.org/whl/torch-2.0.0+cuda118.html
!pip install torch-sparse -f https://data.pyg.org/whl/torch-2.0.0+cuda118.html
!pip install torch-cluster -f https://data.pyg.org/whl/torch-2.0.0+cuda118.html
!pip install git+https://github.com/pyg-team/pytorch_geometric.git

In [74]:
#@title import libs

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, jaccard_score, recall_score, accuracy_score, classification_report

from fcapy.context import FormalContext
from fcapy.lattice import ConceptLattice

from fcapy.visualizer import LineVizNx
import neural_lib as nl
from fcapy.utils.utils import powerset
import matplotlib.pyplot as plt

plt.rcParams['figure.facecolor'] = (1,1,1,1)


from fcapy import LIB_INSTALLED
if LIB_INSTALLED['numpy']:
    import numpy as np

from sparselinear import SparseLinear

In [75]:
# @title neural_lib.py
%%writefile neural_lib.py

from dataclasses import dataclass
from typing import List, Tuple, FrozenSet, Set, Dict
import pandas as pd

from fcapy.lattice import ConceptLattice
from fcapy.lattice.formal_concept import FormalConcept
from fcapy.poset import POSet
from fcapy.visualizer.line_layouts import calc_levels

import torch
from sparselinear import SparseLinear


@dataclass(eq=False)
class DisjunctiveNeuron:
    intent: FrozenSet[str]
    level: int

    def __eq__(self, other: 'DisjunctiveNeuron'):
        return self.intent == other.intent and self.level == other.level

    def __lt__(self, other: 'DisjunctiveNeuron'):
        return self.intent & other.intent == other.intent and self.level > other.level

    def __le__(self, other: 'DisjunctiveNeuron'):
        return self < other or self == other

    def __hash__(self):
        return hash((self.intent, self.level))


class ConceptNetwork:
    def __init__(self, poset: POSet, network=None, attributes: Tuple[str] = None, targets: Tuple[str] = None):
        self._poset = poset
        self._network = network
        self._attributes = attributes
        self._targets = targets

    @property
    def poset(self) -> POSet:
        return self._poset

    @property
    def network(self) -> torch.nn.Sequential:
        return self._network

    @property
    def attributes(self) -> Tuple[str]:
        return self._attributes

    @property
    def targets(self):
        return self._targets

    def trace_description(self, description: FrozenSet[str], include_targets: bool = False) -> Set[int]:
        P = self.poset

        tops_activated = [node for node in P.tops if P[node].intent & description == P[node].intent]
        activated_nodes = set(tops_activated)
        for node in tops_activated:
            activated_nodes |= P.descendants(node)
        if not include_targets:
            activated_nodes -= set(P.bottoms)

        return activated_nodes

    @classmethod
    def from_lattice(
            cls,
            lattice: ConceptLattice, best_concepts_indices: List[int],
            targets: Tuple[str]
    ):
        assert lattice.is_monotone, 'The lattice should be monotone'

        targets = tuple(targets)

        attrs_tpl = tuple(lattice[lattice.bottom].intent)
        P = cls._poset_from_best_concepts(lattice[best_concepts_indices], targets, attrs_tpl)
        P = cls._fill_levels(P)
        return cls(P, None, attributes=attrs_tpl, targets=targets)

    def fit(
            self,
            X_df: 'pd.DataFrame[bool]', y: 'pd.Series[bool]',
            loss_fn=torch.nn.CrossEntropyLoss(), nonlinearity=torch.nn.ReLU,
            n_epochs: int = 2000
    ):
        X = torch.tensor(X_df[list(self.attributes)].values).float()
        y = torch.tensor(y.values).long()

        self._network = self._poset_to_network(self.poset, nonlinearity)

        optimizer = torch.optim.Adam(self.network.parameters())

        for t in range(n_epochs):
            optimizer.zero_grad()
            y_pred = self.network(X)
            loss = loss_fn(y_pred, y)
            loss.backward()
            optimizer.step()

    def predict_proba(self, X_df: 'pd.DataFrame[bool]') -> torch.Tensor:
        X = torch.tensor(X_df[list(self.attributes)].values).float()
        return self.network(X)

    def predict(self, X_df: 'pd.DataFrame[bool]') -> torch.Tensor:
        return self.predict_proba(X_df).argmax(1)

    def edge_weights_from_network(self) -> Dict[Tuple[int, int], float]:
        max_level = self.poset[self.poset.bottoms[0]].level
        nodes_per_levels = {lvl: [] for lvl in range(max_level + 1)}
        for node_i, node in enumerate(self.poset):
            nodes_per_levels[node.level].append(node_i)
        nodes_per_levels = [nodes_per_levels[lvl] for lvl in range(max_level + 1)]

        edge_weights = {}
        for layer_i, nodes in enumerate(nodes_per_levels[:-1]):
            next_nodes = nodes_per_levels[layer_i+1]

            nn_layer = self.network[layer_i*2]
            idxs = nn_layer.weight.indices().numpy().T.tolist()
            vals = nn_layer.weight.values().numpy()

            for (child_i, parent_i), v in zip(idxs, vals):
                edge_weights[(nodes[parent_i], next_nodes[child_i])] = v
        return edge_weights

    @staticmethod
    def _poset_from_best_concepts(
            best_concepts: List[FormalConcept], targets: Tuple[str], attrs_tpl: Tuple[str]
    ) -> POSet:
        P_best = POSet(best_concepts)
        lvls = calc_levels(P_best)[0]
        lvls = [lvl + 1 for lvl in lvls]
        target_lvl = max(lvls) + 1

        attrs_set = set(attrs_tpl)

        best_neurons = [DisjunctiveNeuron(frozenset(c.intent), lvl) for c, lvl in zip(P_best, lvls)]
        first_level_neurons = [DisjunctiveNeuron(frozenset({m}), 0) for m in attrs_tpl]
        last_level_neurons = [DisjunctiveNeuron(frozenset({f"y={y}"} | attrs_set), target_lvl) for y in targets]
        return POSet(first_level_neurons + best_neurons + last_level_neurons)

    @staticmethod
    def _fill_levels(poset: POSet) -> POSet:
        nodes_i = sorted(range(len(poset)), key=lambda node_i: poset[node_i].level)
        for node_i in nodes_i:
            children_i = poset.children(node_i)
            if len(children_i) == 0:
                continue

            max_children_level = max([poset[child_i].level for child_i in children_i])
            for lvl in range(poset[node_i].level+1, max_children_level):
                poset.add(DisjunctiveNeuron(poset[node_i].intent, lvl))
        return poset

    @staticmethod
    def _poset_to_network(poset: POSet, nonlinearity: type = torch.nn.ReLU) -> 'torch.nn.Sequential':
        max_level = poset[poset.bottoms[0]].level
        nodes_per_levels = {lvl: [] for lvl in range(max_level + 1)}
        for node_i, node in enumerate(poset):
            nodes_per_levels[node.level].append(node_i)
        nodes_per_levels = [nodes_per_levels[lvl] for lvl in range(max_level + 1)]

        connectivities = []
        for layer_i, layer in enumerate(nodes_per_levels[1:]):
            layer_i += 1
            prev_layer = nodes_per_levels[layer_i - 1]
            layer_con = [(layer.index(node), prev_layer.index(parent))
                         for node in layer for parent in poset.parents(node)]
            connectivities.append(layer_con)

        linear_layers = []
        for layer_i in range(max_level):
            con = torch.tensor(connectivities[layer_i]).T
            layer = SparseLinear(len(nodes_per_levels[layer_i]), len(nodes_per_levels[layer_i + 1]), connectivity=con)
            linear_layers.append(layer)

        layers = [layer for ll in linear_layers for layer in [ll, nonlinearity()]][:-1] + [torch.nn.Softmax(dim=1)]
        model_sparse = torch.nn.Sequential(*layers)
        return model_sparse


def neuron_label_func(el_i: int, P: POSet, M: set, only_new_attrs: bool = True):
    el = P[el_i]

    if len(el.intent - M) > 0:  # if target node
        attrs_to_show = list(el.intent - M)
    else:
        attrs_to_show = set(el.intent)
        if only_new_attrs:
            for parent_i in P.parents(el_i):
                attrs_to_show = attrs_to_show - P[parent_i].intent

        attrs_to_show = list(attrs_to_show)
    return ','.join(attrs_to_show)

Overwriting neural_lib.py


In [76]:
#@title import data

root = 'https://hse.kamran.uz/osda23/fca/'

pp = pd.read_csv(f'{root}/prepared_pp.csv')
c  = pd.read_csv(f'{root}/prepared_c.csv')
z  = pd.read_csv(f'{root}/prepared_z.csv')
p  = pd.read_csv(f'{root}/prepared_p.csv')


y_pp = pd.read_csv(f'{root}/target_pp.csv')
y_c  = pd.read_csv(f'{root}/target_c.csv')
y_z  = pd.read_csv(f'{root}/target_z.csv')
y_p  = pd.read_csv(f'{root}/target_p.csv')

In [77]:
#@title train

def get_train_test(X,y):
    X.index = X.index.astype('str')
    y.index = y.index.astype('str')
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        test_size=0.1,
                                                        random_state=42,
                                                        shuffle=True,
                                                        stratify=y)
    return X_train, X_test, y_train, y_test

X = [pp,c,z,p]
y = [y_pp,y_c,y_z,y_p]
models = []
for X_, y_ in zip(X,y):
    m = []
    # Split the data to train and test
    X_train, X_test, y_train, y_test = get_train_test(X_, y_)

    # Put binarized data in FormalContext and compute monotone ConceptLattice
    K_train = FormalContext(data = X_train.values, target=y_train.values, attribute_names=X_train.columns)
    L = ConceptLattice.from_context(K_train, algo='Sofia', is_monotone=True)

    # Compute F1 score for each formal concept (assuming that an object is predicted True if it is in the extent of the concept)
    for c in L:
        y_preds = np.zeros(K_train.n_objects)
        y_preds[list(c.extent_i)] = 1
        c.measures = dict(f1_score=f1_score(y_train, y_preds))

    # Select indices of the best concepts from the lattice
    best_concepts = list(L.measures['f1_score'].argsort()[::-1])
    for i in range(len(best_concepts)):
        if len({g_i for c in L[best_concepts[:i]] for g_i in c.extent_i})==K_train.n_objects:
            best_concepts = best_concepts[:i]
            break

    # Construct neural network based on concept lattice
    cn = nl.ConceptNetwork.from_lattice(L, best_concepts, sorted(set(y_train)))
    # cn.fit(X_train, y_train)
    m = [cn, K_train,L,[X_train, X_test, y_train, y_test]]
    models += [m]

In [78]:
# example of learned Latice

models[0][1]

FormalContext (307 objects, 19 attributes, 1995 connections)
   |sex|island_Biscoe|island_Dream|island_Torgersen|year_2007|...|flipper_length_mm_(192.0, 209.333]|flipper_length_mm_(209.333, 231.0]|body_mass_g_(2699.999, 3700.0]|body_mass_g_(3700.0, 4550.0]|body_mass_g_(4550.0, 6300.0]|
0  |  X|             |           X|                |         |...|                                 X|                                  |                             X|                            |                            |
1  |   |             |           X|                |         |...|                                 X|                                  |                             X|                            |                            |
2  |   |             |           X|                |        X|...|                                 X|                                  |                             X|                            |                            |
3  |  X|             |           X|

In [67]:
#@title plot

import networkx as nx

def plot_cn(cn):
    fig, ax = plt.subplots(figsize=(15,5))
    vis = LineVizNx(node_label_font_size=14, node_label_func=lambda el_i, P: nl.neuron_label_func(el_i, P, set(cn.attributes))+'\n\n')
    vis.init_mover_per_poset(cn.poset)
    edge_weights = cn.edge_weights_from_network()

    vis.draw_poset(
        cn.poset, ax=ax,
        flg_node_indices=False,
        node_label_func=lambda el_i, P: nl.neuron_label_func(el_i, P, set(cn.attributes), only_new_attrs=True)+'\n\n',
        edge_color=[edge_weights[edge] for edge in cn.poset.to_networkx().edges],
        edge_cmap=plt.cm.RdBu,
    )
    nx.draw_networkx_edge_labels(cn.poset.to_networkx(), vis.mover.pos, {k: f"{v:.1f}" for k,v in edge_weights.items()}, label_pos=0.7)

    plt.title('Neural network with fitted edge weights', size=24, x=0.05, loc='left')
    plt.tight_layout()
    plt.subplots_adjust()
    plt.savefig('fitted_network.png')
    plt.show()