In [123]:
import pandas as pd
import boto3
from datetime import datetime
import subprocess

from IPython.display import display, clear_output

# Cost Predictions on AWS

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ce.html



## Get the Cost Data from AWS and save it a CSV in S3

In [84]:
def get_cost_data_from_aws(start_date, end_date):
    client = boto3.client('ce')
    results = client.get_cost_and_usage(
        TimePeriod={
            'Start': start_date,
            'End': datetime.now().strftime('%Y-%m-%d')
            },
            Granularity='DAILY',
            Metrics=[
                'UnblendedCost',
            ],
            GroupBy=[
                {
                    'Type': 'DIMENSION',
                    'Key': 'SERVICE'
                },
            ]
        )
    return results

def get_cost_data_to_pandas(start_date, end_date):
    results = get_cost_data_from_aws(start_date, end_date)
    
    start_date_list = []
    cost_list = []
    service_list = []

    for item in results['ResultsByTime']:
        for i in item['Groups']:
            start_date_list.append(item['TimePeriod']['Start'])
        service_list = service_list + [i['Keys'][0] for i in item['Groups']]
        cost_list = cost_list + [i['Metrics']['UnblendedCost']['Amount'] for i in item['Groups']]
    df = pd.DataFrame(list(zip(start_date_list, cost_list, service_list)), 
                      columns =['start_date', 'costs', 'service'])
    return df

In [85]:
start_date = '2022-01-01'  ## This is the month that class started
end_date = datetime.now().strftime('%Y-%m-%d')

my_cost_data = get_cost_data_to_pandas(start_date, end_date)

In [86]:
def save_to_s3(df, end_date):
    '''
    Upload the CSV to S3
    '''
    # save the df as a csv
    csv = f'cost_data_to_{end_date}.csv'
    df.to_csv(csv, index=False)
    
    # set up the session
    client = boto3.client('s3')

    # upload the file
    bucket = 'cost-management-robords'
    print(f'getting file from {csv}')
    key = f'by_service/{csv}'
    with open(csv, "rb") as f:
        client.upload_fileobj(f, bucket, key)

    # Remove the file from the local filesystem
    command_to_run_rm = ["rm", csv]
    output_rm = subprocess.check_output(command_to_run_rm).decode("utf-8").strip()
    print(output_rm)

    print('File uploaded to S3')
    
    return bucket, key, csv

In [87]:
bucket, key, csv = save_to_s3(my_cost_data, end_date)

getting file from cost_data_to_2022-11-10.csv

File uploaded to S3


# Forecast

