In [323]:
from cassandra.cluster import Cluster
import time, requests, json, math
from datetime import datetime
import hashlib

In [275]:
def get_names(port, exclude):
    try:
        result = requests.get(f'http://130.233.193.117:{port}/api/v1/label/__name__/values')
        container = []

        for metric in result.json()['data']:
            if not exclude(metric):
                container.append(metric)

        return container

    except:
        raise(f'COULD NOT PING INSTANCES AT PORT {port}')
        return False

In [276]:
def create_segments(unix_start, unix_end, batch_size):

    container = []
    time_delta = unix_end - unix_start

    for _ in range(math.floor(time_delta / batch_size)):
        container.append(batch_size)
        time_delta -= batch_size

    if time_delta > 0:
        container.append(time_delta)

    last_start = unix_start
    tuples = []

    for window in container:
        tuples.append((last_start, last_start+window))
        last_start += window

    return tuples

In [333]:
def hash_dict(data_dict):
    json_str = json.dumps(data_dict, sort_keys=True)
    return hashlib.sha256(json_str.encode()).hexdigest()

In [277]:
class cass_instance:
    
    # CONSTRUCTOR
    def __init__(self):
        cluster = Cluster([('localhost', 12001)])
        self.session = cluster.connect()

    # DESTRUCTOR
    def __del__(self):
        self.session.shutdown()

    # PERFORM ONE OF MORE CASSANDRA QUERIES
    def query(self, inputs):
        results = []

        # SINGLE QUERY
        if type(inputs) == str:
            res = self.session.execute(inputs)
            results.append(res)

        # MULTIPLE QUERIES
        elif type(inputs) == list:
            for input in inputs:
                res = self.session.execute(input)
                results.append(res)

        # CATCH BAD INPUT
        else:
            print('ERROR: BAD INPUT PARAM')


    # CREATE AND ENTER NEW KEYSPACE
    def namespace(self, namespace_name):
        self.query("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};" % namespace_name)
        self.session.set_keyspace(namespace_name)

In [340]:
def create_snapshot(start_time, end_time, sampling):

    # GENERATE EXPERIMENT ID
    exp_id = 'experiment_' + str(int(time.time()))
    print(exp_id)

    # CREATE CASSANDRA INSTANCE, THEN CREATE THE NEW NAMESPACE
    cass = cass_instance()
    cass.namespace(exp_id)

    # CONVERT DATES TO VALID UNIX TIMESTAMPS
    date_format = '%Y-%m-%d %H:%M:%S'
    formatted_start = int(datetime.strptime(start_time, date_format).timestamp())
    formatted_end = int(datetime.strptime(end_time, date_format).timestamp())

    # PROMETHEUS ONLY ALLOWS QUERIES WITH LESS THAN 11K ROWS
    # SEGMENT LARGE TIMESTAMPS INTO SMALLER PAIRS TO BYPASS THIS LIMITATION
    segments = create_segments(formatted_start, formatted_end, 1000)

    # PROMETHEUS SERVER 1
    #s1_filter = lambda x: x.startswith('prometheus_') or x.startswith('grafana_') or x.startswith('alertmanager_') or x.startswith('kepler_process') or x.startswith('apiserver_request_duration') or x.startswith('apiserver_request_sli')
    s1_filter = lambda x: not x.startswith('kepler_')
    s1_metrics = get_names(9090, s1_filter)

    # PROMETHEUS SERVER 2
    s2_filter = lambda x: not x.startswith('kafka_')
    s2_metrics = get_names(9091, s2_filter)

    for metric in s1_metrics:
        for nth_segment, (t1, t2) in enumerate(segments):

            # MAKE THE API REQUEST
            URL = f'http://130.233.193.117:9090/api/v1/query_range?query={metric}&start={t1}&end={t2}&step={sampling}s'
            results = requests.get(URL).json()['data']['result']

            # ON THE FIRST SEGMENT, CREATE THE TABLE
            if nth_segment == 0:
        
                # EXTRACT KEYS & STRINGIFY THEM
                keys = list(results[0]['metric'].keys())
                keys = [key.replace('__name__', 'metric_name').replace(" ", "_").replace("-", "_").replace(".", "_").lower() for key in keys]
                stringified_keys = ' text, '.join(keys) + ' text'
        
                # CREATE THE TABLE
                tbl_query = 'CREATE TABLE ' + metric + ' (metric_hash text PRIMARY KEY, ' + stringified_keys + ');'
                cass.query(tbl_query)

            # LOOP IN THE VALUES
            for item in results:
                keys = list(item['metric'].keys())
                keys = [key.replace('__name__', 'metric_name').replace(" ", "_").replace("-", "_").replace(".", "_").lower() for key in keys]
                joined_keys = ', '.join(keys)
                
                values = [hash_dict(dict(item['metric']))] + list(item['metric'].values())
                joined_values = ', '.join(values)
        
                row_query = insert_query = f"INSERT INTO {metric} ({joined_keys}) VALUES ({joined_values});"
                cass.query(row_query)

        print(metric)

            #tbl_query = 'CREATE TABLE ' + metric + ' (metric_name text PRIMARY KEY, another text);'
            #cass.query(tbl_query)
    
