HOPFIELD NETWORK: FROM HANDWRITTEN DIGITS TO DAYDREAMING

Goal: demonstration of associative memory and capacity enhancement of daydreaming algorithm

Cell Structure:
1. Functions for loading data, implementing hopfield NN, and implementing daydreaming 
2. Demonstrate HNN pattern memorization on handwritten digits
3. Explore how correlation affects memory capcity 
3. Demonstrate HNN on uncorrelated data Salt & Pepper Patterns - compare reconstruction methods
4. Compare daydreaming algorithm memory capacity with Hebbian learning

In [1]:
###----Functions for implementing hopfield NN and daydreaming algorithm 

import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from tensorflow.keras.datasets import mnist
from scipy.ndimage import zoom

#create/load in inital conditions
def load_img(path, side):
    """Load image from disk and convert to binary {-1, +1} vector."""
    img = Image.open(path)
    img = img.resize((side, side))
    img = img.convert('1')  # thresholded black/white
    arr = 2 * np.array(img, dtype=np.int8) - 1
    return arr.flatten()

def show_array(img_array, title=None):
    """Display a single pattern (1D vector) as an image."""
    side = int(np.sqrt(img_array.shape[0]))
    arr = img_array.reshape((side, side))
    plt.figure(figsize=(3, 3))
    plt.imshow(arr, cmap='gray', vmin=-1, vmax=1)
    if title:
        plt.title(title)
    plt.axis('off')
    plt.show()

def show_multiple_arrays(img_arrays, titles=None, suptitle=None):
    """Display multiple patterns side by side."""
    k = len(img_arrays)
    fig, axes = plt.subplots(1, k, figsize=(3*k, 3))
    if k == 1:
        axes = [axes]
    if suptitle:
        plt.suptitle(suptitle, fontsize=14, y=1.02)

    for i in range(k):
        side = int(np.sqrt(img_arrays[i].shape[0]))
        axes[i].imshow(img_arrays[i].reshape((side, side)), cmap='gray', vmin=-1, vmax=1)
        if titles and i < len(titles):
            axes[i].set_title(titles[i])
        axes[i].axis('off')
    plt.tight_layout()
    plt.show()

def modify_img(n, img, flip_frac=0.3, rng=None):
    """Add noise by flipping a fraction of pixels."""
    if rng is None:
        rng = np.random.default_rng()
    img_noisy = img.copy()
    m = int(flip_frac * n)
    idx = rng.integers(0, n, size=m, endpoint=False)
    img_noisy[idx] *= -1
    return img_noisy

def pixel_accuracy(reconstructed, target):
    """Calculate percentage of correct pixels."""
    return 100.0 * np.sum(reconstructed == target) / target.size

# Hopfield 
def calculate_w(img):
    """Hebbian outer product for a single pattern."""
    return np.outer(img, img)

def build_weight_matrix(images, scale_by_n=True, dtype=np.float32):
    """
    Build weight matrix using Hebbian learning over a list of patterns.
    images: list of 1D arrays of length n with entries in {-1, +1}.
    """
    n = images[0].size
    w = np.zeros((n, n), dtype=dtype)
    for im in images:
        w += calculate_w(im).astype(dtype)
    if scale_by_n:
        w /= n
    np.fill_diagonal(w, 0.0)  # No self-coupling
    w = 0.5 * (w + w.T)       # Enforce symmetry
    return w

def energy(w, state, b=None):
    """Calculate Hopfield energy E = -1/2 s^T W s - b^T s (if bias present)."""
    e = -0.5 * state @ (w @ state)
    if b is not None:
        if np.ndim(b) == 0:
            e -= b * np.sum(state)
        else:
            e -= b @ state
    return float(e)

def overlap(state, pattern):
    """Calculate overlap (normalized dot product) with a stored pattern."""
    return float((state @ pattern) / state.size)


