In [3]:
#!/usr/bin/env python3
import sys
import os
import numpy as np
from PIL import Image
from scipy.ndimage import label
from scipy.optimize import linear_sum_assignment
from scipy.spatial.distance import cdist
from joblib import Parallel, delayed
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC

# Make sure to adjust this path so Python can find your eucalc package
sys.path.append('eucalc_directory')
import eucalc as ec

# ─── Settings ─────────────────────────────────────────────────────────────────
datafolder    = "."       # directory containing your .gif files
k             = 360       # number of random directions for ECT
xinterval     = (-1.5, 1.5)
xpoints       = 12000
padding_extra = 10        # extra pixels around max dimension after crop
keywords      = [
    'apple', 'bell', 'bottle', 'car', 'classic',
    'cup', 'device', 'face', 'heart', 'key'
]
n_repeats     = 100       # how many train/test splits to average over
test_frac     = 0.30      # fraction for test set (70/30 split)

# ─── Preprocessing Functions ──────────────────────────────────────────────────
def filter_to_largest_cc(img_array):
    """Return binary mask of the largest connected white component."""
    mask = img_array > 0
    labeled, num = label(mask, structure=np.ones((3,3)))
    if num < 1:
        return np.zeros_like(img_array, dtype=np.uint8)
    sizes = np.bincount(labeled.ravel())
    largest = np.argmax(sizes[1:]) + 1
    return (labeled == largest).astype(np.uint8)


def normalize_cc_size(mask, target_area):
    """Scale mask so its white-pixel area ≈ target_area."""
    current = mask.sum()
    if current in (0, target_area):
        return mask
    scale = np.sqrt(target_area / current)
    h, w = mask.shape
    new_h = max(1, int(round(h * scale)))
    new_w = max(1, int(round(w * scale)))
    pil = Image.fromarray((mask * 255).astype(np.uint8))
    resized = pil.resize((new_w, new_h), resample=Image.BILINEAR)
    return (np.array(resized) > 127).astype(np.uint8)


def crop_to_shape(mask):
    """Crop mask tightly around its white pixels."""
    coords = np.argwhere(mask > 0)
    if coords.size == 0:
        return mask
    r0, c0 = coords.min(axis=0)
    r1, c1 = coords.max(axis=0)
    return mask[r0:r1+1, c0:c1+1]


def pad_image_to_square(img, target_length):
    """Pad image to a centered square of side target_length."""
    h, w = img.shape
    pad_h = (target_length - h) // 2
    pad_w = (target_length - w) // 2
    out = np.zeros((target_length, target_length), dtype=np.uint8)
    out[pad_h:pad_h+h, pad_w:pad_w+w] = img
    return out


# ─── ECT & Distance Functions ────────────────────────────────────────────────
class EctImg:
    def __init__(self, nm, img, k=k, xinterval=xinterval, xpoints=xpoints):
        self.nm = nm
        self.image = self.compute(img, k, xinterval, xpoints)

    def compute(self, img, k, xinterval, xpoints):
        cplx = ec.EmbeddedComplex(img)
        cplx.preproc_ect()
        thetas = np.random.uniform(0, 2 * np.pi, k)
        ect1 = np.empty((k, xpoints), dtype=float)
        T = np.linspace(xinterval[0], xinterval[1], xpoints)
        for i, theta in enumerate(thetas):
            direction = np.array((np.sin(theta), np.cos(theta)))
            ect_dir = cplx.compute_euler_characteristic_transform(direction)
            ect1[i] = [ect_dir.evaluate(t) for t in T]
        return ect1


def wasserstein_distance(emp1, emp2, delta_x=1.0):
    cost_matrix = cdist(emp1, emp2, metric='minkowski', p=1) * delta_x
    row_ind, col_ind = linear_sum_assignment(cost_matrix)
    return np.mean(cost_matrix[row_ind, col_ind])


def compute_distance_matrix_wasserstein_parallel(ects, delta_x,
                                                n_jobs=-1, verbose=5):
    def compute_row(i):
        dists = [
            wasserstein_distance(ects[i].image, ects[j].image, delta_x)
            for j in range(len(ects))
        ]
        return i, dists

    results = Parallel(n_jobs=n_jobs, verbose=verbose)(
        delayed(compute_row)(i) for i in range(len(ects))
    )
    N = len(ects)
    D = np.zeros((N, N), dtype=float)
    for i, row in results:
        D[i, :] = row
    return D


