In [1]:
import polars as pl
all_smiles = pl.read_csv('../data/from_host/train.csv')['SMILES'].to_list()
SMILES_INDEX = 1552
all_smiles[SMILES_INDEX]

'*CCCS(*)(=O)=O'

In [12]:
import MDAnalysis as mda

dump_path = f"work_dirs/host_hybrid/{SMILES_INDEX}/eq1_final.dump"
data_path = f"work_dirs/host_hybrid/{SMILES_INDEX}/eq1_final.data"

# coords = mda.coordinates.LAMMPS.DumpReader(dump_path)
# topology = mda.topology.LAMMPSParser.LammpsDumpParser(dump_path).parse()

# print(topology)

u = mda.Universe(
    data_path,
    f"work_dirs/host_hybrid/{SMILES_INDEX}/eq1_short.xtc",
    # format='LAMMPS'
)

In [13]:
import numpy as np
from MDAnalysis.lib import mdamath
import freud

# --- positions & box for freud ---
positions_angstrom = u.atoms.positions.astype(np.float64)
box_matrix_angstrom = mdamath.triclinic_vectors(u.dimensions)  # 3x3
freud_box = freud.Box.from_matrix(box_matrix_angstrom)

# --- mass density (g/cm^3) ---
amu_to_g = 1.66053906660e-24
volume_ang3 = mdamath.box_volume(u.dimensions)          # Å^3
rho_g_cm3 = (u.atoms.masses.sum() * amu_to_g) / (volume_ang3 * 1e-24)

# --- RDF first peak ---
rdf = freud.density.RDF(bins=200, r_max=10.0)
rdf.compute(system=(freud_box, positions_angstrom))
first_peak_idx = int(np.argmax(rdf.rdf))
first_peak_distance_angstrom = float(rdf.bin_centers[first_peak_idx])

# --- Steinhardt Q6 ---
q6 = freud.order.Steinhardt(l=6)
q6.compute((freud_box, positions_angstrom), neighbors={'num_neighbors': 12, 'exclude_ii': True})
average_q6 = float(q6.order)

# --- Voronoi cell volumes (Å^3) ---
voro = freud.locality.Voronoi()
voro.compute(system=(freud_box, positions_angstrom))
voronoi_volumes_ang3 = voro.volumes  # per-atom cell volumes

print(f"rho = {rho_g_cm3:.3f}, Q6 = {average_q6:.4f}, gr_peak = {first_peak_distance_angstrom:.2f} Å")
print(f"Voronoi volumes: mean = {np.mean(voronoi_volumes_ang3):.2f} Å^3")

rho = 0.119, Q6 = 0.0037, gr_peak = 1.08 Å
Voronoi volumes: mean = 122.54 Å^3


In [18]:
bondi_vdw_radius_by_element_angstrom = {
    "H": 1.20, "C": 1.70, "N": 1.55, "O": 1.52, "F": 1.47,
    "P": 1.80, "S": 1.80, "Cl": 1.75, "Br": 1.85, "Si": 2.10
}

def build_per_atom_vdw_radii_angstrom(universe):
    # Try to use elements if available; else names’ leading letters; else default heavy-atom radius.
    try:
        element_symbols = [el if el is not None else "" for el in getattr(universe.atoms, "elements", [None]*len(universe.atoms))]
    except Exception:
        element_symbols = ["" for _ in range(len(universe.atoms))]

    per_atom_radii = []
    for atom, element_symbol in zip(universe.atoms, element_symbols):
        radius = None
        key = element_symbol if element_symbol in bondi_vdw_radius_by_element_angstrom else str(getattr(atom, "name", "")).rstrip("0123456789")
        if key in bondi_vdw_radius_by_element_angstrom:
            radius = bondi_vdw_radius_by_element_angstrom[key]
        elif key and key[0] in bondi_vdw_radius_by_element_angstrom:
            radius = bondi_vdw_radius_by_element_angstrom[key[0]]
        else:
            # Fallback: treat as generic heavy atom
            radius = 1.70
        per_atom_radii.append(radius)
    return np.asarray(per_atom_radii, dtype=float)

