In [1]:
import matplotlib.pyplot as plt
import os
import torch
import torch.nn.functional as torchF
import pandas as pd
import idx2numpy
import numpy as np

In [None]:
#creating 7x12 bitmapped images to train LeNet against
from bitstring import BitArray

def bitstring_to_pbm(bitstring_data, width, height, filename):
    """
    Converts a bitstring to a PBM image file.

    Args:
        bitstring_data: A bitstring object containing the image data.
        width: The width of the image in pixels.
        height: The height of the image in pixels.
        filename: The name of the PBM file to create.
    """
    if len(bitstring_data) != width * height:
        raise ValueError("Bitstring length does not match image dimensions.")

    with open(filename, 'w') as f:
        f.write("P1\n")  # PBM format header (ASCII)
        f.write(f"{width} {height}\n")  # Image dimensions

        # Write pixel data (1 for black, 0 for white)
        for i in range(height):
            for j in range(width):
                f.write(str(int(bitstring_data[i * width + j])) + " ")
            f.write("\n")

In [None]:
#need ascii 32 to 126
filename='ASCII_pbms/ASCII_126.pbm'
data = '''
0000000
0000000
0000000
0000000
0000000
0001001
0110110
0000000
0000000
0000000
0000000
0000000
'''

width = 7
height = 12
image_data = np.array(list(data.replace('\n', ''))).reshape(height, width).astype(np.int8)

In [None]:
# Display the image using imshow
plt.imshow(image_data, cmap='gray_r')
ax = plt.gca()

# Set border properties
for spine in ax.spines.values():
    spine.set_edgecolor('grey')  # Set border color
    spine.set_linewidth(2)      # Set border thickness

# Remove the ticks
ax.set_xticks([])
ax.set_yticks([])

plt.show()

In [None]:
#save image

data = BitArray(bin=data)

#check for file
if filename.split('/')[1] not in os.listdir('ASCII_pbms'):
    print('creating file', filename)
    bitstring_to_pbm(data, width, height, filename)
else:
    print('file already exists')

In [2]:
#print PBM images
def read_pbm_file(filename):
    with open(filename, 'r') as f:
        l = f.readlines()
    bits = []
    for i in range(2, len(l)):
        lstr = l[i].replace('\n', '').replace(' ', '')
        bits.append([int(b) for b in lstr])

    bits = np.array(bits)
    return bits

In [None]:
filename = 'ASCII_pbms/ASCII_54.pbm'
image_data = read_pbm(filename)

# Display the image using imshow
plt.imshow(image_data, cmap='gray_r')
ax = plt.gca()

# Set border properties
for spine in ax.spines.values():
    spine.set_edgecolor('grey')  # Set border color
    spine.set_linewidth(2)      # Set border thickness

# Remove the ticks
ax.set_xticks([])
ax.set_yticks([])

plt.show()

## Training MNIST Against 7x12 Bitmapped ASCII Printable Chars

In the LeNet paper, loss function is a Euclidean RBF comparison of output layer against an "idealized" version of the inputs (7x12 bitmapped ASCII). I couldn't find a dataset for these online, so I recreated them. The section below recreates the single-char classification model from the first half of the LeNet paper

In [None]:
#dataset prep
ascii_map = open('emnist/emnist-byclass-mapping.txt', 'r').readlines()
ascii_map = dict([(int(l.rstrip().split(' ')[0]), int(l.rstrip().split(' ')[1])) for l in ascii_map])

col_names = ['class'] + ['pix'+str(i) for i in range(28*28)]
df_train = pd.read_csv('emnist/emnist-byclass-train.csv', names=col_names)
df_train['ascii#'] = df_train['class'].map(ascii_map)
df_test = pd.read_csv('emnist/emnist-byclass-test.csv', names=col_names)
df_test['ascii#'] = df_test['class'].map(ascii_map)

df_train.shape, df_test.shape

