### Load Dependencies

In [3]:
import requests
import json
from datetime import datetime, timedelta
import os
from unittest.mock import patch
import boto3

### Some constants

In [10]:
# Define locations
locations = {
    "us-west-2": (43.8041334, -120.5542012),
    "us-west-1": (38.8375215, -120.8958242),
}

start_date = datetime(2023, 6, 1, 0, 0, 0)

end_date = datetime(2023, 6, 30, 0, 0, 0)

### Load Carbon Data

In [16]:
# Function to query the API
def query_api(start, end, lat, lon, token):
    url = f"https://api-access.electricitymaps.com/free-tier/carbon-intensity/past-range?lat={lat}&lon={lon}&start={start}&end={end}"
    headers = {"auth-token": token}
    response = requests.get(url, headers=headers, timeout=10)
    if response.status_code == 200:
        return response.json()["data"]
    else:
        print(f"Error querying API: {response.status_code}")
        return []

def run():
    for location, (latitude, longitude) in locations.items():
        current_start = start_date - timedelta(days=7) # Start 7 days before the start date
        combined_data = []

        while current_start < end_date:
            current_end = current_start + timedelta(days=10)
            if current_end > end_date:
                current_end = end_date
            data = query_api(
                current_start, current_end, latitude, longitude, os.environ.get("ELECTRICITY_MAPS_AUTH_TOKEN")
            )
            combined_data.extend(data)
            current_start = current_end + timedelta(days=1)

        # Save the combined data to a file
        output_file = f"./data/carbon/{location}_carbon_data.json"
        with open(output_file, "w") as f:
            json.dump(combined_data, f)

        print(f"Data combined and saved to {output_file}")


if __name__ == "__main__":
    run()

Data combined and saved to ./data/carbon/us-west-2_carbon_data.json
Data combined and saved to ./data/carbon/us-west-1_carbon_data.json


### Create Carbon Collector Logs

In [18]:
import json
import math
import os
from datetime import datetime, timedelta
from collections import defaultdict

def calculate_distance(lat1, lon1, lat2, lon2):
    r = 6371.0  # Earth radius in kilometers
    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    a = math.sin(dlat / 2)**2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return r * c

def process_and_store_carbon_data_for_regions(input_file, output_folder, regions_info, current_region):
    os.makedirs(output_folder, exist_ok=True)  # Ensure output directory exists
    
    with open(input_file, 'r') as file:
        data = json.load(file)
    
    # For each day between start_date and end_date
    for day in range((end_date - start_date).days + 1):
        current_date = start_date + timedelta(days=day)

        # Get all the data for the previous 7 days
        previous_7_days_data = [entry for entry in data if datetime.strptime(entry['datetime'], '%Y-%m-%dT%H:%M:%S.%fZ').date() >= (current_date - timedelta(days=7)).date() and datetime.strptime(entry['datetime'], '%Y-%m-%dT%H:%M:%S.%fZ').date() < current_date.date()]

        # Calculate the average carbon intensity for the previous 7 days
        if not previous_7_days_data:
            continue
        overall_sum = sum(item['carbonIntensity'] for item in previous_7_days_data)
        overall_avg = overall_sum / len(previous_7_days_data)

        # Calculate the average carbon intensity for each hour of the day
        hourly_averages = defaultdict(list)
        for item in previous_7_days_data:
            hour = datetime.strptime(item['datetime'], '%Y-%m-%dT%H:%M:%S.%fZ').hour
            hourly_averages[hour].append(item['carbonIntensity'])

        hourly_avg = {hour: sum(values) / len(values) for hour, values in hourly_averages.items()}

        # Calculate the distances between the regions
        transmission_distances = {
            f"aws:{region_key}": calculate_distance(regions_info[current_region][0], regions_info[current_region][1], regions_info[region_key][0], regions_info[region_key][1]) for region_key in regions_info
        }
    
        # Assemble the result dictionary
        result_dict = {
            "averages": {
                "overall": {"carbon_intensity": overall_avg},
                **{str(hour): {"carbon_intensity": avg} for hour, avg in hourly_avg.items()}
            },
            "units": "gCO2eq/kWh",
            "transmission_distances": transmission_distances
        }

        # Store the result
        day_folder = os.path.join(output_folder, current_date.strftime('%Y-%m-%d'))
        os.makedirs(day_folder, exist_ok=True)
        with open(os.path.join(day_folder, 'data.json'), 'w') as outfile:
            json.dump(result_dict, outfile, indent=4)

for region in locations:
    process_and_store_carbon_data_for_regions(f"./data/carbon/{region}_carbon_data.json", f"./data/collected_carbon/{region}", locations, region)

### Get Data from the Home Region

For each benchmark:
1. Deploy the benchmark (optimally without any constraints) to the home region (which should be one of the regions we eventually also want to take into consideration).
2. Run the benchmark for a time (use the `invoke_serverless_function_uniform.sh` script to do so). I used it for example like this:
```bash
poetry run ./invoke_serverless_function_uniform.sh dna_visualization 0.0.1 200 3600
```
3. Use the following small script to log sync the data and store it locally.