def estimate_ffv_monte_carlo(
    freud_box,
    atom_positions_angstrom: np.ndarray,
    per_atom_vdw_radii_angstrom: np.ndarray,
    probe_radius_angstrom: float = 0.0,
    grid_points_per_axis: int = 48,
    rng_seed: int = 0
) -> float:
    """
    Returns an estimate of the probe-accessible FFV using uniform grid sampling
    with PBC. Increase grid_points_per_axis for accuracy (cost ~ N^3).
    """
    rng = np.random.default_rng(rng_seed)

    # --- build Cartesian grid in fractional space, then map to box ---
    fractional_lin = (np.arange(grid_points_per_axis, dtype=float) + 0.5) / grid_points_per_axis
    fx, fy, fz = np.meshgrid(fractional_lin, fractional_lin, fractional_lin, indexing="ij")
    sample_fracs = np.column_stack([fx.ravel(), fy.ravel(), fz.ravel()])  # (M,3)

    box_matrix_angstrom = freud_box.to_matrix().astype(float)  # 3x3
    sample_points_angstrom = sample_fracs @ box_matrix_angstrom.T         # (M,3)

    # --- neighbor query with a single conservative cutoff (max radius) ---
    max_effective_radius = float(np.max(per_atom_vdw_radii_angstrom) + probe_radius_angstrom)
    neighbor_query = freud.locality.AABBQuery(freud_box, atom_positions_angstrom.astype(float))
    neighbor_list = neighbor_query.query(sample_points_angstrom, {"r_max": max_effective_radius}).toNeighborList()

    # --- mark any sample point that has at least one neighbor within that conservative cutoff as "potentially occupied" ---
    # This is conservative; to be stricter, filter by per-atom specific radius below (optional refinement).
    occupied_flags = np.zeros(sample_points_angstrom.shape[0], dtype=bool)
    occupied_flags[np.asarray(neighbor_list.query_point_indices, dtype=np.int64)] = True

    # Optional refinement: shrink back by checking per-atom specific radii
    # (kept lightweight; comment out if the neighbor list exposes distances differently in your freud)
    try:
        # If your freud exposes neighbor vectors/distances, validate per-pair
        neighbor_vectors = np.asarray(neighbor_list.separations, dtype=float)  # shape (K,3) in some freud builds
        neighbor_distances = np.linalg.norm(neighbor_vectors, axis=1)
        neighbor_atom_indices = np.asarray(neighbor_list.point_indices, dtype=np.int64)
        neighbor_query_indices = np.asarray(neighbor_list.query_point_indices, dtype=np.int64)
        effective_radii_per_pair = per_atom_vdw_radii_angstrom[neighbor_atom_indices] + probe_radius_angstrom
        within_true_radius = neighbor_distances <= effective_radii_per_pair + 1e-9

        # Reset & re-mark with the refined criterion
        occupied_flags[:] = False
        occupied_flags[neighbor_query_indices[within_true_radius]] = True
    except Exception:
        # If separations/distances are not exposed in your freud version,
        # we keep the conservative occupied_flags computed above.
        pass

    ffv_estimate = float((~occupied_flags).mean())
    return ffv_estimate

per_atom_vdw_radii_angstrom = build_per_atom_vdw_radii_angstrom(u)

# Example call (uses same radii array built above):
ffv_accessible = estimate_ffv_monte_carlo(
    freud_box=freud_box,
    atom_positions_angstrom=positions_angstrom,
    per_atom_vdw_radii_angstrom=per_atom_vdw_radii_angstrom,
    probe_radius_angstrom=1.20,   # e.g., helium-like probe; set 0.0 for geometric void
    # probe_radius_angstrom=0,
    grid_points_per_axis=48,      # ↑ to 64–96 for tighter estimates
    rng_seed=0
)

print('FFV (monte-carlo):', ffv_accessible)

FFV (monte-carlo): 0.8224464699074074


In [22]:
for ts in u.trajectory:
    # ffv_accessible = estimate_ffv_monte_carlo(
    #     freud_box=freud_box,
    #     atom_positions_angstrom=positions_angstrom,
    #     per_atom_vdw_radii_angstrom=per_atom_vdw_radii_angstrom,
    #     probe_radius_angstrom=1.20,   # e.g., helium-like probe; set 0.0 for geometric void
    #     # probe_radius_angstrom=0,
    #     grid_points_per_axis=48,      # ↑ to 64–96 for tighter estimates
    #     rng_seed=0
    # )

    # print('FFV (monte-carlo):', ffv_accessible)

FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-carlo): 0.8224464699074074
FFV (monte-c

KeyboardInterrupt: 

In [24]:
from typing import Dict, Tuple, Optional
import math
import numpy as np
import MDAnalysis as mda
from MDAnalysis.exceptions import NoDataError

# Minimal Bondi vdW radii (Å). Add to taste if your system contains more elements.
BONDI_RADII_ANGSTROM: Dict[str, float] = {
    "H": 1.20, "He": 1.40,
    "C": 1.70, "N": 1.55, "O": 1.52, "F": 1.47,
    "P": 1.80, "S": 1.80, "Cl": 1.75,
    "Br": 1.85, "I": 1.98,
    "Si": 2.10,
    # Alkali/alkaline-earth (rare in organic polymers, included for completeness):
    "Na": 2.27, "K": 2.75, "Ca": 2.31,
}

APPROX_ELEMENT_MASSES = {
    "H": 1.008, "C": 12.011, "N": 14.007, "O": 15.999, "F": 18.998,
    "Na": 22.990, "Mg": 24.305, "Al": 26.982, "Si": 28.085, "P": 30.974,
    "S": 32.06, "Cl": 35.45, "K": 39.098, "Ca": 40.078, "Br": 79.904, "I": 126.904,
}

def safe_get(atom, attr: str):
    try:
        return getattr(atom, attr)
    except (AttributeError, NoDataError):
        return None

def infer_element_symbol_for_atom(atom) -> str:
    # 1) element
    el = safe_get(atom, "element")
    if el:
        return el.strip().capitalize()

    # 2) name
    nm = safe_get(atom, "name")
    if isinstance(nm, str) and nm:
        letters = "".join(c for c in nm if c.isalpha())
        if letters:
            return (letters[:2].capitalize()
                    if len(letters) > 1 and letters[1].islower()
                    else letters[:1].upper())

    # 3) type (LAMMPS often has numeric or short strings)
    tp = safe_get(atom, "type")
    if tp:
        if isinstance(tp, str):
            letters = "".join(c for c in tp if c.isalpha())
            if letters:
                return (letters[:2].capitalize()
                        if len(letters) > 1 and letters[1].islower()
                        else letters[:1].upper())
        # numeric types → just fall through to default after mass check

    # 4) mass-based nearest match (crude but better than nothing)
    mass = safe_get(atom, "mass")
    if mass:
        closest_el = min(APPROX_ELEMENT_MASSES, key=lambda k: abs(APPROX_ELEMENT_MASSES[k] - mass))
        return closest_el

    # 5) default
    return "C"

def compute_fractional_free_volume(
    universe: mda.Universe,
    selection: str = "all",
    grid_spacing_angstrom: float = 0.5,
    radii_angstrom_by_element: Optional[Dict[str, float]] = None,
    probe_radius_angstrom: Optional[float] = None,
    use_periodic_boundary_conditions: bool = True,
    return_occupancy_grid: bool = False,
) -> Tuple[dict, Optional[np.ndarray]]:
    """
    Estimate FFV for a selected set of atoms by voxelizing the periodic box and marking
    voxels whose centers lie within a vdW sphere of any atom (optionally inflated by a probe).

    Parameters
    ----------
    universe : mda.Universe
        Universe with box dimensions in Å. Assumes orthorhombic (alpha=beta=gamma≈90°).
    selection : str
        Atom selection (e.g., "all" or "not name H*").
    grid_spacing_angstrom : float
        Edge length of cubic voxels in Å. Smaller -> more accurate, slower. 0.5 Å is a good start.
    radii_angstrom_by_element : dict
        Mapping element symbol -> vdW radius in Å. Defaults to minimal Bondi set above.
    probe_radius_angstrom : float | None
        If provided, each atom’s radius is inflated by this amount before painting occupancy.
        Set to 0.0 for strict vdW volume, or e.g. 1.2 Å for helium-probe–like accessible volume.
    use_periodic_boundary_conditions : bool
        If True, spheres wrap across boundaries. If False, atoms near edges will be truncated.
    return_occupancy_grid : bool
        If True, also return a boolean 3D array of occupied voxels (z, y, x).

    Returns
    -------
    results : dict
        {
          "ffv": float,
          "occupied_volume_A3": float,
          "box_volume_A3": float,
          "voxel_count_total": int,
          "voxel_count_occupied": int,
          "grid_shape_zyx": (nz, ny, nx),
          "grid_spacing_A": float,
          "selection": str,
          "notes": str,
        }
    occupancy_grid : np.ndarray[bool] | None
        Boolean 3D mask of occupied voxels (z, y, x) if return_occupancy_grid is True, else None.
    """
    radii_lookup = radii_angstrom_by_element or BONDI_RADII_ANGSTROM

    # Grab a single frame’s dimensions (Å) and validate orthorhombic box.
    lx, ly, lz, alpha, beta, gamma = universe.trajectory.ts._unitcell  # MDAnalysis stores [lx, ly, lz, alpha, beta, gamma]
    def _is_close(a: float, b: float, tol: float = 1e-2) -> bool:
        return abs(a - b) < tol
    if not (_is_close(alpha, 90.0) and _is_close(beta, 90.0) and _is_close(gamma, 90.0)):
        raise ValueError("This FFV helper currently assumes an orthorhombic box (alpha=beta=gamma≈90°).")

    total_box_volume_A3 = float(lx * ly * lz)

    # Build grid (voxel centers). We ensure at least 1 voxel per axis.
    nx = max(1, int(math.floor(lx / grid_spacing_angstrom)))
    ny = max(1, int(math.floor(ly / grid_spacing_angstrom)))
    nz = max(1, int(math.floor(lz / grid_spacing_angstrom)))
    actual_dx = lx / nx
    actual_dy = ly / ny
    actual_dz = lz / nz

    # Occupancy grid, indexed as (z, y, x)
    occupancy_grid = np.zeros((nz, ny, nx), dtype=bool)

    selected_atoms = universe.select_atoms(selection)
    if selected_atoms.n_atoms == 0:
        raise ValueError(f"No atoms found for selection='{selection}'.")

    # Precompute voxel center coordinates along each axis (Å).
    # Centers are offset by half a voxel from box edges: [dx/2, 3dx/2, ...]
    x_centers = (np.arange(nx, dtype=float) + 0.5) * actual_dx
    y_centers = (np.arange(ny, dtype=float) + 0.5) * actual_dy
    z_centers = (np.arange(nz, dtype=float) + 0.5) * actual_dz

    # Helper to convert a coordinate (Å) to voxel index with optional periodic wrapping.
    def coord_to_index(coord: float, axis_length: float, ndiv: int) -> int:
        raw = int(math.floor(coord / (axis_length / ndiv)))
        if use_periodic_boundary_conditions:
            return raw % ndiv
        return min(max(raw, 0), ndiv - 1)

    # For faster distance checks, we will “paint” per atom by bounding-boxing voxels that could be inside the sphere,
    # then refine with a center-to-atom distance test.
    positions_A = selected_atoms.positions.copy()  # shape (N, 3) in Å
    # Wrap into the primary cell so indices work cleanly.
    if use_periodic_boundary_conditions:
        positions_A[:, 0] = np.mod(positions_A[:, 0], lx)
        positions_A[:, 1] = np.mod(positions_A[:, 1], ly)
        positions_A[:, 2] = np.mod(positions_A[:, 2], lz)

    # Acquire per-atom radii. Try atom.element when present; fall back to simple guesses from name/type.
    atom_radii = np.empty(selected_atoms.n_atoms, dtype=float)
    for i, atom in enumerate(selected_atoms.atoms):
        element_symbol = infer_element_symbol_for_atom(atom)

        # Normalize halogens like "CL" → "Cl"
        if len(element_symbol) >= 2:
            element_symbol = element_symbol[0].upper() + element_symbol[1].lower()
        else:
            element_symbol = element_symbol.upper()

        vdW_radius = radii_lookup.get(element_symbol, radii_lookup.get("C", 1.70))
        if probe_radius_angstrom is not None:
            vdW_radius += float(probe_radius_angstrom)
        atom_radii[i] = vdW_radius

    # Precompute squared radii for distance tests.
    atom_radii_sq = atom_radii ** 2

    # Paint each atom’s sphere onto the voxel grid
    for atom_index in range(selected_atoms.n_atoms):
        ax, ay, az = positions_A[atom_index]
        r = atom_radii[atom_index]

        # Determine voxel index bounds along each axis (inclusive) potentially covered by the sphere.
        min_x = ax - r
        max_x = ax + r
        min_y = ay - r
        max_y = ay + r
        min_z = az - r
        max_z = az + r

        ix_min = int(math.floor(min_x / actual_dx))
        ix_max = int(math.floor(max_x / actual_dx))
        iy_min = int(math.floor(min_y / actual_dy))
        iy_max = int(math.floor(max_y / actual_dy))
        iz_min = int(math.floor(min_z / actual_dz))
        iz_max = int(math.floor(max_z / actual_dz))

        # Build index ranges with optional periodic wrap
        def wrapped_range(i_min: int, i_max: int, ndiv: int) -> np.ndarray:
            if not use_periodic_boundary_conditions:
                i0 = max(i_min, 0)
                i1 = min(i_max, ndiv - 1)
                if i1 < i0:
                    return np.empty(0, dtype=int)
                return np.arange(i0, i1 + 1, dtype=int)
            # wrap: e.g., range(-2, 1) in ndiv=10 -> [8, 9, 0, 1]
            full = np.arange(i_min, i_max + 1, dtype=int)
            return np.mod(full, ndiv)

        x_indices = wrapped_range(ix_min, ix_max, nx)
        y_indices = wrapped_range(iy_min, iy_max, ny)
        z_indices = wrapped_range(iz_min, iz_max, nz)

        # Squared distance threshold
        r2 = atom_radii_sq[atom_index]

        # Distance test on voxel centers within the bounding box
        for iz in z_indices:
            zc = z_centers[iz]
            dz = zc - az
            if use_periodic_boundary_conditions:
                # Minimum-image along each axis (orthorhombic)
                if dz > 0.5 * lz:
                    dz -= lz
                elif dz < -0.5 * lz:
                    dz += lz
            dz2 = dz * dz
            # Quick prune if dz already exceeds r
            if dz2 > r2:
                continue

            for iy in y_indices:
                yc = y_centers[iy]
                dy = yc - ay
                if use_periodic_boundary_conditions:
                    if dy > 0.5 * ly:
                        dy -= ly
                    elif dy < -0.5 * ly:
                        dy += ly
                dy2 = dy * dy
                if dy2 + dz2 > r2:
                    continue

                # Compute remaining allowed dx span (sphere cross-section)
                remaining_r2 = r2 - (dy2 + dz2)

                # Instead of testing all x individually, we can still loop—vectorizing here would add complexity.
                for ix in x_indices:
                    xc = x_centers[ix]
                    dx = xc - ax
                    if use_periodic_boundary_conditions:
                        if dx > 0.5 * lx:
                            dx -= lx
                        elif dx < -0.5 * lx:
                            dx += lx
                    if dx * dx <= remaining_r2:
                        occupancy_grid[iz, iy, ix] = True

    voxel_volume_A3 = float(actual_dx * actual_dy * actual_dz)
    voxel_count_total = int(nx * ny * nz)
    voxel_count_occupied = int(np.count_nonzero(occupancy_grid))
    occupied_volume_A3 = float(voxel_count_occupied) * voxel_volume_A3

    ffv = max(0.0, min(1.0, 1.0 - (occupied_volume_A3 / total_box_volume_A3)))

    results = {
        "ffv": ffv,
        "occupied_volume_A3": occupied_volume_A3,
        "box_volume_A3": total_box_volume_A3,
        "voxel_count_total": voxel_count_total,
        "voxel_count_occupied": voxel_count_occupied,
        "grid_shape_zyx": (nz, ny, nx),
        "grid_spacing_A": float(grid_spacing_angstrom),
        "selection": selection,
        "notes": (
            "Grid-based FFV using vdW spheres (Bondi radii) with optional probe inflation. "
            "Assumes orthorhombic periodic box. Reduce grid_spacing_A for higher accuracy."
        ),
    }

    return (results, occupancy_grid if return_occupancy_grid else None)

for ts in u.trajectory:
    print(compute_fractional_free_volume(u))

({'ffv': 0.9431179755314414, 'occupied_volume_A3': 41541.57883703709, 'box_volume_A3': 730311.1875, 'voxel_count_total': 5832000, 'voxel_count_occupied': 331736, 'grid_shape_zyx': (180, 180, 180), 'grid_spacing_A': 0.5, 'selection': 'all', 'notes': 'Grid-based FFV using vdW spheres (Bondi radii) with optional probe inflation. Assumes orthorhombic periodic box. Reduce grid_spacing_A for higher accuracy.'}, None)
({'ffv': 0.942629376801057, 'occupied_volume_A3': 41466.91991329193, 'box_volume_A3': 722790.125, 'voxel_count_total': 5735339, 'voxel_count_occupied': 329040, 'grid_shape_zyx': (179, 179, 179), 'grid_spacing_A': 0.5, 'selection': 'all', 'notes': 'Grid-based FFV using vdW spheres (Bondi radii) with optional probe inflation. Assumes orthorhombic periodic box. Reduce grid_spacing_A for higher accuracy.'}, None)
({'ffv': 0.9420078996328816, 'occupied_volume_A3': 41493.061476677656, 'box_volume_A3': 715495.0625, 'voxel_count_total': 5639752, 'voxel_count_occupied': 327061, 'grid_sha

KeyboardInterrupt: 

In [25]:
last_frame = u.trajectory[-1]
print(compute_fractional_free_volume(u))

({'ffv': 0.5439856093358968, 'occupied_volume_A3': 41348.10377937555, 'box_volume_A3': 90672.8046875, 'voxel_count_total': 704969, 'voxel_count_occupied': 321476, 'grid_shape_zyx': (89, 89, 89), 'grid_spacing_A': 0.5, 'selection': 'all', 'notes': 'Grid-based FFV using vdW spheres (Bondi radii) with optional probe inflation. Assumes orthorhombic periodic box. Reduce grid_spacing_A for higher accuracy.'}, None)


In [16]:
rg_values_per_chain = []
for fragment in u.atoms.fragments:
    heavy_mask = (fragment.masses > 1.2)
    # heavy_mask = (fragment.masses > 0)
    if np.count_nonzero(heavy_mask) < 10:
        continue
    chain_rg = float(fragment[heavy_mask].radius_of_gyration(wrap=True))
    rg_values_per_chain.append(chain_rg)

if not rg_values_per_chain:
    raise RuntimeError("No chain had enough heavy atoms for an Rg estimate.")

print(rg_values_per_chain)
rg_values_per_chain = np.array(rg_values_per_chain, dtype=float)
print(
    f"Rg per chain (Å): median={np.median(rg_values_per_chain):.2f}, "
    f"mean={np.mean(rg_values_per_chain):.2f}, "
    f"std={np.std(rg_values_per_chain):.2f}, "
    f"p10={np.percentile(rg_values_per_chain,10):.2f}, "
    f"p90={np.percentile(rg_values_per_chain,90):.2f}, "
    f"n_chains={len(rg_values_per_chain)}"
)

[33.21665311175854, 51.200502580374916, 41.58762363832898, 35.55163650125462, 36.074106343576936, 50.269641746366325, 44.526610147710066, 44.591231889205204, 44.8574987157207, 35.09390001374153]
Rg per chain (Å): median=43.06, mean=41.70, std=6.12, p10=34.91, p90=50.36, n_chains=10


In [17]:
import numpy as np
from collections import deque
import MDAnalysis as mda
from MDAnalysis.transformations import unwrap, wrap, center_in_box
from MDAnalysis.analysis import msd, polymer

# ------------------------------------------------------------
# 0) Transformations: make molecules whole & keep them in the box
# ------------------------------------------------------------
u.trajectory.add_transformations(
    unwrap(u.atoms),
    wrap(u.atoms, compound='fragments'),
    center_in_box(u.atoms, wrap=True),
)
# Activate transforms by stepping the trajectory
_ = u.trajectory[0]
_ = u.trajectory[-1]

# ------------------------------------------------------------
# 1) Diffusivity via Einstein MSD (3D): D = slope / 6
#    Pass time_per_frame_ps if the time axis isn't present in the file.
# ------------------------------------------------------------
def compute_diffusivity(
    universe: mda.Universe,
    selection: str = "all",
    fit_points: int = 20,
    time_per_frame_ps: float | None = None
) -> tuple[float, float]:
    msd_result = msd.EinsteinMSD(universe, select=selection).run()
    msd_values_A2 = np.asarray(msd_result.results.timeseries, dtype=float)
    n_frames = len(msd_values_A2)
    if n_frames < 2:
        raise RuntimeError("Not enough frames to fit MSD slope (need >= 2 frames).")

    time_values_ps = np.asarray(msd_result.times, dtype=float)
    if time_values_ps.size != n_frames or not np.all(np.isfinite(time_values_ps)):
        dt_ps = time_per_frame_ps if time_per_frame_ps is not None else getattr(universe.trajectory, "dt", None)
        if dt_ps is None:
            raise RuntimeError("No time axis; provide time_per_frame_ps (e.g., 0.2 for 2 fs step dumped every 100).")
        time_values_ps = np.arange(n_frames, dtype=float) * float(dt_ps)

    k = max(2, min(int(fit_points), n_frames))
    slope_A2_per_ps = float(np.polyfit(time_values_ps[:k], msd_values_A2[:k], 1)[0])
    diffusivity_A2_per_ps = slope_A2_per_ps / 6.0
    diffusivity_cm2_per_s = diffusivity_A2_per_ps * 1e-4
    return diffusivity_A2_per_ps, diffusivity_cm2_per_s

# ------------------------------------------------------------
# 2) Mass-weighted shape metrics (global and per-chain), PBC-aware
#    Asphericity    b = λ1 - 0.5*(λ2 + λ3)           (Å²)
#    Acylindricity  c = λ2 - λ3                      (Å²)
#    κ² (anisotropy)   = 1 - 3*(Σ λiλj)/(Σ λi)²      (dimensionless)
#    We build the mass-weighted gyration tensor manually for compatibility.
# ------------------------------------------------------------
def mass_weighted_gyration_eigenvalues(atomgroup: mda.core.groups.AtomGroup) -> np.ndarray:
    """
    Return eigenvalues (λ1 >= λ2 >= λ3) of the mass-weighted gyration tensor (Å²).
    Compatible with MDAnalysis versions lacking AtomGroup.gyration_tensor().
    """
    positions_angstrom = atomgroup.positions.astype(float)            # shape (N,3)
    masses_amu = atomgroup.masses.astype(float)                       # shape (N,)
    if positions_angstrom.size == 0:
        raise ValueError("AtomGroup is empty.")
    if np.all(masses_amu == 0):
        raise ValueError("Masses are all zero; attach masses before computing shape metrics.")

    # Mass-weighted center-of-mass in PBC (uses masses internally)
    center_of_mass_angstrom = atomgroup.center_of_mass(wrap=True).astype(float)
    centered_positions = positions_angstrom - center_of_mass_angstrom

    # Mass-weighted gyration tensor G = (1/M) Σ m_i r_i r_i^T
    total_mass = float(np.sum(masses_amu))
    weighted_positions = centered_positions * masses_amu[:, None]
    gyration_tensor_A2 = (weighted_positions.T @ centered_positions) / total_mass  # (3,3)

    # Eigenvalues sorted descending
    eigenvalues = np.linalg.eigvalsh(gyration_tensor_A2)
    return eigenvalues[::-1].astype(float)

def shape_metrics_from_eigs(eigvals_desc: np.ndarray) -> dict:
    lam1, lam2, lam3 = map(float, eigvals_desc)
    trace = lam1 + lam2 + lam3
    asphericity_A2 = lam1 - 0.5 * (lam2 + lam3)
    acylindricity_A2 = lam2 - lam3
    kappa2 = 1.0 - 3.0 * ((lam1*lam2 + lam2*lam3 + lam3*lam1) / (trace*trace + 1e-30))
    return {
        "asphericity_A2": asphericity_A2,
        "acylindricity_A2": acylindricity_A2,
        "kappa2": kappa2,
        "trace_A2": trace,
        "lambda1_A2": lam1,
        "lambda2_A2": lam2,
        "lambda3_A2": lam3,
    }

def compute_shape_metrics(universe: mda.Universe) -> tuple[dict, dict]:
    # Global (all atoms, last frame)
    eigvals_global = mass_weighted_gyration_eigenvalues(universe.atoms)
    global_metrics = shape_metrics_from_eigs(eigvals_global)

    # Per-chain over fragments (filter tiny ones)
    per_chain = []
    for fragment in universe.atoms.fragments:
        if fragment.n_atoms < 12:
            continue
        eigvals = mass_weighted_gyration_eigenvalues(fragment)
        per_chain.append(shape_metrics_from_eigs(eigvals))

    summary = {}
    if per_chain:
        keys = per_chain[0].keys()
        for key in keys:
            values = np.array([d[key] for d in per_chain], dtype=float)
            summary[key] = {
                "median": float(np.median(values)),
                "mean": float(np.mean(values)),
                "std": float(np.std(values)),
                "p10": float(np.percentile(values, 10)),
                "p90": float(np.percentile(values, 90)),
                "n_chains": int(len(values)),
            }
    return global_metrics, summary

# ------------------------------------------------------------
# 3) Persistence length from heavy-atom backbone paths (uses bonds)
#    Same graph-based backbone finder you used earlier.
# ------------------------------------------------------------
def _build_adjacency_from_bonds(n_atoms: int, bonds_0based: np.ndarray) -> list[list[int]]:
    adjacency = [[] for _ in range(n_atoms)]
    for ai, aj in bonds_0based:
        adjacency[ai].append(aj)
        adjacency[aj].append(ai)
    return adjacency

def _bfs_path(adjacency: list[list[int]], start: int, goal: int) -> list[int]:
    queue = deque([start]); parent = {start: None}
    while queue:
        v = queue.popleft()
        if v == goal:
            break
        for w in adjacency[v]:
            if w not in parent:
                parent[w] = v
                queue.append(w)
    if goal not in parent:
        return []
    path = []
    cur = goal
    while cur is not None:
        path.append(cur); cur = parent[cur]
    return path[::-1]

def _double_bfs_longest_path(adjacency: list[list[int]], nodes: list[int]) -> list[int]:
    node_set = set(nodes)
    def farthest(x: int) -> tuple[int, dict[int, int | None]]:
        q = deque([x]); parent = {x: None}; last = x
        while q:
            v = q.popleft(); last = v
            for w in adjacency[v]:
                if w not in parent and w in node_set:
                    parent[w] = v; q.append(w)
        return last, parent
    a = nodes[0]
    a, _ = farthest(a)
    b, parent = farthest(a)
    path = []
    cur = b
    while cur is not None:
        path.append(cur); cur = parent[cur]
    path.reverse()
    return [v for v in path if v in node_set]

def build_backbone_paths_heavy_atoms(universe: mda.Universe, min_atoms_per_chain: int = 12) -> list[mda.core.groups.AtomGroup]:
    assert getattr(universe, "bonds", None) is not None and len(universe.bonds) > 0, "Universe has no bonds; attach them first."
    heavy_mask = (universe.atoms.masses > 1.2)
    heavy_indices = set(np.nonzero(heavy_mask)[0].tolist())
    adjacency = _build_adjacency_from_bonds(len(universe.atoms), universe.bonds.to_indices())

    # connected components within heavy subgraph
    unvisited = set(heavy_indices)
    chains = []
    while unvisited:
        seed = next(iter(unvisited))
        component = []
        q = deque([seed]); unvisited.remove(seed)
        while q:
            v = q.popleft(); component.append(v)
            for w in adjacency[v]:
                if w in unvisited:
                    unvisited.remove(w); q.append(w)

        # endpoints in heavy subgraph
        degree = {v: sum((nbr in component) for nbr in adjacency[v]) for v in component}
        endpoints = [v for v in component if degree[v] == 1]
        if len(endpoints) >= 2:
            best = []
            for i in range(len(endpoints)):
                for j in range(i + 1, len(endpoints)):
                    path = _bfs_path(adjacency, endpoints[i], endpoints[j])
                    path = [p for p in path if p in set(component)]
                    if len(path) > len(best):
                        best = path
            path_indices = best
        else:
            path_indices = _double_bfs_longest_path(adjacency, component)

        if len(path_indices) >= min_atoms_per_chain:
            chains.append(universe.atoms[path_indices])
    return chains

def compute_persistence_length_stats(universe: mda.Universe, backbone_chains: list[mda.core.groups.AtomGroup]) -> dict:
    per_chain_lp = []
    for chain_ag in backbone_chains:
        try:
            pl_calc = polymer.PersistenceLength([chain_ag]).run()
            lp_value = float(np.nanmean(pl_calc.results.lp))
            if np.isfinite(lp_value):
                per_chain_lp.append(lp_value)
        except Exception:
            continue
    if not per_chain_lp:
        raise RuntimeError("Persistence length failed for all chains.")
    arr = np.array(per_chain_lp, dtype=float)
    return {
        "median": float(np.median(arr)),
        "mean": float(np.mean(arr)),
        "std": float(np.std(arr)),
        "p10": float(np.percentile(arr, 10)),
        "p90": float(np.percentile(arr, 90)),
        "n_chains": int(len(arr)),
    }


# D_A2ps, D_cm2s = compute_diffusivity(u, selection="all", fit_points=20, time_per_frame_ps=0.2)
backbone_chains = build_backbone_paths_heavy_atoms(u, min_atoms_per_chain=12)
pl_stats = compute_persistence_length_stats(u, backbone_chains)
print(f"Persistence length (Å): median={pl_stats['median']:.2f}, mean={pl_stats['mean']:.2f}, n={pl_stats['n_chains']}")

global_shape, per_chain_shape = compute_shape_metrics(u)
print("Global shape:", global_shape)
if per_chain_shape:
    print("Asphericity per-chain (Å²) median:", per_chain_shape["asphericity_A2"]["median"])

Exception ignored in: <function ReaderBase.__del__ at 0x7f7dbbb70ae0>
Traceback (most recent call last):
  File "/home/jday/anaconda3/envs/torch2.7/lib/python3.12/site-packages/MDAnalysis/coordinates/base.py", line 1531, in __del__
    self.close()
  File "/home/jday/anaconda3/envs/torch2.7/lib/python3.12/site-packages/MDAnalysis/coordinates/DCD.py", line 181, in close
    self._file.close()
    ^^^^^^^^^^
AttributeError: 'DCDReader' object has no attribute '_file'


KeyboardInterrupt: 