In [None]:
def print_img(df, idx):
    a = df_test.iloc[idx, 1:-1].to_numpy().reshape(28, 28).T
    print(df_test.iloc[idx, -1])
    plt.imshow(a, cmap='gray_r')

print_img(df_train, 46)

In [None]:
#load full printable ascii bitmap set into "parameter" tensor for model eval
bm_l = []
bm_dict = {}
for i, num in enumerate(sorted(pd.concat([df_test['ascii#'], df_train['ascii#']]).unique().tolist())):
    fname = f'ASCII_pbms/ASCII_{num}.pbm'
    arr = read_pbm_file(fname)
    bm_l.append(arr)
    bm_dict[num] = i

bm_param = torch.Tensor(np.array(bm_l)).reshape(62, 84)

In [None]:
#init hyperparams and weights
#hyperparams
F = 5 #size of square conv kernel
S = 1 #stride of conv kernel
B = 64 #batch dimension
SS = 2 #subsampling window size
act_list = {}

#weights
W1 = torch.randn((6, 1, 1, F, F)) / (F*F)
b1 = torch.randn((6, 1, 1))
w2 = torch.randn((6, 1, 1))
b2 = torch.randn((6, 1, 1))
W3_sparse_conv_idx = sum([[(i+x) % 6 for x in range(3)] for i in range(6)], [])
W3_sparse_conv_idx += sum([[(i+x) % 6 for x in range(4)] for i in range(6)], [])
W3_sparse_conv_idx += [0, 1, 3, 4, 1, 2, 4, 5, 0, 2, 3, 5] + list(range(6))
W3 = torch.randn((60, 1, 1, F, F)) / (F*F*60)
b3 = torch.randn((16, 1, 1))
w4 = torch.randn((16, 1, 1))
b4 = torch.randn((16, 1, 1))
W5 = torch.randn((120, 16, 5, 5)) / (F*F*16)
b5 = torch.randn(120)
W6 = torch.randn((120, 84)) / 120**0.5
b6 = torch.randn(84)

params = [W1, b1, w2, b2, W3, b3, w4, b4, W5, b5, W6, b6]
for p in params:
    p.requires_grad = True

In [None]:
#training loop
iters = 10000
losses = []
B = 32

for i in range(iters):
    
    #feedforward
    #retrieve sample batch, convert to numpy
    x_batch = df_train.sample(B).iloc[:, 1:].to_numpy()
    x, y = x_batch[:, :-1], x_batch[:, -1]
    
    #reshape to square matrix and zero pad 2-pixels on each side: (28x28) --> (32x32)
    x = x.reshape(B, 28, 28)
    x = torch.transpose(torch.Tensor(x), 1, 2)
    x = torchF.pad(x, pad=(2, 2, 2, 2), value=0.0)
    
    #normalize
    x = x / 255.0
    act_list['xin'] = x
    
    #convolution1
    x_conv1 = x.unfold(1, F, S).unfold(2, F, S).unsqueeze(1)
    x = (x_conv1 * W1).sum(dim=(-2, -1)) + b1
    #insert normalization here
    act_list['conv1'] = x
    x = x.sigmoid()
    
    #subsampling2
    x = x.unfold(2, SS, SS).unfold(3, SS, SS).sum(dim=(-2, -1))
    x /= float(SS*SS)
    x = x * w2 + b2
    act_list['ss2'] = x
    x = x.sigmoid()
    
    #conv3, uses special sparse connections
    #define sparse convolutional connections
    
    #applying sparse conv index, unfold into convs
    x_conv3 = x[:, W3_sparse_conv_idx, :, :].unfold(2, F, S).unfold(3, F, S)
    x = (x_conv3 * W3).sum(dim=(-2, -1))
    #sum together 60 convs into 16 feature maps
    W3_sparse_summed = [x[:, 3*i:3*(i+1), :, :].sum(dim=1, keepdim=True) for i in range(6)] 
    W3_sparse_summed += [x[:, 18+4*i:18+4*(i+1), :, :].sum(dim=1, keepdim=True) for i in range(9)]
    W3_sparse_summed += [x[:, 54:60, :, :].sum(dim=1, keepdim=True)]
    
    x = torch.cat(W3_sparse_summed, dim=1)
    x += b3
    act_list['conv3'] = x
    x = x.sigmoid()
    
    #subsamp4
    x = x.unfold(2, SS, SS).unfold(3, SS, SS).sum(dim=(-2, -1))
    x /= float(SS*SS)
    x = x * w4 + b4
    act_list['ss4'] = x
    
    #conv5
    x = (x.unsqueeze(1) * W5).sum(dim=(-3, -2, -1)) + b5
    act_list['conv5'] = x
    x = x.sigmoid()
    
    #F6, fully connected
    x = x @ W6 + b6
    act_list['f6'] = x
    x = torchF.tanh(x) #tanh in paper instead of sigmoid like other layers
    
    #O7, output layer
    x = (x.unsqueeze(-1) - bm_param.T).square().sum(dim=1)
    x /= 84.0
    act_list['out7'] = x.reshape(-1)
    
    #backpropagation
    
    #loss function
    #get index of y_pred value
    y_idx = np.vectorize(lambda x: bm_dict[x])(y)
    loss = (x[np.arange(x.shape[0]), y_idx] + (x*-1).exp().sum(axis=1).log()).mean()

    assert 1 == 2
    
    #backprop/gradient update
    loss.backward()
    if i % 500 == 0:
        losses.append(loss.item())
        print('loss:', loss.item())
    lr = 0.1
    for p in params:
        p.data += -lr * p.grad
        p.grad = None