def reconstructed_image_async(n, w, state, ts=50_000, patience=1_000):
    """
    Asynchronous deterministic update (T=0).
    Picks a random neuron, updates it according to sign(h_i).
    Stops when we see 'patience' consecutive no-change steps or after ts steps.
    """
    unchanged = 0
    for _ in range(ts):
        i = np.random.randint(0, n)
        h = np.dot(w[i, :], state)
        s_new = 1 if h > 0 else -1
        if s_new != state[i]:
            state[i] = s_new
            unchanged = 0
        else:
            unchanged += 1
        if unchanged > patience:
            break
    return state

def metropolis_step_fast(w, state, beta):
    """One Metropolis update step at inverse temperature beta."""
    n = state.size
    i = np.random.randint(0, n)
    h_i = np.dot(w[i, :], state)
    dE = 2.0 * state[i] * h_i
    if dE <= 0.0 or np.random.rand() < np.exp(-beta * dE):
        state[i] = -state[i]
        return True
    return False

def reconstructed_image_metropolis(n, w, state, T=0.6, ts=50_000, patience=2_000):
    """
    Asynchronous Metropolis at fixed temperature T.
    Useful for escaping shallow local minima.
    """
    beta = 1.0 / max(1e-12, T)
    unchanged = 0
    for _ in range(ts):
        flipped = metropolis_step_fast(w, state, beta)
        if flipped:
            unchanged = 0
        else:
            unchanged += 1
        if unchanged > patience:
            break
    return state

def relax_zero_temperature(w, state, ts=50_000, patience=1_000, rng=None):
    """Asynchronous T=0 dynamics to fixed point."""
    if rng is None:
        rng = np.random.default_rng()
    n = state.size
    unchanged = 0

    for _ in range(ts):
        i = rng.integers(0, n)
        h = np.dot(w[i, :], state)
        s_new = 1 if h > 0 else -1
        if s_new != state[i]:
            state[i] = s_new
            unchanged = 0
        else:
            unchanged += 1
        if unchanged > patience:
            break
    return state

# Daydreaming
def daydream_step(w, patterns, tau=50.0, relax_steps=50_000,
                  patience=1_000, rng=None):
    """
    One Daydreaming update step, roughly following the idea:
      1. Pick a stored pattern xi.
      2. Start from random state, relax to a fixed point under current W.
      3. Compare outer products xi xi^T and s s^T and update W toward xi and away from s.
    """
    if rng is None:
        rng = np.random.default_rng()

    n = w.shape[0]
    mu = rng.integers(0, len(patterns))  # choose pattern index
    xi = patterns[mu]

    # Start from random state and relax under current dynamics
    s = rng.choice([-1, 1], size=n)
    s = relax_zero_temperature(w, s, ts=relax_steps, patience=patience, rng=rng)

    # Daydream update rule
    delta_w = (np.outer(xi, xi) - np.outer(s, s)) / (tau * n)
    w += delta_w

    # Re-enforce constraints on W
    np.fill_diagonal(w, 0.0)
    w[:] = 0.5 * (w + w.T)

    return w

def normalize_w(w):
    """Normalize weight matrix by RMS to avoid unbounded growth."""
    rms = np.sqrt(np.mean(w**2))
    if rms > 0:
        w /= rms
    return w

def train_daydreaming(patterns, tau=50.0, n_epochs=20, steps_per_epoch=100,
                      relax_steps=50_000, patience=1_000, scale_by_n=True,
                      dtype=np.float32, rng=None, verbose=True):
    """
    Train weight matrix using Daydreaming algorithm starting from Hebbian W.
    """
    if rng is None:
        rng = np.random.default_rng()

    n = patterns[0].size
    w = build_weight_matrix(patterns, scale_by_n=scale_by_n, dtype=dtype)

    for epoch in range(n_epochs):
        for _ in range(steps_per_epoch):
            w = daydream_step(w, patterns, tau=tau, relax_steps=relax_steps,
                              patience=patience, rng=rng)
        w = normalize_w(w)

        if verbose:
            print(f"  Epoch {epoch+1}/{n_epochs} complete")

    return w

