# Examples

## Conversion ase <> cfg files

In [None]:
from pathlib import Path
from src.utils import atoms2cfg, cfg2atoms # Call the code installed.
# from utils import atoms2cfg, cfg2atoms # development
from ase.io import read

In [None]:
current = Path().cwd()
outcar_path = current.parents[1] / "mlip-2/test/examples/00.convert_vasp_outcar/OUTCAR"
traj = read(outcar_path, index=":")
file = "./example/test.cfg"
for i,atoms in enumerate(traj):
    append = False if i==0 else True
    atoms2cfg(atoms, file=file, append=append)
    

In [None]:
a = list(cfg2atoms(file="./example/test.cfg"))
b = list(cfg2atoms(file="./example/executable.cfg", symbols={1: "Ag"}))

In [None]:
a

In [None]:
b

## Training

In [None]:
from src.training import MtpTraining # Call the code installed.
from pathlib import Path
# from training import MtpTraining # development

In [None]:
current = Path().cwd()
repo_path = current.parents[
    next(
        i
        for i in range(len(current.parents))
        if current.parents[i].name == "mtp4py"
    )
]
mtp_path = repo_path.parent / "mlip-2"

setting_path = mtp_path / "test/examples/01.train/06.mtp"
exe_path = mtp_path / "bin/mlp"
cfg_path = mtp_path / "test/examples/01.train/train.cfg"
dataset_name = cfg_path.name.split(".")[0]
output_path = repo_path / f"src/example/pot_{dataset_name}.almtp"

mtp = MtpTraining(setting_path, exe_path, cfg_path, output_path)
mtp.run()

## Infer

In [None]:
from src.training import MtpTraining # Call the code installed.
# from ase_calculator_mtp import AseCalculatorMtp # development
from ase.optimize import FIRE


In [None]:
calc = AseCalculatorMtp(output_path)
cfgs = list(cfg2atoms(cfg_path))
atoms = cfgs[0].copy()
atoms.calc = calc
with FIRE(atoms, "./example/test_traj.traj") as fire:
    fire.run()


# ase db

In [None]:
from pathlib import Path
from ase.io import read
from ase.build import sort
from ase.calculators.emt import EMT
from ase.optimize.fire import FIRE
import numpy as np
from ase.db import connect
from ase import Atom

if Path('./test_ase_db.db').exists():
    Path('./test_ase_db.db').unlink()
db = connect('test_ase_db.db')

def save_to_db(atoms, opt_id):
    # 構造とエネルギーをデータベースに保存
    fmax = np.max(np.linalg.norm(atoms.get_forces(), axis=1))
    db.write(atoms, data={"opt_id":opt_id})

from ase.calculators.lammpslib import LAMMPSlib
lammps_command = [
    "pair_style meam",
    f"pair_coeff * * ./library.meam Pt Fe ./PtFe.meam Pt Fe",
    f"mass 1 {Atom('Pt').mass}",
    f"mass 2 {Atom('Fe').mass}",
]

calc = LAMMPSlib(lmpcmds=lammps_command, atoms_types={"Fe": 2, "Pt": 1}, keep_alive=True)

def opt(FePt, opt_id):
    # for lc in [1.1, 1, 0.9]:
    for lc in [1]:
        FePt = sort(FePt)
        sc = FePt.get_scaled_positions()
        FePt.cell *= lc
        FePt.set_scaled_positions(sc)
        FePt.positions += np.random.normal(0, 0.4, 3*len(FePt)).reshape(-1, 3)

        FePt.calc = calc
        opt = FIRE(FePt)
        opt.attach(save_to_db, interval=1, atoms=FePt, opt_id=opt_id)
        opt.run(fmax=0.05)

FePt = read("./FePt.cif")
FePt = sort(FePt)


FePt_rpt = FePt.repeat([2,2,2])
#FePt_rpt.symbols[FePt_rpt.symbols=="Fe"] = "Pt"
FePt_rpt = sort(FePt_rpt)
print(FePt_rpt.symbols)
opt(FePt_rpt, 1)

FePt_rpt = FePt.repeat([2,2,2])
ids = np.random.choice(np.where(FePt_rpt.symbols == "Pt")[0], 3)
FePt_rpt.symbols[ids] = "Fe"
opt(FePt_rpt, 3)

FePt_rpt = FePt.repeat([2,2,2])
ids = np.random.choice(np.where(FePt_rpt.symbols == "Fe")[0], 3)
FePt_rpt.symbols[ids] = "Pt"
print(FePt_rpt)
opt(FePt_rpt, 4)


In [None]:
def get_atoms_data():
    for row in db.select("fmax<1"):
        yield row.toatoms()