# ─── Main Pipeline ────────────────────────────────────────────────────────────
def main():
    # 1) Gather GIF files matching keywords
    all_files = [f for f in os.listdir(datafolder) if f.lower().endswith('.gif')]
    files = [f for f in all_files
             if any(kw in f.lower() for kw in keywords)]
    if not files:
        raise RuntimeError("No matching GIFs found in datafolder.")

    # Labels: first keyword found in filename
    labels = [
        next(kw for kw in keywords if kw in f.lower())
        for f in files
    ]

    # 2) CC-filter to masks
    masks = Parallel(n_jobs=-1, verbose=5)(
        delayed(lambda f: filter_to_largest_cc(
            np.array(Image.open(os.path.join(datafolder, f))).mean(axis=2)
            if Image.open(os.path.join(datafolder, f)).mode == 'RGB'
            else np.array(Image.open(os.path.join(datafolder, f)))
        ))(f) for f in files
    )
    areas = [m.sum() for m in masks]
    target_area = max(areas)

    # 3) Normalize & crop
    cropped = Parallel(n_jobs=-1, verbose=5)(
        delayed(lambda i: crop_to_shape(
            normalize_cc_size(masks[i], target_area)
        ))(i) for i in range(len(files))
    )
    cropped_dict = dict(zip(files, cropped))

    # 4) Pad to square
    dims = [max(cropped_dict[f].shape) for f in files]
    target_length = max(dims) + padding_extra
    print(f"Target length for padding: {target_length}")
    padded_dict = {
        f: pad_image_to_square(cropped_dict[f], target_length)
        for f in files
    }

    # 5) Compute ECTs
    ects = [EctImg(f, padded_dict[f]) for f in files]

    # 6) Compute Wasserstein distance matrix
    delta_x = (xinterval[1] - xinterval[0]) / (xpoints - 1)
    D = compute_distance_matrix_wasserstein_parallel(
        ects, delta_x, n_jobs=-1, verbose=5
    )

    # 7) Build RBF‐style kernel from D
    γ = 1.0 / np.median(D[np.triu_indices_from(D, k=1)])
    K = np.exp(-γ * D)

    # 8) SVM classification with precomputed kernel
    idx = np.arange(len(files))
    accuracies = []
    for _ in range(n_repeats):
        train_idx, test_idx, y_train, y_test = train_test_split(
            idx, labels,
            test_size=test_frac,
            stratify=labels
        )

        K_train = K[np.ix_(train_idx, train_idx)]
        K_test  = K[np.ix_(test_idx,  train_idx)]

        clf = SVC(kernel='precomputed')
        clf.fit(K_train, y_train)
        accuracies.append(clf.score(K_test, y_test))

    mean_acc = np.mean(accuracies)
    std_acc  = np.std(accuracies)
    print(f"SVM accuracy over {n_repeats} runs: "
          f"{mean_acc:.4f} ± {std_acc:.4f}")

    return accuracies


if __name__ == '__main__':
    main()


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done  48 tasks      | elapsed:    1.2s
[Parallel(n_jobs=-1)]: Done 420 out of 420 | elapsed:    1.5s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done  72 tasks      | elapsed:    6.0s
[Parallel(n_jobs=-1)]: Done 162 tasks      | elapsed:   16.7s
[Parallel(n_jobs=-1)]: Done 288 tasks      | elapsed:   33.0s
[Parallel(n_jobs=-1)]: Done 420 out of 420 | elapsed:   49.1s finished


Target length for padding: 1379


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done  48 tasks      | elapsed: 36.3min
[Parallel(n_jobs=-1)]: Done 138 tasks      | elapsed: 108.8min
[Parallel(n_jobs=-1)]: Done 264 tasks      | elapsed: 229.1min
[Parallel(n_jobs=-1)]: Done 420 out of 420 | elapsed: 381.1min finished


SVM accuracy over 100 runs: 0.9167 ± 0.0144


## based on contour

In [5]:
#!/usr/bin/env python3
import sys
import os
import numpy as np
from PIL import Image
from scipy.ndimage import label
from scipy.optimize import linear_sum_assignment
from scipy.spatial.distance import cdist
from joblib import Parallel, delayed
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from skimage import measure

# Make sure to adjust this path so Python can find your eucalc & My_ECT packages
sys.path.append('eucalc_directory')
import eucalc as ec
import My_ECT as detect

from numba.typed import List as NBList
import numba

# ─── Settings ─────────────────────────────────────────────────────────────────
datafolder    = "."       # directory containing your .gif files
k             = 360       # random directions for ECT
xinterval     = (-1.5, 1.5)
xpoints       = 12000
padding_extra = 10
keywords      = ['apple','bell','bottle','car','classic',
                 'cup','device','face','heart','key']
n_repeats     = 100
test_frac     = 0.30

# ─── Preprocessing ────────────────────────────────────────────────────────────
def filter_to_largest_cc(img_array):
    mask = img_array > 0
    labeled, num = label(mask, structure=np.ones((3,3)))
    if num < 1:
        return np.zeros_like(img_array, dtype=np.uint8)
    sizes  = np.bincount(labeled.ravel())
    largest = np.argmax(sizes[1:]) + 1
    return (labeled == largest).astype(np.uint8)

def normalize_cc_size(mask, target_area):
    cur = mask.sum()
    if cur in (0, target_area):
        return mask
    scale = np.sqrt(target_area / cur)
    h,w   = mask.shape
    new_h = max(1, int(round(h * scale)))
    new_w = max(1, int(round(w * scale)))
    pil   = Image.fromarray((mask*255).astype(np.uint8))
    resized = pil.resize((new_w,new_h), resample=Image.BILINEAR)
    return (np.array(resized)>127).astype(np.uint8)

def crop_to_shape(mask):
    coords = np.argwhere(mask>0)
    if coords.size==0:
        return mask
    r0,c0 = coords.min(axis=0)
    r1,c1 = coords.max(axis=0)
    return mask[r0:r1+1, c0:c1+1]

def pad_image_to_square(img, target_length):
    h,w    = img.shape
    pad_h  = (target_length-h)//2
    pad_w  = (target_length-w)//2
    out    = np.zeros((target_length,target_length),dtype=np.uint8)
    out[pad_h:pad_h+h, pad_w:pad_w+w] = img
    return out

