In [None]:
# Column properties
# Dimensions, void fracs, lambda

# Species properties
# Names, D, steric factors, nu, keq's and kads defined between each pair
# reverse keq and kdes calculated

# Define column discretization parameters
# User specified or loaded initial condition
# Pack initial condition as y0 format for solve_ivp



In [13]:
import numpy as np
from scipy.integrate import solve_ivp
from scipy.optimize import root
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import sys
from dataclasses import dataclass

In [None]:
@dataclass
class SystemConfig:
    bed_height: float              # meters
    column_radius: float           # meters
    Lambda: float                  # ionic capacity [mol/L resin]
    epsilon_i: float               # interstitial void fraction
    epsilon_p: float               # pore void fraction
    Nz: int = 100                  # number of axial segments

    @property
    def A(self):
        return np.pi * self.column_radius**2

    @property
    def dz(self):
        return self.bed_height / (self.Nz - 1)

    @property
    def vol_column(self):
        """Total packed bed volume (resin + interstitial)"""
        return self.A * self.bed_height
    
    @property
    def vol_interstitial(self):
        """Fluid-phase volume only (mobile phase in interstitial space)"""
        return self.A * self.bed_height * self.epsilon_i

In [None]:
class Species:
    def __init__(self, name, D, Kd, unit="M"):
        self.name = name
        self.D = D # Effective axial dispersion coefficients (m^2/s)
        self.Kd = Kd # Pore accessibility fraction ()
        self.unit = unit # Only relevant for plotting and feed conditions

class Ion(Species):
    pass

class Protein(Species):
    def __init__(self, name, D, Kd, sigma, nu):
        super().__init__(name, D, Kd)
        self.sigma = sigma # Steric factors for proteins ()
        self.nu = nu # Number of displaced counterions per bound protein ()

class ExchangeSystem:
    def __init__(self, ions, proteins, config: SystemConfig):
        self.ions = ions
        self.proteins = proteins
        self.species = {**ions, **proteins}
        self.config = config
        self.K_eq = {}
        self.k_ads = {}
        self.k_des = {}

    def set_equilibrium(self, a, b, K_eq_val, k_ads_val):
        self.K_eq[(a, b)] = K_eq_val
        self.k_ads[(a, b)] = k_ads_val
        self.k_des[(a, b)] = k_ads_val / K_eq_val
        self.K_eq[(b, a)] = 1.0 / K_eq_val
        self.k_ads[(b, a)] = k_ads_val / K_eq_val
        self.k_des[(b, a)] = k_ads_val

    def check_equilibria(self):
        missing = []

        # Check ion–ion (each unordered pair only once)
        ion_list = list(self.ions.keys())
        for i in range(len(ion_list)):
            for j in range(i + 1, len(ion_list)):
                a, b = ion_list[i], ion_list[j]
                if (a, b) not in self.K_eq:
                    missing.append((a, b))

        # Check protein–ion in the order (protein, ion)
        for p in self.proteins:
            for i in self.ions:
                if (p, i) not in self.K_eq:
                    missing.append((p, i))

        if missing:
            print("Missing equilibrium definitions for:")
            for pair in missing:
                print(f"  {pair}")
        else:
            print("All required equilibria are defined.")

In [None]:
def initialize_profiles(species_list, Nz, initial_conditions):
    C_init = {s: np.zeros(Nz) for s in species_list}
    Q_init = {s: np.zeros(Nz) for s in species_list}

    for s in species_list:
        if s in initial_conditions:
            if "C" in initial_conditions[s]:
                C_init[s][:] = initial_conditions[s]["C"]
            if "Q" in initial_conditions[s]:
                Q_init[s][:] = initial_conditions[s]["Q"]
    return np.concatenate([C_init[s] for s in species_list] +
                          [Q_init[s] for s in species_list])

