In [None]:
# Essential modules/packages that are required
# for the work of Regressional Analysis

import json
import pandas
import requests
import statistics

import matplotlib
import plotly.express as px

import plotly.io as pio
pio.renderers.default='notebook'

In [None]:
# Run few preflight checks to make sure
# the notebook has all the required information 
# to execute the cells without any exceptions.

# The SECRETS_FILE is a JSON_FILE that contains 
# the details of OPENSEARCH_HOST_ADDRESS, USER_NAME, and PASSWORD 
SECRETS_FILE = ''

assert SECRETS_FILE, "variable `SECRETS_FILE` is Null/None. Provide absolute path to your JSON file"

with open(SECRETS_FILE, 'r') as file_data:
    SECRETS = json.load(file_data)
    
assert SECRETS['opensearch_cluster'], "variable `opensearch_cluster` is Null/None"
assert SECRETS['opensearch_user'], "variable `opensearch_user` is Null/None"
assert SECRETS['opensearch_user_password'], "variable `opensearch_user_password` is Null/None"
assert SECRETS['ca_cert_file'], "variable `ca_cert_file` is Null/None. Provide absolute path to `.pem` file"

In [None]:
# Extract RHOAI version numbers from the Opensearch cluster.
INDEX_FIELD = "metadata.settings.rhoai_version.keyword"
NUMBER_OF_RECORDS = 100

def get_version_numbers_of_rhoai():
    '''
    Returns the list of RHOAI Versions that 
    were tested by PSAP team as part of the
    release process
    '''
    
    data_aggregation_query = {
                              "aggs": {
                                "rhoai_versions": {
                                  "terms": {
                                    "field": f"{INDEX_FIELD}",
                                    "size": NUMBER_OF_RECORDS
                                  }
                                }
                              }
                            }
    
    
    response = requests.get(
        url=SECRETS['opensearch_cluster'] + '/psap-rhoai.rhoai-kserve-single/_search?size=0&_source=false',
        auth=(SECRETS['opensearch_user'], SECRETS['opensearch_user_password']),
        verify=SECRETS['ca_cert_file'],
        json=data_aggregation_query
    )
    
    assert response.status_code == 200, f"HTTP GET request failed, status code - {response.status_code}, \
error message - {response.json()}"
    
    return [ _version['key'] for _version in response.json()['aggregations']['rhoai_versions']['buckets'] ]

In [None]:
# List the RHOAI versions that are available in Opensearch cluster
RHOAI_VERSIONS = sorted(get_version_numbers_of_rhoai(), reverse=True)
RHOAI_VERSIONS

In [None]:
# Define current, ignored, preferred_released_versions for regression analysis
CURRENT_VERSION =                               # string
IGNORED_VERSION = []                            # List of strings
PREFERRED_RELEASED_VERSIONS = [ _version for _version in RHOAI_VERSIONS \
                               if _version != CURRENT_VERSION and _version not in IGNORED_VERSION
                              ]

assert CURRENT_VERSION, 'Variable `CURRENT_VERSION` is Null/None.'

In [None]:
# Get the numbers specific to each KPI

