# 09. MatterGen 생성 구조 검증

## 목적
MatterGen으로 생성된 구조를 CHGNet으로 검증하고,  
기존 Top 5 산업폐기물 후보와 비교합니다.

---

### 검증 파이프라인

1. **구조 로딩 및 필터링**: 유효한 조성만 선택
2. **CHGNet 에너지 계산**: 생성 구조의 안정성 확인
3. **구조 최적화**: fmax < 0.05 eV/Å까지 이완
4. **Top 5 비교**: 동일 기준으로 점수 산출

## 1. 환경 설정

In [None]:
import sys
from pathlib import Path
import json
import numpy as np
import pandas as pd
from tqdm import tqdm

# 프로젝트 경로
PROJECT_ROOT = Path.cwd().parent.parent
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

MATTERGEN_DIR = PROJECT_ROOT / 'data' / 'mattergen'

print(f"Project Root: {PROJECT_ROOT}")
print(f"MatterGen Output: {MATTERGEN_DIR}")

In [None]:
# GPU 확인
import torch
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

## 2. 생성 구조 로딩

In [None]:
from ase.io import read

def load_cif_structures(phase_dir):
    """디렉토리에서 개별 CIF 파일 로딩"""
    structures = []
    
    if not phase_dir.exists():
        return structures
    
    for cif_file in sorted(phase_dir.glob('*.cif')):
        try:
            atoms = read(cif_file)
            formula = atoms.get_chemical_formula(mode='hill')
            symbols = set(atoms.get_chemical_symbols())
            
            structures.append({
                'name': cif_file.stem,
                'atoms': atoms,
                'formula': formula,
                'elements': symbols,
                'n_atoms': len(atoms),
                'source': phase_dir.name,
                'path': str(cif_file)
            })
        except Exception as e:
            print(f"  Skip {cif_file.name}: {e}")
    
    return structures

print("Load function defined")

In [None]:
# 모든 Phase 구조 로딩
phase_dirs = [
    'phase1_Ca_Si_Al_O',
    'phase1_Ca_Si_Al_Fe_O',
    'phase2_Ca_Si_O',
    'phase2_Ca_Si_Mg_O',
    # Phase 3 제외 - 시멘트와 무관한 조성 생성됨
]

all_structures = []

print("Loading MatterGen structures:")
print("=" * 50)

for phase_name in phase_dirs:
    phase_path = MATTERGEN_DIR / phase_name
    structures = load_cif_structures(phase_path)
    all_structures.extend(structures)
    print(f"  {phase_name}: {len(structures)} structures")

print(f"\nTotal loaded: {len(all_structures)} structures")

## 3. 조성 필터링

시멘트 관련 원소만 포함된 구조 선택:
- 허용 원소: Ca, Si, Al, Fe, Mg, O
- 비허용 원소 (La, Lu, Bi, Na 등) 포함 구조 제외

In [None]:
# 허용 원소 정의
ALLOWED_ELEMENTS = {'Ca', 'Si', 'Al', 'Fe', 'Mg', 'O'}

def is_valid_cement_composition(elements):
    """시멘트 관련 조성인지 확인"""
    # 모든 원소가 허용 목록에 있어야 함
    if not elements.issubset(ALLOWED_ELEMENTS):
        return False
    
    # 최소 Ca와 Si 또는 Ca와 O 포함
    if 'Ca' not in elements:
        return False
    
    if 'Si' not in elements and 'O' not in elements:
        return False
    
    return True

# 필터링
filtered_structures = []
rejected = []

for s in all_structures:
    if is_valid_cement_composition(s['elements']):
        filtered_structures.append(s)
    else:
        rejected.append(s)

print(f"Filtered structures: {len(filtered_structures)}/{len(all_structures)}")
print(f"Rejected: {len(rejected)}")

if rejected:
    print("\nRejected structures (invalid elements):")
    for r in rejected:
        invalid = r['elements'] - ALLOWED_ELEMENTS
        print(f"  - {r['name']}: {r['formula']} (contains: {invalid})")

