In [None]:
import numpy as np
import os
import subprocess
import time
import re
import random

from llmize import OPRO, HLMEA, HLMSA
import llmize

from dotenv import load_dotenv
load_dotenv()

In [2]:
KEYS = ["FUE1_enr", "FUE2_enr", "FUE3_enr", "FUE4_enr", "FUE5_enr", "FUE6_enr", "FUE7_enr",
                    "FUE8_enr", "FUE8_gads", "FUE9_enr", "FUE9_gads", "FUE10_enr", "FUE10_gads", "FUE11_enr", "FUE11_gads"]
TEMPLATE_FILE = "GE14.DOM.BOC.inp"
FOLDER = "cas5_runs"

In [3]:
# check if folder exists
if os.path.exists(FOLDER):
    # remove all files in folder
    for file in os.listdir(FOLDER):
        os.remove(os.path.join(FOLDER, file))
else:
    os.makedirs(FOLDER)


In [4]:
with open(TEMPLATE_FILE, 'r') as file:
    template_content = file.readlines()

In [5]:
def read_cas5_out(c5_out_filepath):
    output_dict = {}
    
    with open(c5_out_filepath, 'r') as file:
        lines = file.readlines()
    
    # Locate the summary section
    summary_index = None
    for i, line in enumerate(lines):
        if "** C A S M O 5  Summary **" in line:
            summary_index = i
            break
    
    if summary_index is None:
        print("Warning: CASMO5 summary section not found in file.")
        return None
    
    # Find the column names starting with "No"
    column_names = []
    data_start_index = None
    for i in range(summary_index, len(lines)):
        if lines[i].strip().startswith("No"):
            column_names = re.split(r'\s+', lines[i].strip())
            column_names = [col for col in column_names if "Pu" not in col]  # Remove columns containing "Pu"
            data_start_index = i + 1
            break
    
    i = 0
    for col in column_names:
        if col == 'Pin':
            column_names[i] = 'Pin Peak'
        if col == 'Fiss' or col == 'Tot':
            column_names[i] = col + ' Pu'
        i = i + 1
    
    if not column_names or data_start_index is None:
        print("Warning: Column header line not found after CASMO5 summary section.")
        return None
    
    # Process the data
    last_values = {}
    first_entry_found = False
    index = 0
    for i in range(data_start_index, len(lines)):
        line = lines[i].strip()
        if not line:
            continue
        
        parts = re.split(r'\s+', line)
        
        # First entry should be an integer (record index number)
        prev_index = index
        try:
            index = int(parts[0])
            if index<prev_index:
                break

        except ValueError:
            continue
        
        full_data = {}
        
        # First occurrence of No=1 contains all values
        if index == 1 and not first_entry_found:
            for j, col in enumerate(column_names):
                full_data[col] = parts[j] if j < len(parts) else ''
                last_values[col] = full_data[col]  # Store as last valid values
            first_entry_found = True
        else:
            # Fill missing values with previous ones
            for j, col in enumerate(column_names):
                if j < len(parts) and parts[j]:
                    full_data[col] = parts[j]
                    last_values[col] = parts[j]  # Store last valid value
                else:
                    full_data[col] = last_values.get(col, '')  # Use last recorded value if missing
        
        output_dict[index] = full_data
    
    return output_dict

In [6]:
def parse_lattice_data(solution_list):
    lattice_data = {}
    for i, data in enumerate(solution_list):
        key = KEYS[i]
        lattice_data[key] = float(data)
    return lattice_data


def gen_lattice_inp(template_content, folder_name, lattice_name, lattice_data):
    """
    Function to generate a perturbed lattice input file based on pre-generated lattice data.

    Parameters:
        template_content (list): List of strings representing template file lines.
        folder_name (str): Directory where the new lattice file will be saved.
        lattice_name (str): Name of the lattice.
        lattice_data (dict): Dictionary containing pre-generated perturbation data.

    Returns:
        str: Path to the generated lattice input file.
    """
    # Create the output file path
    output_file = os.path.join(folder_name, f"{lattice_name}.BOC.inp")

    # Apply perturbations based on the provided lattice data
    perturbed_content = []
    for line in template_content:
        if line.startswith("FUE"):
            parts = line.split()
            fue_num = int(parts[1])

            # Apply enrichment perturbation if available
            if f"FUE{fue_num}_enr" in lattice_data:
                parts[4] = f"{lattice_data[f'FUE{fue_num}_enr']:.1f}"

            # Apply gads perturbation if available
            if f"FUE{fue_num}_gads" in lattice_data:
                parts[-1] = f"64016={lattice_data[f'FUE{fue_num}_gads']:.1f}"
            
            line = " ".join(parts) + "\n"

        elif line.startswith("IDE"):
            line = f"IDE='{lattice_name}'\n"

        elif line.startswith("SIM"):
            line = f"SIM '{lattice_name}'\n"

        perturbed_content.append(line)
        
    # Write the perturbed content to the new file
    with open(output_file, 'w') as file:
        file.writelines(perturbed_content)
    #print(f"Written perturbed input file: {output_file}")

    return output_file
    