In [97]:
class cost_forecast:
    
    def __init__(self, s3_path, dataset_name = 'Cost_Dataset',
                import_job_name = "Cost_Prediction_Import_Job", dataset_group_name = "Cost_Forecast",
                forecast_horizon = 30, forecast_frequency = "1D", predictor_name = "Cost_Predictor",
                forecast_name = "Cost_Forecast"):
        
        self.s3_path = s3_path
        self.role = boto3.client('iam').get_role(RoleName='ForecastNotebookRole')
        self.role_arn = self.role['Role']['Arn']
        
        self.client = boto3.client('forecast')
        
        self.dataset_name = dataset_name
        self.dataset_arn = self.create_dataset()
        
        while self.check_status('dataset') != 'ACTIVE':
            clear_output(wait=True)
            self.check_status('dataset')
            
        self.import_job_name = import_job_name
        self.dataset_import_job_arn = self.import_dataset()
        
        self.dataset_group_name = dataset_group_name
        self.dataset_group_arn = self.dataset_group()
        
        while self.check_status('import') != 'ACTIVE':
            clear_output(wait=True)
            self.check_status('import')
        
        self.forecast_name = forecast_name
        self.forecast_horizon = forecast_horizon
        self.forecast_frequency = forecast_frequency
        self.predictor_name = predictor_name
        self.predictor_arn = self.train_predictor()
        
        while self.check_status('predictor') != 'ACTIVE':
            clear_output(wait=True)
            self.check_status('predictor')
        
        self.forecast = forecast_name
        self.forecast_arn = self.create_forecast()
        
    
    def create_dataset(self):
        dataset_dicts = self.client.list_datasets()['Datasets']

        if self.dataset_name in [i['DatasetName'] for i in self.client.list_datasets()['Datasets']]:
            print('Dataset Already Exists')
            ts_dataset_arn = ([item for item in dataset_dicts if 
                               item["DatasetName"] == self.dataset_name][0]['DatasetArn'])
        else:
            print('Creating New Dataset')
            schema = {
               "Attributes":[
                  {
                     "AttributeName":"timestamp",
                     "AttributeType":"timestamp"
                  },
                  {
                     "AttributeName":"target_value",
                     "AttributeType":"float"
                  },
                  {
                     "AttributeName":"item_id",
                     "AttributeType":"string"
                  }
               ]
            }

            # check if the dataset is created first:

            create_dataset_response = self.client.create_dataset(
                DatasetName=self.dataset_name,
                Domain='CUSTOM',
                DatasetType='TARGET_TIME_SERIES',
                DataFrequency='1D',
                Schema=schema
            )
            ts_dataset_arn = create_dataset_response['DatasetArn']
            print('Dataset Create Initiated')
        
        return ts_dataset_arn
    
    def import_dataset(self):
        TIMESTAMP_FORMAT = "yyyy-MM-dd"
        TIMEZONE = "UTC"

        import_job = self.client.list_dataset_import_jobs(
            Filters=[
                {
                    'Key': 'DatasetArn',
                    'Value': self.dataset_arn,
                    'Condition': 'IS'
                },
            ]
        )   

        if self.import_job_name in [i['DatasetImportJobName'] for i in import_job['DatasetImportJobs']]:
            print('Already Exists')
            ts_dataset_import_job_arn = [item for item in import_job['DatasetImportJobs'] if item["DatasetImportJobName"] == self.import_job_name][0]['DatasetImportJobArn']
        else:
            ts_dataset_import_job_response = \
                self.client.create_dataset_import_job(DatasetImportJobName=self.import_job_name,
                                                   DatasetArn=self.dataset_arn,
                                                   DataSource= {
                                                     "S3Config" : {
                                                         "Path": self.s3_path,
                                                         "RoleArn": self.role_arn
                                                     } 
                                                   },
                                                   TimestampFormat=TIMESTAMP_FORMAT,
                                                   TimeZone = TIMEZONE)

            ts_dataset_import_job_arn = ts_dataset_import_job_response['DatasetImportJobArn']
            describe_dataset_import_job_response = self.client.describe_dataset_import_job(DatasetImportJobArn=ts_dataset_import_job_arn)

            print(f"Waiting for Dataset Import Job with ARNto become ACTIVE. This process could take 5-10 minutes.")
        return ts_dataset_import_job_arn
    
    def dataset_group(self):
        DATASET_ARNS = [self.dataset_arn]
        
        if self.dataset_group_name in [i['DatasetGroupName'] for i in self.client.list_dataset_groups()['DatasetGroups']]:
            print('Already Exists')
            dataset_group_arn = ([item for item in self.client.list_dataset_groups()['DatasetGroups'] 
                                 if item["DatasetGroupName"] == self.dataset_group_name][0]['DatasetGroupArn'])
        else:
            create_dataset_group_response = \
                self.client.create_dataset_group(Domain="CUSTOM",
                                              DatasetGroupName=self.dataset_group_name,
                                              DatasetArns=DATASET_ARNS)

            dataset_group_arn = create_dataset_group_response['DatasetGroupArn']
            describe_dataset_group_response = self.client.describe_dataset_group(DatasetGroupArn=dataset_group_arn)

            print(f"The DatasetGroup with ARN {dataset_group_arn} is now {describe_dataset_group_response['Status']}.")
    
        return dataset_group_arn
    
    def train_predictor(self):
        
        if self.predictor_name in [i['PredictorName'] for i in self.client.list_predictors()['Predictors']]:
            print('Already Exists')
            predictor_arn = ([item for item in self.client.list_predictors()['Predictors'] 
                                 if item["PredictorName"] == self.predictor_name][0]['PredictorArn'])
        else:
            create_auto_predictor_response = \
                self.client.create_auto_predictor(PredictorName = self.predictor_name,
                                               ForecastHorizon = self.forecast_horizon,
                                               ForecastFrequency = self.forecast_frequency,
                                               DataConfig = {
                                                   'DatasetGroupArn': self.dataset_group_arn
                                                })

            predictor_arn = create_auto_predictor_response['PredictorArn']
            print(f"Waiting for Predictor with ARN to become ACTIVE. Depending on data size and predictor setting，it can take several hours to be ACTIVE")
        return predictor_arn
    
    def create_forecast(self):
        
        if self.forecast_name in [i['ForecastName'] for i in self.client.list_forecasts()['Forecasts']]:
            print('Already Exists')
            forecast_arn = ([item for item in self.client.list_forecasts()['Forecasts'] 
                                 if item["ForecastName"] == self.forecast_name][0]['ForecastArn'])
        else:

            create_forecast_response = \
                self.client.create_forecast(ForecastName=self.forecast_name,
                                         PredictorArn=self.predictor_arn)

            forecast_arn = create_forecast_response['ForecastArn']
            print(f"Waiting for Forecast to become ACTIVE. Depending on data size and predictor settings，it can take several hours to be ACTIVE.")

        return forecast_arn
    
    def check_status(self, describe_type):
        if describe_type == 'dataset':
            describe_dataset_response = self.client.describe_dataset(DatasetArn=self.dataset_arn)
            print(f"The Dataset is now {describe_dataset_response['Status']}.")
            return describe_dataset_response['Status']
        
        elif describe_type == 'import':
            status = self.client.describe_dataset_import_job(DatasetImportJobArn=self.dataset_import_job_arn)
            print(status['DatasetImportJobName'])
            try:
                print(f'Time Remaining in Minutes (estimated): {status["EstimatedTimeRemainingInMinutes"]}')
            except:
                pass
            print(f'Status: {status["Status"]}')
            return status["Status"]
        
        elif describe_type == 'predictor':
            pred_status = self.client.describe_auto_predictor(PredictorArn=self.predictor_arn)
            print(pred_status['PredictorName'])
            try:
                print(f'Time Remaining in Minutes (estimated): {pred_status["EstimatedTimeRemainingInMinutes"]}')
            except:
                pass
            print(f'Status: {pred_status["Status"]}')
            return pred_status["Status"]
        
        elif describe_type == 'forecast':
            forecast_status = self.client.describe_forecast(ForecastArn=self.forecast_arn)
            print(forecast_status['ForecastName'])
            try:
                print(f'Time Remaining in Minutes (estimated): {forecast_status["EstimatedTimeRemainingInMinutes"]}')
            except:
                pass
            print(f'Status: {forecast_status["Status"]}')
            
            return forecast_status["Status"]



