In [None]:
from typing import List
import sys
import os

sys.path.append(os.path.abspath(".."))
from nmr_spectrum import NMRSpectrum
from spectrum import Spectrum

from matplotlib import pyplot as plt

import numpy as np

components_names = ["Pinene", "Benzyl benzoate"]

protons_list = [16, 12]

filename = "preprocessed_mix.csv"
mix = np.loadtxt(filename, delimiter=",")
# If you are using file exported from Mnova, comment line above and uncomment line below.
# mix = np.loadtxt(filename, delimiter='\t', usecols=[0,1])

how_many_components = len(components_names)
names = ["comp" + str(i) for i in range(how_many_components)]

files_with_components = ["preprocessed_comp0.csv", "preprocessed_comp1.csv"]
spectra = []
for i in range(how_many_components):
    filename = files_with_components[i]
    spectra.append(np.loadtxt(filename, delimiter=","))
    # If you are using file exported from Mnova, comment line above and uncomment line below.
    # spectra.append(np.loadtxt(filename, delimiter='\t', usecols=[0,1]))

spectra2: List[NMRSpectrum] = []
names = []
for i in range(len(spectra)):
    spectra2.append(
        NMRSpectrum(
            confs=list(zip(spectra[i][:, 0], spectra[i][:, 1])), protons=protons_list[i]
        )
    )
    names.append("comp" + str(i))

spectra = spectra2
del spectra2
mix = NMRSpectrum(confs=list(zip(mix[:, 0], mix[:, 1])))
mix.trim_negative_intensities()
mix.normalize()
for sp in spectra:
    sp.trim_negative_intensities()
    sp.normalize()
plt.title("Mixture")
mix.plot(profile=True)

def generate_synthetic_mixture(components, true_p):
    """Generate a synthetic mixture from components based on given proportions."""
    # Create mixture using ScalarProduct
    synthetic_mix = Spectrum.ScalarProduct(components, true_p)
    synthetic_mix.normalize()
    return synthetic_mix

p = 0.4 # 0.01 # 0.3
# p = 0.5
calculated_mix = generate_synthetic_mixture(spectra, [p, 1-p])
calculated_mix.normalize()

plt.title(f"Synthetic Mixture [{p}, {1-p}]")
calculated_mix.plot(profile=True)

for i, sp in enumerate(spectra):
    plt.title("Component " + str(i))
    sp.plot(profile=True)
print(spectra[0].confs[:10])


# Użyć unbalanced optimal transport zamiast wassersteina z total variation karą

# Flat metric

In [None]:
import numpy as np
from tqdm import trange
import math


def filter_significant_features(first, second, n_features):
    """
    Select the n most significant features from both spectra.
    Uses combined intensity to determine importance.
    """
    # Create dictionary with combined intensities
    combined_intensities = {}
    for mz, prob in first.confs:
        combined_intensities[mz] = combined_intensities.get(mz, 0) + prob
    for mz, prob in second.confs:
        combined_intensities[mz] = combined_intensities.get(mz, 0) + prob

    # Select top n features
    selected_features = set(
        sorted(
            combined_intensities.keys(),
            key=lambda mz: combined_intensities[mz],
            reverse=True,
        )[:n_features]
    )

    # Filter spectra
    filtered_first = first.copy()
    filtered_first.confs = [
        (mz, prob) for mz, prob in first.confs if mz in selected_features
    ]

    filtered_second = second.copy()
    filtered_second.confs = [
        (mz, prob) for mz, prob in second.confs if mz in selected_features
    ]

    # Re-normalize
    filtered_first.normalize()
    filtered_second.normalize()

    return filtered_first, filtered_second


def WSDistanceMoves(first, second):
    """Return the optimal transport plan between self and other."""
    try:
        ii = 0
        leftoverprob = second.confs[0][1]
        for mass, prob in first.confs:
            while leftoverprob <= prob:
                yield (second.confs[ii][0], mass, leftoverprob)
                prob -= leftoverprob
                ii += 1
                leftoverprob = second.confs[ii][1]
            yield (second.confs[ii][0], mass, prob)
            leftoverprob -= prob
    except IndexError:
        return