# ─── Contour → Simplicial Conversion ─────────────────────────────────────────
def extract_boundary(mask):
    return measure.find_contours(mask.astype(float), level=0.5)

def contours_to_simplicial(contours):
    all_pts, simplices = [], []
    offset = 0
    for loop in contours:
        N = loop.shape[0]
        all_pts.append(loop)
        for j in range(N):
            v1 = offset + j     + 1
            v2 = offset + (j+1)%N + 1
            simplices.append([v1, v2])
        offset += N
    data = np.vstack(all_pts)
    return data, simplices

# ─── Numba‐njitted ECT internals ──────────────────────────────────────────────
@numba.njit()
def euler_critical_values(simp_comp, data, direction):
    n = len(simp_comp)
    filt = np.empty((n,2),dtype=np.float64)
    for i in range(n):
        simplex = simp_comp[i]
        fv = -1e18
        for j in range(len(simplex)):
            v  = simplex[j]
            dv = data[v-1][0]*direction[0] + data[v-1][1]*direction[1]
            if dv>fv: fv = dv
        filt[i,0] = float(len(simplex))
        filt[i,1] = fv
    return filt

@numba.njit()
def euler_curve(simp_comp, data, direction, interval, points):
    filt = euler_critical_values(simp_comp, data, direction)
    idx  = np.argsort(filt[:,1])
    filt = filt[idx]
    step = (interval[1]-interval[0])/(points-1)
    chi  = np.empty(points, dtype=np.float64)
    val  = 0.0
    c    = 0
    for i in range(points):
        x = interval[0] + i*step
        while c<filt.shape[0] and filt[c,1]<=x+step:
            dim = int(filt[c,0])
            val += (-1)**(dim-1)
            c   += 1
        chi[i] = val
    return chi

# ─── Typed‐List Random ECT ───────────────────────────────────────────────────
def RandomEct_2d(simp_comp, data, k=20, interval=(-1.,1.), points=100, factor=3):
    # pack simplices
    numba_simp = NBList()
    for s in simp_comp:
        numba_simp.append(NBList(s))
    # pack data points
    numba_data = NBList()
    for pt in data:
        numba_data.append(NBList((float(pt[0]), float(pt[1]))))

    thetas = 2*np.pi*np.random.rand(k)
    ect = np.empty((k, points), dtype=np.float64)
    for i in numba.prange(k):
        th = thetas[i]
        dir_vec = np.array((np.sin(th), np.cos(th)), dtype=np.float64)
        full = euler_curve(numba_simp, numba_data, dir_vec,
                           interval, points*factor)
        ect[i] = full[::factor]
    return ect

# ─── EctImg wrapper ──────────────────────────────────────────────────────────
class EctImg:
    def __init__(self, nm, simplices, data,
                 k=k, interval=xinterval, points=xpoints):
        self.nm    = nm
        # call our typed‐list ECT
        self.image = RandomEct_2d(simp_comp=simplices,
                                  data=data,
                                  k=k,
                                  interval=interval,
                                  points=points)

# ─── Distance & Kernel & SVM ─────────────────────────────────────────────────
def wasserstein_distance(emp1, emp2, delta_x=1.0):
    C = cdist(emp1, emp2, metric='minkowski', p=1) * delta_x
    r,c = linear_sum_assignment(C)
    return np.mean(C[r,c])

def compute_distance_matrix_wasserstein_parallel(ects, delta_x, n_jobs=-1, verbose=5):
    def rowfun(i):
        return i, [wasserstein_distance(ects[i].image, ects[j].image, delta_x)
                  for j in range(len(ects))]
    results = Parallel(n_jobs=n_jobs, verbose=verbose)(
        delayed(rowfun)(i) for i in range(len(ects))
    )
    N = len(ects)
    D = np.zeros((N,N), dtype=float)
    for i,row in results:
        D[i,:] = row
    return D

# ─── Main Pipeline ────────────────────────────────────────────────────────────
def main():
    # 1) gather & filter GIFs
    all_files = [f for f in os.listdir(datafolder) if f.lower().endswith('.gif')]
    files     = [f for f in all_files if any(kw in f.lower() for kw in keywords)]
    if not files:
        raise RuntimeError("No matching GIFs found.")

    labels = [next(kw for kw in keywords if kw in f.lower()) for f in files]

    # 2) CC‐filter
    masks = Parallel(n_jobs=-1,verbose=5)(
      delayed(lambda f: filter_to_largest_cc(
          np.array(Image.open(os.path.join(datafolder,f))).mean(axis=2)
          if Image.open(os.path.join(datafolder,f)).mode=='RGB'
          else np.array(Image.open(os.path.join(datafolder,f)))
      ))(f) for f in files
    )
    target_area = max(m.sum() for m in masks)

    # 3) normalize, crop, pad
    cropped = [crop_to_shape(normalize_cc_size(m, target_area)) for m in masks]
    dims    = [max(c.shape) for c in cropped]
    pad_len = max(dims) + padding_extra
    padded_dict = {
      files[i]: pad_image_to_square(cropped[i], pad_len)
      for i in range(len(files))
    }

    # 4) extract contours → data/simplices
    contour_dict = {}
    for f,mask in padded_dict.items():
        loops = extract_boundary(mask)
        data, simplices = contours_to_simplicial(loops)
        contour_dict[f] = {'data':data, 'simplices':simplices}

    # 5) build EctImg objects
    ects = [
      EctImg(f,
             contour_dict[f]['simplices'],
             contour_dict[f]['data'])
      for f in files
    ]

    # 6) distance matrix
    delta_x = (xinterval[1]-xinterval[0])/(xpoints-1)
    D = compute_distance_matrix_wasserstein_parallel(ects, delta_x,
                                                     n_jobs=-1, verbose=5)

    # 7) kernel + SVM
    gamma =  1.0/np.median(D[np.triu_indices_from(D,k=1)])
    K     = np.exp(-gamma * D)

    idx    = np.arange(len(files))
    accs   = []
    for _ in range(n_repeats):
        ti, te, ytr, yte = train_test_split(
            idx, labels, test_size=test_frac, stratify=labels
        )
        Ktr = K[np.ix_(ti,ti)]
        Kte = K[np.ix_(te,ti)]
        clf = SVC(kernel='precomputed')
        clf.fit(Ktr, ytr)
        accs.append(clf.score(Kte, yte))

    print(f"SVM accuracy: {np.mean(accs):.4f} ± {np.std(accs):.4f}")