# data analysis functions
def generate_random_patterns(n, P, rng=None):
    """Generate P random binary patterns for capacity experiments."""
    if rng is None:
        rng = np.random.default_rng()
    patterns = rng.choice([-1, 1], size=(P, n))
    return [patterns[i] for i in range(P)]

def retrieve_success_rate(w, patterns, flip_frac=0.3, n_trials_per_pattern=5,
                          overlap_threshold=0.9, ts=50_000, patience=1_000, rng=None):
    """
    Measure retrieval success rate for a given weight matrix and a set of patterns.
    """
    if rng is None:
        rng = np.random.default_rng()

    n = patterns[0].size
    successes = 0
    total = 0

    for p in patterns:
        for _ in range(n_trials_per_pattern):
            s0 = modify_img(n, p, flip_frac=flip_frac, rng=rng)
            s_final = relax_zero_temperature(w, s0.copy(), ts=ts,
                                             patience=patience, rng=rng)
            m = overlap(s_final, p)
            if m >= overlap_threshold:
                successes += 1
            total += 1

    return successes / total if total > 0 else 0.0

#image processing
def upscale_digit(img_28x28, target_size):
    """Upscale 28x28 digit to larger size using nearest-neighbor."""
    scale = target_size / img_28x28.shape[0]
    upscaled = zoom(img_28x28, scale, order=0)  # order=0 = nearest neighbor
    return upscaled

#generate salt and pepper
def generate_salt_pepper_pattern(side=50, seed=None):
    """Generate random binary pattern {-1, +1} on a square grid."""
    rng = np.random.default_rng(seed)
    pattern = rng.integers(0, 2, size=(side, side), dtype=np.int8)
    pattern = 2 * pattern - 1
    return pattern

