In [1]:
import os
import tempfile

import coiled
from archetypal.idfclass import IDF
from archetypal.idfclass.sql import Sql
from pydantic import AnyUrl

from epengine.models.configs import SimulationSpec
from epengine.utils.results import postprocess, serialize_df_dict

AWS_ACCOUNT_ID = os.environ["AWS_ACCOUNT_ID"]
AWS_REGION = os.environ["AWS_REGION"]
HATCHET_CLIENT_TOKEN = os.environ["HATCHET_CLIENT_TOKEN"]
AWS_BUCKET = "ml-for-bem"

worker_image_name = "ml-for-bem-coiledworker"
worker_image_name = "hatchet/epengine"

### Directly Matched Function

In [3]:
@coiled.function(
    region="us-east-1",
    vm_type=["t3.medium", "t3.large"],
    container=f"{AWS_ACCOUNT_ID}.dkr.ecr.{AWS_REGION}.amazonaws.com/{worker_image_name}:latest",
)
def run_simulation(data: dict[str, str], ix: int):
    """Run a simulation using the provided data and return the results.

    Args:
        data (dict[str, str]): The data to run the simulation with.
        ix (int): The index of the simulation.

    Returns:
        dict[str, pd.DataFrame]: The results of the simulation
    """
    spec = SimulationSpec.model_validate(data)
    with tempfile.TemporaryDirectory() as tmpdir:
        idf = IDF(
            spec.idf_path,  # pyright: ignore [reportArgumentType]
            epw=spec.epw_path,
            output_directory=tmpdir,
        )  # pyright: ignore [reportArgumentType]
        idf.simulate()
        sql = Sql(idf.sql_file)
        index_data = spec.model_dump(mode="json", exclude_none=True)
        index_data["spawn_index"] = ix
        # TODO: pull in spawn index
        dfs = postprocess(
            sql,
            index_data=index_data,
            tabular_lookups=[("AnnualBuildingUtilityPerformanceSummary", "End Uses")],
        )
    dfs = serialize_df_dict(dfs)
    return dfs

In [None]:
spec = SimulationSpec(
    experiment_id="coiled-test",
    idf_uri=AnyUrl(
        f"s3://{AWS_BUCKET}/hatchet/insomnia-test/idf/Office_totalBaseline.idf"
    ),
    epw_uri=AnyUrl(
        f"s3://{AWS_BUCKET}/hatchet/insomnia-test/epw/ARG_Buenos.Aires.875760_IWEC.epw"
    ),
    ddy_uri=None,
)
arg = spec.model_dump(mode="json")
results = run_simulation.map([arg for _ in range(10)], range(10), errors="skip")
results = list(results)

### Client submitted function

In [2]:
import json

import pandas as pd

with open("../../../data/taube-spec-small-20.json") as f:
    data = json.load(f)
    df = pd.DataFrame(data)

df["epw_uri"] = df.epw_path.apply(
    lambda x: f"s3://ml-for-bem/hatchet/taube-archetypal-updated-bsmt/epw/{x}"
)
df["idf_uri"] = df.idf_path.apply(
    lambda x: f"s3://ml-for-bem/hatchet/taube-archetypal-updated-bsmt/idf/{x}"
)
df["ddy_uri"] = df.ddy_path.apply(
    lambda x: f"s3://ml-for-bem/hatchet/taube-archetypal-updated-bsmt/ddy/{x}"
)
df = df.drop(columns=["epw_path", "idf_path", "ddy_path"])
df["sort_ix"] = df.index.copy(deep=True)
sim_specs = [
    SimulationSpec(**x, experiment_id="coiled-test-0")
    for x in df.to_dict(orient="records")
]

In [3]:
from coiled import Cluster

from epengine.worker.main import arun

worker_image_name = "ml-for-bem-coiledworker"
worker_image_name = "hatchet/epengine"

cluster = Cluster(
    n_workers=4,
    name="hatchet-worker-cluster",
    container=f"{AWS_ACCOUNT_ID}.dkr.ecr.{AWS_REGION}.amazonaws.com/{worker_image_name}:latest",
    worker_cpu=[8],
    worker_memory=["2 Gib", "16 Gib"],
    environ={"HATCHET_CLIENT_TOKEN": HATCHET_CLIENT_TOKEN},
    spot_policy="spot",
    region=AWS_REGION,
    mount_bucket=f"s3://{AWS_BUCKET}",
)
client = cluster.get_client()