def WSDistance(first, second, n_features=None):
    """
    Compute Wasserstein distance using only n most significant features.
    If n_features is None, use all features.
    """
    if n_features is not None and n_features < len(first.confs) + len(second.confs):
        first, second = filter_significant_features(first, second, n_features)

    if not np.isclose(sum(x[1] for x in first.confs), 1.0):
        raise ValueError("Self is not normalized.")
    if not np.isclose(sum(x[1] for x in second.confs), 1.0):
        raise ValueError("Other is not normalized.")

    return math.fsum((x[0] - x[1]) * x[2] for x in WSDistanceMoves(first, second))


def WSGradient(source, target, n_features=None, epsilon=1e-5):
    """
    Compute numerical gradient of WSDistance with respect to mz values of `source`
    using only n most significant features.
    """
    if n_features is not None and n_features < len(source.confs) + len(target.confs):
        source, target = filter_significant_features(source, target, n_features)

    source.normalize()
    target.normalize()

    mzs = np.array([mz for mz, inten in source.confs])
    intensities = np.array([inten for mz, inten in source.confs])
    grad = np.zeros(len(mzs))

    for i in trange(len(mzs), desc="Computing WSDistance gradient"):
        mz_plus = mzs.copy()
        mz_minus = mzs.copy()
        mz_plus[i] += epsilon
        mz_minus[i] -= epsilon

        source_plus = Spectrum(confs=list(zip(mz_plus, intensities)))
        source_minus = Spectrum(confs=list(zip(mz_minus, intensities)))

        source_plus.normalize()
        source_minus.normalize()

        dist_plus = WSDistance(target, source_plus)
        dist_minus = WSDistance(target, source_minus)

        grad[i] = (dist_plus - dist_minus) / (2 * epsilon)

    return grad


# Use only n most significant features
distance = WSDistance(spectra[0], spectra[1], n_features=2000)
difference = WSDistance(spectra[0], spectra[1]) - distance
print(difference)
print(distance)

# Compute gradient using only 20 most significant features
gradient = WSGradient(spectra[0], spectra[1], n_features=1000)
print(gradient)

In [None]:
import matplotlib.pyplot as plt

def calculate_gradient(mix, comp0, comp1, p, epsilon=1e-5):
    delta = epsilon

    p_plus = np.array([p[0] + delta, p[1] - delta])
    p_minus = np.array([p[0] - delta, p[1] + delta])

    p_plus = np.clip(p_plus, 1e-6, 1)
    p_plus /= p_plus.sum()

    p_minus = np.clip(p_minus, 1e-6, 1)
    p_minus /= p_minus.sum()

    sp_plus = Spectrum.ScalarProduct([comp0, comp1], p_plus)
    sp_minus = Spectrum.ScalarProduct([comp0, comp1], p_minus)

    sp_plus.normalize()
    sp_minus.normalize()

    grad = (mix.WSDistance(sp_plus) - mix.WSDistance(sp_minus)) / (2 * epsilon)

    # Gradient of p[1] is -grad because p[1] = 1 - p[0]
    grad_vec = np.array([grad, -grad])

    return grad_vec


def mirror_descent_two_weights(
    mix, comp0, comp1, learning_rate=1.0, T=100, epsilon=1e-5, lmd = 0.99
):
    p = np.array([0.5, 0.5])  # start from uniform mixture
    history = [p.copy()]
    scores = []

    for _ in trange(T, desc="Mirror Descent (2 weights)"):
        # Track score (distance from current estimate to true mixture)
        estimated_mix = Spectrum.ScalarProduct([comp0, comp1], p)
        estimated_mix.normalize()
        ws = mix.WSDistance(estimated_mix)
        scores.append(ws)

        # Compute gradient
        grad_vec = calculate_gradient(mix, comp0, comp1, p, epsilon)

        # Mirror descent update
        w = p * np.exp(-learning_rate * grad_vec)
        p = w / w.sum()
        learning_rate *= lmd
        history.append(p.copy())

    return p, np.array(history), np.array(scores)


# Run mirror descent
mix.normalize()
spectra[0].normalize()
spectra[1].normalize()

final_p, traj, score_history = mirror_descent_two_weights(
    mix, spectra[0], spectra[1], learning_rate=0.0005, T=50
)

# Reconstruct mixture from estimated weights
estimated_mix = Spectrum.ScalarProduct([spectra[0], spectra[1]], final_p)
estimated_mix.normalize()

# Compute Wasserstein distance to the actual mixture
ws_dist = WSDistance(mix, estimated_mix)

