# Exercise 2: Swendsen-Wang algorithm for the 2D Ising model

The goal of this exercise is to implement the Swendsen-Wang algorithm for the 2D Ising
model. Recall that the update of the Swendsen-Wang algorithm works as follows:

- Assign to each bond $b$ a variable $w_b \in \{ 0, 1 \}$, where 1 means “connected” and 0
means “disconnected”. If the spins connected by the bond $b = (n, m)$ between index
$n$ and $m$ are anti-parallel, always choose $w_b = 0$, If the spins are parallel, choose
$w_b = 0$ with probability $e^{−2\Beta J} , where $\Beta = k_bT$. In terms of conditional probabilities:

\begin{align}
p(w_b = 0 | \sigma_n \neq \sigma_m) &= 1 \\
p(w_b = 0 | \sigma_n = \sigma_m) &= e^{−2\Beta J} \\
p(w_b = 1 | \sigma_n \neq \sigma_m) &= 0 \\
p(w_b = 1 | \sigma_n = \sigma_m) &= 1 − e^{−2\Beta J} \\
\end{align}

- Interpreting the connected bonds as edges of a graph (where 0 means no edge) and
the spins as node, find “clusters” of spins, i.e., connected components of the graph.
- Flip each cluster (= connected component) with probability $p = 0.5$. Notice that a
single spin is also a “cluster”.
After the update, do a measurement (if the system is already thermalized) and continue
with the next update

In [None]:
%matplotlib inline
import numpy as np
from matplotlib import pyplot as plt
import numba
from numba import prange, float32, int32, int8
from scipy import sparse as sp
plt.rcParams["animation.html"] = "jshtml"

In [None]:
spec = {
    "J": float32,
    "Lx": int32,
    "Ly": int32,
    "bond_indices": int32[:, :],
    "spins": int8[:, :],
    "bonds": int8[:],
}

# Compiling takes a lot of time, so it is not that much of a speedup.
@numba.experimental.jitclass(spec)
class System:

    def __init__(self, J: float, Lx: int, Ly: int, initial_state: np.ndarray = None):
        self.J = J
        self.Lx = Lx
        self.Ly = Ly

        # Initialize bond indices
        self.bond_indices = np.empty((2*self.N, 2), dtype=np.int32)
        for n in range(self.N):
            x, y = self.idx_2_xy(n)
            # left-right
            self.bond_indices[2*n, 0] = n
            self.bond_indices[2*n, 1] = self.xy_2_idx(x, (y + 1) % self.Ly)
            # up-down
            self.bond_indices[2*n + 1, 0] = n
            self.bond_indices[2*n + 1, 1] = self.xy_2_idx((x + 1) % self.Lx, y)

        # Set random initial state if not given.
        # Numba does not support many array operations,
        # so this algorithm is dumbed down a lot.
        self.spins = np.ones(self.shape, dtype=np.int8)
        if initial_state is None:
            for n in prange(self.N):
                if np.random.random() < 0.5:
                    y, x = self.idx_2_xy(n)
                    self.spins[y, x] = -1
        else:
            self.spins = initial_state

        self.bonds = np.zeros(2*self.N, dtype=np.int8)
            
    @property
    def N(self) -> int:
        return self.Lx * self.Ly
    
    @property
    def shape(self) -> tuple[int, int]:
        return self.Ly, self.Lx
    
    def xy_2_idx(self, x: int, y: int) -> int:
        return self.Ly * x + y
    
    def idx_2_xy(self, idx: int) -> tuple[int, int]:
        return divmod(idx, self.Ly)
    
    def get_spin(self, idx: int) -> int:
        return self.spins[self.idx_2_xy(idx)]
    
    def update_bonds(self, kbT: float):
        for i, (spin_1, spin_2) in enumerate(self.bond_indices):
            # equal spins: no bond
            if self.get_spin(spin_1) != self.get_spin(spin_2):
                self.bonds[i] = 0
            # Different spins: 1 with probability e^-beta*J
            else:
                self.bonds[i] = np.random.random() > np.exp(-self.J / kbT)
    
    def flip_clusters(self, n_clusters: int, clusters: np.ndarray):
        # pre-determine which clusters to flip
        clusters_to_flip = np.random.random(n_clusters) < 0.5
        # loop through all the spins
        for idx in prange(self.N):
            # check if it should be flipped
            if clusters_to_flip[clusters[idx]]:
                # flip
                self.spins[self.idx_2_xy(idx)] *= -1
    
    def show_clusters(self, clusters: np.ndarray) -> np.ndarray:
        out = np.empty(self.shape)
        for idx in prange(self.N):
            out[self.idx_2_xy(idx)] = clusters[idx]
        return out
    
    @property
    def E(self) -> float:
        out = 0
        for spin_1, spin_2 in self.bond_indices:
            if self.get_spin(spin_1) != self.get_spin(spin_2):
                out += self.J
            else:
                out -= self.J
        return out
    
    @property
    def M(self) -> float:
        return np.mean(self.spins)
            