if __name__ == '__main__':
    main()


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    0.2s finished
[Parallel(n_jobs=-1)]: Done  76 tasks      | elapsed:    0.5s
[Parallel(n_jobs=-1)]: Done 397 out of 420 | elapsed:    1.2s remaining:    0.1s
[Parallel(n_jobs=-1)]: Done 420 out of 420 | elapsed:    1.3s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.


KeyboardInterrupt: 

## ECT metric

In [6]:
#!/usr/bin/env python3
import sys
import os
import numpy as np
from PIL import Image
from scipy.ndimage import label
from joblib import Parallel, delayed
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
import eucalc as ec  # make sure 'eucalc_directory' is on PYTHONPATH

# ─── Settings ─────────────────────────────────────────────────────────────────
datafolder    = "."       # directory containing your .gif files
k             = 360       # number of random directions for ECT
xinterval     = (-1.5, 1.5)
xpoints       = 12000     # sample points along each direction
padding_extra = 10        # extra pixels when padding to square
keywords      = [
    'apple','bell','bottle','car','classic',
    'cup','device','face','heart','key'
]
n_repeats     = 100       # how many train/test splits to average over
test_frac     = 0.30      # fraction for test set (70/30 split)

# ─── Preprocessing Functions ──────────────────────────────────────────────────
def filter_to_largest_cc(img_array):
    """Return binary mask of the largest connected white component."""
    mask = img_array > 0
    labeled, num = label(mask, structure=np.ones((3,3)))
    if num < 1:
        return np.zeros_like(img_array, dtype=np.uint8)
    sizes  = np.bincount(labeled.ravel())
    largest = np.argmax(sizes[1:]) + 1
    return (labeled == largest).astype(np.uint8)

def normalize_cc_size(mask, target_area):
    """Scale mask so its white-pixel area ≈ target_area."""
    current = mask.sum()
    if current in (0, target_area):
        return mask
    scale = np.sqrt(target_area / current)
    h, w   = mask.shape
    new_h = max(1, int(round(h * scale)))
    new_w = max(1, int(round(w * scale)))
    pil = Image.fromarray((mask * 255).astype(np.uint8))
    resized = pil.resize((new_w, new_h), resample=Image.BILINEAR)
    return (np.array(resized) > 127).astype(np.uint8)

def crop_to_shape(mask):
    """Crop mask tightly around its white pixels."""
    coords = np.argwhere(mask > 0)
    if coords.size == 0:
        return mask
    r0, c0 = coords.min(axis=0)
    r1, c1 = coords.max(axis=0)
    return mask[r0:r1+1, c0:c1+1]

def pad_image_to_square(img, target_length):
    """Pad image to a centered square of side target_length."""
    h, w = img.shape
    pad_h = (target_length - h) // 2
    pad_w = (target_length - w) // 2
    out = np.zeros((target_length, target_length), dtype=np.uint8)
    out[pad_h:pad_h+h, pad_w:pad_w+w] = img
    return out

# ─── ECT Extraction Class ─────────────────────────────────────────────────────
class EctImg:
    def __init__(self, nm, img, k=k, xinterval=xinterval, xpoints=xpoints):
        self.nm    = nm
        self.image = self.compute(img, k, xinterval, xpoints)

    def compute(self, img, k, xinterval, xpoints):
        cplx = ec.EmbeddedComplex(img)
        cplx.preproc_ect()
        thetas = np.linspace(0, 2 * np.pi, k, endpoint=False)
        ect1   = np.empty((k, xpoints), dtype=float)
        T      = np.linspace(xinterval[0], xinterval[1], xpoints)
        for i, theta in enumerate(thetas):
            direction = np.array((np.sin(theta), np.cos(theta)))
            ect_dir   = cplx.compute_euler_characteristic_transform(direction)
            ect1[i]   = [ect_dir.evaluate(t) for t in T]
        return ect1

# ─── New Curve‐Based Distance ────────────────────────────────────────────────
def curve_distance(emp1, emp2, delta_x):
    """
    emp1, emp2: (k, xpoints) arrays.
    Compute L1 distance per row (∑|emp1[i] - emp2[i]| * delta_x), then take the maximum.
    """
    row_l1 = np.sum(np.abs(emp1 - emp2), axis=1) * delta_x
    return np.max(row_l1)

