In [None]:
import clickhouse_connect
import pandas as pd
import requests
import yaml
import os


VANTAGE_TOKEN = os.getenv("VANTAGE_TOKEN")
CLIKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST")
CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER")
CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD")

VANTAGE_BASE_URL = "https://api.vantage.sh/v2"

VANTAGE_HEADERS = {
    "accept": "application/json",
    "authorization": f"Bearer {VANTAGE_TOKEN}"
}

client = clickhouse_connect.get_client(
    host=CLIKHOUSE_HOST,
    user=CLICKHOUSE_USER,
    password=CLICKHOUSE_PASSWORD,
    secure=True
)


In [None]:
# unique runner types for last 7 days:

query = "select distinct(labels) from workflow_job where completed_at > DATE('2024-09-10')"

runner_types = client.query(query).result_set

# make a dataframe with all unique runner types, knowing that every entry is a list of strings, to flatten the list
runner_types_set = set()
for runner_list in runner_types:
    for runner in runner_list:
        runner_types_set.update(runner)


In [None]:
# load runner map from scale-config.yml
yaml_url = 'https://raw.githubusercontent.com/pytorch/test-infra/main/.github/scale-config.yml'
yaml_file = requests.get(yaml_url)

# read the yaml file from the response
scale_config = yaml.safe_load(yaml_file.text)
runner_map = scale_config['runner_types']

# load second yaml from pytorch/pytorch -> .github/lf-scale-config.yml
yaml_url = 'https://raw.githubusercontent.com/pytorch/pytorch/main/.github/lf-scale-config.yml'
yaml_file = requests.get(yaml_url)
lf_scale_config = yaml.safe_load(yaml_file.text)
runner_map.update(lf_scale_config['runner_types'])

In [None]:
vantage_product_id_cache = {}
def get_vantage_product_id(instance_type):
    if instance_type is None:
        return None
    
    if instance_type in vantage_product_id_cache:
        return vantage_product_id_cache[instance_type]

    response = requests.get(f"{VANTAGE_BASE_URL}/products?provider_id=aws&service_id=aws-ec2&name={instance_type}", headers=VANTAGE_HEADERS)
    json_response = response.json()
    if not 'products' in json_response:
        print(f"Error - couldn't find products in json: {json_response}")
        return None
    for product in json_response['products']:
        if product['name'] == instance_type:
            vantage_product_id_cache[instance_type] = product['id']
            return product['id']
    print(f"Error - couldn't find product with name {instance_type}")
    vantage_product_id_cache[instance_type] = None
    return None

In [None]:
price_cache = {}
def get_vantage_price_ondemand(vantage_id, region='us-east-1', platform='linux'):
    today = pd.Timestamp.today().date()
    if vantage_id is None:
        return None
    price_key = f"{vantage_id}-{region.replace('-','_')}-on_demand-{platform}"
    if f"{today}-{price_key}" in price_cache:
        return price_cache[f"{today}-{price_key}"]
    response = requests.get(f"{VANTAGE_BASE_URL}/products/{vantage_id}/prices", headers=VANTAGE_HEADERS)
    json_response = response.json()
    if not 'prices' in json_response:
        print(f"Error - couldn't find prices in json: {json_response}")
        return None
    for price in json_response['prices']:
        if price['id'] == price_key:
            price_cache[f"{today}-{price_key}"] = price['amount']
            return price['amount']
        # if price['unit'] == 'hour' and price['details']['lifecycle'] == 'on_demand' and price['region'] == region and price['details']['platform'] == platform:
        #     return price['amount']
    print(f"Error - couldn't find price with unit hour in json: {json_response}")
    return None

In [None]:
# now create a dataframe with runner types and instance types
runner_types_df = pd.DataFrame(runner_types_set, columns=['runner_type'])
runner_types_df['instance_type'] = runner_types_df['runner_type'].map(lambda s: runner_map[s]['instance_type'] if s in runner_map else None)
runner_types_df['os']= runner_types_df['runner_type'].map(lambda s: runner_map[s]['os'] if s in runner_map else None)  
runner_types_df['vantage_product_id'] = runner_types_df['instance_type'].map(get_vantage_product_id)
runner_types_df['vantage_price'] = runner_types_df['vantage_product_id'].map(get_vantage_price_ondemand)

In [None]:
print(f"we found {runner_types_df[runner_types_df['vantage_price'].notnull()].shape[0]}/{runner_types_df.shape[0] } prices")

In [None]:
# show only those that don't have None in the vantage_price column, dort by instance type
runner_types_df[runner_types_df['vantage_price'].notnull()].sort_values(by='vantage_price')

In [None]:
# print those without a price
runner_types_df[runner_types_df['vantage_price'].isnull()]