# scipy is not supported in jit classes
# I can't be bothered to rewrite it now
def find_connected_components(s: System):
    graph = sp.csr_matrix(
        (s.bonds, (s.bond_indices[:, 0], s.bond_indices[:, 1])),
        dtype=np.int8,
        shape=(s.N, s.N)
        )
    graph.eliminate_zeros()
    graph += graph.T
    connections = sp.csgraph.connected_components(graph)
    return connections

def iterate_system(s: System, kbT: float):
    s.update_bonds(kbT)
    s.flip_clusters(*find_connected_components(s))

In [None]:
# Animation to verify
%matplotlib qt5
from matplotlib.animation import FuncAnimation
from matplotlib.widgets import Slider
animation_system = System(1.0, 100, 100)
animation_T = 1.5

clusters = lambda : animation_system.show_clusters(find_connected_components(animation_system)[1])


fig, (ax2, ax1) = plt.subplots(2,1)
ax1.set_xticks([])
ax1.set_yticks([])
im1 = ax1.matshow(animation_system.spins)
# ax2.spines["top"].set_visible(True)
# ax2.spines["right"].set_visible(True)
T_slider = Slider(ax2, 'Temperature ', valmin=0, valmax=5, 
             valinit=1, valfmt='%.2f K/k_B', facecolor='#cc7000')
# fig.tight_layout()

def animation(_):
    iterate_system(animation_system, animation_T)
    im1.set_data(animation_system.spins)
    return im1,

def change_T(_):
    global animation_T
    animation_T = T_slider.val
T_slider.on_changed(change_T)

ani = FuncAnimation(fig, animation, blit=False, interval=50)
plt.show()

In [None]:
measurement_system = System(1.0, 10, 10)
measurement_T_range = np.arange(0.5, 3.1, 0.1)

measurement_N_sweeps = 100     # Number of steps for the measurements
measurement_N_eq = 100         # Number of equilibration steps before the measurements start
measurement_N_flips = 10        # Number of steps between measurements

measured_E = []
measured_M = []
for T in measurement_T_range:
    # Find equilibrium
    for _ in range(measurement_N_eq):
        iterate_system(measurement_system, T)
    # measure
    _E = []
    _M = []
    for _ in range(measurement_N_sweeps):
        _E.append(measurement_system.E)
        _M.append(measurement_system.M)
        for _ in range(measurement_N_flips):
            iterate_system(measurement_system, T)
    
    measured_E.append(np.mean(_E))
    measured_M.append(np.mean(_M))


In [None]:
plt.figure()
plt.suptitle("Energy and magnetization comparison")
plt.subplot(1, 2, 1)
plt.plot(measurement_T_range, measured_E)
plt.title("E")
plt.xlabel("T")
plt.subplot(1, 2, 2)
plt.plot(measurement_T_range, measured_M)
plt.title("M")
plt.xlabel("T")
plt.ylim((-1.1, 1.1))
plt.tight_layout()
plt.show()

In [None]:
def _measure_autocorrelation(P: np.ndarray, delta: int) -> float:
    """Measure the autocorrelation of a measured P

    :param P: Measured values
    :type P: np.ndarray
    :param delta: Timestep for measurement
    :type delta: int
    :return: Autocorrelation
    :rtype: float
    """
    P_m = np.mean(P)
    P2_m = np.mean(P**2)
    P_delta = P[:-delta] * P[delta:]
    P_delta_m = np.mean(P_delta)
    return (P_delta_m - P_m**2) / (P2_m - P_m**2)
measure_autocorrelation = np.vectorize(_measure_autocorrelation, excluded=["P"], signature="(m),()->()")

In [None]:
# Autocorrelation plot
def autocorrelation_plot(T: float):
    s = System(1.0, 10, 10)

    N_sweeps = 1000     # Number of steps for the measurements
    N_eq = 100          # Number of equilibration steps before the measurements start

    E = []
    M = []
    # Find equilibrium
    for _ in range(N_eq):
        iterate_system(s, T)
    # measure
    for _ in range(N_sweeps):
        E.append(s.E)
        M.append(s.M)
        iterate_system(s, T)
    E = np.array(E)
    M = np.array(M)

    deltas = np.arange(int(N_sweeps**0.5), dtype=np.int32) + 1 # avoid having 0 in the range

    E_corr = measure_autocorrelation(E, deltas)
    M_corr = measure_autocorrelation(M, deltas)

    plt.figure()
    plt.suptitle(f"Autocorrelation, {T = }")
    plt.subplot(1,2,1)
    plt.plot(deltas, E_corr)
    plt.title("E")
    plt.subplot(1,2,2)
    plt.plot(deltas, M_corr)
    plt.title("M")
    plt.tight_layout()
    plt.show()


In [None]:
autocorrelation_plot(1.0)
autocorrelation_plot(2.3)
autocorrelation_plot(3.5)