In [1]:
# Define the benchmarks:
benchmarks = ["dna_visualization-0.0.1"]

#### Log Sync

In [5]:
from multi_x_serverless.syncers.log_syncer import LogSyncWorkflow
from multi_x_serverless.common.models.endpoints import Endpoints
from multi_x_serverless.common.constants import DEPLOYMENT_MANAGER_RESOURCE_TABLE, GLOBAL_TIME_ZONE

# This assumes that the run happened in the last 24 hours
time_intervals = [(datetime.now(GLOBAL_TIME_ZONE) - timedelta(days=1), datetime.now(GLOBAL_TIME_ZONE))]

region_clients = {}
endpoints = Endpoints()

for workflow_id in benchmarks:
    deployment_manager_config_str = endpoints.get_deployment_manager_client().get_value_from_table(
        DEPLOYMENT_MANAGER_RESOURCE_TABLE, workflow_id
    )

    raw_data_dir = f"./data/home_region_run_data/{workflow_id}"
    os.makedirs(raw_data_dir, exist_ok=True)

    def mock_upload_data(self, data_for_upload: str):
        with open(f"{raw_data_dir}/data.json", "w") as f:
            f.write(data_for_upload)

    with patch.object(LogSyncWorkflow, "_upload_data", mock_upload_data):
        log_sync_workflow = LogSyncWorkflow(
            workflow_id,
            region_clients,
            deployment_manager_config_str,
            time_intervals,
            endpoints.get_datastore_client(),
            {}
        )
        log_sync_workflow.sync_workflow()

#### Workflow Collect

In [8]:
from multi_x_serverless.data_collector.components.workflow.workflow_retriever import WorkflowRetriever
from multi_x_serverless.data_collector.components.data_collector import DataCollector

workflow_retriever = WorkflowRetriever(None)

for workflow_id in benchmarks:

    raw_data_dir = f"./data/home_region_run_data/{workflow_id}/data.json"

    with open(raw_data_dir, "r") as f:
        data = f.read()

    workflow_summary = workflow_retriever._transform_workflow_summary(
        data,
    )

    collected_data_dir = f"./data/collected_workflow/{workflow_id}/home_region"
    os.makedirs(collected_data_dir, exist_ok=True)

    with open(f"{collected_data_dir}/data.json", "w") as f:
        json.dump(workflow_summary, f, indent=4)

### Run the Deployment Algorithm

We need to run the deployment algorithm for the days between the start and end date. We will run it every day and store the results locally in files.
Additionally, we need to provide the data collected by a specific workflow as the input to the workflow loader.

**TODO BEFORE the following is executed:** For each of the involved benchmarks we need to actually have some data from the home region. So please do **Get Data from the Home Region** first.

In [11]:
from multi_x_serverless.common.constants import GLOBAL_SYSTEM_REGION, DEPLOYMENT_OPTIMIZATION_MONITOR_RESOURCE_TABLE
from multi_x_serverless.routing.workflow_config import WorkflowConfig
from multi_x_serverless.routing.deployment_algorithms.stochastic_heuristic_deployment_algorithm import StochasticHeuristicDeploymentAlgorithm
from multi_x_serverless.routing.deployment_input.components.loaders.carbon_loader import CarbonLoader
from multi_x_serverless.routing.deployment_input.components.loaders.region_viability_loader import RegionViabilityLoader
from multi_x_serverless.routing.deployment_algorithms.deployment_algorithm import DeploymentAlgorithm
from multi_x_serverless.routing.deployment_input.components.loaders.workflow_loader import WorkflowLoader

# Define the constraint configurations
constraint_configurations = {
    "no_constraints": {
        "constraints": {
            "hard_resource_constraints": {"cost": None, "runtime": None, "carbon": None},
            "soft_resource_constraints": {"cost": None, "runtime": None, "carbon": None},
            "priority_order": ["carbon", "cost", "runtime"],
        }
    },
    "five_percent_runtime_constraint": {
        "constraints": {
            "hard_resource_constraints": {"cost": None, "runtime": {"type": "relative", "value": 105}, "carbon": None},
            "soft_resource_constraints": {"cost": None, "runtime": None, "carbon": None},
            "priority_order": ["carbon", "cost", "runtime"],
        }
    },
}