def compute_distance_matrix_curve(ects, delta_x, n_jobs=-1, verbose=5):
    def rowfun(i):
        return i, [
            curve_distance(ects[i].image, ects[j].image, delta_x)
            for j in range(len(ects))
        ]
    results = Parallel(n_jobs=n_jobs, verbose=verbose)(
        delayed(rowfun)(i) for i in range(len(ects))
    )
    N = len(ects)
    D = np.zeros((N, N), dtype=float)
    for i, row in results:
        D[i, :] = row
    return D

# ─── Main Pipeline ────────────────────────────────────────────────────────────
def main():
    # 1) List & filter GIF files by keyword
    all_gifs = [f for f in os.listdir(datafolder) if f.lower().endswith('.gif')]
    files    = [f for f in all_gifs if any(kw in f.lower() for kw in keywords)]
    if not files:
        raise RuntimeError("No matching GIFs found.")

    # 2) Assign labels by first matching keyword
    labels = [ next(kw for kw in keywords if kw in f.lower()) for f in files ]

    # 3) Load & CC‐filter masks in parallel
    def load_mask(f):
        img = Image.open(os.path.join(datafolder, f)).convert('L')
        return filter_to_largest_cc(np.array(img))
    masks = Parallel(n_jobs=-1, verbose=5)(
        delayed(load_mask)(f) for f in files
    )
    areas = [m.sum() for m in masks]
    target_area = max(areas)

    # 4) Normalize size & crop
    cropped = [
        crop_to_shape(normalize_cc_size(m, target_area))
        for m in masks
    ]

    # 5) Pad to square
    dims         = [max(c.shape) for c in cropped]
    target_length = max(dims) + padding_extra
    padded       = {
        files[i]: pad_image_to_square(cropped[i], target_length)
        for i in range(len(files))
    }

    # 6) Compute ECT signatures
    ects = [EctImg(f, padded[f]) for f in files]

    # 7) Compute delta_x from the sampling grid
    delta_x = (xinterval[1] - xinterval[0]) / (xpoints - 1)

    # 8) Compute distance matrix with the new curve‐based metric
    D = compute_distance_matrix_curve(ects, delta_x, n_jobs=-1, verbose=5)

    # 9) Build RBF‐style kernel and classify with an SVM
    gamma = 1.0 / np.median(D[np.triu_indices_from(D, k=1)])
    K     = np.exp(-gamma * D)

    idx        = np.arange(len(files))
    accuracies = []
    for _ in range(n_repeats):
        ti, te, ytr, yte = train_test_split(
            idx, labels, test_size=test_frac, stratify=labels
        )
        Ktr = K[np.ix_(ti, ti)]
        Kte = K[np.ix_(te, ti)]

        clf = SVC(kernel='precomputed')
        clf.fit(Ktr, ytr)
        accuracies.append(clf.score(Kte, yte))

    print(f"SVM accuracy over {n_repeats} runs: "
          f"{np.mean(accuracies):.4f} ± {np.std(accuracies):.4f}")

if __name__ == '__main__':
    main()


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done  48 tasks      | elapsed:    1.2s
[Parallel(n_jobs=-1)]: Done 397 out of 420 | elapsed:    1.3s remaining:    0.1s
[Parallel(n_jobs=-1)]: Done 420 out of 420 | elapsed:    1.3s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done  48 tasks      | elapsed:  1.3min
[Parallel(n_jobs=-1)]: Done 138 tasks      | elapsed:  2.6min
[Parallel(n_jobs=-1)]: Done 264 tasks      | elapsed:  4.4min
[Parallel(n_jobs=-1)]: Done 420 out of 420 | elapsed:  6.5min finished


SVM accuracy over 100 runs: 0.8623 ± 0.0171


## DETECT

In [7]:
#!/usr/bin/env python3
import sys
import os
import numpy as np
from PIL import Image
from scipy.ndimage import label
from joblib import Parallel, delayed
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
import eucalc as ec  # ensure 'eucalc_directory' is on PYTHONPATH
import numba
from numba.typed import List as NBList
from skimage import measure

# ─── Settings ─────────────────────────────────────────────────────────────────
datafolder    = "."       # where your .gif files live
k             = 360       # number of directions for SECT
xinterval     = (-1.5, 1.5)
xpoints       = 12000     # sample points along each direction
padding_extra = 10        # extra pixels when padding to square
keywords      = [
    'apple','bell','bottle','car','classic',
    'cup','device','face','heart','key'
]
n_repeats     = 100       # how many train/test splits
test_frac     = 0.30      # fraction for test set (70/30 split)

# ─── Preprocessing ────────────────────────────────────────────────────────────
def filter_to_largest_cc(img_array):
    mask = img_array > 0
    labeled, num = label(mask, structure=np.ones((3,3)))
    if num < 1:
        return np.zeros_like(img_array, dtype=np.uint8)
    sizes  = np.bincount(labeled.ravel())
    largest = np.argmax(sizes[1:]) + 1
    return (labeled == largest).astype(np.uint8)

def normalize_cc_size(mask, target_area):
    cur = mask.sum()
    if cur in (0, target_area):
        return mask
    scale = np.sqrt(target_area / cur)
    h,w   = mask.shape
    new_h = max(1, int(round(h*scale)))
    new_w = max(1, int(round(w*scale)))
    pil = Image.fromarray((mask*255).astype(np.uint8))
    resized = pil.resize((new_w,new_h), resample=Image.BILINEAR)
    return (np.array(resized)>127).astype(np.uint8)