In [None]:
#feedforward
#retrieve sample batch, convert to numpy
x_batch = df_train.sample(B).iloc[:, 1:].to_numpy()
x, y = x_batch[:, :-1], x_batch[:, -1]

#reshape to square matrix and zero pad 2-pixels on each side: (28x28) --> (32x32)
x = x.reshape(B, 28, 28)
x = torch.transpose(torch.Tensor(x), 1, 2)
x = torchF.pad(x, pad=(2, 2, 2, 2), value=0.0)

#normalize
x = x / 255.0
act_list['xin'] = x

#convolution1
x_conv1 = x.unfold(1, F, S).unfold(2, F, S).unsqueeze(1)
x = (x_conv1 * W1).sum(dim=(-2, -1)) + b1
#insert normalization here
act_list['conv1'] = x
x = x.sigmoid()

#subsampling2
x = x.unfold(2, SS, SS).unfold(3, SS, SS).sum(dim=(-2, -1))
x /= float(SS*SS)
x = x * w2 + b2
act_list['ss2'] = x

#conv3, uses special sparse connections
#define sparse convolutional connections

#applying sparse conv index, unfold into convs
x_conv3 = x[:, W3_sparse_conv_idx, :, :].unfold(2, F, S).unfold(3, F, S)
x = (x_conv3 * W3).sum(dim=(-2, -1))
#sum together 60 convs into 16 feature maps
W3_sparse_summed = [x[:, 3*i:3*(i+1), :, :].sum(dim=1, keepdim=True) for i in range(6)] 
W3_sparse_summed += [x[:, 18+4*i:18+4*(i+1), :, :].sum(dim=1, keepdim=True) for i in range(9)]
W3_sparse_summed += [x[:, 54:60, :, :].sum(dim=1, keepdim=True)]

x = torch.cat(W3_sparse_summed, dim=1)
x += b3
act_list['conv3'] = x
x = x.sigmoid()

#subsamp4
x = x.unfold(2, SS, SS).unfold(3, SS, SS).sum(dim=(-2, -1))
x /= float(SS*SS)
x = x * w4 + b4
act_list['ss4'] = x

#conv5
x = (x.unsqueeze(1) * W5).sum(dim=(-3, -2, -1)) + b5
act_list['conv5'] = x
x = x.sigmoid()

#F6, fully connected
x = x @ W6 + b6
act_list['f6'] = x
x = torchF.tanh(x) #tanh in paper instead of sigmoid like other layers