In [None]:
# now use clickhouse to get all entries in the last 7 days for the table workflow_job
DAYS=7
today = pd.Timestamp.today().date()
seven_days_ago = today - pd.Timedelta(days=DAYS)
seven_days_ago_str = seven_days_ago.strftime('%Y-%m-%d')

query = f"select * from materialized_views.workflow_job_by_started_at where started_at > DATE('{seven_days_ago_str}')"

workflow_jobs = client.query(query).result_set

In [None]:
# also get the columns of the table
column_names = client.query("select * from workflow_job limit 1").column_names


In [None]:
workflow_jobs_df = pd.DataFrame(workflow_jobs, columns=column_names)
print(f"Found {workflow_jobs_df.shape[0]} entries in the last {DAYS} days")

In [None]:
# remove the item 'self-hosted' from the list of runner types
workflow_jobs_df['labels'] = workflow_jobs_df['labels'].map(lambda l: [x for x in l if x != 'self-hosted'])

In [None]:
# drop rows where labels is empty
workflow_jobs_df = workflow_jobs_df[workflow_jobs_df['labels'].map(len) > 0]

In [None]:
# add a column 'group' and 'repo' which is the first and second element of the dynamoKey split by '/', and a column branch, which is the first element of the head_branch column split by '/'
workflow_jobs_df['group'] = workflow_jobs_df['dynamoKey'].map(lambda s: s.split('/')[0])
workflow_jobs_df['repo'] = workflow_jobs_df['dynamoKey'].map(lambda s: s.split('/')[1])
workflow_jobs_df['branch'] = workflow_jobs_df['head_branch'].map(lambda s: s.split('/')[0])
# add a column that combines the previous 3:
workflow_jobs_df['group_repo_branch'] = workflow_jobs_df['group'] + '/' + workflow_jobs_df['repo'] + '/' + workflow_jobs_df['branch']


In [None]:
# extend the df with runner_types_df, joining on runner_types_df.runner_type == workflow_jobs_df.labels[0]
workflow_jobs_df = workflow_jobs_df.merge(runner_types_df, left_on=workflow_jobs_df['labels'].map(lambda l: l[0]), right_on='runner_type', how='left')

In [None]:
# now add a column that subtracts the created_at from the completed_at column and gives a duration in hours
workflow_jobs_df['duration'] = (workflow_jobs_df['completed_at'] - workflow_jobs_df['started_at']).dt.total_seconds() / 3600

In [None]:
# now we can calculate the cost of each job
workflow_jobs_df['cost'] = workflow_jobs_df['duration'] * workflow_jobs_df['vantage_price']

In [None]:
# get everything where runner_type starts with lf
workflow_jobs_df_lf = workflow_jobs_df[workflow_jobs_df['runner_type'].map(lambda s: s.startswith('lf'))]

In [None]:
workflow_jobs_df_lf['cost'].sum()

In [None]:
workflow_jobs_df['duration'].sum()

In [None]:
# count how many have a price and how many don't
with_count = workflow_jobs_df['vantage_price'].notnull().value_counts()
print(f"we have {with_count[True]} jobs with a price and {with_count[False]} without")


In [None]:
# count unique head_sha
workflow_jobs_df['run_id'].nunique()


In [None]:
# get all where run_id is 10910999394
# workflow_jobs_df[workflow_jobs_df['run_id'] == 10910999394]

In [None]:
# now make a sum by workflow name, plot in a table, sorted by cost, rounded to 2 decimals and with a $ sign, only keeping those over 100
workflow_jobs_df.groupby('workflow_name')['cost'].sum().reset_index().sort_values(by='cost', ascending=False).round(2).query('cost > 100').style.format({'cost': '${:,.2f}'})

In [None]:
# group by runner_type
workflow_jobs_df.groupby('runner_type')['cost'].sum().reset_index().sort_values(by='cost', ascending=False).round(2).query('cost > 100').style.format({'cost': '${:,.2f}'})

In [None]:
# group by name
workflow_jobs_df.groupby('name')['cost'].sum().reset_index().sort_values(by='cost', ascending=False).round(2).query('cost > 2000').style.format({'cost': '${:,.2f}'})

In [None]:
# group by group_repo_branch
workflow_jobs_df.groupby('group_repo_branch')['cost'].sum().reset_index().sort_values(by='cost', ascending=False).round(2).query('cost > 500').style.format({'cost': '${:,.2f}'})

In [None]:
# sort by longest running jobs
workflow_jobs_df.sort_values(by='duration', ascending=False).head(10)

In [None]:
# unique runner types without a price
workflow_jobs_df[workflow_jobs_df['instance_type'].isnull()]['labels'].map(lambda l: l[0]).unique()

In [None]:
# group by runner_type
workflow_jobs_df.groupby(['runner_type','instance_type'])['duration'].sum().reset_index().sort_values(by='duration', ascending=False).round(2).style.format({'duration': '{:,.2f}h'})

In [None]:
workflow_jobs_df_lf.duration.sum()