def crop_to_shape(mask):
    coords = np.argwhere(mask>0)
    if coords.size==0:
        return mask
    r0,c0 = coords.min(axis=0)
    r1,c1 = coords.max(axis=0)
    return mask[r0:r1+1, c0:c1+1]

def pad_image_to_square(img, target_length):
    h,w   = img.shape
    pad_h = (target_length-h)//2
    pad_w = (target_length-w)//2
    out   = np.zeros((target_length,target_length), dtype=np.uint8)
    out[pad_h:pad_h+h, pad_w:pad_w+w] = img
    return out

# ─── Contours → Simplicial ────────────────────────────────────────────────────
def extract_boundary(mask):
    return measure.find_contours(mask.astype(float), level=0.5)

def contours_to_simplicial(contours):
    all_pts, simplices = [], []
    offset = 0
    for loop in contours:
        N = loop.shape[0]
        all_pts.append(loop)
        for j in range(N):
            v1 = offset + j     + 1
            v2 = offset + (j+1)%N + 1
            simplices.append([v1, v2])
        offset += N
    data = np.vstack(all_pts)
    return data, simplices

# ─── Numba‐njitted SECT internals ─────────────────────────────────────────────
@numba.njit()
def euler_critical_values(simp_comp, data, direction):
    n = len(simp_comp)
    filt = np.empty((n,2), dtype=np.float64)
    for i in range(n):
        simplex = simp_comp[i]
        fv = -1e18
        for j in range(len(simplex)):
            v = simplex[j]
            dv = data[v-1][0]*direction[0] + data[v-1][1]*direction[1]
            if dv > fv:
                fv = dv
        filt[i,0] = float(len(simplex))
        filt[i,1] = fv
    return filt

@numba.njit()
def euler_curve(simp_comp, data, direction, interval, points):
    filt = euler_critical_values(simp_comp, data, direction)
    idx  = np.argsort(filt[:,1])
    filt = filt[idx]
    step = (interval[1]-interval[0])/(points-1)
    chi  = np.empty(points, dtype=np.float64)
    val  = 0.0
    c    = 0
    for i in range(points):
        x = interval[0] + i*step
        while c < filt.shape[0] and filt[c,1] <= x + step:
            dim = int(filt[c,0])
            val += (-1)**(dim-1)
            c   += 1
        chi[i] = val
    return chi

@numba.njit()
def cumulative_euler_curve(simp_comp, data, direction,
                            interval, points, factor):
    ec = euler_curve(simp_comp, data, direction,
                     interval, points*factor)
    mean = np.mean(ec)
    step = (interval[1]-interval[0])/(points*factor - 1)
    cec  = np.cumsum(ec)[::factor]*step - \
           np.linspace(0, interval[1]-interval[0], points)*mean
    return cec

@numba.njit(parallel=True)
def _sect_2d(simp_comp, data, k, interval, points, factor):
    sect = np.empty((k, points), dtype=np.float64)
    thetas = np.linspace(0, 2*np.pi, k+1)
    for i in numba.prange(k):
        theta = thetas[i]
        direction = np.array((np.sin(theta), np.cos(theta)),
                             dtype=np.float64)
        sect[i] = cumulative_euler_curve(simp_comp, data,
                                         direction, interval,
                                         points, factor)
    return sect

def sect_2d(simp_comp, data, k=20, interval=(-1.,1.),
            points=100, mode='mean', factor=3):
    # pack simplices
    numba_simp = NBList()
    for s in simp_comp:
        numba_simp.append(NBList(s))
    # pack data
    numba_data = NBList()
    for pt in data:
        numba_data.append(NBList((float(pt[0]), float(pt[1]))))
    # compute full or mean
    full = _sect_2d(numba_simp, numba_data, k, interval, points, factor)
    return full if mode=='full' else full.mean(axis=0)

# ─── Distance on 1D SECT curves ───────────────────────────────────────────────
def sect_distance(c1, c2, delta_x):
    """L1 norm of two 1D curves (sum |c1-c2| * delta_x)."""
    return np.sum(np.abs(c1 - c2)) * delta_x

def compute_distance_matrix_sect(curves, delta_x, n_jobs=-1, verbose=5):
    def rowfun(i):
        return i, [sect_distance(curves[i], curves[j], delta_x)
                   for j in range(len(curves))]
    results = Parallel(n_jobs=n_jobs, verbose=verbose)(
        delayed(rowfun)(i) for i in range(len(curves))
    )
    N = len(curves)
    D = np.zeros((N, N), dtype=float)
    for i, row in results:
        D[i, :] = row
    return D

