# Пример реализации ассоциативной памяти.

Архитектура модели:
1. Шардированный линейный классификатор.
2. Хэширование для быстрого поиска шарда.

Шардирование нужно для увеличения емкости памяти модели. Тренировочная выборка разбивается на N частей и на каждой части обучается отдельный экземпляр линейного классификатора. При достаточном количестве шардов можно получить сходимость на 100% (на тренировочной выборке). В данном примере для шардирования используется LSH. Такой подход имеет следующие плюсы:
* устойчивость к шумам на тренировочной выборке
* быстрое вычисление шарда

In [1]:
import numpy as np

from sklearn.datasets import fetch_openml
mnist = fetch_openml('mnist_784', parser='auto')

from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(mnist.data, mnist.target, test_size=10000, random_state=42, train_size=60000)
x_train = x_train.values.astype(int)
y_train = y_train.values.astype(int)
x_test = x_test.values.astype(int)
y_test = y_test.values.astype(int)

In [2]:
input_count = 28*28
bits = 8
shard_count = 50

kernels = np.random.choice([-1, 1], size=(bits, input_count), p=[1./2, 1./2])

bits_train = np.heaviside(np.einsum('ik,jk->ij', x_train, kernels), 0).astype(int)
bits_test = np.heaviside(np.einsum('ik,jk->ij', x_test, kernels), 0).astype(int)

shards_train = np.packbits(bits_train) % shard_count
shards_test = np.packbits(bits_test) % shard_count

unique, counts = np.unique(shards_train, return_counts=True)
print(unique, counts)

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49] [ 331  241 1979  645 2296 1052 1179  735  503  752  286  320 2222 3214
 1188 1350  331  438  158   78 2408 1009  911  464  962  577  387  232
 3379 3052 1059 1201  615  781  326  395 1840 1103  632  431 1111  609
  767  537 2569 1333 4200 3232 2681 1899]


In [3]:
input_count = 28*28
output_count = 10

from numba import njit, prange

@njit(fastmath=True)
def fit(shard, S, T):
    for epoch in range(10000):
        total_misses = 0
        for i in range(len(shards_train)):
            if shards_train[i] == shard:
                # diff classifier
                X, y = x_train[i], y_train[i]
                M = S / T
                p = ((M - X)**2).sum(axis=1).argmin()
                if p != y:
                    S[y] += X
                    T[y] += 1
                    S[p] = (S[p] - X / 2).clip(0)
                    total_misses += 1
        if total_misses == 0:
                break

S = np.zeros((shard_count, output_count, input_count))
T = np.ones((shard_count, output_count, 1))

@njit(fastmath=True, parallel=True)
def fit_all(S, T):
    for shard in prange(shard_count):
        fit(shard, S[shard], T[shard])

fit_all(S, T)

In [4]:
M = S / T

total_test = 0
for i in range(len(x_test)):
    X, y = x_test[i], y_test[i]
    if y == ((M[shards_test[i]] - X)**2).sum(axis=1).argmin():
        total_test += 1

total_train = 0
for i in range(len(x_train)):
    X, y = x_train[i], y_train[i]
    if y == ((M[shards_train[i]] - X)**2).sum(axis=1).argmin():
        total_train += 1

print("accuracy train: %f; accuracy test: %f" % (total_train/len(x_train), total_test/len(x_test)))

accuracy train: 1.000000; accuracy test: 0.900300


In [5]:
noise = np.random.choice([-1, 1], size=x_train.shape, p=[1./2, 1./2])
x_train += noise
noise = np.random.choice([-1, 1], size=x_test.shape, p=[1./2, 1./2])
x_test += noise

total_test = 0
for i in range(len(x_test)):
    X, y = x_test[i], y_test[i]
    if y == ((M[shards_test[i]] - X)**2).sum(axis=1).argmin():
        total_test += 1

total_train = 0
for i in range(len(x_train)):
    X, y = x_train[i], y_train[i]
    if y == ((M[shards_train[i]] - X)**2).sum(axis=1).argmin():
        total_train += 1

print("accuracy train: %f; accuracy test: %f" % (total_train/len(x_train), total_test/len(x_test)))

accuracy train: 0.999800; accuracy test: 0.900100