In [98]:
s3_path = 's3://'+bucket+'/'+key

In [99]:
cost_forecast_output = cost_forecast(s3_path = s3_path, dataset_name = 'Cost_Datasetv3',
                import_job_name = "Cost_Prediction_Import_Jobv3", dataset_group_name = "Cost_Forecastv3",
                forecast_horizon = 30, forecast_frequency = "1D", predictor_name = "Cost_Predictorv3",
                forecast_name = "Cost_Forecastv3")

Dataset Already Exists
The Dataset is now ACTIVE.
Already Exists
Already Exists
Cost_Prediction_Import_Jobv3
Status: ACTIVE
Already Exists
Cost_Predictorv3
Status: ACTIVE
Waiting for Forecast to become ACTIVE. Depending on data size and predictor settings，it can take several hours to be ACTIVE.


## Get Results

In [100]:
forecastquery = boto3.client(service_name='forecastquery')

In [110]:
role = boto3.client('iam').get_role(RoleName='ForecastNotebookRole')
role_arn = role['Role']['Arn']

In [111]:
client = boto3.client('forecast')

response = client.create_forecast_export_job(
    ForecastExportJobName='CostForecastNovResults',
    ForecastArn=cost_forecast_output.forecast_arn,
    Destination={
        'S3Config': {
            'Path': 's3://cost-management-robords/forecast_results/',
            'RoleArn': role_arn
        }
    },
    Format='CSV'
)

In [113]:
response

{'ForecastExportJobArn': 'arn:aws:forecast:us-east-1:669437599565:forecast-export-job/Cost_Forecastv3/CostForecastNovResults',
 'ResponseMetadata': {'RequestId': '6893f7f9-85e7-48a0-a44b-be7013a976b7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Fri, 11 Nov 2022 22:24:13 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '125',
   'connection': 'keep-alive',
   'x-amzn-requestid': '6893f7f9-85e7-48a0-a44b-be7013a976b7'},
  'RetryAttempts': 0}}

In [128]:
def list_files_in_s3(bucket, path):
    '''
    Get the list of files from S3
    
    If we wanted to just list items in the bucket, we could do the following.  
    However, we can't check the file has content with this
    
    s3 = boto3.resource('s3')
    my_bucket = s3.Bucket(bucket)
    for file in my_bucket.objects.all():
        print(file.key)
    
    '''
    
    s3 = boto3.client('s3')
    
    response = s3.list_objects_v2(
        Bucket=bucket,
        Prefix=path
    )

    most_recent_file_date = max([i['LastModified'] for i in response['Contents']])
    most_recent_file_key = ([i['Key'] for i in response['Contents'] 
                         if i['LastModified'] == most_recent_file_date][0])
    
    return response, most_recent_file_key

