In [1]:
model_version = "v7.0.0"
calc_mode = "crystal_u0_plus_d3"

## 1. Initial structures

In [4]:
import numpy as np
from pathlib import Path
from ase.io import read
from pfcc_extras.structure.ase_rdkit_converter import atoms_to_smiles
from pfcc_extras.liquidgenerator.liquid_generator import LiquidGenerator
from density import estimate_density
from IPython.display import clear_output

def make_polyIL_structure(mol_type, n_mol):
    assert mol_type in ["monomer", "dimer", "trimer", "polymer", "polymer_x7"]

    monomer = read(f"assets/monomer.xyz")
    polymer = read(f"assets/{mol_type}.xyz")
    anion = read("assets/PF6.xyz")
    if mol_type == "monomer":
        n_anion = n_mol
    elif mol_type == "dimer":
        n_anion = n_mol * 2
    elif mol_type == "trimer":
        n_anion = n_mol * 3
    elif mol_type == "polymer":
        n_anion = n_mol * 5
    else:
        n_anion = n_mol * 7
    
    polymer.positions -= polymer.get_center_of_mass()
    anion.positions -= anion.get_center_of_mass()
    
    # Estimate density for the polymer based on its SMILES conversion.
    density = estimate_density(atoms_to_smiles(monomer))
    
    # Create a mixture of polymer frames and anions.
    composition = [polymer] * n_mol + [anion] * n_anion
    
    # Generate a bulk random structure using the LiquidGenerator.
    liquid_generator = LiquidGenerator(engine="torch", composition=composition, density=density, cubic=True)
    atoms = liquid_generator.run(epochs=100)
    clear_output()
    return atoms


# Create a directory to store the generated initial structures.
structure_dir = Path("structures")
structure_dir.mkdir(parents=True, exist_ok=True)

initial_structures = []

for n_mol in [6, 12]:    
    filename = f"monomer_{n_mol}.xyz"
    filepath = structure_dir / filename
    if not filepath.is_file():
        atoms = make_polyIL_structure("monomer", n_mol)
        atoms.write(filepath)
    else:
        atoms = read(filepath)
    initial_structures.append(atoms)

for n_mol in [3, 6]:    
    filename = f"dimer_{n_mol}.xyz"
    filepath = structure_dir / filename
    if not filepath.is_file():
        atoms = make_polyIL_structure("dimer", n_mol)
        atoms.write(filepath)
    else:
        atoms = read(filepath)
    initial_structures.append(atoms)

for n_mol in [2, 3]:    
    filename = f"trimer_{n_mol}.xyz"
    filepath = structure_dir / filename
    if not filepath.is_file():
        atoms = make_polyIL_structure("trimer", n_mol)
        atoms.write(filepath)
    else:
        atoms = read(filepath)
    initial_structures.append(atoms)

for n_mol in [2, 3]:    
    filename = f"polymer_{n_mol}.xyz"
    filepath = structure_dir / filename
    if not filepath.is_file():
        atoms = make_polyIL_structure("polymer", n_mol)
        atoms.write(filepath)
    else:
        atoms = read(filepath)
    initial_structures.append(atoms)

## 2. Initial dataset

In [None]:
import numpy as np
from pathlib import Path
from h5py import File
from tqdm.auto import tqdm
from concurrent.futures import as_completed, ThreadPoolExecutor

from pfp_api_client import Estimator, ASECalculator
from light_pfp_data.utils.dataset import H5DatasetWriter
from light_pfp_data.sample.crystal import sample_md, sample_rattle


# Create folder for the initial dataset
init_dataset_dir = Path("init_dataset")
init_dataset_dir.mkdir(parents=True, exist_ok=True)

# Define the initial dataset file
initial_dataset_file = init_dataset_dir / "init.h5"

if initial_dataset_file.exists():
    print(f"Dataset file {initial_dataset_file} already exists. Skipping dataset generation.")
    dataset = H5DatasetWriter(File(initial_dataset_file, "r+"))