# Compute Wasserstein distance for the given mixture
given_p = np.array([0.393072, 0.606928])
given_mix = Spectrum.ScalarProduct([spectra[0], spectra[1]], given_p)
given_mix.normalize()
ws_dist_given = WSDistance(mix, given_mix)

# Print results
print("\nFinal weights:")
print(f"  comp1 = {final_p[0]:.4f}")
print(f"  comp2 = {final_p[1]:.4f}")
print(f"\nFinal Wasserstein distance to true mixture: {ws_dist:.6f}")
print(
    f"\nWasserstein distance for given mixture (0.393072, 0.606928): {ws_dist_given:.6f}"
)

plt.plot(traj[:, 0], label="Component 0")
plt.plot(traj[:, 1], label="Component 1")
plt.xlabel("Iteration")
plt.ylabel("Weight")
plt.title("Mirror Descent Weight Evolution")
plt.legend()
plt.grid(True)
plt.show()

plt.plot(score_history, marker="o")
plt.xlabel("Iteration")
plt.ylabel("Wasserstein Distance")
plt.title("Distance to True Mixture Over Iterations")
plt.grid(True)
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt


def proportions_test():
    def test_mirror_descent(components, true_p, learning_rate=0.0005, T=100, epsilon=1e-5):
        """Test mirror descent algorithm on a synthetic mixture with known proportions."""
        # Generate synthetic mixture
        synthetic_mix = generate_synthetic_mixture(components, true_p)
        
        # Run mirror descent to recover proportions
        recovered_p, traj, score_history = mirror_descent_two_weights(
            synthetic_mix, components[0], components[1], learning_rate=learning_rate, T=T, epsilon=epsilon
        )
        
        # Calculate error between true and recovered proportions
        error = np.linalg.norm(true_p - recovered_p)
        
        return recovered_p, traj, score_history, error

    # List of proportion pairs to test
    test_proportions = [
        [0.2, 0.8],
        [0.4, 0.6],
        [0.9, 0.1]
    ]

    # Convert to numpy arrays and ensure they sum to 1
    test_proportions = [np.array(p) / sum(p) for p in test_proportions]

    # Run tests (without tqdm to avoid widget issues)
    results = []
    print("Testing proportions:")
    for i, true_p in enumerate(test_proportions):
        print(f"  Testing {i+1}/{len(test_proportions)}: p={true_p}")
        recovered_p, traj, score_history, error = test_mirror_descent(
            spectra, true_p, learning_rate=0.015, T=30
        )
        results.append({
            "true_p": true_p,
            "recovered_p": recovered_p,
            "traj": traj,
            "score_history": score_history,
            "error": error
        })

    # Print results
    print("\nTest Results:")
    print("-" * 60)
    print(f"{'True Proportions':<25} {'Recovered Proportions':<25} {'Error':<10}")
    print("-" * 60)
    for res in results:
        true_str = f"[{res['true_p'][0]:.3f}, {res['true_p'][1]:.3f}]"
        recovered_str = f"[{res['recovered_p'][0]:.3f}, {res['recovered_p'][1]:.3f}]"
        print(f"{true_str:<25} {recovered_str:<25} {res['error']:.6f}")

    # Visualization of proportions
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    true_props = np.array([res['true_p'][0] for res in results])
    recovered_props = np.array([res['recovered_p'][0] for res in results])
    plt.scatter(true_props, recovered_props)
    plt.plot([0, 1], [0, 1], 'r--')
    plt.xlabel('True proportion of component 0')
    plt.ylabel('Recovered proportion of component 0')
    plt.title('True vs Recovered Proportions')
    plt.grid(True)

    # Plot errors
    plt.subplot(1, 2, 2)
    errors = [res['error'] for res in results]
    plt.bar(range(len(errors)), errors)
    plt.xticks(range(len(errors)), [f"[{p[0]:.1f}, {p[1]:.1f}]" for p in test_proportions])
    plt.xlabel('True proportions')
    plt.ylabel('L2 Error')
    plt.title('Proportion Recovery Error')
    plt.tight_layout()
    plt.show()

    # Plot convergence for each test case
    plt.figure(figsize=(15, 10))
    for i, res in enumerate(results):
        plt.subplot(len(results), 2, 2*i+1)
        plt.plot(res['traj'][:, 0], label='Comp 0')
        plt.plot(res['traj'][:, 1], label='Comp 1')
        plt.axhline(res['true_p'][0], color='r', linestyle='--', alpha=0.5, label='True comp 0')
        plt.axhline(res['true_p'][1], color='g', linestyle='--', alpha=0.5, label='True comp 1')
        plt.title(f"Proportion Convergence for p={res['true_p']}")
        plt.legend()
        plt.grid(True)
        
        plt.subplot(len(results), 2, 2*i+2)
        plt.plot(res['score_history'])
        plt.title(f"Wasserstein Distance for p={res['true_p']}")
        plt.xlabel('Iteration')
        plt.ylabel('Distance')
        plt.grid(True)

    plt.tight_layout()
    plt.show()

