In [None]:
# Parameters

In [None]:
# Build the dataset

from typing import Optional
import pandas as pd
import functools


def add_parent_level(df: pd.DataFrame, name: str) -> None:
    df.columns = pd.MultiIndex.from_tuples([(name, x) for x in df.columns])


def calculate_limit(row: pd.Series, attribute: str) -> Optional[float]:
    row_analysis = local_analysis.get(row.name)
    if row_analysis is None:
        return None
    vm_spec = compute_specs.virtual_machine_by_name(row_analysis.advisor_sku)
    return getattr(vm_spec.capabilities, attribute)


def add_limit(df: pd.DataFrame, name: str) -> None:
    df['new_limit'] = df.apply(functools.partial(calculate_limit, attribute=name), axis=1)

drop_utilization = ['samples', 'percentile_50th', 'percentile_80th']
drop_disk_utilization = ['cached', 'counter_name']

res_data = resources.assign(resource_name=resources.resource_id.str.extract(r'([^/]+)$'))
res_data = res_data.drop(columns=['subscription_id', 'storage_profile'])
res_data = res_data.set_index('resource_id')
res_data_col = res_data.columns.to_list()
res_data_col = res_data_col[1:-1] + res_data_col[-1:] + res_data_col[0:1]
res_data = res_data[res_data_col]
add_parent_level(res_data, 'Resource')

if local_analysis:
    local_data = pd.DataFrame([(k, v.advisor_sku, v.advisor_sku_invalid_reason, v.annual_savings_no_ri) for k,v in local_analysis.items()], columns=['resource_id', 'recommendation', 'invalidation', 'annual_savings']).convert_dtypes()
    local_data = local_data.set_index('resource_id')
    add_parent_level(local_data, 'AzMeta')

if advisor_analysis:
    advisor_data = pd.DataFrame([(k, v.advisor_sku, v.advisor_sku_invalid_reason) for k,v in advisor_analysis.items()], dtype='string', columns=['resource_id', 'recommendation', 'invalidation'])
    advisor_data = advisor_data.set_index('resource_id')
    add_parent_level(advisor_data, 'Advisor')

cpu_data = cpu_utilization.drop(columns=drop_utilization).set_index('resource_id')
add_limit(cpu_data, 'd_total_acus')
add_parent_level(cpu_data, 'CPU Used (ACUs)')

mem_data = mem_utilization.drop(columns=drop_utilization).set_index('resource_id')
mem_data = mem_data / 1024.0
add_limit(mem_data, 'memory_gb')
add_parent_level(mem_data, 'Memory Used (GiB)')

disk_tput_cached = disk_utilization[(disk_utilization.cached == True) & (disk_utilization.counter_name == 'Disk Bytes/sec')]
disk_tput_cached = disk_tput_cached.drop(columns=drop_utilization + drop_disk_utilization).set_index('resource_id')
add_limit(disk_tput_cached, 'combined_temp_disk_and_cached_read_bytes_per_second')
disk_tput_cached = disk_tput_cached / (1024.0 ** 2)
add_parent_level(disk_tput_cached, 'Cached Disk Througput (MiB/sec)')

disk_trans_cached = disk_utilization[(disk_utilization.cached == True) & (disk_utilization.counter_name == 'Disk Transfers/sec')]
disk_trans_cached = disk_trans_cached.drop(columns=drop_utilization + drop_disk_utilization).set_index('resource_id')
add_limit(disk_trans_cached, 'combined_temp_disk_and_cached_iops')
add_parent_level(disk_trans_cached, 'Cached Disk Operations (IOPS)')

disk_tput_uncached = disk_utilization[(disk_utilization.cached == False) & (disk_utilization.counter_name == 'Disk Bytes/sec')]
disk_tput_uncached = disk_tput_uncached.drop(columns=drop_utilization + drop_disk_utilization).set_index('resource_id')
add_limit(disk_tput_uncached, 'uncached_disk_bytes_per_second')
disk_tput_uncached = disk_tput_uncached / (1024.0 ** 2)
add_parent_level(disk_tput_uncached, 'Uncached Disk Througput (MiB/sec)')