def unpack_state(state_array, species_list, Nz):
    split = len(species_list) * Nz
    C_arrs = state_array[:split]
    Q_arrs = state_array[split:]
    C_dict = {s: C_arrs[i*Nz:(i+1)*Nz] for i, s in enumerate(species_list)}
    Q_dict = {s: Q_arrs[i*Nz:(i+1)*Nz] for i, s in enumerate(species_list)}
    return C_dict, Q_dict

In [None]:
def convert_buffer_units(buffer, units_dict):
    cp_per_mL_to_M = 1e3 / 6.022e23  # (mol/6.022e23 capsids) * (1000 mL / 1 L)
    converted = {}
    for species, val in buffer.items():
        unit = units_dict.get(species)
        if unit == "M":
            converted[species] = val
        elif unit == "capsid/mL":
            converted[species] = val * cp_per_mL_to_M
        else:
            raise ValueError(f"Unknown or missing unit for species '{species}'")
    return converted

def feed_condition(t, system, buffers, method):
    """
    Determines feed composition and flow rate at time t using method steps.

    Args:
        t (float): time [s]
        system: ExchangeSystem object

    Returns:
        composition (dict of concentrations in M), flow_rate (m^3/s)
    """
    species_list = list(system.species.keys())
    units = {name: s.unit for name, s in system.species.items()}
    vol_interstitial = system.config.vol_interstitial

    # Validate buffer contents
    for name, buffer in buffers.items():
        if set(buffer.keys()) != set(species_list):
            raise ValueError(f"Buffer '{name}' species mismatch. Missing or extra entries.")

    block_start = 0
    for block in method:
        flow_rate_m3_s = block["flow_rate_mL_min"] * 1.667e-8
        duration_s = block["duration_CV"] * interstitialvelocity / flow_rate_m3_s
        block_end = block_start + duration_s

        if block_start <= t < block_end:
            frac = (t - block_start) / duration_s
            percent_B = block["start_B"] + frac * (block["end_B"] - block["start_B"])
            comp_A = convert_buffer_units(buffers[block["buffer_A"]], units)
            comp_B = convert_buffer_units(buffers[block["buffer_B"]], units)

            composition = {
                s: (1 - percent_B) * comp_A[s] + percent_B * comp_B[s]
                for s in species_list
            }
            return composition, flow_rate_m3_s

        block_start = block_end

    # After method ends, return last block conditions
    last = method[-1]
    flow_rate_m3_s = last["flow_rate_mL_min"] * 1.667e-8
    percent_B = last["end_B"]
    comp_A = convert_buffer_units(buffers[last["buffer_A"]], units)
    comp_B = convert_buffer_units(buffers[last["buffer_B"]], units)

    composition = {
        s: (1 - percent_B) * comp_A[s] + percent_B * comp_B[s]
        for s in species_list
    }

    return composition, flow_rate_m3_s

In [None]:
def calc_Qbar_i(Q, system):
    Nz = system.config.Nz
    Lambda = system.config.Lambda
    Qbar = {i: np.zeros(Nz) for i in system.ions}
    for i in system.ions:
        for z_idx in range(Nz):
            sum_Q_other_ions = sum(
            Q[k][z_idx] for k in system.ions if k != i
            )
            blocked = sum(
                (p.nu + p.sigma) * Q[p.name][z_idx] for p in system.proteins.values()
            )
            if Q[i][z_idx] > 1e-12:
                Qbar[i][z_idx] = (Lambda - blocked) / (1 + sum_Q_other_ions / Q[i][z_idx])
            else:
                Qbar[i][z_idx] = 0.0
    return Qbar

