In [1]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import heapq
from collections import deque
from scipy.optimize import curve_fit
from numba import njit
from numba.typed import List


In [2]:
def power_law(x, a, tau):
    """Power-law function for fitting."""
    return a * x**(-tau)
    
def initialize_lattice(size, dim):
    spins = -np.ones((size, size, size))
    return spins

def initialize_random_fields(R, size):
    h_rand = np.random.normal(0, R, (size, size, size))
    return h_rand

@njit
def calculate_magnetization(spins):
    return np.sum(spins/(np.size(spins)))
    
# Return local field at position (i, j, k)
@njit
def return_local_field(dim, size, spins, J, h, H, i, j, k):
    neighbours = [[(i+1)%size, j, k], [(i-1)%size, j, k], [i, (j+1)%size, k], [i, (j-1)%size, k], [i, j, (k+1)%size], [i, j, (k-1)%size]]    
    h_eff = 0
    
    for neighbour in neighbours:
        h_eff += J*spins[neighbour[0], neighbour[1], neighbour[2]]

    h_eff += h[i, j, k] + H
    # print(h_eff)
    return h_eff

@njit
def avalanche(dim, size, spins, J, h, H, fwd_flag):
    """Simulate an avalanche and return the magnetization and avalanche size."""
    flipped_spins = []
    queue = List()  # Numba-compatible queue
    pointer = 0     # Pointer to track the front of the queue

    if fwd_flag:
        # Forward process: flip -1 to +1
        for i in range(size):
            for j in range(size):
                for k in range(size):
                    if spins[i, j, k] == -1:
                        h_eff = return_local_field(dim, size, spins, J, h, H, i, j, k)
                        if h_eff > 0:
                            spins[i, j, k] *= -1
                            flipped_spins.append((i, j, k))
                            queue.append((i, j, k))
        
        # Propagate the avalanche
        while pointer < len(queue):
            i, j, k = queue[pointer]
            pointer += 1  # Move the pointer forward

            neighbours = [
                ((i + 1) % size, j, k),
                ((i - 1) % size, j, k),
                (i, (j + 1) % size, k),
                (i, (j - 1) % size, k),
                (i, j, (k + 1) % size),
                (i, j, (k - 1) % size)
            ]
            for neighbour in neighbours:
                ni, nj, nk = neighbour
                if spins[ni, nj, nk] == -1:
                    h_eff = return_local_field(dim, size, spins, J, h, H, ni, nj, nk)
                    if h_eff > 0:
                        spins[ni, nj, nk] *= -1
                        flipped_spins.append((ni, nj, nk))
                        queue.append((ni, nj, nk))
    else:
        # Reverse process: flip +1 to -1
        for i in range(size):
            for j in range(size):
                for k in range(size):
                    if spins[i, j, k] == 1:
                        h_eff = return_local_field(dim, size, spins, J, h, H, i, j, k)
                        if h_eff < 0:
                            spins[i, j, k] *= -1
                            flipped_spins.append((i, j, k))
                            queue.append((i, j, k))
        
        # Propagate the avalanche
        while pointer < len(queue):
            i, j, k = queue[pointer]
            pointer += 1  # Move the pointer forward

            neighbours = [
                ((i + 1) % size, j, k),
                ((i - 1) % size, j, k),
                (i, (j + 1) % size, k),
                (i, (j - 1) % size, k),
                (i, j, (k + 1) % size),
                (i, j, (k - 1) % size)
            ]
            for neighbour in neighbours:
                ni, nj, nk = neighbour
                if spins[ni, nj, nk] == 1:
                    h_eff = return_local_field(dim, size, spins, J, h, H, ni, nj, nk)
                    if h_eff < 0:
                        spins[ni, nj, nk] *= -1
                        flipped_spins.append((ni, nj, nk))
                        queue.append((ni, nj, nk))
    
    avalanche_size = len(flipped_spins)
    magnetization = calculate_magnetization(spins)
    return magnetization, avalanche_size