In [None]:

import logging
from tqdm import tqdm

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

kappa_mixture = 0.25
kappa_components = 0.22


def transport_cost_conf(conf, target):
    source_mz, source_intensity = conf
    remaining = source_intensity
    cost = 0.0
    target_confs = [(mz, inten) for mz, inten in target.confs]

    lo, hi = 0, len(target_confs)
    while lo < hi:
        mid = (lo + hi) // 2
        if target_confs[mid][0] < source_mz:
            lo = mid + 1
        else:
            hi = mid
    idx = lo
    left = idx - 1
    right = idx

    while remaining > 0 and (left >= 0 or right < len(target_confs)):
        left_dist = (
            abs(source_mz - target_confs[left][0]) if left >= 0 else float("inf")
        )
        right_dist = (
            abs(source_mz - target_confs[right][0])
            if right < len(target_confs)
            else float("inf")
        )
        if left_dist <= right_dist:
            assigned = min(remaining, target_confs[left][1])
            cost += left_dist * assigned
            remaining -= assigned
            new_val = target_confs[left][1] - assigned
            target_confs[left] = (target_confs[left][0], new_val)
            if new_val <= 1e-12:
                left -= 1
        else:
            assigned = min(remaining, target_confs[right][1])
            cost += right_dist * assigned
            remaining -= assigned
            new_val = target_confs[right][1] - assigned
            target_confs[right] = (target_confs[right][0], new_val)
            if new_val <= 1e-12:
                right += 1
    return cost


def calculate_zeta_and_omega(
    mix: NMRSpectrum,
    components: List[NMRSpectrum],
    kappa_mixture=kappa_mixture,
    kappa_components=kappa_components,
):
    logger.info("Starting calculation of zeta and omega.")
    uniform_weights = np.ones(len(components)) / len(components)
    combined_components = Spectrum.ScalarProduct(components, uniform_weights)
    combined_components.normalize()

    final_zeta = []
    final_omega = []

    for mz, intensity in tqdm(mix.confs, desc="Processing mix confs"):
        if intensity < 1e-5:
            continue
        ws_distance = transport_cost_conf((mz, intensity), combined_components)
        normalized_distance = ws_distance / intensity if intensity > 0 else 0
      
        if normalized_distance > 1e-3:
            weight = normalized_distance - kappa_mixture
            final_zeta.append((mz, intensity * weight))

    for mz, intensity in tqdm(combined_components.confs, desc="Processing combined components confs"):
        ws_distance = transport_cost_conf((mz, intensity), mix)
        normalized_distance = ws_distance / intensity if intensity > 0 else 0
       
        if normalized_distance > 1e-3:
            weight = normalized_distance - kappa_components
            final_omega.append((mz, intensity * weight))

    final_zeta_spectrum = (
        Spectrum(confs=final_zeta) if final_zeta else Spectrum(confs=[])
    )
    final_omega_spectrum = (
        Spectrum(confs=final_omega) if final_omega else Spectrum(confs=[])
    )

    if final_zeta:
        final_zeta_spectrum.normalize()
    if final_omega:
        final_omega_spectrum.normalize()

    logger.info("Zeta and omega calculation completed.")
    return final_zeta_spectrum, final_omega_spectrum


logger.info("Starting zeta and omega computation for mix and spectra.")
zeta, omega = calculate_zeta_and_omega(
    mix,
    spectra,
    kappa_mixture=kappa_mixture,
    kappa_components=kappa_components,
)
logger.info("Zeta and omega computation finished.")


In [None]:
zeta.plot(profile=False)
omega.plot(profile=False)