dynamodb_client = boto3.client('dynamodb', region_name=GLOBAL_SYSTEM_REGION)
for benchmark in benchmarks:
    response = dynamodb_client.get_item(TableName=DEPLOYMENT_OPTIMIZATION_MONITOR_RESOURCE_TABLE, Key={"key": {"S": benchmark}})
    item = response.get("Item")
    workflow_config_from_table = item["value"]["S"]

    workflow_config_dict = json.loads(json.loads(workflow_config_from_table).get("workflow_config"))
    for configuration_name, configuration in constraint_configurations.items():
        workflow_config_dict["constraints"] = configuration["constraints"]
        workflow_config = WorkflowConfig(workflow_config_dict)

        for day in range((end_date - start_date).days + 1):
            current_date = start_date + timedelta(days=day)

            def mock_carbon_loader_setup(self, available_regions: set[str]):
                real_available_regions = list(locations.keys())
                loaded_carbon_data = {}
                for region in real_available_regions:
                    carbon_data_path = f'./data/collected_carbon/{region}/{current_date.strftime("%Y-%m-%d")}/data.json'
                    with open(carbon_data_path, 'r') as file:
                        loaded_carbon_data[f'aws:{region}'] = json.load(file)
                self._carbon_data = loaded_carbon_data

            def mock_workflow_loader_setup(self, workflow_id: str):
                collected_data_dir = f"./data/collected_workflow/{workflow_id}/home_region/data.json"

                with open(collected_data_dir, "r") as f:
                    self._workflow_data = json.load(f)

            def mock_region_viability_setup(self):
                self._available_regions = [f'aws:{region}' for region in locations.keys()]

            def mock_upload_result(self, result: dict):
                result_path = f'./data/deployment_results/{benchmark}/{configuration_name}/'
                os.makedirs(result_path, exist_ok=True)
                with open(os.path.join(result_path, f'{current_date.strftime("%Y-%m-%d")}.json'), 'w') as file:
                    json.dump(result, file, indent=4)

            with patch.object(CarbonLoader, 'setup', new=mock_carbon_loader_setup), patch.object(RegionViabilityLoader, 'setup', new=mock_region_viability_setup), patch.object(DeploymentAlgorithm, '_upload_result', new=mock_upload_result), patch.object(WorkflowLoader, 'setup', new=mock_workflow_loader_setup):
                algorithm = StochasticHeuristicDeploymentAlgorithm(workflow_config)
                algorithm.run([f"{i}" for i in range(24)])

### Run new experiments for the new deployments

Now since we have the new deployments, we can run the experiments for them. We don't need to run all experiments, basically we need to run all the combinations of the regions involved. For this we need to iterate over the potential deployments, find combinations of regions that are not yet collected and run the experiments for them. The following script will do this on a per benchmark basis.

In [23]:

# Please select the benchmark you want to run the experiment for
benchmark = benchmarks[0]

deployment_result_path = f'./data/deployment_results/{benchmark}'

unique_instance_region_combinations = set()

# Iterate over all results, retrieving the combinations of instances and regions
for directory in os.listdir(deployment_result_path):
    if not os.path.isdir(os.path.join(deployment_result_path, directory)):
        continue
    for file in os.listdir(os.path.join(deployment_result_path, directory)):
        with open(os.path.join(deployment_result_path, directory, file), 'r') as f:
            data = json.load(f)
        for time_key, deployment in data["time_keys_to_staging_area_data"].items():
            deployment_str = json.dumps(deployment)
            unique_instance_region_combinations.add(deployment_str)

# Store the unique combinations
with open(f'./data/deployment_results/{benchmark}/unique_instance_region_combinations.json', 'w') as f:
    json.dump(list(unique_instance_region_combinations), f, indent=4)

#### Re-deploy all deployments

Before we can run the experiments we need the code to be in the right regions


In [25]:
from multi_x_serverless.deployment.common.deploy.deployer import Deployer, create_default_deployer
from multi_x_serverless.common.constants import (
    DEPLOYMENT_MANAGER_RESOURCE_TABLE
)
from multi_x_serverless.deployment.common.factories.deployer_factory import DeployerFactory

endpoints = Endpoints()

workflow_id = benchmarks[0]

deployment_result_path = f'./data/deployment_results/{workflow_id}/unique_instance_region_combinations.json'

workflow_data_raw = endpoints.get_deployment_manager_client().get_value_from_table(
    DEPLOYMENT_MANAGER_RESOURCE_TABLE, workflow_id
)

workflow_data = json.loads(workflow_data_raw)

workflow_function_descriptions = json.loads(workflow_data["workflow_function_descriptions"])
deployment_config = json.loads(workflow_data["deployment_config"])
deployed_regions = json.loads(workflow_data["deployed_regions"])

deployer_factory = DeployerFactory(project_dir=None)
config = deployer_factory.create_config_obj_from_dict(deployment_config=deployment_config)
deployer = create_default_deployer(config)

with open(deployment_result_path, 'r') as f:
    unique_instance_region_combinations = json.load(f)

for combination in unique_instance_region_combinations:
    deployment = json.loads(combination)

    deployer.re_deploy(
        workflow_function_descriptions,
        deployed_regions,
        deployment
    )

Found credentials in shared credentials file: ~/.aws/credentials


Found credentials in shared credentials file: ~/.aws/credentials
Found credentials in shared credentials file: ~/.aws/credentials
Found credentials in shared credentials file: ~/.aws/credentials