#    # LOOP THROUGH EACH SEGMENT, COMBINING THEIR QUERY OUTPUT
#    for nth_segment, (t1, t2) in segments:
#
#        # MAKE GET REQUEST
#        URL = f'http://130.233.193.117:{prometheus_port}/api/v1/query_range?query={query}&start={t1}&end={t2}&step={n_steps}s'
#        results = requests.get(URL).json()['data']['result']

In [341]:
#clear && python3 snapshot.py --start "2024-01-17 23:46:25" --end "2024-01-18 07:46:25" --interval 5

In [342]:
create_snapshot(
    start_time="2024-01-17 23:46:25",
    end_time="2024-01-18 07:46:25",
    sampling=5
)

experiment_1705595728


SyntaxException: <Error from server: code=2000 [Syntax error in CQL query] message="line 1:209 mismatched character '1' expecting '-'">

In [304]:
cass = cass_instance()
cass.session.set_keyspace('experiment_1705593815')

In [308]:
res = cass.session.execute("SELECT * FROM kepler_container_joules_total")

In [185]:
keys = ['container', 'container_id', 'container_name', 'container_namespace', 'endpoint', 'instance', 'job', 'namespace', 'pod', 'pod_name', 'service', 'source']

In [187]:
stringified_keys = ' string, '.join(keys) + ' string'
tbl_query = 'CREATE TABLE IF NOT EXISTS your_table (__name__ string PRIMARY KEY, ' + stringified_keys + ')';

In [188]:
tbl_query

'CREATE TABLE IF NOT EXISTS your_table (__name__ string PRIMARY KEY, container string, container_id string, container_name string, container_namespace string, endpoint string, instance string, job string, namespace string, pod string, pod_name string, service string, source string)'

In [184]:
' string, '.join(keys) + ' string'

'container string, container_id string, container_name string, container_namespace string, endpoint string, instance string, job string, namespace string, pod string, pod_name string, service string, source string'

In [None]:
"""
CREATE TABLE IF NOT EXISTS your_table (
    id UUID PRIMARY KEY,
    json_data map<text, text>
);
"""

In [86]:
cass_instance().init_namespace('foobarz')

In [65]:
def cass_query():
    cluster = Cluster([('localhost', 12001)]).connect()
    session = cluster.connect()

    return session

In [70]:
cluster = Cluster([('localhost', 12001)])
session = cluster.connect()

In [73]:
session.shutdown()

In [74]:
session.execute()

<cassandra.cluster.Session at 0x7fb36831f8b0>

In [72]:
print(dir(session))

['__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_graph_paging_available', '_create_response_future', '_default_consistency_level', '_default_serial_consistency_level', '_default_timeout', '_graph_paging_available', '_initial_connect_futures', '_lock', '_maybe_get_execution_profile', '_maybe_set_graph_paging', '_metrics', '_monitor_reporter', '_on_analytics_master_result', '_on_request', '_pools', '_profile_manager', '_protocol_version', '_request_init_callbacks', '_resolve_execution_profile_options', '_row_factory', '_set_keyspace_for_all_pools', '_target_analytics_master', '_transform_params', '_validate_set_legacy_config', 'add_or_renew_pool', 'add_r

In [71]:
session.close()

AttributeError: 'Session' object has no attribute 'close'

In [61]:
keyspace_query = "CREATE KEYSPACE exp_12345 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"
session.execute(keyspace_query)
session.set_keyspace('exp_12345')

In [62]:
session.execute(keyspace_query)

AlreadyExists: Keyspace 'exp_12345' already exists

In [75]:
session.set_keyspace('exp_12345')

NoHostAvailable: ('Unable to complete the operation against any hosts', {<Host: 127.0.0.1:12001 DATACENTER1>: ConnectionException('Pool is shutdown')})