# @njit
# def avalanche(dim, size, spins, J, h, H, fwd_flag):
#     """Simulate an avalanche and return the magnetization and avalanche size."""
#     flipped_spins = []
#     queue = []

#     if fwd_flag:
#         # Forward process: flip -1 to +1
#         for i in range(size):
#             for j in range(size):
#                 for k in range(size):
#                     if spins[i, j, k] == -1:
#                         h_eff = return_local_field(dim, size, spins, J, h, H, i, j, k)
#                         if h_eff > 0:
#                             spins[i, j, k] *= -1
#                             flipped_spins.append((i, j, k))
#                             queue.append((i, j, k))
        
#         # Propagate the avalanche
#         while queue:
#             i, j, k = queue.pop(0)
#             neighbours = [
#                 ((i + 1) % size, j, k),
#                 ((i - 1) % size, j, k),
#                 (i, (j + 1) % size, k),
#                 (i, (j - 1) % size, k),
#                 (i, j, (k + 1) % size),
#                 (i, j, (k - 1) % size)
#             ]
#             for neighbour in neighbours:
#                 ni, nj, nk = neighbour
#                 if spins[ni, nj, nk] == -1:
#                     h_eff = return_local_field(dim, size, spins, J, h, H, ni, nj, nk)
#                     if h_eff > 0:
#                         spins[ni, nj, nk] *= -1
#                         flipped_spins.append((ni, nj, nk))
#                         queue.append((ni, nj, nk))
#     else:
#         # Reverse process: flip +1 to -1
#         for i in range(size):
#             for j in range(size):
#                 for k in range(size):
#                     if spins[i, j, k] == 1:
#                         h_eff = return_local_field(dim, size, spins, J, h, H, i, j, k)
#                         if h_eff < 0:
#                             spins[i, j, k] *= -1
#                             flipped_spins.append((i, j, k))
#                             queue.append((i, j, k))
        
#         # Propagate the avalanche
#         while queue:
#             i, j, k = queue.pop(0)
#             neighbours = [
#                 ((i + 1) % size, j, k),
#                 ((i - 1) % size, j, k),
#                 (i, (j + 1) % size, k),
#                 (i, (j - 1) % size, k),
#                 (i, j, (k + 1) % size),
#                 (i, j, (k - 1) % size)
#             ]
#             for neighbour in neighbours:
#                 ni, nj, nk = neighbour
#                 if spins[ni, nj, nk] == 1:
#                     h_eff = return_local_field(dim, size, spins, J, h, H, ni, nj, nk)
#                     if h_eff < 0:
#                         spins[ni, nj, nk] *= -1
#                         flipped_spins.append((ni, nj, nk))
#                         queue.append((ni, nj, nk))
    
#     avalanche_size = len(flipped_spins)
#     magnetization = calculate_magnetization(spins)
#     return magnetization, avalanche_size

In [3]:
size = 100
dim = 3

J = 1
R = 4

# avalanche
H_start = -5
H_end = 5
H_step = 0.01

In [None]:
Rs = [1.75, 2, 2.16, 2.25, 2.5, 3, 3.25, 3.5, 2.75, 4]  # Just to be sure

avalanche_sizes_fwd_for_Rs = []
avalanche_sizes_rev_for_Rs = []
magnetizations_fwd_for_Rs = []
magnetizations_rev_for_Rs = []