disk_trans_uncached = disk_utilization[(disk_utilization.cached == False) & (disk_utilization.counter_name == 'Disk Transfers/sec')]
disk_trans_uncached = disk_trans_uncached.drop(columns=drop_utilization + drop_disk_utilization).set_index('resource_id')
add_limit(disk_trans_uncached, 'uncached_disk_iops')
add_parent_level(disk_trans_uncached, 'Uncached Disk Operations (IOPS)')

all_joins = [cpu_data, mem_data, disk_tput_cached, disk_trans_cached, disk_tput_uncached, disk_trans_uncached]
if local_analysis:
    all_joins.insert(0, local_data)
if advisor_analysis:
    all_joins.append(advisor_data)
full_data = res_data.join(all_joins)
full_data.sort_index(inplace=True)
full_data.to_excel('final_out_test.xlsx')

# AzMeta Resize Recommendations

In [None]:
import datetime

print("Report Date:", datetime.datetime.now().isoformat())
print("Total Annual Savings:", "${:,.2f}".format(local_data[('AzMeta', 'annual_savings')].sum()), "(Non-RI Pricing, SQL and Windows AHUB Licensing)")

In [None]:
# Present the dataset
import matplotlib as plt
import itertools
from matplotlib import colors


def background_limit_coloring(row):
    cmap="coolwarm"
    text_color_threshold=0.408
    limit_index = (row.index.get_level_values(0)[0], 'new_limit')
    smin = 0
    smax = row[limit_index] 
    if pd.isna(smax):
        return [''] * len(row)
    
    rng = smax - smin
    norm = colors.Normalize(smin, smax)
    rgbas = plt.cm.get_cmap(cmap)(norm(row.to_numpy(dtype=float)))

    def relative_luminance(rgba):
        r, g, b = (
            x / 12.92 if x <= 0.03928 else ((x + 0.055) / 1.055 ** 2.4)
            for x in rgba[:3]
        )
        return 0.2126 * r + 0.7152 * g + 0.0722 * b

    def css(rgba):
        dark = relative_luminance(rgba) < text_color_threshold
        text_color = "#f1f1f1" if dark else "#000000"
        return f"background-color: {colors.rgb2hex(rgba)};color: {text_color};"
    
    return [css(rgba) for rgba in rgbas[0:-1]] + ['']


def build_header_style(col_groups):
    start = 0
    styles = []
    palette = ['#f6f6f6', '#eae9e9', '#d4d7dd', '#f6f6f6', '#eae9e9', '#d4d7dd', '#f6f6f6', '#eae9e9', '#d4d7dd']
    for i,group in enumerate(itertools.groupby(col_groups, lambda c:c[0])):
        styles.append({'selector': f'.col_heading.level0.col{start}', 'props': [('background-color', palette[i])]})
        group_len = len(tuple(group[1]))
        for j in range(group_len):
            styles.append({'selector': f'.col_heading.level1.col{start + j}', 'props': [('background-color', palette[i])]})
        start += group_len
    return styles


data_group_names = [x for x in full_data.columns.get_level_values(0).unique() if x not in ('Resource', 'AzMeta', 'Advisor')]
num_mask = [x[0] in data_group_names for x in full_data.columns.to_flat_index()]
styler = full_data.style.hide_index() \
    .set_properties(**{'font-weight': 'bold'}, subset=[('Resource', 'resource_name')]) \
    .format('{:.1f}', subset=num_mask, na_rep='N/A') \
    .format('${:.2f}', subset=[('AzMeta', 'annual_savings')], na_rep='N/A') \
    .set_table_styles(build_header_style(full_data.columns))
for data_group in data_group_names:
    mask = [x == data_group for x in full_data.columns.get_level_values(0)]
    styler = styler.apply(background_limit_coloring, axis=1, subset=mask)
styler