# Benfords Law Analysis
In this notebook analyze the **distribution of first significant digits** (fsd) of different aspects of an image.
These could be for example:
- The raw pixel values
- The discrete cosine transformation (DCT) values

Benfords law is an observation that in many collections of numbers, be they mathematical tables, real-life data, or combinations thereof, the leading significant digits are not uniformly distributed, as might be expected, but are heavily skewed toward the smaller digits. [[1](https://digitalcommons.calpoly.edu/cgi/viewcontent.cgi?article=1074&context=rgp_rsr)]

It is mathematically defined as (simplified) [[2](https://arxiv.org/pdf/1506.03046.pdf)]:

$$bf(d)=\beta log_b(1+\frac{1}{d})$$

with $b$ being base ($10$ for "normal" numbers) and $d$ being the possible digits (for $b=10$: $\{1,…,9\}$). The corresponding plot for $b=10$ does look as follows:

<img src="./benfords_law_ground_truth.png" alt="Benfords Law">

It was shown, that **natural** image data (e.g. produced fotographs) also follows this distribution, but GAN generated images do not. This fact was used successfully by Bonettini and collegues in [[3](https://arxiv.org/pdf/2004.07682.pdf)] to distinguish between real and fake images.

As an example dataset we will use the famous grayscale MNIST dataset, which is included in TensorFlow Keras.

In [None]:
# Import packages and settings
import tensorflow as tf
import numpy as np
import pandas as pd
from tqdm import tqdm
import cv2
import glob
import numpy.typing as npt
import plotly.graph_objects as go
import matplotlib.pyplot as plt
from math import log10, floor

pd.options.plotting.backend = "plotly"

In [None]:
FSD_SLOW = 0
FSD_FAST = 1
BASE_10 = 10
# Taken from here: https://cgjennings.ca/articles/jpeg-compression/
QUANTIZATION_TABLE = [
    [16,  12,  14,  14,  18,  24,  49,  72],
    [11,  12,  13,  17,  22,  35,  64,  92],
    [10,  14,  16,  22,  37,  55,  78,  95],
    [16,  19,  24,  29,  56,  64,  87,  98],
    [24,  26,  40,  51,  68,  81, 103, 112],
    [40,  58,  57,  87, 109, 104, 121, 100],
    [51,  60,  69,  80, 103, 113, 120, 103],
    [61,  55,  56,  62,  77,  92, 101,  99]
]

In [None]:
# Import and prepare data
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()
images = np.append(train_images, test_images, axis=0)
images = images.reshape(images.shape[0], 28, 28, 1).astype('float32')

In [None]:
def get_dct_array(image_list: npt.ArrayLike) -> npt.ArrayLike:
    """Calculates the DCT for each element in the list, flattens the result and returns a one-dimensional array.

    Args:
        image_list (npt.ArrayLike): A list of images

    Returns:
        npt.ArrayLike: A one-dimensional array of DCT values
    """
    dcts = np.array([cv2.dct(image) for image in image_list])
    dcts = dcts.flatten()
    return dcts


In [None]:
def to_fsd(values: npt.ArrayLike, mode: int = FSD_FAST) -> npt.ArrayLike:
    """Replaces each value in values with its first significant digit.

    Args:
        values (npt.ArrayLike): An array of float values

    Returns:
        npt.ArrayLike: The first significant digits of values
    """
    values = values[values != 0]
    fsd = []
    if mode == FSD_SLOW:
        for value in tqdm(values):
            num = int(np.floor(abs(value * (10 ** -int(floor(log10(abs(value))))))))
            fsd.append(num)
    elif mode == FSD_FAST:
        n = np.abs(values * np.power(np.full(values.shape, 10.), -np.floor(np.log10(np.abs(values).astype("float64"))))).astype("int")
        fsd.extend(n)
    return np.array(fsd)
        

In [None]:
def count_fsds(fsds: npt.ArrayLike, base: int = BASE_10) -> npt.ArrayLike:
    """Counts up the occurence of each digit, depending on the base (1-9 for base 10).

    Args:
        fsds (npt.ArrayLike): An array of digits

    Returns:
        npt.ArrayLike: An array of the summed digits of length base - 1
    """
    count = []
    for i in range(1,base):
        count.append(np.count_nonzero(fsds == i))
    return np.array(count)

In [None]:
def benfords_law() -> npt.ArrayLike:
    """Create the ground truth distribution according to benfords law for base10 digits.

    Returns:
        npt.ArrayLike: The benfords law distribution for base10 digits
    """
    bf_law = []
    for i in range(1,10):
        bf_law.append(log10(1 + (1 / i)))
    return np.array(bf_law)

In [None]:
def mae_to_benfords_law(fsds: npt.ArrayLike) -> float:
    bf_law = benfords_law()
    err = 0
    for i in range(len(bf_law)):
        err += np.abs(fsds[i] - bf_law[i])
    return err / len(bf_law)

In [None]:
def kullback_leibler_divergence(fsds: npt.ArrayLike) -> float:
    bf_law = benfords_law()
    err = 0
    for i in range(len(bf_law)):
        err += fsds[i] * np.log(fsds[i] / bf_law[i])
    return err# / len(bf_law)

In [None]:
def jensen_shannon_divergence(fsds: npt.ArrayLike) -> float:
    bf_law = benfords_law()
    
    err = 0
    for i in range(len(bf_law)):
        err += fsds[i] * np.log(fsds[i] / bf_law[i])

    for i in range(len(bf_law)):
        err += bf_law[i] * np.log(bf_law[i] / fsds[i])
    return err #/ len(bf_law)

In [None]:
def plot_df_comparison(fsd_count_dist: npt.ArrayLike, base: int = BASE_10, title: str = "Measurements vs. Benfords Law"):
    """Plot Benfords law against measured first significant digits.

    Args:
        fsd_count_dist (npt.ArrayLike): Measured first significant digits
        base (int, optional): Number base. Defaults to BASE_10.
        title (str, optional): Title of the produced plot. Defaults to "Measurements vs. Benfords Law".
    """
    df = pd.DataFrame()
    df["digit"] = [i for i in range(1, base, 1)]
    df["MNIST FSD count"] = fsd_count_dist
    df["Benfords Law (ground truth)"] = benfords_law()

    mae = np.round(mae_to_benfords_law(fsd_count_dist), decimals=6)
    kld = np.round(kullback_leibler_divergence(fsd_count_dist), decimals=6)
    jsd = np.round(jensen_shannon_divergence(fsd_count_dist), decimals=6)
    title = title + "\n - Mean absolute error: " + str(mae) + "\n - Kullback-Leibler: " + str(kld) + "\n - Jensen-Shannon: " + str(jsd)

    fig = go.Figure()
    fig.add_bar(x=df["digit"], y=df["MNIST FSD count"], name="Measurements", hoverinfo="y")
    fig.add_scatter(x=df["digit"], y=df["Benfords Law (ground truth)"], name="Ground Truth", hoverinfo="y")
    fig.update_layout(title=title, xaxis_title="Digits", yaxis_title="Distribution")

    fig.show(renderer="browser")

In [None]:
def img_to_blocks(image: npt.ArrayLike) -> npt.ArrayLike:
    """Divides the image into non-overlapping 8x8 blocks and returns them.

    Args:
        image (npt.ArrayLike): The image, from which the blocks should be extracted

    Returns:
        npt.ArrayLike: 8x8 blocks of the image
    """
    if len(image.shape) > 2:
        image = image.reshape(28,28)
    num_blocks = int(image.shape[0] / 8)
    blocks = []
    for row in range(0, (num_blocks) * 8, 8):
        for col in range(0, (num_blocks) * 8, 8):
            block = image[row:row+8, col:col+8]
            blocks.append(block)
    return np.array(blocks)

In [None]:
def quantize_block(block: npt.ArrayLike, q_table: npt.ArrayLike = QUANTIZATION_TABLE) -> npt.ArrayLike:
    """Takes a DC transformed 8x8 block and performs a quantization.

    Args:
        block (npt.ArrayLike): The 8x8 block (a 2d array)
        q_table (int, optional): The quantization table. Defaults to QUANTIZATION_TABLE.

    Returns:
        npt.ArrayLike: The quantized 8x8 block (a 2d array)
    """
    return np.abs(np.round(block / q_table))

In [None]:
# Run dct on images and gather first significant digits (Slow version - pure python)
dcts = get_dct_array(images)
fsd = to_fsd(dcts, mode=FSD_SLOW)

# Count fsds
fsd_count = count_fsds(fsd, base=BASE_10)

# Calculate distribution of each digit
fsd_count_dist = fsd_count / np.sum(fsd_count)

# Plot distribution against the ground truth benfords law
plot_df_comparison(fsd_count_dist=fsd_count_dist, title="DCT FSDs vs. Benfords Law")

In [None]:
# Run dct on images and gather first significant digits (Fast version - numpy)
dcts = get_dct_array(images)
fsd = to_fsd(dcts, mode=FSD_FAST)

# Count fsds
fsd_count = count_fsds(fsd, base=BASE_10)

# Calculate distribution of each digit
fsd_count_dist = fsd_count / np.sum(fsd_count)

# Plot distribution against the ground truth benfords law
plot_df_comparison(fsd_count_dist, title="DCT FSDs vs. Benfords Law")

In [None]:
# Gather first significant digits on raw images
i = images.flatten()
fsd = to_fsd(i)

# Count fsds
fsd_count = count_fsds(fsd, base=BASE_10)

# Calculate distribution of each digit
fsd_count_dist_raw_mnist = fsd_count / np.sum(fsd_count)

# Plot distribution against the ground truth benfords law
# plot_df_comparison(fsd_count_dist, title="Raw MNIST Images FSDs vs. Benfords Law")

In [None]:
# Gather first significant digits on raw normalized images
i = images.flatten()
# i = np.array([(p - np.min(i)) / (np.max(i) - np.min(i)) for p in images])
i = (i - np.min(i)) / (np.max(i) - np.min(i))

fsd = to_fsd(i)

# Count fsds
fsd_count = count_fsds(fsd, base=BASE_10)

# Calculate distribution of each digit
fsd_count_dist_normalized_mnist = fsd_count / np.sum(fsd_count)

# Plot distribution against the ground truth benfords law
plot_df_comparison(fsd_count_dist, title="Raw Normalized MNIST Images FSDs vs. Benfords Law")

In [None]:
#########################
# DOES NOT WORK FOR NOW #
#########################

# FSDs on quantized DC transformed MNIST
# image_blocks = np.array([img_to_blocks(image) for image in images])
# print(images.shape)
# print(image_blocks.shape) # 7000 images, 9 blocks per image, 8x8 blocks
# image_blocks = image_blocks - 128
# print(f"Image block: \n{image_blocks[0][4]}")

# block_dcts = np.array([[cv2.dct(block) for block in image] for image in image_blocks])
# print(block_dcts.shape)
# print(f"DCT block: \n{block_dcts[0][4]}")

# quantized_blocks = np.array([[quantize_block(block) for block in block_dct] for block_dct in block_dcts])
# print(quantized_blocks.shape)
# print(f"Quantization block: \n{quantized_blocks[0][4]}")

# f_quantized_blocks = quantized_blocks.flatten()
# fsd = to_fsd(f_quantized_blocks)
# fsd_count = count_fsds(fsd, base=BASE_10)
# fsd_count_dist = fsd_count / np.sum(fsd_count)

# plot_df_comparison(fsd_count_dist, title="Quantized DC transformed MNIST vs. Benfords Law")

In [None]:
# FSDs on quantized DC transformed MNIST
fd_list = np.array([0] * 9)
i_s = np.array([image.reshape(28,28) for image in images])
for img in tqdm(i_s):
    image_blocks = img_to_blocks(img)
    # print(image_blocks.shape) # 7000 images, 9 blocks per image, 8x8 blocks
    image_blocks = image_blocks - 128
    # print(image_blocks[4])

    block_dcts = np.array([cv2.dct(block) for block in image_blocks])
    # print(block_dcts.shape)
    # print(f"DCT block: \n{block_dcts[4]}")

    quantized_blocks = np.array([quantize_block(block) for block in block_dcts])
    # print(quantized_blocks.shape)
    # print(f"Quantization block: \n{quantized_blocks[0]}")

    fsds = np.array([to_fsd(q.flatten()) for q in quantized_blocks])
    # print(f"FSD: \n{fsd}")
    # if 2 in fsds[0]:
    #     print("Number included")
    # else:
    #     print("Number not inlcuded")
    # fsd = to_fsd(f_quantized_blocks)
    # fsd_count = count_fsds(fsd[0], base=BASE_10)
    # fsd_count_dist = fsd_count / np.sum(fsd_count)
    for fsd in fsds:
        for i in range(1,10):
            if i in fsd:
                fd_list[i-1] += 1
print(fd_list)
fd_list = np.array(fd_list) / (len(images) * 9)
# fd_list = np.array(fd_list) / np.sum(fd_list)


plot_df_comparison(fd_list, title="Quantized DC transformed MNIST vs. Benfords Law")

In [None]:
horses = np.array([cv2.imread(file, cv2.IMREAD_GRAYSCALE) for file in glob.glob("horses/000000/*.png")]).astype("float32")
print(horses.shape)
print(horses[0].shape)
plt.imshow(horses[0], cmap=plt.cm.gray)
plt.show()

In [None]:
# FSDs on quantized DC transformed MNIST
fd_list = [0] * 9
# i_s = np.array([image.reshape(28,28) for image in images])
horses = np.array([cv2.imread(file, cv2.IMREAD_GRAYSCALE) for file in glob.glob("horses/000000/*.png")]).astype("float32")
for img in tqdm(horses):
    image_blocks = img_to_blocks(img)
    # print(image_blocks.shape) # 7000 images, 9 blocks per image, 8x8 blocks
    image_blocks = image_blocks - 128
    # print(image_blocks[4])

    block_dcts = np.array([cv2.dct(block) for block in image_blocks])
    # print(block_dcts.shape)
    # print(f"DCT block: \n{block_dcts[4]}")

    quantized_blocks = np.array([quantize_block(block) for block in block_dcts])
    # print(quantized_blocks.shape)
    # print(f"Quantization block: \n{quantized_blocks[0]}")

    fsds = np.array([to_fsd(q.flatten()) for q in quantized_blocks])
    # print(f"FSD: \n{fsd}")
    # if 2 in fsds[0]:
    #     print("Number included")
    # else:
    #     print("Number not inlcuded")
    # fsd = to_fsd(f_quantized_blocks)
    # fsd_count = count_fsds(fsd[0], base=BASE_10)
    # fsd_count_dist = fsd_count / np.sum(fsd_count)

    
    # print(fd_list)
    for fsd in fsds:
        for i in range(1,10):
            if i in fsd:
                fd_list[i-1] += 1
print(fd_list)
# fd_list = np.array(fd_list) / (len(images) * 9)
fd_list = np.array(fd_list) / np.sum(fd_list)# (len(horses) * 1024)

plot_df_comparison(fd_list, title="Quantized DC transformed GAN vs. Benfords Law")

In [None]:
dcts = get_dct_array(horses)
fsd = to_fsd(dcts)
fsd_count = count_fsds(fsd)
fsd_count_dist = fsd_count / np.sum(fsd_count)

plot_df_comparison(fsd_count_dist=fsd_count_dist, title="GAN DCT FSDs vs. Benfords Law")

In [None]:
h = horses.flatten()
fsd = to_fsd(h)
fsd_count = count_fsds(fsd)
fsd_count_dist_raw_gan = fsd_count / np.sum(fsd_count)

#plot_df_comparison(fsd_count_dist=fsd_count_dist, title="Raw GAN FSDs vs. Benfords Law")

In [None]:
# Gather first significant digits on raw normalized images
flattened_horses = horses.flatten()
# i = np.array([(p - np.min(i)) / (np.max(i) - np.min(i)) for p in images])
print(np.min(flattened_horses), np.max(flattened_horses), np.mean(flattened_horses), np.median(flattened_horses))
i = (flattened_horses - np.min(flattened_horses)) / (np.max(flattened_horses) - np.min(flattened_horses))
print(np.min(i), np.max(i), np.mean(i), np.median(i))

fsd = to_fsd(i)

# Count fsds
fsd_count = count_fsds(fsd, base=BASE_10)

# Calculate distribution of each digit
fsd_count_dist_normalized_gan = fsd_count / np.sum(fsd_count)

# Plot distribution against the ground truth benfords law
plot_df_comparison(fsd_count_dist_normalized_gan, title="Raw Normalized MNIST Images FSDs vs. Benfords Law")

In [None]:
from plotly.subplots import make_subplots
df = pd.DataFrame()
df["digit"] = [i for i in range(1, 10, 1)]
df["MNIST FSD count"] = fsd_count_dist_normalized_mnist
df["GAN FSD COUNT"] = fsd_count_dist_normalized_gan
df["Benfords Law (ground truth)"] = benfords_law()


fig = make_subplots(rows=2, cols=1)
fig.add_bar(x=df["digit"], y=df["MNIST FSD count"], name="Measurements normalized MNIST", hoverinfo="y", row=1, col=1)
fig.add_scatter(x=df["digit"], y=df["Benfords Law (ground truth)"], name="Ground Truth", hoverinfo="y", row=1, col=1)
fig.add_bar(x=df["digit"], y=df["GAN FSD COUNT"], name="Measurements normalized  GAN", hoverinfo="y", row=2, col=1)
fig.add_scatter(x=df["digit"], y=df["Benfords Law (ground truth)"], name="Ground Truth", hoverinfo="y", row=2, col=1)
fig.update_layout(title="Raw normalized FSDs, MNIST (top) and GAN (bottom)", xaxis_title="Digits", yaxis_title="Distribution")
fig.show(renderer="browser")