In [None]:
# import modules
import os
import asyncio
import nest_asyncio
import time

import season1.utility as util
import data_utility
import async_llm

from season1.validater import get_label


from season1.prompt_generator import generate_prompt
from season1.preprocessor import preprocess

In [None]:
# functions
def generate_examples(example_list, used_paramname):
    examples = "\n"
    for idx, data in enumerate(example_list):       
        examples += "Example " + str(idx + 1) + ":\nMap:\n"
        examples += data["map"]
        examples += "Parameters:\n"
        for param in used_paramname:
            examples += "- " + param + ": " + str(data["params"][param]) + "\n"
        examples += "\n"
    return examples


def generate_filename_from_used_paramname(used_paramname, recursion_count):
    filename = ""
    for param in used_paramname:
        filename += param
        filename += "_"
    filename += str(recursion_count)
    return filename
    
    
def generate_parameters_to_modify(used_paramname, current_params, target_params):
    text = "Current and Target Parameter values:\n"
    
    for paramname in used_paramname:
        text += "current " + paramname + ": " + str(current_params[paramname]) + "\n"
        text += "target " + paramname + ": " + str(target_params[paramname]) + "\n"
    
    return text
    

In [None]:
# setting
used_paramname = ["map_size"] # Select one, two, or six from map_size, room_count, enemy_count, treasure_count, exploraion, winding_path
max_recursion_count = 5

result_dir = os.path.join("..", "data", "result") 
map_data_path = os.path.join("..", "data", "DemoMapDataset.json")
target_param_path = os.path.join("..", "data", "TargetParameterDataset.json")
base_prompt_dir = os.path.join("..", "src", "base_prompt")

In [None]:
# load dataset
map_dataset = util.read_json_file(map_data_path)
param_dataset = util.read_json_file(target_param_path)
param_dataset = param_dataset["param_list"]

In [None]:
# create preparation prompt
preparation_templete = util.read_text_file(os.path.join(base_prompt_dir, "PreparationPhaseTemplete.txt"))

map_description = util.read_text_file(os.path.join(base_prompt_dir, "MapDescription.txt"))
preparation_templete = preparation_templete.replace("{MapDescription}", map_description)

preparation_prompts = []
for param in used_paramname:
    param_description = util.read_text_file(os.path.join(base_prompt_dir, param + ".txt"))
    
    example_list = data_utility.get_demos_from_map_dataset(map_dataset, [param])
    examples = generate_examples(example_list, used_paramname)

    parameters = util.read_text_file(os.path.join(base_prompt_dir, "ParameterTemplete.txt"))
    parameters = parameters.replace("{ParameterDescription}", param_description)
    parameters = parameters.replace("{Examples}", examples)
    
    preparation_prompts.append(preparation_templete.replace("{Parameters}", parameters).replace("{ParameterName}", param))

In [None]:
# preparation phase
system_prompts = ["""
You are an expert in analyzing and modifying ASCII dungeon maps based on given parameters. Your task is to help users understand how various parameters affect the layout of the map and to provide step-by-step instructions on how to modify the map to achieve a target parameter value. Ensure that your explanations are clear and structured, and that you provide logical reasoning for each modification. You will also describe both how to increase and decrease specific parameter values.
"""] * len(preparation_prompts)

nest_asyncio.apply()

preparation_outputs = asyncio.run(async_llm.generate_unstructured_datas(system_prompts, preparation_prompts))

In [None]:
# create modification prompt templete
modification_template = util.read_text_file(os.path.join(base_prompt_dir, "ModificationPhaseTemplete.txt"))
modification_template = modification_template.replace("{MapDescription}", map_description)

parameters = util.read_text_file(os.path.join(base_prompt_dir, "ParameterTemplete.txt"))

param_description = ""
for param in used_paramname:
    param_description += util.read_text_file(os.path.join(base_prompt_dir, param + ".txt")) + '\n'

parameters = parameters.replace("{ParameterDescription}", param_description)

modification_template = modification_template.replace("{Parameters}", parameters)

preparation_output = "\n".join(preparation_outputs)

modification_template = modification_template.replace("{PreparationOutput}", preparation_output)

In [None]:
# create initial prompt
system_prompts = []
user_prompts = []
used_examples = []
used_params = []
for params in param_dataset:
    example_list = data_utility.get_demos_from_map_dataset(map_dataset, used_paramname)
    examples = generate_examples(example_list, used_paramname)
    
    system, user = generate_prompt(examples, params, "AutoCOT2")
    system_prompts.append(system)
    user_prompts.append(user)
    used_examples.append(example_list)
    used_params.append(params)

In [None]:
# initial creation phase

size = len(system_prompts)

start_time = time.time()

initial_outputs = asyncio.run(async_llm.generate_unstructured_datas(system_prompts[:size], user_prompts[:size]))
preprocessed_outputs = list(map(preprocess, initial_outputs))

elapsed_time = time.time() - start_time
if elapsed_time < 65:
    time.sleep(65 - elapsed_time)

In [None]:
# make block
data_blocks = []
for i in range(len(preprocessed_outputs)):
    data_blocks.append({
        "target_params": used_params[i],
        "map": preprocessed_outputs[i],
        "examples": used_examples[i]
    })

In [None]:
# save initial outputs
util.create_directory(result_dir)
file_path = os.path.join(result_dir, generate_filename_from_used_paramname(used_paramname, 0))

util.write_json_file(file_path, data_blocks)

In [None]:
# recursion and save outputs

system_prompts = ["""
You are an expert in analyzing and modifying ASCII dungeon maps based on given parameters. Your task is to modify an ASCII dungeon map to adjust several of its parameters. You will be provided with the current and target values for one or more parameters, along with a original map. Your goal is to transform the map to meet the target values for all provided parameters.
"""] * len(data_blocks)

for recursion_count in range(1, max_recursion_count + 1):
    modification_prompts = []
    
    start_time = time.time()

    for i in range(len(data_blocks)):
        modification_prompt = modification_template.replace("{Examples}", generate_examples(data_blocks[i]["examples"], used_paramname))
        modification_prompt = modification_prompt.replace("{Map}", data_blocks[i]["map"])
        
        current_params = get_label(data_blocks[i]["map"])
        target_params = data_blocks[i]["target_params"]
        
        modification_prompt = modification_prompt.replace("{ParametersToModify}", generate_parameters_to_modify(used_paramname, current_params, target_params))
        
        modification_prompts.append(modification_prompt)

    recursion_outputs = asyncio.run(async_llm.generate_unstructured_datas(system_prompts, modification_prompts))
    preprocessed_outputs = list(map(preprocess, recursion_outputs))

    for i in range(len(data_blocks)):
        data_blocks[i]["map"] = preprocessed_outputs[i]

    util.create_directory(result_dir)
    file_path = os.path.join(result_dir, generate_filename_from_used_paramname(used_paramname, recursion_count))
    util.write_json_file(file_path, data_blocks)
    
    print(f"recursion {recursion_count} successfully performed")
    
    elapsed_time = time.time() - start_time
    if elapsed_time < 65:
        time.sleep(65 - elapsed_time)