In [None]:
atoms_list = list(get_atoms_data())
import importlib
import src.lj_optimizer as ljop
importlib.reload(ljop)


init_eps_dict = {"Pt":1, "Fe" : 1}
init_sigma_dict = {"Pt":3, "Fe" : 2.3}
#lj = ljop.LJTrainerLB(init_eps_dict, init_sigma_dict)
lj = ljop.LJTrainer(None, None)
eps, sig, chempot = lj.run_minimize(minimized_by="ef", labelled_list_atoms=atoms_list, maxiter=10, save_to_file="./washoi.yml", e_weight=0.1)
eps, sig, chempot

In [None]:
atoms_list = list(get_atoms_data())
from src.ase_calculator_mtp import LammpsLJBuilder
from plotly import graph_objects as go
x= [atoms.get_potential_energy() for atoms in atoms_list]
init_eps_dict = {"Pt":1.3, "Fe" : 1.8}
init_sigma_dict = {"Pt":2.5, "Fe" : 2.0}
# init_eps_dict=eps
# init_sigma_dict=sig

def get_lj_engs():
    for atoms in atoms_list:
        _atoms = atoms.copy()
        _atoms.calc = LammpsLJBuilder().get_calculator_from_file(path_to_yaml="./washoi.yml")
        eng = _atoms.get_potential_energy()
        yield eng
y= [i for i in get_lj_engs()]

x = np.array(x)
y = np.array(y)
print(len(x))
fig = go.Figure()
fig.add_trace(
    go.Scatter(x=x, y=y)
)


In [None]:
atoms_list = list(get_atoms_data())
from plotly import graph_objects as go
x= [atoms.get_forces() for atoms in atoms_list]
init_eps_dict = {"Pt":2, "Fe" : 1.5}
init_sigma_dict = {"Pt":2.8, "Fe" : 2.1}
# init_eps_dict=eps
# init_sigma_dict=sig

def get_lj_engs():
    for atoms in atoms_list:
        _atoms = atoms.copy()
        _atoms.calc = LammpsLJBuilder().get_calculator_from_file(path_to_yaml="./washoi.yml")
        eng = _atoms.get_forces()
        yield eng
y= [i for i in get_lj_engs()]

x = np.array(x).reshape(1, -1)[0]
y = np.array(y).reshape(1, -1)[0]

fig = go.Figure()
fig.add_trace(
    go.Scatter(x=x[abs(y) < 1], y=y[abs(y) < 1])
)


In [None]:
np.array([atoms.get_forces() for atoms in atoms_list]).reshape(1, -1)[0]

In [None]:
from sqlalchemy import create_engine, Column, Integer, Float, String, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import select

# Declarative Base の作成
Base = declarative_base()

# テーブル定義をクラスを使って行う
class System(Base):
    __tablename__ = 'systems'
    
    id = Column(Integer, primary_key=True)
    calculator = Column(String)
    fmax = Column(Float)
    # 他のカラムも定義できますが、すべてのカラムが含まれる前提で進めます

# データベースエンジンの作成
engine = create_engine('sqlite:///test_ase_db.db')
Session = sessionmaker(bind=engine)
session = Session()

# fmaxの上位2位、下位2位のデータを取得
subquery_asc = session.query(System.id, System.calculator, System.fmax, 
                             func.row_number().over(
                                 partition_by=System.calculator, 
                                 order_by=System.fmax.asc()).label('rank_asc')).subquery()

subquery_desc = session.query(System.id, System.calculator, System.fmax, 
                              func.row_number().over(
                                  partition_by=System.calculator, 
                                  order_by=System.fmax.desc()).label('rank_desc')).subquery()

# 全カラムを取得しつつ、上位2位、下位2位を取得
query = session.query(System.__table__).filter(
    subquery_asc.c.rank_asc <= 2
).union(
    session.query(System.__table__).filter(
        subquery_desc.c.rank_desc <= 2
    )
).order_by(System.calculator, System.fmax)

# クエリの実行
results = query.all()

# 結果の表示
for row in results:
    print(f"ID: {row.id}, Calculator: {row.calculator}, Fmax: {row.fmax}")

In [None]:

cursor.execute("PRAGMA table_info(systems);")
columns = cursor.fetchall()
for col in columns:
    print(col[1])

In [None]:
import struct
from ase import Atoms
# バイナリデータを整数リストに変換
binary_data = b'\x08\x00\x00\x00\x00\x00\x00\x00'
num_values = len(binary_data) // 4  # assuming each value is 4 bytes (e.g., int32)
values = struct.unpack(f'{num_values}I', binary_data)  # 'I' for unsigned int
print(values)

In [None]:
from src import libsetter
libsetter.data