# ─── Main Pipeline ────────────────────────────────────────────────────────────
def main():
    # 1) gather & filter GIFs
    all_gifs = [f for f in os.listdir(datafolder) if f.lower().endswith('.gif')]
    files    = [f for f in all_gifs if any(kw in f.lower() for kw in keywords)]
    if not files:
        raise RuntimeError("No matching GIFs found.")

    # 2) labels by keyword
    labels = [next(kw for kw in keywords if kw in f.lower()) for f in files]

    # 3) load & CC-filter
    def load_mask(f):
        img = Image.open(os.path.join(datafolder,f)).convert('L')
        return filter_to_largest_cc(np.array(img))
    masks = Parallel(n_jobs=-1, verbose=5)(
        delayed(load_mask)(f) for f in files
    )
    areas = [m.sum() for m in masks]
    target_area = max(areas)

    # 4) normalize & crop
    cropped = [crop_to_shape(normalize_cc_size(m, target_area)) for m in masks]

    # 5) pad to square
    dims          = [max(c.shape) for c in cropped]
    target_length = max(dims) + padding_extra
    padded        = {
        files[i]: pad_image_to_square(cropped[i], target_length)
        for i in range(len(files))
    }

    # 6) extract contours & build 1D SECT (mean mode)
    curves = []
    for f in files:
        loops = extract_boundary(padded[f])
        data, simplices = contours_to_simplicial(loops)
        curve = sect_2d(simplices, data,
                        k=k,
                        interval=xinterval,
                        points=xpoints,
                        mode='mean',
                        factor=3)
        curves.append(curve)

    # 7) compute delta_x
    delta_x = (xinterval[1] - xinterval[0]) / (xpoints - 1)

    # 8) distance matrix
    D = compute_distance_matrix_sect(curves, delta_x, n_jobs=-1, verbose=5)

    # 9) kernel + SVM
    gamma = 1.0 / np.median(D[np.triu_indices_from(D, k=1)])
    K     = np.exp(-gamma * D)

    idx, accs = np.arange(len(files)), []
    for _ in range(n_repeats):
        ti, te, ytr, yte = train_test_split(
            idx, labels, test_size=test_frac, stratify=labels
        )
        Ktr = K[np.ix_(ti, ti)]
        Kte = K[np.ix_(te, ti)]
        clf = SVC(kernel='precomputed')
        clf.fit(Ktr, ytr)
        accs.append(clf.score(Kte, yte))

    print(f"SVM accuracy over {n_repeats} runs: "
          f"{np.mean(accs):.4f} ± {np.std(accs):.4f}")

if __name__ == '__main__':
    main()


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done  48 tasks      | elapsed:    1.3s
[Parallel(n_jobs=-1)]: Done 397 out of 420 | elapsed:    1.4s remaining:    0.1s
[Parallel(n_jobs=-1)]: Done 420 out of 420 | elapsed:    1.4s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done  48 tasks      | elapsed:    3.1s
[Parallel(n_jobs=-1)]: Done 138 tasks      | elapsed:    7.8s
[Parallel(n_jobs=-1)]: Done 264 tasks      | elapsed:   14.7s
[Parallel(n_jobs=-1)]: Done 420 out of 420 | elapsed:   23.3s finished


SVM accuracy over 100 runs: 0.5102 ± 0.0188


## EulerImage

In [None]:
#!/usr/bin/env python3
import sys
import os
import numpy as np
from PIL import Image
from scipy.ndimage import label
from joblib import Parallel, delayed
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
import eucalc as ec  # make sure 'eucalc_directory' is on PYTHONPATH

# ─── Settings ─────────────────────────────────────────────────────────────────
datafolder    = "."       # directory containing your .gif files
k             = 360         # number of directions for ECT image density
xinterval     = (-1.5, 1.5)
xpoints       = 12000        # sample points along each direction
yinterval     = (0., 251)
ypoints       = 251        # resolution of the density image
padding_extra = 10         # extra pixels when padding to square
keywords      = [
    'apple','bell','bottle','car','classic',
    'cup','device','face','heart','key'
]
n_repeats     = 100        # how many train/test splits to average over
test_frac     = 0.30       # fraction for test set (70/30 split)

# ─── Preprocessing Functions ──────────────────────────────────────────────────
def filter_to_largest_cc(img_array):
    """Return binary mask of the largest connected white component."""
    mask = img_array > 0
    labeled, num = label(mask, structure=np.ones((3,3)))
    if num < 1:
        return np.zeros_like(img_array, dtype=np.uint8)
    sizes  = np.bincount(labeled.ravel())
    largest = np.argmax(sizes[1:]) + 1
    return (labeled == largest).astype(np.uint8)

def normalize_cc_size(mask, target_area):
    """Scale mask so its white-pixel area ≈ target_area."""
    current = mask.sum()
    if current in (0, target_area):
        return mask
    scale = np.sqrt(target_area / current)
    h, w   = mask.shape
    new_h = max(1, int(round(h * scale)))
    new_w = max(1, int(round(w * scale)))
    pil = Image.fromarray((mask * 255).astype(np.uint8))
    resized = pil.resize((new_w, new_h), resample=Image.BILINEAR)
    return (np.array(resized) > 127).astype(np.uint8)

def crop_to_shape(mask):
    """Crop mask tightly around its white pixels."""
    coords = np.argwhere(mask > 0)
    if coords.size == 0:
        return mask
    r0, c0 = coords.min(axis=0)
    r1, c1 = coords.max(axis=0)
    return mask[r0:r1+1, c0:c1+1]

def pad_image_to_square(img, target_length):
    """Pad image to a centered square of side target_length."""
    h, w = img.shape
    pad_h = (target_length - h) // 2
    pad_w = (target_length - w) // 2
    out = np.zeros((target_length, target_length), dtype=np.uint8)
    out[pad_h:pad_h+h, pad_w:pad_w+w] = img
    return out