for R in Rs:
    spins = initialize_lattice(size, dim)
    h = initialize_random_fields(R, size)
    magnetizations_fwd = []
    avalanche_sizes_fwd = []
    Hs = np.arange(H_start, H_end, H_step)
    
    for H_curr in tqdm(Hs, desc=f"Forward Process for R = {R}"):
        magnetization, avalanche_size = avalanche(dim, size, spins, J, h, H_curr, fwd_flag=True)
        magnetizations_fwd.append(magnetization)
        avalanche_sizes_fwd.append(avalanche_size)
        
    magnetizations_fwd_for_Rs.append(magnetizations_fwd)
    avalanche_sizes_fwd_for_Rs.append(avalanche_sizes_fwd)  # Fixed variable name
    
    # Reverse process: Decrease H from H_end to H_start
    magnetizations_rev = []
    avalanche_sizes_rev = []
    for H_curr in tqdm(Hs[::-1], desc=f"Reverse Process for R = {R}"):
        magnetization, avalanche_size = avalanche(dim, size, spins, J, h, H_curr, fwd_flag=False)
        magnetizations_rev.append(magnetization)
        avalanche_sizes_rev.append(avalanche_size)
    
    magnetizations_rev_for_Rs.append(magnetizations_rev)
    avalanche_sizes_rev_for_Rs.append(avalanche_sizes_rev)

Forward Process for R = 1.75:  51%|███   | 510/1000 [01:53<01:47,  4.54it/s]

In [None]:
fig, ax = plt.subplots(figsize = (12, 6))

for idx, R in enumerate(Rs):
    ax.plot(Hs, magnetizations_fwd_for_Rs[idx], linewidth = 1, label = f"R={R}")
    # ax[0].plot(Hs, magnetizations_rev, linewidth = 1, label = "rev")
    ax.set_xlabel(r"$\mathbf{H}$", fontsize = 16)
    ax.set_ylabel(r"$\mathbf{m}$", fontsize = 16)

plt.legend(loc = "upper right")


In [None]:
plt.figure(figsize=(10, 6))
for idx, R in enumerate(Rs):
    hist, bin_edges = np.histogram(avalanche_sizes_fwd_for_Rs[idx], bins=np.logspace(0, np.log10(max(avalanche_sizes_fwd_for_Rs[idx])), 1000), density=True)
    bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2

    # Filter out zero values in the histogram (log(0) is undefined)
    valid_indices = hist > 0
    bin_centers = bin_centers[valid_indices]
    hist = hist[valid_indices]
    
    plt.loglog(bin_centers, hist, 'o', label=f"R={R}")

plt.xlabel('Avalanche Size (S)')
plt.ylabel('Probability Density')
plt.title('Avalanche Size Distribution for Different R Values')
plt.legend()
plt.show()

In [None]:
# hist, bin_edges = np.histogram(avalanche_sizes_fwd, bins=np.logspace(0, np.log10(max(avalanche_sizes_fwd)), 100), density=True)
# bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2

# # Filter out zero values in the histogram (log(0) is undefined)
# valid_indices = hist > 0
# bin_centers = bin_centers[valid_indices]
# hist = hist[valid_indices]

# # # Identify the crossover point (e.g., S_c = 10)
# # S_c = 2
# # mask_small = bin_centers < S_c  # Data for S < S_c
# # mask_large = bin_centers >= S_c  # Data for S >= S_c

# # # Fit the first power law (for S < S_c)
# # popt_small, _ = curve_fit(power_law, bin_centers[mask_small], hist[mask_small], p0=[0, 3])

# # # Fit the second power law (for S >= S_c)
# # popt_large, _ = curve_fit(power_law, bin_centers[mask_large], hist[mask_large], p0=[0, 3])

# # # Plot the distribution and the power-law fits
# # plt.figure(figsize=(8, 6))
# plt.loglog(bin_centers, hist, 'bo', label='Avalanche Size Distribution')
# # plt.loglog(bin_centers[mask_small], power_law(bin_centers[mask_small], *popt_small), 'r-', label=f'Power Law Fit (S < {S_c}): $\\tau = {popt_small[1]:.2f}$')
# # plt.loglog(bin_centers[mask_large], power_law(bin_centers[mask_large], *popt_large), 'g-', label=f'Power Law Fit (S >= {S_c}): $\\tau = {popt_large[1]:.2f}$')
# # plt.xlabel('Avalanche Size (S)')
# # plt.ylabel('Probability Density')
# # plt.title('Avalanche Size Distribution with Two Power Law Fits')
# # plt.legend()
# # plt.grid(True, which="both", ls="--")
# # plt.show()