Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 225 lines (176 sloc) 8.26 KB
#!/usr/bin/env python3
# Convenience wrapper around 'aws ssm start-session'
# can resolve instance id from Name tag, hostname, IP address, etc.
#
# See https://aws.nz/aws-utils/ssm-session for more info.
#
# Author: Michael Ludvig (https://aws.nz)
# The script can list available instances, resolve instance names,
# and host names, etc. In the end it executes 'aws' to actually
# start the session.
import os
import sys
import re
import logging
import argparse
import botocore.exceptions
import botocore.credentials
import botocore.session
import boto3
def configure_logging(level):
streamHandler = logging.StreamHandler()
formatter = logging.Formatter(
"[%(name)s] %(levelname)s: %(message)s"
)
streamHandler.setFormatter(formatter)
logger = logging.getLogger("ssm-session")
logger.setLevel(level)
logger.addHandler(streamHandler)
logger.debug("Logging level set to DEBUG")
return logger
def parse_args(argv):
"""
Parse command line arguments.
"""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False)
group_general = parser.add_argument_group('General Options')
group_general.add_argument('--profile', '-p', dest='profile', type=str, help='Configuration profile from ~/.aws/{credentials,config}')
group_general.add_argument('--region', '-r', dest='region', type=str, help='Set / override AWS region.')
group_general.add_argument('--verbose', '-v', action='store_const', dest='log_level', const=logging.INFO, default=logging.WARN, help='Increase log_level level')
group_general.add_argument('--debug', '-d', action='store_const', dest='log_level', const=logging.DEBUG, help='Increase log_level level')
group_general.add_argument('--help', '-h', action="help", help='Print this help and exit')
group_instance_wrapper = parser.add_argument_group('Instance Selection')
group_instance = group_instance_wrapper.add_mutually_exclusive_group(required=True)
group_instance.add_argument('INSTANCE', nargs='?', help='Instance ID, Name, Host name or IP address')
group_instance.add_argument('--list', '-l', dest='list', action="store_true", help='List instances available for SSM Session')
parser.description = 'Start SSM Shell Session to a given instance'
parser.epilog = f'''
IMPORTANT: instances must be registered in AWS Systems Manager (SSM)
before you can start a shell session! Instances not registered in SSM
will not be recognised by {parser.prog} nor show up in --list output.
Visit https://aws.nz/aws-utils/ssm-session for more info and usage examples.
Author: Michael Ludvig
'''
# Parse supplied arguments
args = parser.parse_args(argv)
return args
def update_env(env, var_names, var_value):
if var_names and var_value:
for var_name in var_names.split(','):
env[var_name] = var_value
def start_session(instance_id, profile=None, region=None):
extra_args = ""
if profile:
extra_args += f"--profile {profile} "
if region:
extra_args += f"--region {region} "
command = f'aws {extra_args} ssm start-session --target {instance_id}'
logger.info("Running: %s", command)
os.system(command)
class InstanceResolver():
def __init__(self, args):
# aws-cli compatible MFA cache
cli_cache = os.path.join(os.path.expanduser('~'),'.aws/cli/cache')
# Construct boto3 session with MFA cache
session = boto3.session.Session(profile_name=args.profile, region_name=args.region)
session._session.get_component('credential_provider').get_provider('assume-role').cache = botocore.credentials.JSONFileCache(cli_cache)
# Create boto3 clients from session
self.ssm_client = session.client('ssm')
self.ec2_client = session.client('ec2')
def get_list(self):
def _try_append(_list, _dict, _key):
if _key in _dict:
_list.append(_dict[_key])
items = {}
# List instances from SSM
logger.debug("Fetching SSM inventory")
inventory = self.ssm_client.get_inventory()
for entity in inventory["Entities"]:
try:
content = entity['Data']['AWS:InstanceInformation']["Content"][0]
# At the moment we only support EC2 Instances
assert content["ResourceType"] == "EC2Instance"
# Ignore Terminated instances
if content.get("InstanceStatus") == "Terminated":
logger.debug("Ignoring terminated instance: %s", entity)
continue
# Add to the list
instance_id = content['InstanceId']
items[instance_id] = {
"InstanceId": instance_id,
"HostName": content.get("ComputerName"),
}
logger.debug("Added instance: %s: %r", instance_id, items[instance_id])
except (KeyError, ValueError):
logger.debug("SSM inventory entity not recognised: %s", entity)
continue
# Add attributes from EC2
reservations = self.ec2_client.describe_instances(InstanceIds=list(items.keys()))
for reservation in reservations['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
if not instance_id in items:
continue
# Find instance IPs
items[instance_id]['Addresses'] = []
_try_append(items[instance_id]['Addresses'], instance, 'PrivateIpAddress')
_try_append(items[instance_id]['Addresses'], instance, 'PublicIpAddress')
# Find instance name from tag Name
items[instance_id]['InstanceName'] = None
for tag in instance['Tags']:
if tag['Key'] == 'Name':
items[instance_id]['InstanceName'] = tag['Value']
logger.debug("Updated instance: %s: %r", instance_id, items[instance_id])
return items
def print_list(self):
hostname_len = 0
instname_len = 0
items = self.get_list().values()
if not items:
logger.warning("No instances registered in SSM!")
return
items = list(items)
items.sort(key=lambda x: x.get('InstanceName') or x.get('HostName'))
for item in items:
hostname_len = max(hostname_len, len(item['HostName']))
instname_len = max(instname_len, len(item['InstanceName']))
for item in items:
print(f"{item['InstanceId']} {item['HostName']:{hostname_len}} {item['InstanceName']:{instname_len}} {' '.join(item['Addresses'])}")
def resolve_instance(self, instance):
# Is it a valid Instance ID?
if re.match('^i-[a-f0-9]+$', instance):
return instance
# It is not - find it in the list
instances = []
items = self.get_list()
for instance_id in items:
item = items[instance_id]
if instance.lower() in [item['HostName'].lower(), item['InstanceName'].lower()] + item['Addresses']:
instances.append(instance_id)
if not instances:
return None
if len(instances) > 1:
logger.warning("Found %d instances for '%s': %s", len(instances), instance, " ".join(instances))
logger.warning("Use INSTANCE_ID to connect to a specific one")
quit(1)
# Found only one instance - return it
return instances[0]
if __name__ == "__main__":
## Split command line to main args and optional command to run
args = parse_args(sys.argv[1:])
logger = configure_logging(args.log_level)
try:
instance = None
if args.list:
InstanceResolver(args).print_list()
quit(0)
instance = InstanceResolver(args).resolve_instance(args.INSTANCE)
if not instance:
logger.warning("Could not resolve Instance ID for '%s'", args.INSTANCE)
logger.warning("Perhaps the '%s' is not registered in SSM?", args.INSTANCE)
quit(1)
start_session(instance, profile=args.profile, region=args.region)
except (botocore.exceptions.BotoCoreError,
botocore.exceptions.ClientError) as e:
logger.error(e)
quit(1)
You can’t perform that action at this time.