# ─── ECT Image‐Based Feature Extraction ───────────────────────────────────────
class EctImg:
    def __init__(self, nm, img,
                 k=k,
                 xinterval=xinterval, xpoints=xpoints,
                 yinterval=yinterval, ypoints=ypoints):
        self.nm        = nm
        self.xinterval = xinterval
        self.yinterval = yinterval
        self.xpoints   = xpoints
        self.ypoints   = ypoints
        self.image     = self.compute(img, k,
                                      xinterval, xpoints,
                                      yinterval, ypoints)

    def compute(self, img, k,
                xinterval, xpoints,
                yinterval, ypoints):
        cplx   = ec.EmbeddedComplex(img)
        cplx.preproc_ect()
        thetas = np.linspace(0, 2*np.pi, k+1)
        ect1   = np.empty((k, xpoints), dtype=float)
        for i, theta in enumerate(thetas[:-1]):
            direction   = np.array((np.sin(theta), np.cos(theta)))
            ect_dir     = cplx.compute_euler_characteristic_transform(direction)
            T           = np.linspace(xinterval[0], xinterval[1], xpoints)
            ect1[i, :]  = [ect_dir.evaluate(t) for t in T]

        image = np.zeros((ypoints, xpoints), dtype=float)
        yvals  = np.linspace(yinterval[0], yinterval[1], ypoints+1)
        for col in range(xpoints):
            column = ect1[:, col]
            for row in range(ypoints):
                if row < ypoints-1:
                    mask = (yvals[row] <= column) & (column < yvals[row+1])
                else:
                    mask = (yvals[row] <= column) & (column <= yvals[row+1])
                image[row, col] = mask.sum() / k
        return image

# ─── Entry‐Wise L1 Distance ───────────────────────────────────────────────────
def compute_distance_matrix_l1(ects, n_jobs=-1, verbose=5):
    """Compute entry-wise L1 distance between ECT images."""
    def rowfun(i):
        return i, [
            np.sum(np.abs(ects[i].image - ects[j].image))
            for j in range(len(ects))
        ]
    results = Parallel(n_jobs=n_jobs, verbose=verbose)(
        delayed(rowfun)(i) for i in range(len(ects))
    )
    N = len(ects)
    D = np.zeros((N, N), dtype=float)
    for i, row in results:
        D[i, :] = row
    return D

# ─── Main Pipeline ────────────────────────────────────────────────────────────
def main():
    # 1) List & filter GIF files by keyword
    all_gifs = [f for f in os.listdir(datafolder) if f.lower().endswith('.gif')]
    files    = [f for f in all_gifs if any(kw in f.lower() for kw in keywords)]
    if not files:
        raise RuntimeError("No matching GIFs found.")

    # 2) Assign labels by first matching keyword
    labels = [ next(kw for kw in keywords if kw in f.lower()) for f in files ]

    # 3) Load & CC‐filter masks in parallel
    def load_mask(f):
        img = Image.open(os.path.join(datafolder, f)).convert('L')
        return filter_to_largest_cc(np.array(img))
    masks = Parallel(n_jobs=-1, verbose=5)(
        delayed(load_mask)(f) for f in files
    )
    areas = [m.sum() for m in masks]
    target_area = max(areas)

    # 4) Normalize size & crop
    cropped = [
        crop_to_shape(normalize_cc_size(m, target_area))
        for m in masks
    ]

    # 5) Pad to square
    dims         = [max(c.shape) for c in cropped]
    target_length = max(dims) + padding_extra
    padded       = {
        files[i]: pad_image_to_square(cropped[i], target_length)
        for i in range(len(files))
    }

    # 6) Compute ECT image signatures
    ects = [EctImg(f, padded[f]) for f in files]

    # 7) Compute distance matrix using entry-wise L1
    D = compute_distance_matrix_l1(ects, n_jobs=-1, verbose=5)

    # 8) Build RBF‐style kernel and classify with an SVM
    gamma = 1.0 / np.median(D[np.triu_indices_from(D, k=1)])
    K     = np.exp(-gamma * D)

    idx        = np.arange(len(files))
    accuracies = []
    for _ in range(n_repeats):
        ti, te, ytr, yte = train_test_split(
            idx, labels, test_size=test_frac, stratify=labels
        )
        Ktr = K[np.ix_(ti, ti)]
        Kte = K[np.ix_(te, ti)]

        clf = SVC(kernel='precomputed')
        clf.fit(Ktr, ytr)
        accuracies.append(clf.score(Kte, yte))

    print(f"SVM accuracy over {n_repeats} runs: "
          f"{np.mean(accuracies):.4f} ± {np.std(accuracies):.4f}")

if __name__ == '__main__':
    main()


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done  48 tasks      | elapsed:    1.3s
[Parallel(n_jobs=-1)]: Done 397 out of 420 | elapsed:    1.5s remaining:    0.1s
[Parallel(n_jobs=-1)]: Done 420 out of 420 | elapsed:    1.5s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done  48 tasks      | elapsed:   56.3s
[Parallel(n_jobs=-1)]: Done 138 tasks      | elapsed:  1.9min
[Parallel(n_jobs=-1)]: Done 264 tasks      | elapsed:  3.2min
[Parallel(n_jobs=-1)]: Done 420 out of 420 | elapsed:  4.7min finished


SVM accuracy over 100 runs: 0.9439 ± 0.0201
