# Continuous Ingestor Tool - Test Runbook

## Contents
- [Prerequesites](#Prerequesites)
- [Ensure database](#Ensure-database)
- [Basic Testing](#Basic-Testing)
  - [TEST 0: Regression Parameter](#TEST-0:-Regression-Parameter)
  - [TEST 1: Default Parameter](#TEST-1:-Default-Parameter)
  - [TEST 2: Single Threaded Ingestion](#TEST-2:-Single-Threaded-Ingestion)
  - [TEST 3: Records with High Concurrency](#TEST-3:-Records-with-High-Concurrency)
  - [TEST 4: High Concurrency and Scale Factor](#TEST-4:-High-Concurrency-and-Scale-Factor)
- [DryRun and Include Parameter Testing](#DryRun-and-Include-Parameter-Testing)
  - [TEST 5: DryRun](#TEST-5:-DryRun)
  - [TEST 6: Include Only EU Central](#TEST-6:-Include-Only-EU-Central)
  - [TEST 7: Include Only Athena Microservice](#TEST-7:-Include-Only-Athena-Microservice)
- [Missing Values Testing](#Missing-Values-Testing)
  - [TEST 8: Missing Values](#TEST-8:-Missing-Values)
- [Signal Values Testing](#Signal-Values-Testing)
  - [TEST 9: Sinus Signal Values SNR100](#TEST-9:-Sinus-Signal-Values-SNR100)
  - [TEST 10: Sinus Signal Values SNR20](#TEST-10:-Sinus-Signal-Values-SNR20)
  - [TEST 11: Sinus Signal Values SNR1](#TEST-11:-Sinus-Signal-Values-SNR1)
  - [TEST 12: Saw Signal Values SNR100](#TEST-12:-Saw-Signal-Values-SNR100)
  - [TEST 13: Saw Signal Values SNR10](#TEST-13:-Saw-Signal-Values-SNR10)
- [Visualization of Signals](#Visualization-of-Signals)
- [Cleanup: Drop Tables and Database](#Cleanup:-Drop-Tables-and-Database)

## Prerequesites:
1. Start a Sagemaker Studio / Sagemaker Notebook as outlined:  [Amazon Timestream - Amazon SageMaker](https://docs.aws.amazon.com/timestream/latest/developerguide/Sagemaker.html)

2. Clone the Github Repository: [Amazon Timestream Tools and Samples](https://github.com/awslabs/amazon-timestream-tools.git)

3. Change to the folder ```./tools/continuous-ingestor/```

(Back to [top](#Contents))

In [None]:
import sys

import matplotlib.pyplot as plt
from IPython import display
import pandas as pd
import boto3
import botocore
from botocore.config import Config

sys.path.insert(0, '../../') # add project base folder to path
import integrations.sagemaker.timestreamquery as timestream

#################################################
##### Timestream Configurations.  ###############
#################################################

REGION = "eu-west-1" # <--- specify the region service endpoint
PROFILE = "default" # <--- specify the AWS credentials profile
DB_NAME = "DemoAndTest01" # <--- specify the database created in Amazon Timestream

session = boto3.Session()
read_client = session.client('timestream-query' , region_name = REGION, config = Config())
write_client = session.client('timestream-write', region_name = REGION, config = Config())

print("Read and Write Clients for Amazon Timestream created.")

## Ensure database
The following cell will create a new database with the previously defined database name and table name.
The new table has a 24h retention period for memory and a 5 years retention on magnetic. (Back to [top](#Contents))

In [None]:
def recreateTable(tableName, databaseName):
    try:
        write_client.delete_table(DatabaseName=databaseName, TableName=tableName)
        print("Table <{}> in database <{}> deleted.".format(tableName, databaseName))
    except:
        pass
    write_client.create_table(DatabaseName=databaseName, TableName=tableName,\
                                 RetentionProperties={'MemoryStoreRetentionPeriodInHours': 24,\
                                                      'MagneticStoreRetentionPeriodInDays': 5 * 365})
    print("Table's <{}> in database <{}> (re)created.".format(tableName,databaseName))

# CREATE Database if not exists.
try:
    write_client.create_database(DatabaseName=DB_NAME)
    print("Database <{}> created.".format(DB_NAME))
except:
    print("Database <{}> exists already.".format(DB_NAME))
    pass
    

## Basic Testing
In this section we run the standard parametrized ingestions as described in the README.md (Back to [top](#Contents))

### TEST 0: Regression Parameter
(Back to [top](#Contents))

In [None]:
###################################################################################################
## TEST 0: Start continuous ingestor and generate exactly 1 record per time series with legacy endpoint parameter 
###################################################################################################

TABLE_NAME0 = "DevOpsTest00"
recreateTable(TABLE_NAME0, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py \
  --database-name "$DB_NAME" --table-name "$TABLE_NAME0" --endpoint "$REGION" --autostop 1

ENDPOINT_URL='https://ingest-cell1.timestream.' + REGION + '.amazonaws.com'
%run timestream_sample_continuous_data_ingestor_application.py \
  -d "$DB_NAME" -t "$TABLE_NAME0" -e "$REGION" -url "$ENDPOINT_URL" --autostop 1

query = """DESCRIBE {}.{}""".format(DB_NAME, TABLE_NAME0)
result = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
display.display(result)


### TEST 1: Default Parameter
(Back to [top](#Contents))

In [None]:
###################################################################################################
## TEST 1: Start continuous ingestor and generate exactly 5 records per time series with default parameter 
###################################################################################################

TABLE_NAME1 = "DevOpsTest01"
recreateTable(TABLE_NAME1, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py \
  --database-name "$DB_NAME" --table-name "$TABLE_NAME1" --region "$REGION" --autostop 5

query = """DESCRIBE {}.{}""".format(DB_NAME, TABLE_NAME1)
result = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
display.display(result)


In [None]:
# 20 server metric values for 1010 different machines/dimension sets   =>  20,200 unique metric time series
#  5 process event values from 1212 different processes/dimension sets =>   6,060 unique event time series

# AUTOSTOP after 5 values per time series => (20,200 + 6,060) * 5 = 131,300 values
query = """SELECT count(1) as cnt
             FROM {}.{}""".format(DB_NAME, TABLE_NAME1)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result1)
query = """SELECT measure_name, count(1) as cnt,  process_name, microservice_name, instance_name, silo, region, availability_zone
             FROM {}.{} 
            WHERE 1=1 
              AND process_name is null
            GROUP BY measure_name, process_name, microservice_name, instance_name, silo, region, availability_zone
            ORDER BY cnt, region, availability_zone""".format(DB_NAME, TABLE_NAME1)
result2 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result2)
query = """SELECT measure_name, count(1) as cnt, process_name, microservice_name, instance_name, silo, region, availability_zone
             FROM {}.{} 
            WHERE 1=1  
              AND process_name is not null
            GROUP BY measure_name, process_name, microservice_name, instance_name, silo, region, availability_zone
            ORDER BY cnt, region, availability_zone""".format(DB_NAME, TABLE_NAME1)
result3 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result3)


if result1['cnt'][0] != 131300:
    print('\x1B[91m' + 'TEST 1-1: FAILED: We expect exactly 131300 total values. Actual {}'.format(result1['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 1-1: SUCCESS: We expect exactly 131300 total values.')


if len(result2['cnt']) != 20200:
    print('\x1B[91m' + 'TEST 1-2: FAILED: We expect 20200 metric time series. Actual {}'.format(result2['cnt']))
else:
    print('\x1b[92m' + 'TEST 1-2: SUCCESS: We expect 20200 metric time series.')
    

if len(result3['cnt']) != 6060:
    print('\x1B[91m' + 'TEST 1-3: FAILED: We expect 6060 event time series. Actual {}'.format(result3['cnt']))
else:
    print('\x1b[92m' + 'TEST 1-3: SUCCESS: We expect 6060 event time series.')

print('\x1b[0m' + 'TEST 1: DONE') 


### TEST 2: Single Threaded Ingestion
(Back to [top](#Contents))

In [None]:
###################################################################################################
## TEST 2: Start continuous ingestor and generate exactly 1 record with single threaded parametrization
###################################################################################################
TABLE_NAME2 = "DevOpsTest02"
recreateTable(TABLE_NAME2, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py \
  -c 1 -s 1 -d "$DB_NAME" -t "$TABLE_NAME2" -r "$REGION" --autostop 1


In [None]:
# 20 server metric values for 1010 different machines/dimension sets   =>  20,200 unique metric time series
#  5 process event values from 1212 different processes/dimension sets =>   6,060 unique event time series

# AUTOSTOP after 1 value per time series => (20,200 + 6,060) * 1 = 26,260 values
query = """SELECT count(1) as cnt
             FROM {}.{}""".format(DB_NAME, TABLE_NAME2)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result1)
query = """SELECT measure_name, count(1) as cnt,  process_name, microservice_name, instance_name, silo, region, availability_zone
             FROM {}.{} 
            WHERE 1=1 
              AND process_name is null
            GROUP BY measure_name, process_name, microservice_name, instance_name, silo, region, availability_zone
            ORDER BY cnt, region, availability_zone""".format(DB_NAME, TABLE_NAME2)
result2 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result2)
query = """SELECT measure_name, count(1) as cnt, process_name, microservice_name, instance_name, silo, region, availability_zone
             FROM {}.{} 
            WHERE 1=1  
              AND process_name is not null
            GROUP BY measure_name, process_name, microservice_name, instance_name, silo, region, availability_zone
            ORDER BY cnt, region, availability_zone""".format(DB_NAME, TABLE_NAME2)
result3 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result3)

if result1['cnt'][0] != 26260:
    print('\x1B[91m' + 'TEST 2-1: FAILED: We expect exactly 26260 total values. Actual {}'.format(result1['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 2-1: SUCCESS: We expect exactly 26260 total values.')


if len(result2['cnt']) != 20200:
    print('\x1B[91m' + 'TEST 2-2: FAILED: We expect 20200 metric time series. Actual {}'.format(result2['cnt']))
else:
    print('\x1b[92m' + 'TEST 2-2: SUCCESS: We expect 20200 metric time series.')
    

if len(result3['cnt']) != 6060:
    print('\x1B[91m' + 'TEST 2-3: FAILED: We expect 6060 event time series. Actual {}'.format(result3['cnt']))
else:
    print('\x1b[92m' + 'TEST 2-3: SUCCESS: We expect 6060 event time series.')

print('\x1b[0m' + 'TEST 2: DONE') 


### TEST 3: Records with High Concurrency
(Back to [top](#Contents))

In [None]:
###################################################################################################
## TEST 3: Start continuous ingestor and generate exactly 5 records per time series with high concurrency parameter 
###################################################################################################

TABLE_NAME3 = "DevOpsTest03"
recreateTable(TABLE_NAME3, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py \
  --database-name "$DB_NAME" --table-name "$TABLE_NAME3" --region "$REGION" --concurrency 30 --autostop 5

query = """DESCRIBE {}.{}""".format(DB_NAME, TABLE_NAME3)
result = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
display.display(result)


In [None]:
# 20 server metric values for 1010 different machines/dimension sets   =>  20,200 unique metric time series
#  5 process event values from 1212 different processes/dimension sets =>   6,060 unique event time series

# AUTOSTOP after 5 values per time series => (20,200 + 6,060) * 5 = 131,300 values
query = """SELECT count(1) as cnt
             FROM {}.{}""".format(DB_NAME, TABLE_NAME3)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result1)
query = """SELECT measure_name, count(1) as cnt,  process_name, microservice_name, instance_name, silo, region, availability_zone
             FROM {}.{} 
            WHERE 1=1 
              AND process_name is null
            GROUP BY measure_name, process_name, microservice_name, instance_name, silo, region, availability_zone
            ORDER BY cnt, region, availability_zone""".format(DB_NAME, TABLE_NAME3)
result2 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result2)
query = """SELECT measure_name, count(1) as cnt, process_name, microservice_name, instance_name, silo, region, availability_zone
             FROM {}.{} 
            WHERE 1=1  
              AND process_name is not null
            GROUP BY measure_name, process_name, microservice_name, instance_name, silo, region, availability_zone
            ORDER BY cnt, region, availability_zone""".format(DB_NAME, TABLE_NAME3)
result3 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result3)


if result1['cnt'][0] != 131300:
    print('\x1B[91m' + 'TEST 3-1: FAILED: We expect exactly 131300 total values. Actual {}'.format(result1['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 3-1: SUCCESS: We expect exactly 131300 total values.')


if len(result2['cnt']) != 20200:
    print('\x1B[91m' + 'TEST 3-2: FAILED: We expect 20200 metric time series. Actual {}'.format(result2['cnt']))
else:
    print('\x1b[92m' + 'TEST 3-2: SUCCESS: We expect 20200 metric time series.')
    

if len(result3['cnt']) != 6060:
    print('\x1B[91m' + 'TEST 3-3: FAILED: We expect 6060 event time series. Actual {}'.format(result3['cnt']))
else:
    print('\x1b[92m' + 'TEST 3-3: SUCCESS: We expect 6060 event time series.')

print('\x1b[0m' + 'TEST 3: DONE') 


### TEST 4: High Concurrency and Scale Factor
(Back to [top](#Contents))

In [None]:
###################################################################################################
## TEST 4: Start continuous ingestor and generate exactly 5 records per time series with scale factor and high concurrency parameter 
###################################################################################################

TABLE_NAME4 = "DevOpsTest04"
recreateTable(TABLE_NAME4, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py \
  --database-name "$DB_NAME" --table-name "$TABLE_NAME4" --region "$REGION" --concurrency 30 --host-scale 5 --autostop 3

query = """DESCRIBE {}.{}""".format(DB_NAME, TABLE_NAME4)
result = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
display.display(result)


In [None]:
# 20 server metric values for 1010 * 5 scale factor different machines/dimension sets   =>  101,000 unique metric time series
#  5 process event values from 1212 * 5 scale factor different processes/dimension sets =>   30,300 unique event time series

# AUTOSTOP after 3 values per time series => (101,000 + 30,300) * 3 = 393,900 values
query = """SELECT count(1) as cnt
             FROM {}.{}""".format(DB_NAME, TABLE_NAME4)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result1)
query = """SELECT measure_name, count(1) as cnt,  process_name, microservice_name, instance_name, silo, region, availability_zone
             FROM {}.{} 
            WHERE 1=1 
              AND process_name is null
            GROUP BY measure_name, process_name, microservice_name, instance_name, silo, region, availability_zone
            ORDER BY cnt, region, availability_zone""".format(DB_NAME, TABLE_NAME4)
result2 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result2)
query = """SELECT measure_name, count(1) as cnt, process_name, microservice_name, instance_name, silo, region, availability_zone
             FROM {}.{} 
            WHERE 1=1  
              AND process_name is not null
            GROUP BY measure_name, process_name, microservice_name, instance_name, silo, region, availability_zone
            ORDER BY cnt, region, availability_zone""".format(DB_NAME, TABLE_NAME4)
result3 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result3)


if result1['cnt'][0] != 393900:
    print('\x1B[91m' + 'TEST 4-1: FAILED: We expect exactly 393900 total values. Actual {}'.format(result1['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 4-1: SUCCESS: We expect exactly 393900 total values.')


if len(result2['cnt']) != 101000:
    print('\x1B[91m' + 'TEST 4-2: FAILED: We expect 101000 metric time series. Actual {}'.format(result2['cnt']))
else:
    print('\x1b[92m' + 'TEST 4-2: SUCCESS: We expect 101000 metric time series.')
    

if len(result3['cnt']) != 30300:
    print('\x1B[91m' + 'TEST 4-3: FAILED: We expect 30300 event time series. Actual {}'.format(result3['cnt']))
else:
    print('\x1b[92m' + 'TEST 4-3: SUCCESS: We expect 30300 event time series.')

print('\x1b[0m' + 'TEST 4: DONE') 


## DryRun and Include Parameter Testing
(Back to [top](#Contents))

### TEST 5: DryRun

In [None]:
###################################################################################################
## TEST 5: Run ingestor in dry run to skip writing records actually
###################################################################################################

TABLE_NAME5 = "DevOpsTest05"
recreateTable(TABLE_NAME5, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py \
  --database-name "$DB_NAME" --table-name "$TABLE_NAME5" --region "$REGION" --autostop 5 --dry-run

query = """DESCRIBE {}.{}""".format(DB_NAME, TABLE_NAME5)
result = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
display.display(result)


In [None]:
# AUTOSTOP after 5 iterations, --dry-run is quicker, but no records stored.
query = """SELECT count(1) as cnt
             FROM {}.{}""".format(DB_NAME, TABLE_NAME5)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result1)

if result1['cnt'][0] != 0:
    print('\x1B[91m' + 'TEST 5-1: FAILED: We expect no records.')
else:
    print('\x1b[92m' + 'TEST 5-1: SUCCESS: We expect no records.')

print('\x1b[0m' + 'TEST 5: DONE') 


### TEST 6: Include Only EU Central
(Back to [top](#Contents))


In [None]:
###################################################################################################
## TEST 6: Run ingestor for data in 'eu-central-1' only (1 cell, 1 silo)
###################################################################################################

TABLE_NAME6 = "DevOpsTest06"
recreateTable(TABLE_NAME6, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py --autostop 5\
  --database-name "$DB_NAME" --table-name "$TABLE_NAME6" --region "$REGION" --include-region "eu-central-1"

query = """DESCRIBE {}.{}""".format(DB_NAME, TABLE_NAME6)
result = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
display.display(result)

In [None]:
# 20 server metric values for 10 different machines/dimension sets   =>  200 unique metric time series
#  5 process event values from 12 different processes/dimension sets =>   60 unique event time series

# AUTOSTOP after 5 iterations, 260 * 5 = 1300
query = """SELECT count(1) as cnt
             FROM {}.{}""".format(DB_NAME, TABLE_NAME6)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result1)

query = """SELECT count(distinct region) as cnt
             FROM {}.{}""".format(DB_NAME, TABLE_NAME6)
result2 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result2)

query = """SELECT count(1) as cnt, region
             FROM {}.{}
            GROUP BY region""".format(DB_NAME, TABLE_NAME6)
result3 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
display.display(result3)


if result1['cnt'][0] != 1300:
    print('\x1B[91m' + 'TEST 6-1: FAILED: We expect 1300 records. Actual {}'.format(result1['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 6-1: SUCCESS: We expect 1300 records.')

    
if result2['cnt'][0] != 1:
    print('\x1B[91m' + 'TEST 6-2: FAILED: We expect exactly one region. Actual {}'.format(result2['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 6-2: SUCCESS: We expect exactly one region.')

if result3['region'][0] != 'eu-central-1':
    print('\x1B[91m' + 'TEST 6-3: FAILED: We expect <eu-central-1> as region. Actual {}'.format(result3['region'][0]))
else:
    print('\x1b[92m' + 'TEST 6-3: SUCCESS: We expect <eu-central-1> as region.')

if result3['cnt'][0] != 1300:
    print('\x1B[91m' + 'TEST 6-4: FAILED: We expect 1300 records in <eu-central-1> region. Actual {}'.format(result3['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 6-4: SUCCESS: We expect 1300 records in <eu-central-1> region.')
    
print('\x1b[0m' + 'TEST 6: DONE') 
    

### TEST 7: Include Only Athena Microservice
(Back to [top](#Contents))

In [None]:
###################################################################################################
## TEST 7: Run ingestor for data in 'eu-central-1' only (1 cell, 1 silo) and a limit on microservice 'athena'
###################################################################################################

TABLE_NAME7 = "DevOpsTest07"
recreateTable(TABLE_NAME7, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py --autostop 5\
  --database-name "$DB_NAME" --table-name "$TABLE_NAME7" --region "$REGION" --include-ms "athena"

query = """DESCRIBE {}.{}""".format(DB_NAME, TABLE_NAME7)
result = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
display.display(result)

In [None]:
# 20 server metric values for 101 different machines/dimension sets and 1 service   =>  2020 unique metric time series
#  5 process event values from 202 different processes/dimension sets and 1 service with 2 processes =>   1010 unique event time series

# AUTOSTOP after 5 iterations, 3030 * 5 = 15150
query = """SELECT count(1) as cnt
             FROM {}.{}""".format(DB_NAME, TABLE_NAME7)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result1)

query = """SELECT count(distinct microservice_name) as cnt
             FROM {}.{}""".format(DB_NAME, TABLE_NAME7)
result2 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result2)

query = """SELECT count(1) as cnt, microservice_name
             FROM {}.{}
            GROUP BY microservice_name""".format(DB_NAME, TABLE_NAME7)
result3 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
display.display(result3)


if result1['cnt'][0] != 15150:
    print('\x1B[91m' + 'TEST 7-1: FAILED: We expect 1300 records. Actual {}'.format(result1['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 7-1: SUCCESS: We expect 1300 records.')

    
if result2['cnt'][0] != 1:
    print('\x1B[91m' + 'TEST 7-2: FAILED: We expect exactly one microservice. Actual {}'.format(result2['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 7-2: SUCCESS: We expect exactly one microservice.')

if result3['microservice_name'][0] != 'athena':
    print('\x1B[91m' + 'TEST 7-3: FAILED: We expect <athena> as microservice. Actual {}'.format(result3['microservice_name'][0]))
else:
    print('\x1b[92m' + 'TEST 7-3: SUCCESS: We expect <athena> as microservice.')

if result3['cnt'][0] != 15150:
    print('\x1B[91m' + 'TEST 7-4: FAILED: We expect 15150 records for <athena> microservice. Actual {}'.format(result3['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 7-4: SUCCESS: We expect 15150 records for <athena> microservice.')
    
print('\x1b[0m' + 'TEST 7: DONE') 
    

## Missing Values Testing
### TEST 8: Missing Values
(Back to [top](#Contents))

In [None]:
###################################################################################################
## TEST 8: Run ingestor simulating 'eu-central-1' and 'athena' service with missing values.
###################################################################################################

TABLE_NAME825 = "DevOpsTest0825"
recreateTable(TABLE_NAME825, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py --autostop 25\
  --database-name "$DB_NAME" --table-name "$TABLE_NAME825" --region "$REGION" --include-region "eu-central-1" --include-ms "athena" --missing-cpu 25 --seed 1234

TABLE_NAME875 = "DevOpsTest0875"
recreateTable(TABLE_NAME875, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py --autostop 25\
  --database-name "$DB_NAME" --table-name "$TABLE_NAME875" --region "$REGION" --include-region "eu-central-1" --include-ms "athena" --missing-cpu 75 --seed 1234



In [None]:
query = """SELECT count(1), measure_name
             FROM {}.{}
            WHERE 1=1
              AND microservice_name = 'athena'
              AND measure_name IN ('cpu_user', 'cpu_idle', 'network_bytes_in', 'memory_used')
            GROUP BY measure_name""".format(DB_NAME, TABLE_NAME825)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result1.head(20))


query = """SELECT count(1) as cnt, measure_name
             FROM {0}.{1}
            WHERE 1=1
              AND microservice_name = 'athena'
              AND measure_name IN ('{2}')
            GROUP BY measure_name"""

result2 = timestream.executeQueryAndReturnAsDataframe(read_client, query.format(DB_NAME, TABLE_NAME825, 'memory_used'), True)
#display.display(result2.head(20))

result3 = timestream.executeQueryAndReturnAsDataframe(read_client, query.format(DB_NAME, TABLE_NAME825, 'cpu_user'), True)
#display.display(result3.head(20))

if result2['cnt'][0] != 25:
    print('\x1B[91m' + 'TEST 8-1: FAILED: We expect 25 memory records. Actual {}'.format(result2['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 8-1: SUCCESS: We expect 25 memory records.')

# we can expect the same number of missing cpu signals due to fixed seed.
# 25% missing values of 25 = 18 to 20
if result3['cnt'][0] != 19:
    print('\x1B[91m' + 'TEST 8-2: FAILED: We expect 19 cpu records. Actual {}'.format(result3['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 8-2: SUCCESS: We expect 19 cpu records.')

print('\x1b[0m' + 'TEST 8 (25%): DONE') 


query = """SELECT count(1), measure_name
             FROM {}.{}
            WHERE 1=1
              AND microservice_name = 'athena'
              AND measure_name IN ('cpu_user', 'cpu_idle', 'network_bytes_in', 'memory_used')
            GROUP BY measure_name""".format(DB_NAME, TABLE_NAME875)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)
#display.display(result1.head(20))


query = """SELECT count(1) as cnt, measure_name
             FROM {0}.{1}
            WHERE 1=1
              AND microservice_name = 'athena'
              AND measure_name IN ('{2}')
            GROUP BY measure_name"""

result2 = timestream.executeQueryAndReturnAsDataframe(read_client, query.format(DB_NAME, TABLE_NAME875, 'memory_used'), True)
#display.display(result2.head(20))

result3 = timestream.executeQueryAndReturnAsDataframe(read_client, query.format(DB_NAME, TABLE_NAME875, 'cpu_user'), True)
#display.display(result3.head(20))

if result2['cnt'][0] != 25:
    print('\x1B[91m' + 'TEST 8-3: FAILED: We expect 25 memory records. Actual {}'.format(result2['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 8-3: SUCCESS: We expect 25 memory records.')

# we can expect the same number of missing cpu signals due to fixed seed.
# 75% missing values of 25 = 5 to 7
if result3['cnt'][0] != 7:
    print('\x1B[91m' + 'TEST 8-4: FAILED: We expect 7 cpu records. Actual {}'.format(result3['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 8-4: SUCCESS: We expect 7 cpu records.')

print('\x1b[0m' + 'TEST 8 (75%): DONE') 


    

## Signal Values Testing
### TEST 9: Sinus Signal Values SNR100
(Back to [top](#Contents))

In [None]:
###################################################################################################
## TEST 9: Run ingestor simulating 'eu-central-1' and 'athena' service with sinus signal on CPU (no noise)
###################################################################################################

TABLE_NAME9SNR100 = "DevOpsTest09SNR100"
recreateTable(TABLE_NAME9SNR100, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py --autostop 120  --seed 1234 \
  --database-name "$DB_NAME" --table-name "$TABLE_NAME9SNR100" --region "$REGION" \
  --include-region "eu-central-1" --include-ms "athena" \
  --sin-signal-cpu 100 --sin-frq-cpu m 



In [None]:
query = """WITH offsetted AS (
         SELECT time, 
                measure_value::double value,
                lag(time, 60) OVER (order by time rows 60 preceding) as lagtime,
                lag(measure_value::double, 60) OVER (order by time rows 60 preceding) as lagvalue
           FROM {}.{}
          WHERE 1=1
            AND measure_name = 'cpu_user')
         SELECT count(1) as cnt, round(sum(value), 2) as sum, round(sum(value - lagvalue),2) as dif
           FROM offsetted
          WHERE lagtime is not null""".format(DB_NAME, TABLE_NAME9SNR100)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)


if result1['cnt'][0] != 60:
    print('\x1B[91m' + 'TEST 9-1: FAILED: We expect 60 records with value and lagvalue. Actual {}'.format(result1['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 9-1: SUCCESS: We expect 60 records with value and lagvalue.')

if result1['sum'][0] != 3000:
    print('\x1B[91m' + 'TEST 9-2: FAILED: We expect a sum of 3000. Actual {}'.format(result1['sum'][0]))
else:
    print('\x1b[92m' + 'TEST 9-2: SUCCESS: We expect a sum of 3000')

if result1['dif'][0] != 0:
    print('\x1B[91m' + 'TEST 9-3: FAILED: We expect a difference of 0. Actual {}'.format(result1['dif'][0]))
else:
    print('\x1b[92m' + 'TEST 9-3: SUCCESS: We expect a difference of 0')

print('\x1b[0m' + 'TEST 9: DONE') 

### TEST 10: Sinus Signal Values SNR20
(Back to [top](#Contents))

In [None]:
###################################################################################################
## TEST 10 SNR: Run ingestor simulating 'eu-central-1' and 'athena' service with sinus signal on CPU with SNR=20
## Signal 20x stronger than noise => noise = [-2.5 ... 2.5] for signal = [0 ... 100]
###################################################################################################

TABLE_NAME10SNR20 = "DevOpsTest10SNR20"
recreateTable(TABLE_NAME10SNR20, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py --autostop 120  --seed 1234 \
  --database-name "$DB_NAME" --table-name "$TABLE_NAME10SNR20" --region "$REGION" \
  --include-region "eu-central-1" --include-ms "athena" \
  --sin-signal-cpu 20 --sin-frq-cpu m 


In [None]:
query = """WITH offsetted AS (
         SELECT time, 
                measure_value::double value,
                lag(time, 60) OVER (order by time rows 60 preceding) as lagtime,
                lag(measure_value::double, 60) OVER (order by time rows 60 preceding) as lagvalue
           FROM {}.{}
          WHERE 1=1
            AND measure_name = 'cpu_user')
         SELECT count(1) as cnt, round(sum(value), 2) as sum, round(sum(value - lagvalue),2) as dif
           FROM offsetted
          WHERE lagtime is not null""".format(DB_NAME, TABLE_NAME10SNR20)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)


if result1['cnt'][0] != 60:
    print('\x1B[91m' + 'TEST 10-1: FAILED: We expect 60 records with value and lagvalue. Actual {}'.format(result1['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 10-1: SUCCESS: We expect 60 records with value and lagvalue.')

if (result1['sum'][0] <= 2990):
    print('\x1B[91m' + 'TEST 10-2: FAILED: We expect a sum of > 2990 - noise should averaged out.. Actual {}'.format(result1['sum'][0]))
else:
    print('\x1b[92m' + 'TEST 10-2: SUCCESS: We expect a sum of > 2990 - noise should averaged out.')

if (abs(result1['dif'][0]) >= 10 ):
    print('\x1B[91m' + 'TEST 10-3: FAILED:  We expect a difference of < 10. Actual {}'.format(result1['dif'][0]))
else:
    print('\x1b[92m' + 'TEST 10-3: SUCCESS: We expect a difference of < 10')

print('\x1b[0m' + 'TEST 10: DONE') 

### TEST 11: Sinus Signal Values SNR1
(Back to [top](#Contents))

In [None]:
###################################################################################################
## TEST 11 SNR 1: Run ingestor simulating 'eu-central-1' and 'athena' service with sinus signal on CPU with SNR=1
## Signal equally strong than noise => noise = [-50 ... 50] for signal = [0 ... 100]
###################################################################################################

TABLE_NAME11SNR1 = "DevOpsTest11SNR1"
recreateTable(TABLE_NAME11SNR1, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py --autostop 120  --seed 1234 \
  --database-name "$DB_NAME" --table-name "$TABLE_NAME11SNR1" --region "$REGION" \
  --include-region "eu-central-1" --include-ms "athena" \
  --sin-signal-cpu 1 --sin-frq-cpu m 



In [None]:
query = """WITH offsetted AS (
         SELECT time, 
                measure_value::double value,
                lag(time, 60) OVER (order by time rows 60 preceding) as lagtime,
                lag(measure_value::double, 60) OVER (order by time rows 60 preceding) as lagvalue
           FROM {}.{}
          WHERE 1=1
            AND measure_name = 'cpu_user')
         SELECT count(1) as cnt, round(sum(value), 2) as sum, round(sum(value - lagvalue),2) as dif
           FROM offsetted
          WHERE lagtime is not null""".format(DB_NAME, TABLE_NAME11SNR1)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)


if result1['cnt'][0] != 60:
    print('\x1B[91m' + 'TEST 11-1: FAILED: We expect 60 records with value and lagvalue. Actual {}'.format(result1['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 11-1: SUCCESS: We expect 60 records with value and lagvalue.')

if result1['sum'][0] < 2800:
    print('\x1B[91m' + 'TEST 11-2: FAILED: We expect a sum of 2500 or more. Actual {}'.format(result1['sum'][0]))
else:
    print('\x1b[92m' + 'TEST 11-2: SUCCESS: We expect a sum of 2500 or more')
 
if abs(result1['dif'][0]) >= 200:
    print('\x1B[91m' + 'TEST 11-3: FAILED: We expect a difference of -200 to 200. Actual {}'.format(result1['dif'][0]))
else:
    print('\x1b[92m' + 'TEST 11-3: SUCCESS: We expect a difference of -200 to 200')

print('\x1b[0m' + 'TEST 11: DONE') 

### TEST 12: Saw Signal Values SNR100
(Back to [top](#Contents))

In [None]:
###################################################################################################
## TEST 12: Run ingestor simulating 'eu-central-1' and 'athena' service with saw signal on CPU (no noise)
###################################################################################################

TABLE_NAME12SNR100 = "DevOpsTest12SNR100"
recreateTable(TABLE_NAME12SNR100, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py --autostop 120  --seed 1234 \
  --database-name "$DB_NAME" --table-name "$TABLE_NAME12SNR100" --region "$REGION" \
  --include-region "eu-central-1" --include-ms "athena" \
  --saw-signal-cpu 100 --saw-frq-cpu m 


In [None]:
query = """WITH offsetted AS (
         SELECT time, 
                measure_value::double value,
                lag(time, 60) OVER (order by time rows 60 preceding) as lagtime,
                lag(measure_value::double, 60) OVER (order by time rows 60 preceding) as lagvalue
           FROM {}.{}
          WHERE 1=1
            AND measure_name = 'cpu_user')
         SELECT count(1) as cnt, round(sum(value), 2) as sum, round(sum(value - lagvalue),2) as dif
           FROM offsetted
          WHERE lagtime is not null""".format(DB_NAME, TABLE_NAME12SNR100)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)


if result1['cnt'][0] != 60:
    print('\x1B[91m' + 'TEST 12-1: FAILED: We expect 60 records with value and lagvalue. Actual {}'.format(result1['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 12-1: SUCCESS: We expect 60 records with value and lagvalue.')

if result1['sum'][0] < 2950:
    print('\x1B[91m' + 'TEST 12-2: FAILED: We expect a sum of 2950 or 3000. Actual {}'.format(result1['sum'][0]))
else:
    print('\x1b[92m' + 'TEST 12-2: SUCCESS: We expect a sum of 2950 or 3000')

if result1['dif'][0] != 0:
    print('\x1B[91m' + 'TEST 12-3: FAILED: We expect a difference of 0. Actual {}'.format(result1['dif'][0]))
else:
    print('\x1b[92m' + 'TEST 12-3: SUCCESS: We expect a difference of 0')

print('\x1b[0m' + 'TEST 12: DONE') 

### TEST 13: Saw Signal Values SNR10
(Back to [top](#Contents))

In [None]:
###################################################################################################
## TEST 13: Run ingestor simulating 'eu-central-1' and 'athena' service with saw signal on CPU (no noise)
###################################################################################################

TABLE_NAME13SNR10 = "DevOpsTest13SNR10"
recreateTable(TABLE_NAME13SNR10, DB_NAME)

%run timestream_sample_continuous_data_ingestor_application.py --autostop 120  --seed 1234 \
  --database-name "$DB_NAME" --table-name "$TABLE_NAME13SNR10" --region "$REGION" \
  --include-region "eu-central-1" --include-ms "athena" \
  --saw-signal-cpu 10 --saw-frq-cpu m 


In [None]:
query = """WITH offsetted AS (
         SELECT time, 
                measure_value::double value,
                lag(time, 60) OVER (order by time rows 60 preceding) as lagtime,
                lag(measure_value::double, 60) OVER (order by time rows 60 preceding) as lagvalue
           FROM {}.{}
          WHERE 1=1
            AND measure_name = 'cpu_user')
         SELECT count(1) as cnt, round(sum(value), 2) as sum, round(sum(value - lagvalue),2) as dif
           FROM offsetted
          WHERE lagtime is not null""".format(DB_NAME, TABLE_NAME13SNR10)
result1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)


if result1['cnt'][0] != 60:
    print('\x1B[91m' + 'TEST 13-1: FAILED: We expect 60 records with value and lagvalue. Actual {}'.format(result1['cnt'][0]))
else:
    print('\x1b[92m' + 'TEST 13-1: SUCCESS: We expect 60 records with value and lagvalue.')

if result1['sum'][0] < 2900:
    print('\x1B[91m' + 'TEST 13-2: FAILED: We expect a sum of 2950 or 3000 +/- noise. Actual {}'.format(result1['sum'][0]))
else:
    print('\x1b[92m' + 'TEST 13-2: SUCCESS: We expect a sum of 2950 or 3000')

if result1['dif'][0] > 10:
    print('\x1B[91m' + 'TEST 13-3: FAILED: We expect a difference of less than 10. Actual {}'.format(result1['dif'][0]))
else:
    print('\x1b[92m' + 'TEST 13-3: SUCCESS: We expect a difference of less than 10')

print('\x1b[0m' + 'TEST 13: DONE') 

## Visualization of Signals
(Back to [top](#Contents))

In [None]:
query = """SELECT time, measure_value::double as cpu_user
             FROM {}.{}
            WHERE 1=1
              AND measure_name = 'cpu_user'
            ORDER BY time 
            LIMIT 250""".format(DB_NAME, TABLE_NAME9SNR100)
resultSIN100 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)


query = """SELECT time, measure_value::double as cpu_user
             FROM {}.{}
            WHERE 1=1
              AND measure_name = 'cpu_user'
            ORDER BY time 
            LIMIT 250""".format(DB_NAME, TABLE_NAME10SNR20)
resultSIN20 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)


query = """SELECT time, measure_value::double as cpu_user
             FROM {}.{}
            WHERE 1=1
              AND measure_name = 'cpu_user'
            ORDER BY time 
            LIMIT 250""".format(DB_NAME, TABLE_NAME11SNR1)
resultSIN1 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)


query = """SELECT time, measure_value::double as cpu_user
             FROM {}.{}
            WHERE 1=1
              AND measure_name = 'cpu_user'
            ORDER BY time 
            LIMIT 250""".format(DB_NAME, TABLE_NAME12SNR100)
resultSAW100 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)


query = """SELECT time, measure_value::double as cpu_user
             FROM {}.{}
            WHERE 1=1
              AND measure_name = 'cpu_user'
            ORDER BY time 
            LIMIT 250""".format(DB_NAME, TABLE_NAME13SNR10)
resultSAW10 = timestream.executeQueryAndReturnAsDataframe(read_client, query, True)



plt.rcParams['figure.figsize'] = [20, 15]
fig, ax = plt.subplots(5)

ax[0].title.set_text('CPU User - Sinus SNR 100 - as Recorded')
ax[0].plot(resultSIN100['time'], resultSIN100['cpu_user'], color='darkorange', \
           marker='+', markersize=12, mew=2, linewidth=0.5, alpha=0.5)
ax[0].grid(which ='both', axis ='both', linestyle ='--')

ax[1].title.set_text('CPU User - Sinus SNR 20 - as Recorded')
ax[1].plot(resultSIN20['time'], resultSIN20['cpu_user'], color='darkorange', \
          marker='+', markersize=12, mew=2, linewidth=0.5, alpha=0.5)
ax[1].grid(which ='both', axis ='both', linestyle ='--')

ax[2].title.set_text('CPU User - Sinus SNR 1 - as Recorded')
ax[2].plot(resultSIN1['time'], resultSIN1['cpu_user'], color='darkorange', \
          marker='+', markersize=12, mew=2, linewidth=0.5, alpha=0.5)
ax[2].grid(which ='both', axis ='both', linestyle ='--')


ax[3].title.set_text('CPU User - Saw SNR 100 - as Recorded')
ax[3].plot(resultSAW100['time'], resultSAW100['cpu_user'], color='darkgreen', \
           marker='+', markersize=12, mew=2, linewidth=0.5, alpha=0.5)
ax[3].grid(which ='both', axis ='both', linestyle ='--')

ax[4].title.set_text('CPU User - Saw SNR 10 - as Recorded')
ax[4].plot(resultSAW10['time'], resultSAW10['cpu_user'], color='darkgreen', \
           marker='+', markersize=12, mew=2, linewidth=0.5, alpha=0.5)
ax[4].grid(which ='both', axis ='both', linestyle ='--')
ax[4].set_xticklabels('', visilbe=False)

fig.autofmt_xdate(rotation=60)
fig.set_figwidth(20)

plt.savefig('sinsawsnr.png')

## Cleanup: Drop Tables and Database
(Back to [top](#Contents))

In [None]:
# DROP Database and Tables (Stop if there is a table we dont know.)

CLEANUP = ""
if "DELETE" == CLEANUP:
    try:
        result = write_client.list_tables(DatabaseName=DB_NAME, MaxResults=20)
        to_be_dropped = list()
        for table in result['Tables']:
            tableName = table['TableName']
            if (tableName.startswith("DevOpsTest")):
                print('Going to drop table <' + tableName + '>' )
                to_be_dropped.append(tableName)
            else:
                raise AssertionError("Found table <" + tableName +"> which does not start with <DevOpsTest> - stopping process.")

        for tableToDrop in to_be_dropped:
            write_client.delete_table(DatabaseName=DB_NAME, TableName=tableToDrop)
            print("Table <{}> deleted.".format(tableToDrop))

        write_client.delete_database(DatabaseName=DB_NAME)
        print("Database <{}> deleted.".format(DB_NAME))
    except AssertionError as e:
        raise
    except Exception as e:
        if "ResourceNotFoundException" == e.response['Error']['Code']:
            pass
