In [85]:
import numpy as np
from pydpmd.data import RigidBumpy, load, BaseParticle
from pydpmd.data.bumpy_utils import get_closest_vertex_radius_for_mu_eff
from pydpmd.utils import join_systems, split_systems
from pydpmd.plot import draw_particles_frame
import matplotlib.pyplot as plt
import os

In [2]:
# add sampling domains to the box sampling protocol
# each domain has a particle id attached to it (domain is a convex hull)
# default domain size is the box
# add a launcher that creates a dataset for each system type and records valid positions, angles, domain sizes, and free volume fractions
# resize domain function (scales domain about its center of mass, optionally clipping to box sizes if it is too big)
# if a system has a domain with too high a free volume fraction, it should be expanded, if it is too low, it should be shrunk - then should be re-run
# protocol runs until a desired number of samples are obtained

# make rigid bumpy with core

# get right energy scale

In [75]:
def clamp_domains_to_box(domain_pos, domain_size, box_size, system_id):
    box_repeat = np.concatenate([box_size[sid][None].repeat(domain_size[i], axis=0) for i, sid in enumerate(system_id)])
    return np.clip(domain_pos, np.zeros_like(box_repeat), box_repeat)

def tri_area(a, b, c):
    return 0.5 * ((b[0] - a[0]) * (c[1] - a[1]) - (c[0] - a[0]) * (b[1] - a[1]))

def get_domain_areas(domain_pos, domain_centroid):
    areas = []
    size = domain_pos.shape[0]
    for i in range(0, size):
        areas.append(tri_area(domain_centroid, domain_pos[i], domain_pos[(i + 1) % size]))
    areas = np.array(areas)
    total_area = np.sum(areas)
    return np.cumsum(areas) / total_area, total_area

def build_domains(data: BaseParticle, domain_style: str = "square", pos: np.ndarray = None, domain_particle_id: np.ndarray = None, clamp_to_box: bool = False, domain_kwargs: dict = {}):
    """
    Build polygonal domains, typically centered around particles, typically used for sampling.

    Args:
        data (BaseParticle): The system data.
        domain_style (str): The style of domain to build (square, )
        pos (np.ndarray): The positions of the particles to build domains for.  If not specified, the positions from the data are used.
        domain_particle_id (np.ndarray): The particle ids to build domains for.  If not specified, one is built for each particle.
        clamp_to_box (bool): Whether to clamp the domain to the box.  Set to True for sampling in a walled container.
        domain_kwargs (dict): Additional keyword arguments for the domain.
    """
    if pos is None:
        pos = data.pos
    assert pos.shape == data.pos.shape
    if domain_particle_id is None:  # if not specified, build one for each particle
        domain_particle_id = np.arange(len(pos))

    if domain_style == "square":
        domain_length = domain_kwargs.get("domain_length")
        domain_stencil = np.array([[-1, -1], [1, -1], [1, 1], [-1, 1]]) * domain_length / 2
        domain_pos = np.concatenate([domain_stencil + pos[i] for i in domain_particle_id])
        domain_size = np.ones(domain_particle_id.shape[0]).astype(int) * domain_stencil.shape[0]

    domain_offset = np.concatenate([[0], np.cumsum(domain_size)])
    if clamp_to_box:
        domain_pos = clamp_domains_to_box(domain_pos, domain_size, box_size, system_id)
    domain_centroid = np.array([np.mean(domain_pos[domain_offset[i]:domain_offset[i + 1]], axis=0) for i in range(len(domain_offset) - 1)])

    # divide the domain into triangles, calculate the cumulative fractional area in each, and the total area
    domain_fractional_area, domain_area = zip(*[
        get_domain_areas(
            domain_pos[domain_offset[i]:domain_offset[i + 1]],
            domain_centroid[i]
        ) for i in range(len(domain_offset) - 1)])

    domain_fractional_area = np.concatenate(domain_fractional_area)
    domain_area = np.array(domain_area)

    # add domains to the data
    data.add_array(domain_pos, "domain_pos", ignore_missing_index_space=True)
    data.add_array(domain_centroid, "domain_centroid", ignore_missing_index_space=True)
    data.add_array(domain_offset, "domain_offset", ignore_missing_index_space=True)
    data.add_array(domain_fractional_area, "domain_fractional_area", ignore_missing_index_space=True)
    data.add_array(domain_area, "domain_area", ignore_missing_index_space=True)
    data.add_array(domain_particle_id, "domain_particle_id", ignore_missing_index_space=True)

data_root = f"/home/mmccraw/dev/data/09-09-25/bumpy/0.01/jam"
jam_data = load(data_root, location=["final", "init"])

# build_domains(jam_data, domain_style="square", domain_kwargs={"domain_length": 0.2})
# jam_data.save('test')

In [None]:
min_phi = 0.01
min_phi_offset = 1e-4
n_phi_steps = 10



TypeError: 'NoneType' object is not subscriptable

In [77]:
pe_mask = jam_data.final.pe_total / jam_data.system_size < pe_tol
phi_j = np.max(jam_data.final.packing_fraction[pe_mask])
if phi_j - max_phi_offset < 0.1:
    max_phi_offset = phi_j - 0.1
delta_phi = np.logspace(np.log10(min_phi_offset), np.log10(max_phi_offset), n_phi_steps - 1)
delta_phi = np.concatenate((delta_phi, [phi_j - 0.1]))
phi = phi_j - delta_phi

# Stable catalog of targets and integer TODO indices
delta_phi_all = delta_phi.copy()
todo_idx = np.arange(delta_phi_all.size, dtype=int)

# Output HDF5 path (streaming writes per iteration)
h5_out_path = os.path.join(data_root, "box_sample_result.h5")
if os.path.exists(h5_out_path):
    os.remove(h5_out_path)

# create a block of n_phi_steps systems, each with a different phi value
offset_data = join_systems([rb for _ in range(n_phi_steps)])
offset_data.scale_to_packing_fraction(phi)
offset_data.add_array(delta_phi, 'delta_phi')

# create max_n_dynamics_duplicates / n_phi_steps duplicates of the concatenated dynamics_data
offset_data = join_systems([offset_data for _ in range(max_n_dynamics_duplicates // n_phi_steps)])
offset_data.set_positions(1, rng_seed)  # set random positions
offset_data.save(offset_data_path, locations=["init"])

NameError: name 'rb' is not defined

In [None]:



# create max_n_dynamics_duplicates / n_phi_steps duplicates of the concatenated dynamics_data
offset_data = join_systems([offset_data for _ in range(max_n_dynamics_duplicates // n_phi_steps)])

In [33]:
data = load('test', location=["final", "init"], load_trajectory=True)

Loading trajectory: 100%|██████████| 4/4 [00:00<00:00, 20.27it/s]


In [79]:
data = load('/home/mmccraw/dev/data/09-09-25/box-sample/bumpy/0.01/init', location=['init'], load_trajectory=True)

Loading trajectory: 100%|██████████| 4/4 [00:00<00:00, 12.09it/s]


In [None]:
delta_phi = data.delta_phi.copy()



AttributeError: 'RigidBumpy' object has no attribute 'domain_pos'