else:
    print(f"Creating dataset file {initial_dataset_file} and starting sampling.")
    dataset = H5DatasetWriter(initial_dataset_file)

    # Initialize the PFP estimator and calculator
    estimator = Estimator(model_version=model_version, calc_mode=calc_mode)
    calc = ASECalculator(estimator)

    # List to store our future tasks
    futures = []
    pbar = tqdm(desc="Total progress", total=0, leave=True)

    # Use ThreadPoolExecutor for multithreading sampling tasks
    with ThreadPoolExecutor(max_workers=16) as executor:
        for atoms in initial_structures:
            futures += sample_md(
                input_structure=atoms,
                calculator=calc,
                dataset=dataset,
                supercell=(1, 1, 1),
                sampling_temp=[500.0, 1000.0, 1500.0],
                sampling_steps=[5000, 5000, 5000],
                sampling_interval=[100, 100, 100],
                ensemble="nvt",
                executor=executor,
                pbar=pbar
            )
            futures += sample_md(
                input_structure=atoms,
                calculator=calc,
                dataset=dataset,
                supercell=(1, 1, 1),
                sampling_temp=[400.0, 500.0, 600.0],
                sampling_pressure=[1.0, 1.0, 1.0],
                sampling_steps=[5000, 5000, 5000],
                sampling_interval=[100, 100, 100],
                ensemble="npt",
                executor=executor,
                pbar=pbar
            )
            futures += sample_rattle(
                input_structure=atoms,
                calculator=calc,
                dataset=dataset,
                stdev=0.1,
                n_sample=10,
                supercell=(1, 1, 1)
            )
            futures += sample_rattle(
                input_structure=atoms,
                calculator=calc,
                dataset=dataset,
                stdev=0.15,
                n_sample=10,
                supercell=(1, 1, 1)
            )

    for f in as_completed(futures):
        _ = f.result()

dataset.h5.close()

Creating dataset file init_dataset/init.h5 and starting sampling.


Total progress: 0it [00:00, ?it/s]

## 3. Active learning

In [None]:
import logging
from light_pfp_autogen.active_learning import ActiveLearning
from light_pfp_autogen.config import ActiveLearningConfig, TrainConfig, SampleConfig, CommonConfig, MTPConfig

logging.basicConfig(level=logging.INFO)

# Set hyperparameters for the active learning task
active_learning_config = ActiveLearningConfig(
    task_name="polyIL_diffusion",
    pfp_model_version=model_version,
    pfp_calc_mode=calc_mode,
    init_dataset=["init_dataset/init.h5"],
    work_dir="./autogen_workdir",
    training_time=0.5,
    train_config=TrainConfig(
        common_config=CommonConfig(max_forces=50.0, max_energy=5.0),
        mtp_config=MTPConfig(pretrained_model="ORGANIC_SMALL_NN")
    ),
    sample_config=SampleConfig(
        dE_min_coef=3.0,
        dE_max_coef=20.0,
        dF_min_coef=10.0,
        dF_max=50.0,
        dS_min_coef=3.0,
        dS_max_coef=20.0,
        pfp_fallback_samples=5
    )
)

# Initialize the active learning task
active_learning = ActiveLearning(active_learning_config)

# Start the initial training and active learning process
active_learning.initialize()

In [9]:
import numpy as np
from ase import units
from ase.md.langevin import Langevin
from ase.md.npt import NPT
from ase.md.velocitydistribution import MaxwellBoltzmannDistribution
from IPython.display import clear_output
from light_pfp_autogen.context import DataCollectionContext