#O7, output layer
x = (x.unsqueeze(-1) - bm_param.T).square().sum(dim=1)
x /= 84.0
act_list['out7'] = x

#backpropagation

#loss function
#get index of y_pred value
y_idx = np.vectorize(lambda x: bm_dict[x])(y)
#loss = (x[np.arange(x.shape[0]), y_idx] + (x*-1).exp().sum(axis=1).log()).mean()
loss = x[:, y_idx].mean()
loss.backward()

lr = 0.1
for p in params:
    p.data += -lr * p.grad
    p.grad = None

In [12]:
# Feedforward with fixes, helped by claude
def forward(x_batch):
    
    # Retrieve sample batch, convert to numpy
    x, y = x_batch[:, :-1], x_batch[:, -1]
    
    # Reshape to square matrix and zero pad 2-pixels on each side: (28x28) --> (32x32)
    x = x.reshape(B, 28, 28)
    x = torch.transpose(torch.Tensor(x), 1, 2)
    x = torchF.pad(x, pad=(2, 2, 2, 2), value=0.0)
    
    # Normalize to [-0.1, 1.1] range as per paper
    x = (x / 255.0) * 1.2 - 0.1
    
    # Convolution1
    x_conv1 = x.unfold(1, F, S).unfold(2, F, S).unsqueeze(1)
    x = (x_conv1 * W1).sum(dim=(-2, -1)) + b1
    x = torch.sigmoid(x)  # Using torch.sigmoid for better gradient flow
    
    # Subsampling2
    x = x.unfold(2, SS, SS).unfold(3, SS, SS).sum(dim=(-2, -1))
    x /= float(SS*SS)
    x = x * w2 + b2
    x = torch.sigmoid(x)  # Add activation after subsampling
    
    # Conv3, uses special sparse connections
    x_conv3 = x[:, W3_sparse_conv_idx, :, :].unfold(2, F, S).unfold(3, F, S)
    x = (x_conv3 * W3).sum(dim=(-2, -1))
    
    # Sum together 60 convs into 16 feature maps
    W3_sparse_summed = [x[:, 3*i:3*(i+1), :, :].sum(dim=1, keepdim=True) for i in range(6)] 
    W3_sparse_summed += [x[:, 18+4*i:18+4*(i+1), :, :].sum(dim=1, keepdim=True) for i in range(9)]
    W3_sparse_summed += [x[:, 54:60, :, :].sum(dim=1, keepdim=True)]
    x = torch.cat(W3_sparse_summed, dim=1)
    x += b3
    x = torch.sigmoid(x)
    
    # Subsamp4
    x = x.unfold(2, SS, SS).unfold(3, SS, SS).sum(dim=(-2, -1))
    x /= float(SS*SS)
    x = x * w4 + b4
    x = torch.sigmoid(x)  # Add activation after subsampling
    
    # Conv5
    x = (x.unsqueeze(1) * W5).sum(dim=(-3, -2, -1)) + b5
    x = torch.sigmoid(x)
    
    # F6, fully connected
    x = x @ W6 + b6
    x = torch.tanh(x)  # tanh in paper instead of sigmoid like other layers
    
    # O7, output layer - proper RBF implementation as in the paper
    # Calculate Euclidean distance to each prototype
    distances = (x.unsqueeze(-1) - bm_param.T).square().sum(dim=1)
    # Apply Gaussian kernel (RBF)
    outputs = torch.exp(-0.5 * distances / 84.0)  # The scaling factor is from the paper
    
    return outputs, y

# Loss function for RBF network
def compute_loss(outputs, y, bm_dict):
    # Convert labels to indices
    y_idx = np.vectorize(lambda x: bm_dict[x])(y)
    
    # Cross-entropy-like loss for RBF network
    # We want to maximize the output for the correct class
    correct_class_outputs = outputs[torch.arange(outputs.shape[0]), y_idx]
    loss = -torch.log(correct_class_outputs + 1e-10).mean()  # Add small epsilon for numerical stability
    
    return loss