# for _ in range(100):
#     client.submit(arun)

[2024-09-22 17:38:11,358][INFO    ][coiled] Creating software environment
[2024-09-22 17:38:11,464][INFO    ][coiled] Software environment created
[2024-09-22 17:38:12,134][INFO    ][coiled] Creating Cluster (name: hatchet-worker-cluster, https://cloud.coiled.io/clusters/597308?account=szvsw ). This usually takes 1-2 minutes...

+---------+----------------+-----------------+-----------------+
| Package | Client         | Scheduler       | Workers         |
+---------+----------------+-----------------+-----------------+
| python  | 3.10.9.final.0 | 3.10.12.final.0 | 3.10.12.final.0 |
+---------+----------------+-----------------+-----------------+


In [4]:
import logging
import os
import shutil
from pathlib import Path

from epengine.models.ddy_injector import DDYSizingSpec
from epengine.utils.filesys import fetch_uri

logger = logging.getLogger("SIM-DASK-WORKER")


def simulate(spec: dict):
    # logger.info(f"Running simulation for {spec}")
    # print(f"Running simulation for {spec}")
    sim_spec = SimulationSpec.model_validate(spec)
    # logger.info(str(os.listdir("/mount")))
    # print(str(os.listdir("/mount")))

    mount_idf_path = f"/mount/{str(sim_spec.idf_uri)[5:]}"
    mount_ddy_path = f"/mount/{str(sim_spec.ddy_uri)[5:]}" if sim_spec.ddy_uri else None
    mount_epw_path = f"/mount/{str(sim_spec.epw_uri)[5:]}"
    fetch_uri(sim_spec.idf_uri, Path(mount_idf_path))
    fetch_uri(sim_spec.epw_uri, Path(mount_epw_path))
    if mount_ddy_path:
        fetch_uri(sim_spec.ddy_uri, Path(mount_ddy_path))

    with tempfile.TemporaryDirectory() as tmpdir:
        local_idf_path = Path(tmpdir) / "model.idf"
        local_epw_path = Path(tmpdir) / "model.epw"
        shutil.copyfile(mount_idf_path, local_idf_path)
        shutil.copyfile(mount_epw_path, local_epw_path)
        if mount_ddy_path:
            local_ddy_path = Path(tmpdir) / "model.ddy"
            shutil.copyfile(mount_ddy_path, local_ddy_path)
        else:
            local_ddy_path = None
        idf = IDF(
            local_idf_path.as_posix(),
            epw=local_epw_path.as_posix(),
            output_directory=tmpdir,
            as_version=None,  # pyright: ignore [reportArgumentType]
        )
        if local_ddy_path:
            ddy = IDF(
                local_ddy_path.as_posix(),
                output_directory=tmpdir,
                as_version="9.5.0",
                file_version="9.5.0",
                prep_outputs=False,
            )
            ddy_spec = DDYSizingSpec(
                design_days=["Ann Clg .4% Condns DB=>MWB", "Ann Htg 99.6% Condns DB"],
            )
            ddy_spec.inject_ddy(idf, ddy)
        idf.simulate()
        sql = Sql(idf.sql_file)
        dfs = postprocess(
            sql,
            index_data=sim_spec.model_dump(mode="json", exclude_none=True),
            tabular_lookups=[("AnnualBuildingUtilityPerformanceSummary", "End Uses")],
        )
    dfs = serialize_df_dict(dfs)
    return dfs


sim_args = [spec.model_dump(mode="json") for spec in sim_specs] * 8
res = client.map(
    simulate,
    sim_args,
)
results = client.gather(res, errors="skip")
erred = [
    (spec, r) for spec, r in zip(sim_args, res, strict=True) if r.status != "finished"
]

ValueError: zip() argument 2 is longer than argument 1

In [38]:
client.submit(arun)

In [10]:
cluster.shutdown()

[2024-09-22 17:50:27,612][INFO    ][coiled] Cluster 597308 deleted successfully.


In [5]:
import pandas as pd

pd.concat([
    pd.DataFrame.from_dict(
        result["AnnualBuildingUtilityPerformanceSummary_End_Uses"], orient="tight"
    )
    for result in list(results)
])

TypeError: 'NoneType' object is not subscriptable