def eval_cas5_boc(file_path):

    folder_name, file_name = os.path.split(file_path)
    log_file = file_name[:-3] + 'log'

    if os.path.exists(folder_name) and os.path.exists(os.path.join(folder_name, file_name)):
        #print(f"Running simulation for {file_path} in background...")
        #print("current directory", os.getcwd())
        os.chdir(folder_name)

        with open(log_file, "w") as log:
            process = subprocess.Popen(["cas5", file_name], stdin=subprocess.PIPE, stdout=log, stderr=log)
            process.stdin.write(b"y\n")
            process.stdin.close()

        time.sleep(5)
        # Wait until no files starting with "tmp" exist
        while any(f.startswith('tmp.'+file_name[:-3]) for f in os.listdir('.')):
            #print(f"Waiting for temporary files to be cleared for {file_path}...")
            time.sleep(1)

        os.chdir("..")
    else:
        print(f"Directory or file {os.path.join(folder_name, file_name)} does not exist.")
    
    outfile_name = file_name[:-3] + 'out'
    c5_out_filepath = os.path.join(folder_name, outfile_name)
    output_dict = read_cas5_out(c5_out_filepath)

    kinf = float(output_dict[1]['K-inf'])
    ppf = float(output_dict[1]['Pin Peak'])

    return kinf, ppf

def calc_score(kinf, ppf, target_kinf=1.05, target_ppf=1.33, kinf_tol=0.001, ppf_tol=0.00):

    kinf_penalty = 0
    ppf_penalty = 0
    if kinf < (target_kinf - kinf_tol):
        kinf_penalty = 2000*(target_kinf-kinf)
    if kinf > (target_kinf + kinf_tol):
        kinf_penalty = 2000*(kinf-target_kinf)
    if ppf > (target_ppf + ppf_tol):
        ppf_penalty = 1000*(ppf-target_ppf)

    score = 100-kinf_penalty-ppf_penalty
    score = round(score, 2)
    return score


def objective_function(solution_list, template_content):
    lattice_data = parse_lattice_data(solution_list)
    lat_index = random.randint(0, 1000)
    file_path = gen_lattice_inp(template_content, FOLDER, lattice_name=f"GE14_{lat_index:03d}", lattice_data=lattice_data)
    kinf, ppf = eval_cas5_boc(file_path)
    score = calc_score(kinf, ppf)

    return score

In [7]:
# Generate initial solution by random perturbation of this:
# ["FUE1_enr", "FUE2_enr", "FUE3_enr", "FUE4_enr", "FUE5_enr", "FUE6_enr", "FUE7_enr",
# "FUE8_enr", "FUE8_gads", "FUE9_enr", "FUE9_gads", "FUE10_enr", "FUE10_gads", "FUE11_enr", "FUE11_gads"]

# enrichments are between 1 to 5% with 0.1% step
# gadolinia has maximum 10% with increment of 1% step

# set seed for reproducibility
np.random.seed(42)

