Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 54 additions & 30 deletions cloudendure/cloudendure.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import datetime
import json
import os
from typing import Dict, List
from typing import Any, Dict, List

import boto3
import fire
Expand Down Expand Up @@ -116,14 +116,14 @@ def check(
print("Failed to fetch the project!")
raise CloudEndureHTTPException("Failed to fetch the CloudEndure Project!")

machine_status = 0
machine_status: int = 0
machines_response = self.api.api_call(f"projects/{project_id}/machines")

for _machine in _machines:
machine_exist = False
for machine in json.loads(machines_response.text).get("items", []):
source_props = machine.get("sourceProperties", {})
ref_name = source_props.get("name") or source_props.get(
source_props: Dict[str, Any] = machine.get("sourceProperties", {})
ref_name: str = source_props.get("name") or source_props.get(
"machineCloudId", "NONE"
)
if _machine == source_props.get(
Expand Down Expand Up @@ -306,7 +306,9 @@ def launch(self, project_name="", launch_type="test", dry_run=False):
f"Machine: ({source_props['name']}) - Not a machine we want to launch..."
)

def status(self, project_name="", launch_type="test", dry_run=False):
def status(
self, project_id: str = "", launch_type: str = "test", dry_run: bool = False
) -> bool:
"""Get the status of machines in the current wave."""
if not project_name:
project_name = self.project_name
Expand All @@ -324,10 +326,10 @@ def status(self, project_name="", launch_type="test", dry_run=False):
machine_status = 0
machines_response = self.api.api_call(f"projects/{project_id}/machines")
for _machine in _machines:
machine_exist = False
machine_exist: bool = False
for machine in json.loads(machines_response.text).get("items", []):
source_props = machine.get("sourceProperties", {})
ref_name = source_props.get("name") or source_props.get(
source_props: Dict[str, Any] = machine.get("sourceProperties", {})
ref_name: str = source_props.get("name") or source_props.get(
"machineCloudId", "NONE"
)
if ref_name == source_props.get("name", "NONE"):
Expand Down Expand Up @@ -407,7 +409,9 @@ def status(self, project_name="", launch_type="test", dry_run=False):
print("ERROR: some machines in the targeted pool are not ready")
return False

def execute(self, project_name="", launch_type="test", dry_run=False):
def execute(
self, project_name: str = "", launch_type: str = "test", dry_run: bool = False
) -> bool:
"""Start the migration project my checking and launching the migration wave."""
if not project_name:
project_name = self.project_name
Expand Down Expand Up @@ -444,7 +448,7 @@ def execute(self, project_name="", launch_type="test", dry_run=False):
ref_name = source_props.get("name") or source_props.get(
"machineCloudId", "NONE"
)
_machine_id = source_props.get("id", "")
_machine_id: str = source_props.get("id", "")
print(f"Machine name: {ref_name}, Machine ID: {_machine_id}")
machinelist[machine["id"]] = ref_name

Expand All @@ -460,7 +464,12 @@ def execute(self, project_name="", launch_type="test", dry_run=False):
print(str(e))
return False

def share_image(self, image_id, dest_accounts=None, image_name="CloudEndureImage"):
def share_image(
self,
image_id: str,
dest_accounts: List[Any] = None,
image_name: str = "CloudEndureImage",
) -> bool:
"""Share the generated AMIs to the provided destination accounts."""
print("Loading EC2 client for region: ", AWS_REGION)
_ec2_client = boto3.client("ec2", AWS_REGION)
Expand All @@ -472,33 +481,44 @@ def share_image(self, image_id, dest_accounts=None, image_name="CloudEndureImage
dest_accounts = DESTINATION_ACCOUNTS

for account in dest_accounts:
# Share the image with the destination account
image.modify_attribute(
ImageId=image.id,
Attribute="launchPermission",
OperationType="add",
LaunchPermission={"Add": [{"UserId": account}]},
)
try:
# Share the image with the destination account
image.modify_attribute(
ImageId=image.id,
Attribute="launchPermission",
OperationType="add",
LaunchPermission={"Add": [{"UserId": account}]},
)
except Exception as e:
print(e)
return False

# We have to now share the snapshots associated with the AMI so it can be copied
devices = image.block_device_mappings
for device in devices:
if "Ebs" in device:
snapshot_id = device["Ebs"]["SnapshotId"]
snapshot = _ec2_client.Snapshot(snapshot_id)
snapshot.modify_attribute(
Attribute="createVolumePermission",
CreateVolumePermission={"Add": [{"UserId": account}]},
OperationType="add",
)
try:
snapshot.modify_attribute(
Attribute="createVolumePermission",
CreateVolumePermission={"Add": [{"UserId": account}]},
OperationType="add",
)
except Exception as e:
print(e)
return False
return True

def create_ami(self, instance_ids=None, project_name=""):
def create_ami(
self, instance_ids: List[str] = None, project_name: str = ""
) -> bool:
"""Create an AMI from the specified instance."""
if not project_name:
project_name = self.project_name
project_id = self.project_id
project_name: str = self.project_name
project_id: str = self.project_id
else:
project_id = self.get_project_id(project_name=project_name)
project_id: str = self.get_project_id(project_name=project_name)

if not project_id:
return False
Expand All @@ -508,7 +528,9 @@ def create_ami(self, instance_ids=None, project_name=""):
_ec2_client = boto3.client("ec2", AWS_REGION)

# Create an AMI from the migrated instance
image_creation_time = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")
image_creation_time: str = datetime.datetime.utcnow().strftime(
"%Y%m%d%H%M%S"
)
instances = _ec2_client.describe_instances(
Filters=[
{"Name": "tag:MigrationWave", "Values": ["0"]},
Expand All @@ -517,14 +539,16 @@ def create_ami(self, instance_ids=None, project_name=""):
)
for reservation in instances.get("Reservations", []):
for instance in reservation.get("Instances", []):
instance_id = instance.get("InstanceId", "")
instance_id: str = instance.get("InstanceId", "")
ec2_image = _ec2_client.create_image(
InstanceId=instance_id,
Name=f"{image_creation_time}",
Description=f"{project_name} - {project_id} - {image_creation_time}",
NoReboot=True,
)
_filters = [{"Name": "resource-id", "Values": [instance_id]}]
_filters: List[Any] = [
{"Name": "resource-id", "Values": [instance_id]}
]

# Tag the newly created AMI by getting the tags of the migrated instance to copy to the AMI.
ec2_tags = _ec2_client.describe_tags(Filters=_filters)
Expand Down
66 changes: 46 additions & 20 deletions cloudendure/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import logging
import os
from pathlib import Path
from typing import Any, Dict

import yaml

logger = logging.getLogger(__name__)
LOG_LEVEL = os.environ.get("CLOUDENDURE_LOG_LEVEL", "INFO")
LOG_LEVEL: str = os.environ.get("CLOUDENDURE_LOG_LEVEL", "INFO")
logger.setLevel(getattr(logging, LOG_LEVEL))


Expand All @@ -17,7 +18,9 @@ class CloudEndureConfig:
def __init__(self, *args, **kwargs):
"""Initialize the Environment."""
logger.info("Initializing the CloudEndure Configuration")
_config_path = os.environ.get("CLOUDENDURE_CONFIG_PATH", "~/.cloudendure.yaml")
_config_path: str = os.environ.get(
"CLOUDENDURE_CONFIG_PATH", "~/.cloudendure.yaml"
)
if _config_path.startswith("~"):
self.config_path = os.path.expanduser(_config_path)

Expand All @@ -42,23 +45,23 @@ def __init__(self, *args, **kwargs):
)
self.update_config()

def __str__(self):
def __str__(self) -> str:
"""Define the string representation of the CloudEndure API object."""
return "<CloudEndureAPI>"

def read_yaml_config(self):
def read_yaml_config(self) -> Dict[str, Any]:
"""Read the CloudEndure YAML configuration file."""
logger.info("Loading the CloudEndure YAML configuration file")
with open(self.config_path, "r") as yaml_stream:
try:
config = yaml.safe_load(yaml_stream)
config: Dict[str, Any] = yaml.safe_load(yaml_stream)
except yaml.YAMLError as e:
logger.error("YAMLError during read_yaml_config: %s", str(e))
config = {}
# print(e)
return config

def write_yaml_config(self, config):
def write_yaml_config(self, config: Dict[str, Any]) -> bool:
"""Write to the CloudEndure YAML configuration file."""
logger.info("Writing to the CloudEndure YAML configuration file")
with open(self.config_path, "w") as yaml_file:
Expand All @@ -73,36 +76,59 @@ def write_yaml_config(self, config):
)
return False

def update_yaml_config(self, kwargs):
def update_yaml_config(self, kwargs: Dict[str, Any]) -> bool:
"""Update the YAML configuration file."""
logger.info("Writing updated configuration file")
_config = self.read_yaml_config()
_config: Dict[str, Any] = self.read_yaml_config()
_config.update(kwargs)
self.write_yaml_config(_config)
self.update_config()

def get_env_vars(self, prefix="cloudendure"):
try:
self.write_yaml_config(_config)
self.update_config()
except Exception as e:
logger.error(e)
return False
return True

def get_env_vars(self, prefix: str = "cloudendure") -> Dict[str, any]:
"""Get all environment variables starting with CLOUDENDURE_."""
prefix = prefix.strip("_")
prefix: str = prefix.strip("_")
logger.info("Loading all environment variables starting with (%s)", prefix)
env_vars = {
env_vars: Dict[str, Any] = {
x[0].lower().lstrip(prefix.lower()).strip("_"): x[1]
for x in os.environ.items()
if x[0].lower().startswith(prefix.lower())
}
return env_vars

def update_config(self):
self.yaml_config_contents = self.read_yaml_config()
"""Update the configuration."""
self.yaml_config_contents: Dict[str, Any] = self.read_yaml_config()
self.env_config = self.get_env_vars()
self.active_config = {**self.yaml_config_contents, **self.env_config}

def update_token(self, token):
self.update_yaml_config({"token": token})
def update_token(self, token: str) -> bool:
"""Update the CloudEndure token.

Returns:
bool: Whether or not the operation was successful.

"""
try:
self.update_yaml_config({"token": token})
except Exception as e:
logger.error(e)
return False
return True

def get_var(self, var: str) -> str:
"""Get the specified environment or config variable.

Returns:
str: The variable to be used for the provided configuration env var.

def get_var(self, var):
"""Get the specified environment or config variable."""
"""
logger.info("Looking up variable: (%s)", var)
env_var = os.environ.get(var.upper(), "")
env_var: str = os.environ.get(var.upper(), "")

if env_var:
logger.info("Found Environment Variable - (%s): (%s)", var, env_var)
Expand Down
6 changes: 3 additions & 3 deletions cloudendure/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ def __init__(self):
self._valid_properties: Dict[str, Any] = {}

@classmethod
def _is_builtin(cls, obj):
def _is_builtin(self, obj) -> bool:
"""Define the built-in property."""
return isinstance(obj, (int, float, str, list, dict, bool))

def as_dict(self):
def as_dict(self) -> Dict[str, Any]:
"""Return a dict representation of the model."""
result: Dict[Any, Any] = {}
for key in self._valid_properties:
Expand Down Expand Up @@ -49,7 +49,7 @@ def as_dict(self):
return result

@classmethod
def parse(cls, json: Dict[str, Any]):
def parse(self, json: Dict[str, Any]):
"""Parse a JSON object into a model instance."""
raise NotImplementedError

Expand Down
27 changes: 17 additions & 10 deletions lambda/copy_ami.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os

import boto3
from typing import Dict

logger = logging.getLogger(__name__)

Expand All @@ -24,7 +25,7 @@
SESSION_NAME = os.environ.get("CLOUDENDURE_SESSION_NAME", "CloudEndureMigration")


def assume_role(sts_role_arn="", session_name=SESSION_NAME):
def assume_role(sts_role_arn: str = "", session_name: str = SESSION_NAME):
sts = boto3.client("sts")

try:
Expand All @@ -47,7 +48,7 @@ def assume_role(sts_role_arn="", session_name=SESSION_NAME):
return credentials


def get_ec2(credentials, region=""):
def get_ec2(credentials: Dict[str, str], region: str = ""):
ec2 = None

# Copy image to failover regions
Expand All @@ -65,20 +66,24 @@ def get_ec2(credentials, region=""):
return ec2


def share_image(image_name="CloudEndureImage"):
def share_image(image_name: str = "CloudEndureImage") -> bool:
src_credentials = assume_role(SRC_ROLE_ARN)
src_ec2 = get_ec2(src_credentials, region=SRC_REGION)

# Access the image that needs to be copied
image = src_ec2.Image(IMAGE_ID)

# Share the image with the destination account
image.modify_attribute(
ImageId=image.id,
Attribute="launchPermission",
OperationType="add",
LaunchPermission={"Add": [{"UserId": DEST_ACCOUNT_ID}]},
)
try:
# Share the image with the destination account
image.modify_attribute(
ImageId=image.id,
Attribute="launchPermission",
OperationType="add",
LaunchPermission={"Add": [{"UserId": DEST_ACCOUNT_ID}]},
)
except Exception as e:
logger.error(e)
return False

# We have to now share the snapshots associated with the AMI so it can be copied
devices = image.block_device_mappings
Expand All @@ -104,3 +109,5 @@ def share_image(image_name="CloudEndureImage"):
)
except Exception as e:
logger.error(e)
return False
return True
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ def run(self):
vers: str = about["__version__"]
os.system(f"git tag v{vers}")
os.system("git push --tags")

sys.exit()


Expand Down