In [1]:
import json
from unskript import nbparams
from unskript.secrets import ENV_MODE, ENV_MODE_AWS
from unskript.fwk.workflow import Task, Workflow

env = {"ENV_MODE":"ENV_MODE_AWS","TENANT_ID":"982dba5f-d9df-48ae-a5bf-ec1fc94d4882","ENVIRONMENT_ID":"1499f27c-6406-4fbd-bd1b-c6f92800018f","TENANT_URL":"https://tenant-staging.alpha.unskript.io","AWS_REGION":"us-west-2"}
secret_store_cfg = {"SECRET_STORE_TYPE":"SECRET_STORE_TYPE_AWS","AWS_SECRET_PREFIX":"staging","AWS_REGION":"us-west-2"}
paramDict = {}
paramDict.update(env)
paramDict.update(secret_store_cfg)
paramsJson = json.dumps(paramDict)
nbParamsObj = nbparams.NBParams(paramsJson)

w = Workflow(env, secret_store_cfg, None, global_vars=globals())

<img src="https://unskript.com/assets/favicon.png" alt="unSkript.com" width="100" height="100"/> 
<h1> unSkript Runbooks </h1>
<div class="alert alert-block alert-success">
    <b> This runbook demonstrates AWS Access Key Rotation for a user using unSkript legos.</b>
</div>

<br>

<center><h2>AWS Access Key Rotation</h2></center>

# Steps Overview
    1) Create a new Access Key for the user
    2) Update the status of the old Access Key to "Inactive"
    3) Check if there are 2 Access Keys one with status "Active" and another with "Inactive"
    4) Delete the Inactive Access Key

In this lego we will create a new Access Key for the user as given in the input parameters

In [29]:
##
# Copyright (c) 2021 unSkript, Inc
# All rights reserved.
##
from pydantic import BaseModel, Field
import pandas as pd
from typing import List,Dict
import pprint


from beartype import beartype
@beartype
def aws_create_access_key_printer(output):
    if output is None:
        return
    pprint.pprint(output)


@beartype
def aws_create_access_key(
    handle,
    aws_username: str
) -> Dict:
    """aws_check_ssl_certificate_expiry checks the expiry date of an ACM SSL certificate .

            :type handle: object
            :param handle: Object returned from Task Validate

            :type aws_username: str
            :param aws_username: Username of the IAM user to be looked up

            :rtype: Result Dictionary of result
    """
    iamClient = handle.client('iam')
    result = iamClient.create_access_key(UserName=aws_username)
    return result


task = Task(Workflow())
task.configure(credentialsJson='''{
    "credential_name": "DevRole",
    "credential_type": "CONNECTOR_TYPE_AWS",
    "credential_id": "0b438eba-0627-4f6d-b998-a4c604f20e3c"
}''')
task.configure(inputParamsJson='''{
    "aws_username": "\\"shloka\\""
    }''')
(err, hdl, args) = task.validate(vars=vars())
if err is None:
    task.execute(aws_create_access_key, lego_printer=aws_create_access_key_printer, hdl=hdl, args=args)

In this lego we will update the status of the old Access Key to "Inactive". This step is required to delete the old access key as one user cannot have 2 Access Keys. Hence, we need the old AWS Access Key ID, status to set and AWS username as inputs.

In [30]:
##
# Copyright (c) 2021 unSkript, Inc
# All rights reserved.
##
from pydantic import BaseModel, Field, SecretStr
import pandas as pd
from typing import List
import pprint


from beartype import beartype
@beartype
def aws_update_access_key_printer(output):
    if output is None:
        return
    pprint.pprint("Access Key status successfully changed")
    pprint.pprint(output)

@beartype
def aws_update_access_key(
    handle,
    aws_username: str,
    aws_access_key_id: str,
    status: str
) -> Dict:
    """aws_check_ssl_certificate_expiry checks the expiry date of an ACM SSL certificate .

                    :type handle: object
                    :param handle: Object returned from Task Validate

                    :type aws_username: str
                    :param aws_username: Username of the IAM user to be looked up

                    :type aws_access_key_id: SecretStr
                    :param aws_access_key_id: Old Access Key ID of the user of which the status needs to be updated

                    :rtype: Result Dictionary of result
    """
    iamClient = handle.client('iam')
    result = iamClient.update_access_key(UserName=aws_username, AccessKeyId=aws_access_key_id, Status=status)
    return result