def calc_dQ_idt(C, Q, Qbar, system):
    Nz = system.config.Nz
    dQdt = {s: np.zeros(Nz) for s in system.species}
    for i in system.ions:
        for z_idx in range(Nz):
            # Ion–ion exchange
            for k in system.ions:
                if k != i:
                    dQdt[i][z_idx] += system.k_ads[(i, k)] * C[i][z_idx] * Q[k][z_idx]
                    dQdt[i][z_idx] -= system.k_des[(i, k)] * Q[i][z_idx] * C[k][z_idx]

            # Protein–ion exchange
            for j in system.proteins:
                nu_j = system.proteins[j].nu
                dQdt[i][z_idx] += system.k_ads[(i, j)] * (C[i][z_idx] ** nu_j) * Q[j][z_idx]
                dQdt[i][z_idx] -= system.k_des[(i, j)] * Q[i][z_idx] * (C[j][z_idx] ** nu_j)

    for j in system.proteins:
        for z_idx in range(Nz):
        # Protein–ion exchange
            nu_j = system.proteins[j].nu
            for i in system.ions:
                dQdt[i][z_idx] += system.k_ads[(j, i)] * C[j][z_idx] * (Qbar[i][z_idx] ** nu_j)
                dQdt[i][z_idx] -= system.k_des[(j, i)] * Q[j][z_idx] * (C[i][z_idx] ** nu_j)
    return dQdt

def calc_dCdt(C, dQdt, system, feed):
    Nz = system.config.Nz
    dz = system.config.dz
    v_interstitial, C_feed = feed
    epsilon_p = system.config.epsilon_p
    epsilon_i = system.config.epsilon_i
    for s in system.species:
        D = system.species[s].D
        Kd = system.species[s].Kd
        for z_idx in range(Nz):
            if z_idx == 0:
                # INLET (one-sided forward difference)
                dCdz = (C[s][0] - C_feed[s]) / dz
                d2Cdz2 = (C[s][1] - 2 * C[s][0] + C_feed[s]) / dz**2
            elif z_idx == system.config.Nz - 1:
                # OUTLET (backward difference)
                dCdz = (C[s][-1] - C[s][-2]) / dz
                d2Cdz2 = (C[s][-1] - 2 * C[s][-2] + C[s][-3]) / dz**2
            else:
                # INTERIOR (central difference for diffusion, backward for advection)
                dCdz = (C[s][z_idx] - C[s][z_idx - 1]) / dz
                d2Cdz2 = (C[s][z_idx + 1] - 2 * C[s][z_idx] + C[s][z_idx - 1]) / dz**2
            numerator = -v_interstitial * dCdz[s][z_idx] + D * d2Cdz2 - epsilon_p * Kd * dQdt[s][z_idx]
            denominator = epsilon_i + epsilon_p * Kd
    return numerator / denominator

In [None]:
def convert_buffer_units(buffer, units_dict):
    cp_per_mL_to_M = 1e3 / 6.022e23  # (mol/6.022e23 capsids) * (1000 mL / 1 L)
    converted = {}
    for species, val in buffer.items():
        unit = units_dict.get(species)
        if unit == "M":
            converted[species] = val
        elif unit == "capsid/mL":
            converted[species] = val * cp_per_mL_to_M
        else:
            raise ValueError(f"Unknown or missing unit for species '{species}'")
    return converted

def feed_condition(t, system, buffers, method):
    """
    Determines feed composition and flow rate at time t using method steps.

    Args:
        t (float): time [s]
        system: ExchangeSystem object

    Returns:
        composition (dict of concentrations in M), flow_rate (m^3/s)
    """
    species_list = list(system.species.keys())
    units = {name: s.unit for name, s in system.species.items()}
    vol_interstitial = system.config.vol_interstitial

    # Validate buffer contents
    for name, buffer in buffers.items():
        if set(buffer.keys()) != set(species_list):
            raise ValueError(f"Buffer '{name}' species mismatch. Missing or extra entries.")

    block_start = 0
    for block in method:
        flow_rate_m3_s = block["flow_rate_mL_min"] * 1.667e-8
        duration_s = block["duration_CV"] * interstitialvelocity / flow_rate_m3_s
        block_end = block_start + duration_s

        if block_start <= t < block_end:
            frac = (t - block_start) / duration_s
            percent_B = block["start_B"] + frac * (block["end_B"] - block["start_B"])
            comp_A = convert_buffer_units(buffers[block["buffer_A"]], units)
            comp_B = convert_buffer_units(buffers[block["buffer_B"]], units)

            composition = {
                s: (1 - percent_B) * comp_A[s] + percent_B * comp_B[s]
                for s in species_list
            }
            return composition, flow_rate_m3_s

        block_start = block_end

    # After method ends, return last block conditions
    last = method[-1]
    flow_rate_m3_s = last["flow_rate_mL_min"] * 1.667e-8
    percent_B = last["end_B"]
    comp_A = convert_buffer_units(buffers[last["buffer_A"]], units)
    comp_B = convert_buffer_units(buffers[last["buffer_B"]], units)

    composition = {
        s: (1 - percent_B) * comp_A[s] + percent_B * comp_B[s]
        for s in species_list
    }

    return composition, flow_rate_m3_s