def generate_initial_solutions(n_solutions=10):
    solutions = []
    for _ in range(n_solutions):
        # Generate random enrichments between 1-5% with 0.1% steps
        enrichments = np.round(np.random.choice(np.arange(3.6, 5.1, 0.1), size=11), decimals=1)
        enrichments[0] = np.round(np.random.choice(np.arange(1.5, 2.1, 0.1), size=1), decimals=1)
        enrichments[1] = np.round(np.random.choice(np.arange(2.0, 3.1, 0.1), size=1), decimals=1)
        enrichments[2] = np.round(np.random.choice(np.arange(2.0, 3.1, 0.1), size=1), decimals=1)

        # Generate random gadolinia content between 0-10% with 1% steps
        gads = np.round(np.random.choice(np.arange(4.0, 10.0, 1.0), size=4), decimals=1)
        # Combine into solution array
        solution = []
        for i in range(7):  # FUE1-7 enrichments only
            solution.append(enrichments[i])
        
        # Add FUE8-11 enrichments and gadolinia
        solution.extend([enrichments[7], gads[0]])  # FUE8
        solution.extend([enrichments[8], gads[1]])  # FUE9  
        solution.extend([enrichments[9], gads[2]])  # FUE10
        solution.extend([enrichments[10], gads[3]]) # FUE11
        
        solutions.append(solution)
        
    return solutions

batch_size = 16

solutions = generate_initial_solutions(n_solutions=batch_size)
for solution in solutions:
    print(solution)


  enrichments[0] = np.round(np.random.choice(np.arange(1.5, 2.1, 0.1), size=1), decimals=1)
  enrichments[1] = np.round(np.random.choice(np.arange(2.0, 3.1, 0.1), size=1), decimals=1)
  enrichments[2] = np.round(np.random.choice(np.arange(2.0, 3.1, 0.1), size=1), decimals=1)