In [None]:
# 유효 구조 요약
print("\nValid structures by phase:")
print("-" * 50)

from collections import Counter
phase_counts = Counter(s['source'] for s in filtered_structures)

for phase, count in sorted(phase_counts.items()):
    print(f"  {phase}: {count}")

print("\nSample formulas:")
for s in filtered_structures[:5]:
    print(f"  - {s['formula']} ({s['source']})")

## 4. CHGNet 에너지 계산

In [None]:
from chgnet.model import CHGNet
from chgnet.model.dynamics import CHGNetCalculator

# CHGNet 로드
print("Loading CHGNet...")
model = CHGNet.load()
calc = CHGNetCalculator(model)
print("CHGNet loaded successfully")

In [None]:
def evaluate_structure(atoms, calc):
    """CHGNet으로 구조 평가"""
    atoms_copy = atoms.copy()
    atoms_copy.calc = calc
    
    try:
        energy = atoms_copy.get_potential_energy()
        forces = atoms_copy.get_forces()
        
        return {
            'energy': energy,
            'energy_per_atom': energy / len(atoms_copy),
            'max_force': np.abs(forces).max(),
            'mean_force': np.abs(forces).mean(),
            'valid': True
        }
    except Exception as e:
        return {
            'error': str(e),
            'valid': False
        }

print("Evaluation function defined")

In [None]:
# 모든 구조 평가
print("Evaluating structures with CHGNet...")
print("=" * 50)

results = []

for s in tqdm(filtered_structures, desc="Evaluating"):
    eval_result = evaluate_structure(s['atoms'], calc)
    
    result = {
        'name': s['name'],
        'formula': s['formula'],
        'source': s['source'],
        'n_atoms': s['n_atoms'],
        'elements': list(s['elements']),
        **eval_result
    }
    results.append(result)

# 유효한 결과만
valid_results = [r for r in results if r['valid']]
print(f"\nValid evaluations: {len(valid_results)}/{len(results)}")

In [None]:
# 결과 정렬 (energy_per_atom 기준 - 낮을수록 안정)
valid_results.sort(key=lambda x: x['energy_per_atom'])

print("\nTop 10 Most Stable Structures:")
print("-" * 80)
print(f"{'Rank':<5} {'Name':<25} {'Formula':<20} {'E/atom (eV)':<12} {'Max Force'}")
print("-" * 80)

for i, r in enumerate(valid_results[:10], 1):
    print(f"{i:<5} {r['name']:<25} {r['formula']:<20} {r['energy_per_atom']:.4f}      {r['max_force']:.4f}")

## 5. 상위 구조 최적화

In [None]:
from ase.optimize import BFGS

def optimize_structure(atoms, calc, fmax=0.05, max_steps=200):
    """구조 최적화"""
    atoms_opt = atoms.copy()
    atoms_opt.calc = calc
    
    initial_energy = atoms_opt.get_potential_energy()
    
    optimizer = BFGS(atoms_opt, logfile=None)
    
    try:
        converged = optimizer.run(fmax=fmax, steps=max_steps)
    except Exception as e:
        return {
            'success': False,
            'error': str(e)
        }
    
    final_energy = atoms_opt.get_potential_energy()
    final_forces = atoms_opt.get_forces()
    
    return {
        'success': True,
        'atoms': atoms_opt,
        'initial_energy': initial_energy,
        'final_energy': final_energy,
        'energy_change': final_energy - initial_energy,
        'final_max_force': np.abs(final_forces).max(),
        'steps': optimizer.nsteps,
        'converged': converged
    }

print("Optimization function defined")

In [None]:
# 상위 5개 구조 최적화
N_OPTIMIZE = 5
top_candidates = valid_results[:N_OPTIMIZE]

optimized = []

print(f"Optimizing top {N_OPTIMIZE} structures...")
print("=" * 60)