# Define the MD simulation protocol for active learning iterations tailored to polyIL diffusion task.
def active_learning_protocol(size, steps):
    temperature = np.random.uniform(300, 700)  # K
    atoms = make_polyIL_structure("polymer_x7", 4)
    
    print(f"Running MD for polyIL with size={len(atoms)}, temperature={temperature:.1f} K")
    MaxwellBoltzmannDistribution(atoms, temperature_K=temperature)
    
    # First stage: Short NVT MD using Langevin thermostat for equilibration.
    md_nvt = Langevin(atoms, units.fs, temperature_K=temperature, friction=0.1)
    with DataCollectionContext(md=md_nvt, interval=100, max_samples=20):
        md_nvt.run(steps=5000)  # short equilibration run
    
    # Second stage: Longer NPT MD to generate diverse training data.
    md_npt = NPT(
        atoms,
        units.fs,
        temperature_K=temperature,
        externalstress=1 * units.bar,
        ttime=20.0 * units.fs,
        pfactor=2e6 * units.GPa * (units.fs**2)
    )
    with DataCollectionContext(md=md_npt, interval=100, max_samples=steps // 100 // 2):
        md_npt.run(steps=steps)
    
    clear_output(wait=True)


for i in range(active_learning.iter, 10):
    print(f"Current active learning iteration: {i} (small structures)")
    for _ in range(5):
        active_learning_protocol(size="small", steps=50000)
    active_learning.update()

active_learning.print_md_statistics()

  iter    n_items    n_collected    n_context    n_error    n_max_sample     E_error    F_error      S_error
------  ---------  -------------  -----------  ---------  --------------  ----------  ---------  -----------
     0       1360           1360           10          0              10  0.012811      1.57252  0.00129418
     1       2222            712           10          0               6  0.00291231    1.21913  0.00067769
     2       2628            407           10          0               5  0.00129946    1.00878  0.000612832
     3       2405            705           10          0               5  0.00135669    1.10198  0.000762934
     4       1975           1147           10          0               9  0.0032764     1.40298  0.000603046
     5       1974           1156           10          0               9  0.00416708    1.41927  0.000730803
     6       2473            758           10          0               6  0.00634993    1.18043  0.000807921
     7       2441    

## 4. Post training

In [None]:
from light_pfp_autogen.utils import submit_training_job, check_training_job_status, estimate_epoch

epoch = estimate_epoch(active_learning.datasets_list, 2)
train_config_dict = {
    "common_config": {
        "total_epoch": epoch,
        "max_forces": 50.0
    },
    "mtp_config": {
        "pretrained_model": "ORGANIC_SMALL_NN"
    },
}

training_config = TrainConfig.from_dict(
    train_config_dict
)

model_id = submit_training_job(
    training_config,
    active_learning.datasets_list,
    "polyIL_diffusion_final",
)

status = check_training_job_status(model_id)
print(f"Training job {model_id} status: {status}")

## 5. PFP validation run

In [None]:
import numpy as np
from ase import units
from ase.io import read, Trajectory
from ase.md.velocitydistribution import MaxwellBoltzmannDistribution
from ase.md.langevin import Langevin
from ase.md.npt import NPT


def md_protocol(atoms, temperature, steps, traj):
    print(f"Running MD for polyIL with size={len(atoms)}, temperature={temperature:.1f} K")
    MaxwellBoltzmannDistribution(atoms, temperature_K=temperature)

    traj = Trajectory(traj, "w", atoms=atoms)
    # First stage: Short NVT MD using Langevin thermostat for equilibration.
    md_nvt = Langevin(atoms, units.fs, temperature_K=temperature, friction=0.1)
    md_nvt.attach(traj.write, interval=100)
    md_nvt.run(steps=5000)  # short equilibration run
    
    # Second stage: Longer NPT MD to generate diverse training data.
    md_npt = NPT(
        atoms,
        units.fs,
        temperature_K=temperature,
        mask=np.eye(3),
        externalstress=1 * units.bar,
        ttime=20.0 * units.fs,
        pfactor=2e6 * units.GPa * (units.fs**2)
    )
    md_npt.attach(traj.write, interval=100)
    md_npt.run(steps=steps)

In [None]:
from pathlib import Path

md_dir = Path("pfp_md")
md_dir.mkdir(exist_ok=True)

def md_wrap(t):
    calc = ASECalculator(Estimator(model_version=model_version, calc_mode=calc_mode))
    atoms = read("assets/md_init.xyz")
    atoms.calc = calc
    traj = md_dir / f"md_{t}.traj"
    md_protocol(atoms, t, 50000, traj)

In [None]:
from joblib import Parallel, delayed

Parallel(n_jobs=3)(delayed(md_wrap)(t) for t in [400, 500, 600])

## 6. LightPFP validation run

In [None]:
import numpy as np
from ase import units
from ase.io import read, Trajectory
from ase.md.velocitydistribution import MaxwellBoltzmannDistribution
from ase.md.langevin import Langevin
from ase.md.npt import NPT


def md_protocol(atoms, temperature, steps, traj):
    print(f"Running MD for polyIL with size={len(atoms)}, temperature={temperature:.1f} K")
    MaxwellBoltzmannDistribution(atoms, temperature_K=temperature)

    traj = Trajectory(traj, "w", atoms=atoms)
    # First stage: Short NVT MD using Langevin thermostat for equilibration.
    md_nvt = Langevin(atoms, units.fs, temperature_K=temperature, friction=0.1)
    md_nvt.attach(traj.write, interval=100)
    md_nvt.run(steps=5000)  # short equilibration run
    
    # Second stage: Longer NPT MD to generate diverse training data.
    md_npt = NPT(
        atoms,
        units.fs,
        temperature_K=temperature,
        mask=np.eye(3),
        externalstress=1 * units.bar,
        ttime=20.0 * units.fs,
        pfactor=2e6 * units.GPa * (units.fs**2)
    )
    md_npt.attach(traj.write, interval=100)
    md_npt.run(steps=steps)

In [None]:
from light_pfp_client import Estimator, ASECalculator

calc = ASECalculator(Estimator(model_id = model_id))

In [None]:
from pathlib import Path

md_dir = Path("light_pfp_md")
md_dir.mkdir(exist_ok=True)

for t in [400, 500, 600]:
    atoms = read("assets/md_init.xyz")
    atoms.calc = calc
    traj = md_dir / f"md_{t}.traj"
    md_protocol(atoms, t, 50000, traj)

### 6.1 Analysis of results
#### A. Density

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from ase import units
from ase.io import Trajectory

def get_density(atoms):
    return atoms.get_masses().sum() / units.kg / atoms.get_volume() * 1e27

def get_density_traj(traj, last_n_frames=100):
    return np.mean([get_density(atoms) for atoms in traj[-last_n_frames:]])

In [None]:
pfp_density_400 = [get_density(atoms) for atoms in Trajectory("pfp_md/md_400.traj")]
pfp_density_500 = [get_density(atoms) for atoms in Trajectory("pfp_md/md_500.traj")]
pfp_density_600 = [get_density(atoms) for atoms in Trajectory("pfp_md/md_600.traj")]
lpfp_density_400 = [get_density(atoms) for atoms in Trajectory("light_pfp_md/md_400.traj")]
lpfp_density_500 = [get_density(atoms) for atoms in Trajectory("light_pfp_md/md_500.traj")]
lpfp_density_600 = [get_density(atoms) for atoms in Trajectory("light_pfp_md/md_600.traj")]

In [None]:
plt.plot(np.arange(len(pfp_density_400))*0.1, pfp_density_400, label="PFP 400K", c="r")
plt.plot(np.arange(len(lpfp_density_400))*0.1, lpfp_density_400, label="LightPFP 400K", c="r", linestyle="--")
plt.plot(np.arange(len(pfp_density_500))*0.1, pfp_density_500, label="PFP 500K", c="b")
plt.plot(np.arange(len(lpfp_density_500))*0.1, lpfp_density_500, label="LightPFP 500K", c="b", linestyle="--")
plt.plot(np.arange(len(pfp_density_600))*0.1, pfp_density_600, label="PFP 600K", c="g")
plt.plot(np.arange(len(lpfp_density_600))*0.1, lpfp_density_600, label="LightPFP 600K", c="g", linestyle="--")
plt.xlabel("time (ps")
plt.ylabel("density (g/cm^3)")
plt.legend()
plt.savefig("density.png")

#### B. RDF

In [None]:
from light_pfp_evaluate.md import plot_rdf
from ase.io import Trajectory

for t in [400, 500, 600]:
    plot_rdf(
        [t],
        [Trajectory(f"pfp_md/md_{t}.traj")[-100:]],
        [Trajectory(f"light_pfp_md/md_{t}.traj")[-100:]],
        f"rdf_{t}.png"
    )

#### C. MSD

In [None]:
import numpy as np

def get_msd(traj):
    numbers = traj[0].get_atomic_numbers()
    ind = (numbers==6) | (numbers==7) | (numbers==1)
    pos = np.array([atoms[numbers==15].get_positions() for atoms in traj])
    msd = [np.mean(np.sum((pos[i+1:] - pos[:-(i+1)])**2, axis=2)) for i in range(len(pos)-1)]
    return msd


In [None]:
pfp_msd_400 = get_msd(Trajectory("pfp_md/md_400.traj")[200:])
pfp_msd_500 = get_msd(Trajectory("pfp_md/md_500.traj")[200:])
pfp_msd_600 = get_msd(Trajectory("pfp_md/md_600.traj")[200:])
lpfp_msd_400 = get_msd(Trajectory("light_pfp_md/md_400.traj")[200:])
lpfp_msd_500 = get_msd(Trajectory("light_pfp_md/md_500.traj")[200:])
lpfp_msd_600 = get_msd(Trajectory("light_pfp_md/md_600.traj")[200:])

In [None]:
plt.plot(np.arange(len(pfp_msd_400))*0.1, pfp_msd_400, label="PFP 400K", c="r")
plt.plot(np.arange(len(lpfp_msd_400))*0.1, lpfp_msd_400, label="LightPFP 400K", c="r", linestyle="--")
plt.plot(np.arange(len(pfp_msd_500))*0.1, pfp_msd_500, label="PFP 500K", c="b")
plt.plot(np.arange(len(lpfp_msd_500))*0.1, lpfp_msd_500, label="LightPFP 500K", c="b", linestyle="--")
plt.plot(np.arange(len(pfp_msd_600))*0.1, pfp_msd_600, label="PFP 600K", c="g")
plt.plot(np.arange(len(lpfp_msd_600))*0.1, lpfp_msd_600, label="LightPFP 600K", c="g", linestyle="--")
plt.xlabel("time (ps)")
plt.ylabel("msd")
plt.legend()
plt.savefig("msd.png")

#### D. Diffusion active energy

In [None]:
def get_diffusion_coef(msd, time_interval):
    time = np.arange(len(msd)) * time_interval
    D = np.polyfit(time, msd, 1)[0] / 6 *1e-5 # cm^2/s
    return D

In [None]:
pfp_d_400 = get_diffusion_coef(pfp_msd_400, 100)
pfp_d_500 = get_diffusion_coef(pfp_msd_500, 100)
pfp_d_600 = get_diffusion_coef(pfp_msd_600, 100)
lpfp_d_400 = get_diffusion_coef(lpfp_msd_400, 100)
lpfp_d_500 = get_diffusion_coef(lpfp_msd_500, 100)
lpfp_d_600 = get_diffusion_coef(lpfp_msd_600, 100)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import linregress

R = 8.314
temperatures = np.array([400, 500, 600])
diff_coeffs_pfp = np.array([pfp_d_400, pfp_d_500, pfp_d_600])
diff_coeffs_lpfp = np.array([lpfp_d_400, lpfp_d_500, lpfp_d_600])
inv_temp = 1 / temperatures
ln_diff_pfp = np.log(diff_coeffs_pfp)
ln_diff_lpfp = np.log(diff_coeffs_lpfp)
activation_energy_pfp = -linregress(inv_temp, ln_diff_pfp)[0] * R / 1000
activation_energy_lpfp = -linregress(inv_temp, ln_diff_lpfp)[0] * R / 1000
plt.plot(inv_temp, ln_diff_pfp, marker="o", label=f'PFP {activation_energy_pfp:4.2f}kJ/mol')
plt.plot(inv_temp, ln_diff_lpfp, marker="o", label=f'LightPFP {activation_energy_lpfp:4.2f}kJ/mol')

# Annotate plot
plt.xlabel('1/T (K⁻¹)')
plt.ylabel('ln(D) (ln(m²/s))')
plt.title('Arrhenius Plot')
plt.legend()
plt.grid(True)
    
plt.tight_layout()
plt.savefig(eval_dir/"arrhenius.png")