def save_salt_pepper_pattern(pattern, filename):
    """Save pattern as image file (0/255 grayscale)."""
    img_array = ((pattern.astype(np.int32) + 1) // 2 * 255).astype(np.uint8)
    img = Image.fromarray(img_array, mode='L')
    img.save(filename)


ModuleNotFoundError: No module named 'tensorflow'

In [None]:
###----Test normal HNN on handwritten images from Mniest

# Load MNIST dataset (28x28 grayscale digit images)
(x_train, y_train), _ = mnist.load_data()

original_size = 28  # MNIST digits are 28x28

# ADJUSTABLE: desired digit size for the Hopfield network
digit_size = 100  # can also try 50, 80, 128, etc.

# Select one example of each of a few digits (e.g., 0,1,2,3,4)
digits_to_store = [0, 1, 2, 4, 8]
digit_patterns = []
stored_labels = []

print(f"Upscaling MNIST digits from {original_size}×{original_size} to {digit_size}×{digit_size}")
print("Storing these handwritten digits:")

fig, axes = plt.subplots(1, len(digits_to_store), figsize=(3*len(digits_to_store), 3))

for idx, d in enumerate(digits_to_store):
    # Pick the first training example with label d
    img_28 = x_train[y_train == d][0]     # shape (28, 28)

    # Upscale to desired resolution
    img = upscale_digit(img_28, digit_size)  # shape (digit_size, digit_size)

    # Binarize: threshold at mean
    binary = np.where(img > img.mean(), 1, -1).flatten()

    digit_patterns.append(binary)
    stored_labels.append(d)

    axes[idx].imshow(img, cmap='gray')
    axes[idx].set_title(f'Digit: {d}')
    axes[idx].axis('off')

plt.suptitle(f"Stored Patterns (MNIST digits {digit_size}×{digit_size})", fontsize=14)
plt.tight_layout()
plt.show()

# Build Hopfield network for these patterns
print(f"\nBuilding Hopfield network with {len(digit_patterns)} stored patterns...")
n_digits = digit_size * digit_size
print(f"  Each pattern is {digit_size}×{digit_size} = {n_digits} pixels")
print("  This implies a weight matrix of size "
      f"{n_digits}×{n_digits} (~{n_digits*n_digits*4/1e6:.1f} MB in float32)\n")

w_digits = build_weight_matrix(digit_patterns, scale_by_n=True, dtype=np.float32)
print(f"Network size: {n_digits} neurons")
print(f"Weight matrix shape: {w_digits.shape}\n")

# Test retrieval with noisy input on one of the stored digits
test_digit_idx = 2  # index in digits_to_store list (here digit "2")
test_pattern = digit_patterns[test_digit_idx]
label = stored_labels[test_digit_idx]
noise_level = 0.30

print(f"Testing retrieval on digit '{label}' with {noise_level*100:.0f}% noise added...\n")
noisy_test = modify_img(n_digits, test_pattern, flip_frac=noise_level)

# Reconstruct using asynchronous update
reconstructed = reconstructed_image_async(
    n_digits, w_digits, noisy_test.copy(),
    ts=20_000, patience=1_000
)

# Visualize original, noisy, and reconstructed patterns
accuracy = pixel_accuracy(reconstructed, test_pattern)

fig, axes = plt.subplots(1, 3, figsize=(12, 4))

axes[0].imshow(test_pattern.reshape(digit_size, digit_size), cmap='gray', vmin=-1, vmax=1)
axes[0].set_title(f'Original\nDigit: {label}')
axes[0].axis('off')

axes[1].imshow(noisy_test.reshape(digit_size, digit_size), cmap='gray', vmin=-1, vmax=1)
axes[1].set_title(f'Noisy Input\n({noise_level*100:.0f}% corrupted)')
axes[1].axis('off')

axes[2].imshow(reconstructed.reshape(digit_size, digit_size), cmap='gray', vmin=-1, vmax=1)
axes[2].set_title(f'Reconstructed\n({accuracy:.1f}% accurate)')
axes[2].axis('off')

plt.suptitle("Hopfield Network: High-Resolution Pattern Retrieval (MNIST 100×100)", fontsize=14, y=1.02)
plt.tight_layout()
plt.show()

print(f"  Accuracy: {accuracy:.2f}%")
print(f"  Energy (initial noisy state): {energy(w_digits, noisy_test):.4f}")
print(f"  Energy (final state):         {energy(w_digits, reconstructed):.4f}")
print(f"  Overlap with original:        {overlap(reconstructed, test_pattern):.4f}")


In [None]:
###----Determine Correlation between patterns
## How does correlation affect memory?

# Calculate correlation between stored digits
print("Analyzing pattern correlations:")
correlations = []
for i in range(len(digit_patterns)):
    for j in range(i+1, len(digit_patterns)):
        corr = overlap(digit_patterns[i], digit_patterns[j])
        correlations.append(abs(corr))
        print(f"  |Overlap(Pattern {i}, Pattern {j})| = {abs(corr):.3f}")

avg_corr = np.mean(correlations)
print(f"\nAverage correlation: {avg_corr:.3f}")
print(f"High correlation → Limited capacity!\n")

# Test with increasing number of digits
print("Testing capacity with increasing number of stored digits:")
print("-" * 60)
print(f"{'# Patterns':<12} {'Success Rate':<15} {'Note'}")
print("-" * 60)

for n_store in [3, 5, 7, 10, 15]:
    if n_store > len(digits.target):
        break

    # Select n_store digits
    test_patterns = []
    for i in range(n_store):
        img = digits.images[i * 10]  # Take every 10th to get variety
        binary = np.where(img > img.mean(), 1, -1).flatten()
        test_patterns.append(binary)

    # Build network and test
    w_test = build_weight_matrix(test_patterns, scale_by_n=True)
    success_rate = retrieve_success_rate(w_test, test_patterns, flip_frac=0.25,
                                         n_trials_per_pattern=3, overlap_threshold=0.9)

    note = "Good" if success_rate > 0.8 else "Degraded" if success_rate > 0.5 else "Failed"
    print(f"{n_store:<12} {success_rate:<15.2%} {note}")

print("-" * 60)




We see that high correlation between the images reduces the memory capcity, so we want to consider lower correlation images. 

In [None]:
###---Use uncorrelated patterns (salt and pepper noise) to try to improve capacity


# Generate salt & pepper patterns
print("Generating random salt & pepper patterns...")
side = 50
pattern_seeds = [42, 123, 568, 34]
sp_patterns = [generate_salt_pepper_pattern(side, seed=s) for s in pattern_seeds]

# Save patterns
for i, pattern in enumerate(sp_patterns, 1):
    save_salt_pepper_pattern(pattern, f'p{i}.jpg')
print(f"✓ Generated {len(sp_patterns)} patterns ({side}×{side} pixels)\n")

# Display stored patterns
print("Stored patterns:")
show_multiple_arrays([p.flatten() for p in sp_patterns],
                    titles=[f'Pattern {i+1}' for i in range(len(sp_patterns))],
                    suptitle="Salt & Pepper Patterns (Uncorrelated)")

# Build Hopfield network
n_sp = side * side
imgs_sp = [p.flatten() for p in sp_patterns]
w_sp = build_weight_matrix(imgs_sp, scale_by_n=True, dtype=np.float32)

print(f"\nNetwork configuration:")
print(f"  Size: {n_sp} neurons ({side}×{side})")
print(f"  Stored patterns: {len(imgs_sp)}")
print(f"  Weight matrix: {w_sp.shape}\n")

# Test reconstruction with different methods
target_idx = 2
target = imgs_sp[target_idx]
flip_frac = 0.40
fixed_T = 0.6

print(f"Testing reconstruction methods:")
print(f"  Target: Pattern {target_idx+1}")
print(f"  Noise level: {flip_frac*100:.0f}% pixels flipped\n")

# Create noisy version
state0 = modify_img(n_sp, target, flip_frac=flip_frac)
init_acc = pixel_accuracy(state0, target)

print("-" * 70)
print(f"{'Method':<30} {'Accuracy':<12} {'Energy':<12} {'Overlap'}")
print("-" * 70)
print(f"{'Initial (noisy)':<30} {init_acc:>6.2f}%     {energy(w_sp, state0):>10.4f}  {overlap(state0, target):>7.4f}")

# Method 1: Asynchronous (T=0)
s_async = reconstructed_image_async(n_sp, w_sp, state0.copy(), ts=50_000, patience=2_000)
async_acc = pixel_accuracy(s_async, target)
print(f"{'Asynchronous (T=0)':<30} {async_acc:>6.2f}%     {energy(w_sp, s_async):>10.4f}  {overlap(s_async, target):>7.4f}")

# Method 2: Metropolis
s_met = reconstructed_image_metropolis(n_sp, w_sp, state0.copy(), T=fixed_T,
                                       ts=80_000, patience=3_000)
met_acc = pixel_accuracy(s_met, target)
print(f"{'Metropolis (T='+str(fixed_T)+')':<30} {met_acc:>6.2f}%     {energy(w_sp, s_met):>10.4f}  {overlap(s_met, target):>7.4f}")
print("-" * 70)

# Visualize comparison
fig, axes = plt.subplots(1, 4, figsize=(16, 4))

axes[0].imshow(target.reshape(side, side), cmap='gray', vmin=-1, vmax=1)
axes[0].set_title(f'Original Target')
axes[0].axis('off')

axes[1].imshow(state0.reshape(side, side), cmap='gray', vmin=-1, vmax=1)
axes[1].set_title(f'Noisy Input\n{init_acc:.1f}% accurate')
axes[1].axis('off')

axes[2].imshow(s_async.reshape(side, side), cmap='gray', vmin=-1, vmax=1)
axes[2].set_title(f'Asynchronous\n{async_acc:.1f}% accurate')
axes[2].axis('off')

axes[3].imshow(s_met.reshape(side, side), cmap='gray', vmin=-1, vmax=1)
axes[3].set_title(f'Metropolis (T={fixed_T})\n{met_acc:.1f}% accurate')
axes[3].axis('off')

plt.suptitle("Reconstruction Method Comparison", fontsize=14, y=1.02)
plt.tight_layout()
plt.show()

best_method = "Asynchronous" if async_acc > met_acc else "Metropolis"
print(f"Best method: {best_method}")

In [None]:
###---Compare memory capacity for daydreaming with normal HNN

# Configuration
side_cap = 20
n_cap = side_cap * side_cap
alpha_list = [0.02, 0.05, 0.08, 0.11, 0.14, 0.17, 0.20]

print("Memory capacity experiment:")
print(f"  Network size: {n_cap} neurons ({side_cap}×{side_cap})")
print(f"  Noise level: 30% pixels flipped")
print(f"  Success threshold: 90% overlap\n")

print("Running capacity comparison (this may take a few minutes)...")
print("-" * 70)
print(f"{'Load α':<10} {'# Patterns':<12} {'Hebbian':<15} {'Daydreaming':<15}")
print("-" * 70)

rng = np.random.default_rng(123)
hebb_results = []
dd_results = []

for alpha in alpha_list:
    P = max(1, int(round(alpha * n_cap)))
    patterns = generate_random_patterns(n_cap, P, rng=rng)

    # Hebbian
    w_hebb = build_weight_matrix(patterns, scale_by_n=True, dtype=np.float32)
    hebb_sr = retrieve_success_rate(w_hebb, patterns, flip_frac=0.3,
                                   n_trials_per_pattern=5, overlap_threshold=0.9,
                                   ts=20_000, patience=1_000, rng=rng)
    hebb_results.append(hebb_sr)

    # Daydreaming
    w_dd = train_daydreaming(patterns, tau=50.0, n_epochs=10, steps_per_epoch=50,
                            relax_steps=5_000, patience=500, scale_by_n=True,
                            dtype=np.float32, verbose=False, rng=rng)
    dd_sr = retrieve_success_rate(w_dd, patterns, flip_frac=0.3,
                                 n_trials_per_pattern=5, overlap_threshold=0.9,
                                 ts=20_000, patience=1_000, rng=rng)
    dd_results.append(dd_sr)

    print(f"{alpha:<10.3f} {P:<12} {hebb_sr:<15.2%} {dd_sr:<15.2%}")

print("-" * 70)

# Plot results
plt.figure(figsize=(12, 6))
plt.plot(alpha_list, hebb_results, 'o-', linewidth=2, markersize=8,
         label='Hebbian Learning', color='#2E86AB')
plt.plot(alpha_list, dd_results, 's-', linewidth=2, markersize=8,
         label='Daydreaming Algorithm', color='#A23B72')
plt.axhline(y=0.9, color='gray', linestyle='--', alpha=0.5, label='90% Success Threshold')
plt.xlabel('Load α = P/N (Patterns per Neuron)', fontsize=12)
plt.ylabel('Retrieval Success Rate', fontsize=12)
plt.title('Memory Capacity: Hebbian vs Daydreaming', fontsize=14, fontweight='bold')
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.ylim(-0.05, 1.05)
plt.tight_layout()
plt.show()

# Analysis
print("\n" + "="*80)
print("ANALYSIS & CONCLUSIONS")
print("="*80)

# Find capacity thresholds
hebb_capacity = next((alpha for alpha, sr in zip(alpha_list, hebb_results) if sr < 0.9), alpha_list[-1])
dd_capacity = next((alpha for alpha, sr in zip(alpha_list, dd_results) if sr < 0.9), alpha_list[-1])

print(f"\nCapacity comparison:")
print(f"  Hebbian learning capacity: α ≈ {hebb_capacity:.3f}")
print(f"  Daydreaming capacity: α ≈ {dd_capacity:.3f}")
if dd_capacity > hebb_capacity:
    improvement = ((dd_capacity - hebb_capacity) / hebb_capacity) * 100
    print(f"  → Daydreaming improves capacity by ~{improvement:.0f}%")