for i, candidate in enumerate(top_candidates, 1):
    print(f"\n[{i}/{N_OPTIMIZE}] {candidate['name']} ({candidate['formula']})")
    
    # 원본 atoms 찾기
    atoms = None
    for s in filtered_structures:
        if s['name'] == candidate['name']:
            atoms = s['atoms'].copy()
            break
    
    if atoms is None:
        print("  [SKIP] Atoms not found")
        continue
    
    # 최적화
    opt_result = optimize_structure(atoms, calc)
    
    if opt_result['success']:
        optimized.append({
            'name': candidate['name'],
            'formula': candidate['formula'],
            'source': candidate['source'],
            'atoms': opt_result['atoms'],
            'initial_E_per_atom': candidate['energy_per_atom'],
            'final_E_per_atom': opt_result['final_energy'] / len(opt_result['atoms']),
            'energy_change': opt_result['energy_change'],
            'converged': opt_result['converged'],
            'steps': opt_result['steps']
        })
        
        print(f"  Initial E/atom: {candidate['energy_per_atom']:.4f} eV")
        print(f"  Final E/atom:   {opt_result['final_energy']/len(opt_result['atoms']):.4f} eV")
        print(f"  Steps: {opt_result['steps']} ({'Converged' if opt_result['converged'] else 'Not converged'})")
    else:
        print(f"  [ERROR] {opt_result['error']}")

print(f"\nOptimized: {len(optimized)} structures")

## 6. 최적화 구조 저장

In [None]:
from ase.io import write

# 저장 디렉토리
optimized_dir = PROJECT_ROOT / 'structures' / 'mattergen_optimized'
optimized_dir.mkdir(parents=True, exist_ok=True)

print("Saving optimized structures...")
for s in optimized:
    cif_path = optimized_dir / f"{s['name']}_opt.cif"
    write(cif_path, s['atoms'])
    print(f"  Saved: {cif_path.name}")

print(f"\nAll saved to: {optimized_dir}")

## 7. 결과 요약

In [None]:
# 결과 DataFrame
df_results = pd.DataFrame([
    {
        'Rank': i+1,
        'Name': s['name'],
        'Formula': s['formula'],
        'Source': s['source'],
        'E/atom (eV)': f"{s['final_E_per_atom']:.4f}",
        'Converged': 'Yes' if s['converged'] else 'No'
    }
    for i, s in enumerate(optimized)
])

print("\n" + "=" * 70)
print("MatterGen Validation Results")
print("=" * 70)
print(f"\nTotal generated: {len(all_structures)}")
print(f"Valid composition: {len(filtered_structures)}")
print(f"Optimized: {len(optimized)}")
print("\nTop Optimized Structures:")
print(df_results.to_string(index=False))

In [None]:
# JSON 저장
def convert_numpy(obj):
    if isinstance(obj, (np.floating, np.float64, np.float32)):
        return float(obj)
    if isinstance(obj, (np.integer, np.int64, np.int32)):
        return int(obj)
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    if isinstance(obj, dict):
        return {k: convert_numpy(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [convert_numpy(i) for i in obj]
    return obj

validation_summary = {
    'total_generated': len(all_structures),
    'valid_composition': len(filtered_structures),
    'rejected_count': len(rejected),
    'optimized_count': len(optimized),
    'all_results': [
        {k: v for k, v in r.items() if k != 'atoms'}
        for r in valid_results
    ],
    'optimized_structures': [
        {k: v for k, v in s.items() if k != 'atoms'}
        for s in optimized
    ]
}

validation_summary = convert_numpy(validation_summary)

results_path = PROJECT_ROOT / 'data' / 'results' / 'mattergen_validation.json'
results_path.parent.mkdir(exist_ok=True)

with open(results_path, 'w') as f:
    json.dump(validation_summary, f, indent=2)

print(f"Results saved: {results_path}")

---

## 다음 단계

검증된 구조로 다음 분석을 수행할 수 있습니다:

1. **수화 시뮬레이션**: 물 분자 추가 후 MD 실행
2. **C-S-H 형성 분석**: 기존 메트릭 적용
3. **기존 Top 5와 비교**: 동일 조건에서 점수 산출