[np.float64(2.1), np.float64(3.0), np.float64(3.0), np.float64(5.0), np.float64(4.6), np.float64(4.3), np.float64(4.8), np.float64(4.0), np.float64(8.0), np.float64(4.2), np.float64(7.0), np.float64(4.5), np.float64(6.0), np.float64(3.8), np.float64(9.0)][0m
[0m[np.float64(1.6), np.float64(2.5), np.float64(2.8), np.float64(4.7), np.float64(4.9), np.float64(4.1), np.float64(3.7), np.float64(4.7), np.float64(4.0), np.float64(4.0), np.float64(6.0), np.float64(3.6), np.float64(6.0), np.float64(4.7), np.float64(5.0)][0m
[0m[np.float64(1.5), np.float64(2.2), np.float64(2.4), np.float64(4.9), np.float64(4.9), np.float64(5.0), np.float64(4.9), np.float64(3.8), np.float64(6.0), np.float64(4.7), np.float64(8.0), np.float64(4.2), np.float64(4.0), np.float64(3.9), np.float64(5.0)][0m
[0m[np.float64(1.8), np.float64(2.6), np.float64(2.7), np.float64(4.9), np.float64(3.7), np.float64(4.5), np.float64(4.4), np.float64(4.5), np.float64(6.0), np.float64(4.0), np.float64(9.0), np.float64(3.7), np.

[0m

In [8]:
import multiprocessing

def evaluate_solution(solution):
    score = objective_function(solution_list=solution, template_content=template_content)
    return score

with multiprocessing.Pool() as pool:
    scores = pool.map(evaluate_solution, solutions)

print(scores)

[-148.66, 13.88, -198.88, -186.42, -96.7, -83.06, 17.04, -254.0, -209.04, 18.52, -59.32, -54.5, -300.94, -274.02, -158.34, -177.9][0m
[0m

In [None]:
with open("bwr_ge14.txt", "r") as f:
    problem_text = f.read()

def obj_func(x):
    return objective_function(x, template_content)


# Initialize the OPRO optimizer
opro = OPRO(problem_text=problem_text, obj_func=obj_func,
            llm_model="gemini-2.5-flash-lite", api_key=os.getenv("GEMINI_API_KEY"))

prompt = opro.get_sample_prompt(init_samples=solutions, init_scores=scores, optimization_type="maximize")
response = opro.get_sample_response(prompt)

llmize.utils.pretty_print(prompt=prompt, response=response)

[0mPrompt:[0m
[0mYou are an optimization agent and an expert in nuclear reactor design. Your task is to generate a 10×10 GE-14 fuel lattice design that satisfies the
following conditions:

- Fuel Enrichment (FUE#_enr): Maximum 5.0% with increments of 0.1%.
- Gadolinia Content (FUE#_gads): Maximum 10.0% with increments of 1.0%.

Lattice Configuration:
Here is the half-lattice map (symmetric arrangement assumed):
1
2  7
3  8  5
7  4  9  6
4 10  5 11  5
4  5 11  0  0  5
7  5  6  0  0  5 10
7  5  5  5  5  5  5  5
3  6 10  5  5  5  5  5 10
2  7  6  6  6  6  6  6  4  7

Objective:
Your goal is to generate new solutions that achieve:
- Lattice criticality: k_inf = 1.05
- Pin Peaking Factor (PPF): <1.30
- Higher score than any of the given solutions.

The objective function (score) is determined based on these parameters, but you are not to guess k_inf, PPF, or score—they will be calculated
externally using Casmo-5 simulations and a processing code.
Maximum score is 100.0, which is achieved

In [None]:
# Initialize the HLMEA optimizer
hlmea  = HLMEA(problem_text=problem_text, obj_func=obj_func,
            llm_model="gemini-2.5-flash-lite", api_key=os.getenv("GEMINI_API_KEY"))

prompt = hlmea.get_sample_prompt(init_samples=solutions, init_scores=scores, optimization_type="maximize")
response = hlmea.get_sample_response(prompt)

llmize.utils.pretty_print(prompt=prompt, response=response)

[0mPrompt:[0m
[0m
You are a hyper-heuristic LLM-driven evolutionary algorithm that generates diverse and optimized solutions for a given problem.
You can adaptively set hyperparameters to enhance solution quality and exploration.

## Problem Description
You need to optimize a given problem by evolving a set of candidate solutions. The objective is to iteratively improve solutions using evolutionary
strategies, ensuring diversity and avoiding premature convergence.

You are an optimization agent and an expert in nuclear reactor design. Your task is to generate a 10×10 GE-14 fuel lattice design that satisfies the
following conditions:

- Fuel Enrichment (FUE#_enr): Maximum 5.0% with increments of 0.1%.
- Gadolinia Content (FUE#_gads): Maximum 10.0% with increments of 1.0%.

Lattice Configuration:
Here is the half-lattice map (symmetric arrangement assumed):
1
2  7
3  8  5
7  4  9  6
4 10  5 11  5
4  5 11  0  0  5
7  5  6  0  0  5 10
7  5  5  5  5  5  5  5
3  6 10  5  5  5  5  5 10
2  

In [12]:
from llmize.callbacks import EarlyStopping, AdaptTempOnPlateau, OptimalScoreStopping

# Define the early stopping callback
earlystop_callback = EarlyStopping(monitor='best_score', min_delta=0.1, patience=50, verbose=1)

# Define the optimal score stopping callback
optimal_score_callback = OptimalScoreStopping(optimal_score=100.0, tolerance=0.1)

# Define the temperature adaptation callback
adapt_temp_callback = AdaptTempOnPlateau(monitor='best_score', init_temperature=1.0, min_delta=0.1, 
                                         patience=20, factor=1.1, max_temperature=1.9, verbose=1)

callbacks = [earlystop_callback, optimal_score_callback, adapt_temp_callback]

In [13]:
results = opro.maximize(init_samples=solutions, init_scores=scores, num_steps=250, batch_size=batch_size, callbacks=callbacks, parallel_n_jobs=-1)


[37mRunning OPRO optimization with 250 steps and batch size 16...[0m
[0m[37mStep 0 - Best Initial Score: 18.520, Average Initial Score: -134.521[0m
[0m[37mStep 1 - Current Best Score: 18.520, Average Batch Score: -134.491 - Best Batch Score: -17.400[0m
[0m[37mNo improvement in best_score. Patience count: 1/50[0m
[0m[37mStep 2 - Current Best Score: 51.340, Average Batch Score: -84.591 - Best Batch Score: 51.340[0m
[0m[37mStep 3 - Current Best Score: 87.000, Average Batch Score: 50.095 - Best Batch Score: 87.000[0m
[0m[37mStep 4 - Current Best Score: 87.000, Average Batch Score: 32.941 - Best Batch Score: 79.900[0m
[0m[37mNo improvement in best_score. Patience count: 1/50[0m
[0m[37mStep 5 - Current Best Score: 87.000, Average Batch Score: 11.484 - Best Batch Score: 50.000[0m
[0m[37mNo improvement in best_score. Patience count: 2/50[0m
[0m[37mStep 6 - Current Best Score: 92.020, Average Batch Score: 52.955 - Best Batch Score: 92.020[0m
[0m[37mStep 7 - Cur