def get_llm_load_test_kpi_results(kpi_field):
    '''
    Returns the results/observations of a specific KPI
    '''
    
    data_aggregation_query = {
      "aggs": {
        "users": {
          "terms": {
            "field": "metadata.settings.virtual_users",
            "size": 100
          },
          "aggs": {
            "models": {
              "terms": {
                "field": "metadata.settings.model_name.keyword",
                "size": 100
              },
              "aggs": {
                "rhoai": {
                  "terms": {
                    "field": "metadata.settings.rhoai_version.keyword",
                    "size": 100
                  },
                  "aggs": {
                    "stats": {
                      "extended_stats": {
                        "field": f"{kpi_field}"
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
    
    response = requests.get(
        url=SECRETS['opensearch_cluster'] + '/psap-rhoai.rhoai-kserve-single/_search?size=0&_source=false',
        auth=(SECRETS['opensearch_user'], SECRETS['opensearch_user_password']),
        verify=SECRETS['ca_cert_file'],
        json=data_aggregation_query
    )
    
    
    assert response.status_code == 200, f"HTTP GET request failed, status code - {response.status_code}, \
error message - {response.json()}"
    
    return response.json()

In [None]:
# Filter out current version results

def get_current_version_results(response):
    '''
    Only returns results of the current version
    '''
    
    kpi_stats_avg = dict()
    for user_concurrency in response['aggregations']['users']['buckets']:
        number_of_virtual_users = user_concurrency['key']
        kpi_stats_avg[number_of_virtual_users] = dict()

        for model in user_concurrency['models']['buckets']:
            model_name = model['key']

            for rhoai_version in model['rhoai']['buckets']:
                rhoai_version_number = rhoai_version['key']

                if rhoai_version_number == CURRENT_VERSION:
                    avg_value = rhoai_version['stats']['avg']
                    # persist the information to a dictionary
                    kpi_stats_avg[number_of_virtual_users][model_name] = avg_value
                    break

    return kpi_stats_avg

In [None]:
# Filter out the results of PREFERRED_RELEASED_VERSIONS.

def get_preferred_released_version_results(response):
    '''
    Returns the measurements of the other versions
    that are different from the current version
    '''
    
    kpi_stats_avg = dict()

    for user_concurrency in response['aggregations']['users']['buckets']:
        number_of_virtual_users = user_concurrency['key']
        kpi_stats_avg[number_of_virtual_users] = dict()

        for model in user_concurrency['models']['buckets']:
            model_name = model['key']
            kpi_measurements = list()

            for rhoai_version in model['rhoai']['buckets']:
                rhoai_version_number = rhoai_version['key']

                if rhoai_version_number in PREFERRED_RELEASED_VERSIONS:
                    avg_value = rhoai_version['stats']['avg']
                    kpi_measurements.append(avg_value)
        
            # persist the information into a dictionary
            kpi_stats_avg[number_of_virtual_users][model_name] = kpi_measurements

    return kpi_stats_avg

In [None]:
# Parse the data for the `Line Charts`

def get_data_for_line_charts(response):
    '''
    Returns the measurements in a format
    that is compatible with Line Charts
    '''
    
    line_chart_data = dict()
    
    for user_concurrency in response['aggregations']['users']['buckets']:
        number_of_virtual_users = user_concurrency['key']
        line_chart_data[number_of_virtual_users] = dict()

        for model in user_concurrency['models']['buckets']:
            model_name = model['key']
            line_chart_data[number_of_virtual_users][model_name] = dict()

            for rhoai_version in model['rhoai']['buckets']:
                rhoai_version_number = rhoai_version['key']

                if rhoai_version_number in PREFERRED_RELEASED_VERSIONS or rhoai_version_number == CURRENT_VERSION:
                    avg_value = rhoai_version['stats']['avg']
                    line_chart_data[number_of_virtual_users][model_name][rhoai_version_number] = avg_value

    return line_chart_data

In [None]:
# Calculate sample/population mean

def get_measure_of_center(data):
    '''
    Returns Mean to determine the limits
    '''
    
    return statistics.mean(data)

In [None]:
# Calculate sample/population standard deviation

def get_measure_of_distribution(data):
    '''
    Returns STDDEV to determine the limits
    '''
    
    return statistics.stdev(data)

In [None]:
def get_percentage_change(mean, previous_data):
    '''
    Returns %change w.r.t the average mean
    '''

    previous_data_avg = get_measure_of_center(previous_data)
    return (((mean / previous_data_avg) * 100) - 100)

In [None]:
def get_delta(mean, previous_data):
    '''
    Returns the difference of current_mean and previous_mean
    '''

    previous_data_avg = get_measure_of_center(previous_data)
    return (mean - previous_data_avg)

In [None]:
def get_std_dev_measurements(deviation, mean, previous_data):
    '''
    Returns standard deviation bounds
    '''

    previous_data_avg = get_measure_of_center(previous_data)
    previous_data_stddev = get_measure_of_distribution(previous_data)

    if deviation == 1:
        std_lower_bound = previous_data_avg - (previous_data_stddev * deviation)
        std_upper_bound = previous_data_avg + (previous_data_stddev * deviation)

        number_of_observation = [ item for item in previous_data if item <= std_upper_bound and item >= std_lower_bound ]
        if mean <= std_upper_bound and mean >= std_lower_bound:
            bound_verify = True
        else:
            bound_verify = False

    else:
        pass
        std_lower_bound_1 = previous_data_avg - (previous_data_stddev * (deviation - 1))
        std_lower_bound_2 = previous_data_avg - (previous_data_stddev * deviation)
        std_upper_bound_1 = previous_data_avg + (previous_data_stddev * (deviation - 1))
        std_upper_bound_2 = previous_data_avg + (previous_data_stddev * deviation)

        number_of_observation_below_the_mean = [ item for item in previous_data if item <= std_lower_bound_1 and item >= std_lower_bound_2 ]
        number_of_observation_above_the_mean = [item for item in previous_data if item >= std_upper_bound_1 and item <= std_upper_bound_2 ]

        number_of_observation = number_of_observation_above_the_mean + number_of_observation_below_the_mean

        if mean <= std_lower_bound_1 and mean >= std_lower_bound_2:
            bound_verify = True
        elif mean >= std_upper_bound_1 and mean <= std_upper_bound_2:
            bound_verify = True
        else:
            bound_verify = False

    return ((len(number_of_observation)/len(previous_data)) * 100), bound_verify

In [None]:
def get_data_frame_values(current_data, old_data):
    '''
    Returns data compatible with Pandas DataFrame
    '''
    
    # Lets define schema for the dataFrame
    data_frame_for_regressional_analysis = dict()
    data_frame_for_regressional_analysis['user_concurrency'] = list()
    data_frame_for_regressional_analysis['model_name'] = list()
    data_frame_for_regressional_analysis['present_ver_mean'] = list()
    data_frame_for_regressional_analysis['previous_ver_mean'] = list()
    data_frame_for_regressional_analysis['std_dev'] = list()
    data_frame_for_regressional_analysis['std_dev_1'] = list()
    data_frame_for_regressional_analysis['std_dev_1_bound'] = list()
    data_frame_for_regressional_analysis['std_dev_2'] = list()
    data_frame_for_regressional_analysis['std_dev_2_bound'] = list()
    data_frame_for_regressional_analysis['std_dev_3'] = list()
    data_frame_for_regressional_analysis['std_dev_3_bound'] = list()
    data_frame_for_regressional_analysis['change'] = list()
    data_frame_for_regressional_analysis['delta'] = list()
    
    for user_concurrency, kpi_measurements in current_data.items():
        for model_name, mean_value in kpi_measurements.items():
            data_frame_for_regressional_analysis['user_concurrency'].append(user_concurrency)
            data_frame_for_regressional_analysis['model_name'].append(model_name)
            data_frame_for_regressional_analysis['present_ver_mean'].append(mean_value)
            data_frame_for_regressional_analysis['previous_ver_mean'].append(get_measure_of_center(old_data[user_concurrency][model_name]))
            data_frame_for_regressional_analysis['std_dev'].append(get_measure_of_distribution(old_data[user_concurrency][model_name]))
            dist_1, bound_1 = get_std_dev_measurements(1, mean_value, old_data[user_concurrency][model_name])
            data_frame_for_regressional_analysis['std_dev_1'].append(dist_1)
            data_frame_for_regressional_analysis['std_dev_1_bound'].append(bound_1)
            dist_2, bound_2 = get_std_dev_measurements(2, mean_value, old_data[user_concurrency][model_name])
            data_frame_for_regressional_analysis['std_dev_2'].append(dist_2)
            data_frame_for_regressional_analysis['std_dev_2_bound'].append(bound_2)
            dist_3, bound_3 = get_std_dev_measurements(3, mean_value, old_data[user_concurrency][model_name])
            data_frame_for_regressional_analysis['std_dev_3'].append(dist_3)
            data_frame_for_regressional_analysis['std_dev_3_bound'].append(bound_3)
            data_frame_for_regressional_analysis['change'].append(get_percentage_change(mean_value, old_data[user_concurrency][model_name]))
            data_frame_for_regressional_analysis['delta'].append(get_delta(mean_value, old_data[user_concurrency][model_name]))
            
    return data_frame_for_regressional_analysis

In [None]:
def get_color_property(cellData):
    '''
    Returns the color property for the Table rows
    '''    
    bool_val = cellData.std_dev_1_bound or cellData.std_dev_2_bound or cellData.std_dev_3_bound
    
    if bool_val:
        return ['color: black'] * len(cellData.to_dict())
    else:
        return ['background-color: pink'] * len(cellData.to_dict())

In [None]:
# Declare KPIs(Key Performance Indicators).
# The KPIs declared here are actually the keys 
# that are created by the llm-load-test.

KPIS = {
    'throughput': {
        "field": "kpis.kserve_llm_load_test_throughput.value",
        "help": "Model throughput",
        "unit": "(tokens/s)"
    },
    'itl': {
        "field": "kpis.kserve_llm_load_test_itl.value",
        "help": "All values of inter token latency",
        "unit": "(ms)"
    },
    'ttft': {
        "field": "kpis.kserve_llm_load_test_ttft.value",
        "help": "All values of time to first token",
        "unit": "[ms]"
    },
    'tpot': {
        "field": "kpis.kserve_llm_load_test_tpot.value",
        "help": "All values of time per output token",
        "unit": "[ms/token]"
    }
}

In [None]:
# Declare properties to style the Table
STYLE_TABLE = [
            {
                'selector': 'tr:hover',
                'props': [
                    ('background-color', 'yellow'),
                ]
            },
            {
                'selector': 'th',
                'props': [
                    ('background-color', 'grey'),
                    ('border', '1px solid black'),
                    ('border-collapse', 'collapse')
                ]
            },
            {
                'selector': 'td',
                'props': [
                    ('border', '1px solid black'),
                    ('border-collapse', 'collapse')
                ]
            },
            {
                'selector': 'caption',
                'props': [
                    ('font-size', '1.875em'),
                    ('text-align', 'center')
                ]
            }
        ]

STYLE_FORMAT = {
            "present_ver_mean": "{:.1f}",
            "previous_ver_mean": "{:.1f}",
            "std_dev": "{:.1f}",
            "std_dev_1": "{:.1f} %",
            "std_dev_2": "{:.1f} %",
            "std_dev_3": "{:.1f} %",
            "change": "{:.1f} %",
            "delta": "{:.1f}"
            
        }

In [None]:
# A function that calls all other important functions and 
# returns data that is just compatible enough to provide Table and Line Charts.

def get_kpi_data_frame_values(kpi_field):
    aggregated_results = get_llm_load_test_kpi_results(kpi_field)
    current_version_results = get_current_version_results(aggregated_results)
    previous_version_results = get_preferred_released_version_results(aggregated_results)
    dataFrame = get_data_frame_values(current_version_results, previous_version_results)
    
    return aggregated_results, dataFrame

In [None]:
# Throughput
aggregated_results, kpi_data_frame = get_kpi_data_frame_values(KPIS['throughput']['field'])

# A summary of test results that is compared with other versions displayed in a Line Chart
VIRTUAL_USERS_CONCURRENCY = 32
line_chart_data = get_data_for_line_charts(aggregated_results)
line_chart_df = pandas.DataFrame.from_dict(line_chart_data[VIRTUAL_USERS_CONCURRENCY])
fig = px.line(
    line_chart_df, 
    markers=True,
    title=KPIS['throughput']['help'] + \
        f" at {VIRTUAL_USERS_CONCURRENCY} concurrency - " + \
        KPIS['throughput']['unit'],
    labels={
        "index": "Version(s) of RHOAI",
        "value": f"{KPIS['throughput']['unit']}"
    }
)
fig.show()


# Summary of Regression Analysis displayed in Table Chart
dataFrame = pandas.DataFrame.from_dict(data=kpi_data_frame)    
dataFrame.style.hide_index()\
    .apply(get_color_property, axis=1)\
    .set_table_styles(table_styles=STYLE_TABLE)\
    .format(STYLE_FORMAT)\
    .set_caption(f"{KPIS['throughput']['help'] + ' ' + KPIS['throughput']['unit']}")

In [None]:
# Inter Token Latency (ITL)
aggregated_results, kpi_data_frame = get_kpi_data_frame_values(KPIS['itl']['field'])

# A summary of test results that is compared with other versions and displayed in a Line Chart
VIRTUAL_USERS_CONCURRENCY = 32
line_chart_data = get_data_for_line_charts(aggregated_results)
line_chart_df = pandas.DataFrame.from_dict(line_chart_data[VIRTUAL_USERS_CONCURRENCY])
fig = px.line(
    line_chart_df, 
    markers=True,
    title=KPIS['itl']['help'] + \
        f" at {VIRTUAL_USERS_CONCURRENCY} concurrency - " + \
        KPIS['itl']['unit'],
    labels={
        "index": "Version(s) of RHOAI",
        "value": f"{KPIS['itl']['unit']}"
    }
)
fig.show()

# Summary of Regression Analysis displayed in Table Chart
dataFrame = pandas.DataFrame.from_dict(data=kpi_data_frame)    
dataFrame.style.hide_index()\
    .apply(get_color_property, axis=1)\
    .set_table_styles(table_styles=STYLE_TABLE)\
    .format(STYLE_FORMAT)\
    .set_caption(f"{KPIS['itl']['help'] + ' ' + KPIS['itl']['unit']}")

In [None]:
# Time to First Token (TTFT)
aggregated_results, kpi_data_frame = get_kpi_data_frame_values(KPIS['ttft']['field'])

# A summary of test results that is compared with other versions and displayed in a Line Chart
VIRTUAL_USERS_CONCURRENCY = 32
line_chart_data = get_data_for_line_charts(aggregated_results)
line_chart_df = pandas.DataFrame.from_dict(line_chart_data[VIRTUAL_USERS_CONCURRENCY])
fig = px.line(
    line_chart_df,
    markers=True,
    title=KPIS['ttft']['help'] + \
        f" at {VIRTUAL_USERS_CONCURRENCY} concurrency - " + \
        KPIS['ttft']['unit'],
    labels={
        "index": "Version(s) of RHOAI",
        "value": f"{KPIS['ttft']['unit']}"
    }
)
fig.show()

# Summary of Regression Analysis displayed in Table Chart
dataFrame = pandas.DataFrame.from_dict(data=kpi_data_frame)    
dataFrame.style.hide_index()\
    .apply(get_color_property, axis=1)\
    .set_table_styles(table_styles=STYLE_TABLE)\
    .format(STYLE_FORMAT)\
    .set_caption(f"{KPIS['ttft']['help'] + ' ' + KPIS['ttft']['unit']}")

In [None]:
# Time per Output Token
aggregated_results, kpi_data_frame = get_kpi_data_frame_values(KPIS['tpot']['field'])

# A summary of test results that is compared with other versions and displayed in a Line Chart
VIRTUAL_USERS_CONCURRENCY = 32
line_chart_data = get_data_for_line_charts(aggregated_results)
line_chart_df = pandas.DataFrame.from_dict(line_chart_data[VIRTUAL_USERS_CONCURRENCY])
fig = px.line(
    line_chart_df,
    markers=True,
    title=KPIS['tpot']['help'] + \
        f" at {VIRTUAL_USERS_CONCURRENCY} concurrency - " + \
        KPIS['tpot']['unit'],
    labels={
        "index": "Version(s) of RHOAI",
        "value": f"{KPIS['tpot']['unit']}"
    }
)
fig.show()

# Summary of Regression Analysis displayed in Table Chart
dataFrame = pandas.DataFrame.from_dict(data=kpi_data_frame)    
dataFrame.style.hide_index()\
    .apply(get_color_property, axis=1)\
    .set_table_styles(table_styles=STYLE_TABLE)\
    .format(STYLE_FORMAT)\
    .set_caption(f"{KPIS['tpot']['help'] + ' ' + KPIS['tpot']['unit']}")