# Training loop
def train_step(x_batch, params, lr=0.01):
    outputs, y = forward(x_batch)
    loss = compute_loss(outputs, y, bm_dict)
    loss.backward()
    
    # Correct gradient update direction
    for p in params:
        p.data -= lr * p.grad  # Subtract, not add!
        p.grad = None
    
    return loss.item()

def save_model(params):
    for i, p in enumerate(params):
        torch.save(p, f'model_artifacts/model_{i}')

def restore_model(params):
    for i in range(len(params)):
        params[i] = torch.load(f'model_artifacts/model_{i}')


losses = []
B = x_batch.shape[0]
F = 5  # Filter size
S = 1  # Stride
SS = 2  # Subsampling size
for i in range (10000):
    x_batch = df_train.sample(B).iloc[:, 1:].to_numpy()
    l = train_step(x_batch, params)
    if i % 500 == 0:
        print(f'iter {i}:', l)
    losses.append(l)

iter 0: 0.34159621596336365
iter 500: 0.17374785244464874
iter 1000: 0.10848686099052429
iter 1500: 0.09682769328355789
iter 2000: 0.0939687192440033
iter 2500: 0.07197171449661255
iter 3000: 0.07175041735172272
iter 3500: 0.07716022431850433
iter 4000: 0.0811852365732193
iter 4500: 0.07756172120571136
iter 5000: 0.08283617347478867
iter 5500: 0.07364185154438019
iter 6000: 0.06847753375768661
iter 6500: 0.06473362445831299
iter 7000: 0.06389860063791275
iter 7500: 0.06067684292793274
iter 8000: 0.061996202915906906
iter 8500: 0.06137581914663315
iter 9000: 0.06210257112979889
iter 9500: 0.05484551563858986


In [13]:
save_model(params)

In [None]:
x_train = df_train.iloc[:, 1:].to_numpy()
x_test = df_test.iloc[:, 1:].to_numpy()
outputs, y = forward(x_train)


def compute_accuracy(outputs, y):
    pass

In [None]:
#visualize activations and gradients
# Plotting multiple histograms
for k in act_list.keys():
    #plt.hist(act_list[k].reshape(-1), bins=30, alpha=0.7, label=f'act_list_{k}', histtype='step')
    print(f'{k}: {act_list[k].mean()}, {act_list[k].var()}')
    
#plt.title('Activation distributions')
#plt.legend(loc='upper right')
#plt.show()

In [None]:
#variance in each sample in batch across layers
for k in act_list.keys():
    print(f'{k}: {act_list[k].var(axis=0).mean()}', act_list[k].var())

In [None]:
for p in params:
    plt.hist(p.grad.reshape(-1), bins=30, alpha=0.7, label=f'act_list_{k}', histtype='step')
    print(p.grad.mean(), p.grad.var())

plt.title('gradient dists')
plt.legend(loc='upper right')
plt.show()

## Classification of entire sections of Handwriting

after creating the simpler single-char predictor, LeNet specifies a graph structure for OCR on full sections of handwritten text. The creation of dataset and recreation of that model is below

In [None]:

#creation of handwritten text dataset
splits = {'train': 'data/train.parquet', 'validation': 'data/validation.parquet', 'test': 'data/test.parquet'}
df_train = pd.read_parquet("hf://datasets/Teklia/IAM-line/" + splits["train"])
df_val = pd.read_parquet("hf://datasets/Teklia/IAM-line/" + splits["validation"])
df_test = pd.read_parquet("hf://datasets/Teklia/IAM-line/" + splits["test"])

#define types
df_train['type'] = 'train'
df_val['type'] = 'validation'
df_test['type'] = 'test'


#concat, isolate image bytestrings, save
df = pd.concat([df_train, df_val, df_test])
df['bytes'] = df.image.apply(lambda x: x['bytes'])
df[['text', 'bytes', 'type']].to_parquet('df.parquet.gzip', compression='gzip') 

In [None]:
df