In [133]:
def get_file_from_s3(bucket, path):
    
    _, key = list_files_in_s3(bucket, path)
    
    print(key)
    
    s3 = boto3.client('s3')
    
    response = s3.get_object(Bucket=bucket, Key=key)

    df = pd.read_csv(response.get('Body'))

    return df

In [144]:
bucket = 'cost-management-robords'
path = 'by_service'
services = get_file_from_s3(bucket, path)['service']
services = services.unique()
service_list = list(services)
route_list = [f'/costs/{i}' for i in service_list]

pd.DataFrame(list(zip(service_list, route_list)), columns=['Services','Routes'])

by_service/cost_data_to_2022-11-10.csv


Unnamed: 0,Services,Routes
0,AWS CloudTrail,/costs/AWS CloudTrail
1,AWS Glue,/costs/AWS Glue
2,Amazon Elastic File System,/costs/Amazon Elastic File System
3,Amazon GuardDuty,/costs/Amazon GuardDuty
4,Amazon Simple Storage Service,/costs/Amazon Simple Storage Service
5,AmazonCloudWatch,/costs/AmazonCloudWatch
6,AWS Key Management Service,/costs/AWS Key Management Service
7,Amazon Glacier,/costs/Amazon Glacier
8,Amazon Simple Notification Service,/costs/Amazon Simple Notification Service
9,Amazon Simple Queue Service,/costs/Amazon Simple Queue Service


In [146]:
bucket = 'cost-management-robords'
path = 'forecast_results'
get_file_from_s3(bucket, path)

forecast_results/CostForecastNovResults_2022-11-11T22-38-05Z_part0.csv


Unnamed: 0,item_id,date,p10,p50,p90
0,amazon simple notification service,2022-11-10T00:00:00Z,0.0,0.0,0.0
1,amazon simple notification service,2022-11-11T00:00:00Z,0.0,0.0,0.0
2,amazon simple notification service,2022-11-12T00:00:00Z,0.0,0.0,0.0
3,amazon simple notification service,2022-11-13T00:00:00Z,0.0,0.0,0.0
4,amazon simple notification service,2022-11-14T00:00:00Z,0.0,0.0,0.0
...,...,...,...,...,...
175,amazon simple queue service,2022-12-05T00:00:00Z,0.0,0.0,0.0
176,amazon simple queue service,2022-12-06T00:00:00Z,0.0,0.0,0.0
177,amazon simple queue service,2022-12-07T00:00:00Z,0.0,0.0,0.0
178,amazon simple queue service,2022-12-08T00:00:00Z,0.0,0.0,0.0


In [103]:
forecastquery.query_forecast(
    ForecastArn=cost_forecast_output.forecast_arn,
    Filters={"item_id": 'Amazon Simple Storage Service'}
)

{'Forecast': {'Predictions': {'p10': [{'Timestamp': '2022-11-10T00:00:00',
     'Value': 0.008939661126829215},
    {'Timestamp': '2022-11-11T00:00:00', 'Value': 0.009371761048774931},
    {'Timestamp': '2022-11-12T00:00:00', 'Value': 0.011102962392173807},
    {'Timestamp': '2022-11-13T00:00:00', 'Value': 0.008949113410965173},
    {'Timestamp': '2022-11-14T00:00:00', 'Value': 0.007664827647221533},
    {'Timestamp': '2022-11-15T00:00:00', 'Value': 0.007734930256903579},
    {'Timestamp': '2022-11-16T00:00:00', 'Value': 0.007325832305530203},
    {'Timestamp': '2022-11-17T00:00:00', 'Value': 0.007044587888077485},
    {'Timestamp': '2022-11-18T00:00:00', 'Value': 0.0074160634941466096},
    {'Timestamp': '2022-11-19T00:00:00', 'Value': 0.008810191058688056},
    {'Timestamp': '2022-11-20T00:00:00', 'Value': 0.007112682785308439},
    {'Timestamp': '2022-11-21T00:00:00', 'Value': 0.006096280855539102},
    {'Timestamp': '2022-11-22T00:00:00', 'Value': 0.006151641801450916},
    {'Times