task = Task(Workflow())
task.configure(credentialsJson='''{
    "credential_name": "DevRole",
    "credential_type": "CONNECTOR_TYPE_AWS",
    "credential_id": "0b438eba-0627-4f6d-b998-a4c604f20e3c"
}''')
task.configure(inputParamsJson='''{
    "aws_access_key_id": "\\"AKIAROZRPJOPLDOHCBKW\\"",
    "aws_username": "\\"shloka\\"",
    "status": "\\"Inactive\\""
    }''')
(err, hdl, args) = task.validate(vars=vars())
if err is None:
    task.execute(aws_update_access_key, lego_printer=aws_update_access_key_printer, hdl=hdl, args=args)

Here we will list all the Access Keys allotted to the given user. The new one should have the status as "Active" and the old one should have the status as "Inactive".

In [31]:
##
# Copyright (c) 2021 unSkript, Inc
# All rights reserved.
##
from pydantic import BaseModel, Field
from typing import Dict
import pprint


from beartype import beartype
@beartype
def aws_list_access_keys(
    handle,
    aws_username: str
) -> List:
    """aws_check_ssl_certificate_expiry checks the expiry date of an ACM SSL certificate .

                :type handle: object
                :param handle: Object returned from Task Validate

                :type aws_username: str
                :param aws_username: Username of the IAM user to be looked up

                :rtype: Result Dictionary of result
        """
    iamClient = handle.client('iam')
    paginator = iamClient.get_paginator('list_access_keys')
    for response in paginator.paginate(UserName= aws_username):
        for key,value in response.items():
            if key == "AccessKeyMetadata":
                return value


def aws_list_access_keys_printer(output):
    if output is None:
        return
    pprint.pprint(output)

task = Task(Workflow())
task.configure(credentialsJson='''{
    "credential_name": "DevRole",
    "credential_type": "CONNECTOR_TYPE_AWS",
    "credential_id": "0b438eba-0627-4f6d-b998-a4c604f20e3c"
}''')
task.configure(inputParamsJson='''{
    "aws_username": "\\"shloka\\""
    }''')
(err, hdl, args) = task.validate(vars=vars())
if err is None:
    task.execute(aws_list_access_keys, lego_printer=aws_list_access_keys_printer, hdl=hdl, args=args)

In this lego we will delete the the old (Inactive) Access Key.

In [32]:
##
# Copyright (c) 2021 unSkript, Inc
# All rights reserved.
##
from pydantic import BaseModel, Field
import pandas as pd
from typing import List
import pprint


from beartype import beartype
@beartype
def aws_delete_access_key_printer(output):
    if output is None:
        return
    pprint.pprint("Access Key successfully deleted")
    pprint.pprint(output)


@beartype
def aws_delete_access_key(
    handle,
    aws_username: str,
    aws_access_key_id: str,
) -> Dict:
    """aws_check_ssl_certificate_expiry checks the expiry date of an ACM SSL certificate .

                    :type handle: object
                    :param handle: Object returned from Task Validate

                    :type aws_username: str
                    :param aws_username: Username of the IAM user to be looked up

                    :type aws_access_key_id: str
                    :param aws_access_key_id: Old Access Key ID of the user which needs to be deleted

                    :rtype: Result Status Dictionary of result
    """
    iamClient = handle.client('iam')
    result = iamClient.delete_access_key(UserName=aws_username, AccessKeyId=aws_access_key_id)
    return result


task = Task(Workflow())
task.configure(credentialsJson='''{
    "credential_name": "DevRole",
    "credential_type": "CONNECTOR_TYPE_AWS",
    "credential_id": "0b438eba-0627-4f6d-b998-a4c604f20e3c"
}''')
task.configure(inputParamsJson='''{
    "aws_access_key_id": "\\"AKIAROZRPJOPLDOHCBKW\\"",
    "aws_username": "\\"shloka\\""
    }''')
(err, hdl, args) = task.validate(vars=vars())
if err is None:
    task.execute(aws_delete_access_key, lego_printer=aws_delete_access_key_printer, hdl=hdl, args=args)

We were able to create a new Access Key for a User by using the Create, Update, Delete and List legos.