In [None]:
ions = {
    "Cl": Ion("Cl", D=1e-10, Kd=1.0),
    "Ac": Ion("Ac", D=1e-10, Kd=1.0)
}

proteins = {
    "em": Protein("em", D=5e-12, Kd=1.0, sigma=100, nu=5),
    "fu": Protein("fu", D=5e-12, Kd=1.0, sigma=100, nu=5)
}

config = SystemConfig(
    bed_height=0.2,
    column_radius=0.004,
    Lambda=0.5,
    epsilon_i=0.5,
    epsilon_p=0.5,
    Nz=100
)

system = ExchangeSystem(ions, proteins, config)
species_list = list(system.species.keys())
system.set_equilibrium("Cl", "Ac", K_eq_val=2.0, k_ads_val=1.0)
system.set_equilibrium("em", "Cl", K_eq_val=5e3, k_ads_val=1e4)
system.set_equilibrium("em", "Ac", K_eq_val=5e-2, k_ads_val=1e3)
system.set_equilibrium("fu", "Cl", K_eq_val=1e4, k_ads_val=1e4)
system.set_equilibrium("fu", "Ac", K_eq_val=1e-1, k_ads_val=1e3)
system.check_equilibria()

All required equilibria are defined.


In [None]:
initial_conditions = {
    "Cl": {"C": 0.04, "Q": system.config.Lambda},
}

y0 = initialize_profiles(species_list, system.config.Nz, initial_conditions);

In [None]:
buffers = {
    "Load": {"Cl": 0.04, "Ac": 0.0, "em": 0, "fu": 5e16},
    "A": {"Cl": 0.04, "Ac": 0.0, "em": 0.0, "fu": 0.0},
    "B": {"Cl": 0.04, "Ac": 0.3, "em": 0.0, "fu": 0.0},
    "Spike": {"Cl": 1.0, "Ac": 0.0, "em": 0.0, "fu": 0.0},
}

# Maybe check buffers or something

method = [
    {"buffer_A": "A", "buffer_B": "B", "start_B": 0.0, "end_B": 0.0, "duration_CV": 1, "flow_rate_mL_min": 2.7},
    {"buffer_A": "Spike", "buffer_B": "B", "start_B": 0.0, "end_B": 0.0, "duration_CV": 0.02, "flow_rate_mL_min": 2.7},
    {"buffer_A": "A", "buffer_B": "B", "start_B": 0.0, "end_B": 0.0, "duration_CV": 2, "flow_rate_mL_min": 2.7},
]   

In [None]:
def chromatography_odes(t, y, system, feed_condition):
    global call_count
    call_count += 1
    if call_count % 10 == 0:
        print(f"t = {t:.2f}, ODE Call = {call_count}")
    Nz = system.config.Nz
    dz = system.config.dz
    species_list = list(system.species.keys())
    
    # Unpack
    split = len(species_list) * Nz
    C_arrs = y[:split]
    Q_arrs = y[split:]
    C_dict = {s: C_arrs[i*Nz:(i+1)*Nz] for i, s in enumerate(species_list)}
    Q_dict = {s: Q_arrs[i*Nz:(i+1)*Nz] for i, s in enumerate(species_list)}
    dCdt = {species: np.zeros_like(arr) for species, arr in C_dict.items()}
    dQdt = {species: np.zeros_like(arr) for species, arr in Q_dict.items()}