In [1]:
import polars as pl
import collections
import altair as alt
from datetime import datetime
import plotly.express as px
alt.data_transformers.disable_max_rows()
ROOT_PATH = 'individual_root_path'
DB_NAME = 'individual_db_name'

The history saving thread hit an unexpected error (OperationalError('attempt to write a readonly database')).History will not be written to the database.


In [2]:
root_path = ROOT_PATH
db_path = DB_NAME
connection_string = 'sqlite://' + root_path + db_path

In [None]:
BASE_TABLE = """(SELECT job_id
, user_id
, queue_name
, group_id
, job_name
, nodes
, node_type
, submit_time
, start_time
, end_time
, requeue
, used_walltime
, ROUND((JULIANDAY(end_time)-JULIANDAY(start_time)) * 86400) AS run_s
, ROUND((JULIANDAY(start_time)-JULIANDAY(submit_time)) * 86400) AS wait_s
FROM total) AS t
"""

In [None]:
BASE_CONDITION = """AND requeue = 0
AND queue_name = 'normal'
AND wait_s > 0
AND start_time > submit_time
AND end_time > start_time
AND user_id IS NOT NULL
AND job_name IS NOT NULL
AND used_walltime < run_s
"""

## Comparison Between JOB and BoT

We aim to determine whether the same job includes both normal jobs and BoT (Bag of Tasks) jobs.  
Through this, we highlight the difficulties in analyzing jobs separately by distinguishing between normal jobs and BoT jobs.  
	•	Total number of jobs: 118,754  
	•	Jobs with BOT_TASKS_COUNT > 0 (a): 8,117  
	•	Jobs with NORMAL_JOB_COUNT > 0 (b): 184,505  
	•	Jobs with both (a) and (b): 4,951  

However, when considering the total computing time (calculated as run_s * node_count):  
	•	BOT_TASKS_COUNT total computing time: 2.8907 × 10¹⁰  
	•	NORMAL_JOB_COUNT total computing time: 5.8115 × 10¹⁰  

Based on wall time:  
	•	BOT_TASKS_COUNT wall time: 9.4485 × 10⁹  
	•	NORMAL_JOB_COUNT wall time: 1.1477 × 10¹⁰  

Regarding the submit time distribution:  
	•	Out of 653,088 submissions, 550,162 (84.2%) were submitted within 1,440 minutes (24 hours).  
	•	477,845 submissions (73.2%) occurred within 60 minutes.  
	•	369,712 submissions (56.6%) occurred within 1 minute.  
	•	392,981 submissions (60.2%) occurred within 2 minutes.  

In [None]:
SUBMIT_DELAY_TIME = 60
query = f"""
WITH base AS (
SELECT 
    t.*
    , json_array_length(nodes) AS node_count
FROM {BASE_TABLE}
WHERE 1=1
{BASE_CONDITION}
), base_with_prev_submit_time AS (
SELECT 
    base.*
    , LAG(base.submit_time) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ASC) AS prev_submit_time
FROM base
), base_with_session_flag AS (
SELECT 
    base_with_prev_submit_time.* 
    , CASE 
        WHEN ROUND((JULIANDAY(submit_time)-JULIANDAY(prev_submit_time)) * 86400) < {SUBMIT_DELAY_TIME} THEN 0 
        ELSE 1 
    END AS is_new_session
FROM base_with_prev_submit_time
), base_with_session_id AS (
SELECT 
    base_with_session_flag.*
    , SUM(is_new_session) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS session_id
FROM base_with_session_flag
), session AS (
SELECT 
    user_id
    , job_name
    , session_id
    , count(1) AS session_length
    , sum(used_walltime) AS total_wall_time
    , sum(node_count) AS node_count_sum
    , sum(used_walltime * node_count) AS total_computing_time
FROM base_with_session_id
GROUP BY 1, 2, 3
)
SELECT 
    user_id
    , job_name
    , SUM(iif(session_length>1, session_length, 0)) AS BOT_TASKS_JOBS_COUNT
    , SUM(iif(session_length>1, 1, 0)) AS BOT_TASKS_COUNT
    , ROUND(SUM(iif(session_length>1, total_wall_time, 0))) AS BOT_TASKS_WALL_TIME
    , ROUND(SUM(iif(session_length>1, total_computing_time, 0))) AS BOT_TASKS_COMPUTING_TIME
    , SUM(iif(session_length=1, 1, 0)) AS NORMAL_JOB_COUNT
    , ROUND(SUM(iif(session_length=1, total_wall_time, 0))) AS NORMAL_JOB_WALL_TIME
    , ROUND(SUM(iif(session_length=1, total_computing_time, 0))) AS NORMAL_JOB_COMPUTING_TIME
    , COUNT(1) AS TOTAL_JOB_COUNT
    , SUM(node_count_sum) AS node_count_sum
FROM session
GROUP BY 1, 2
"""
job_bot_compare = pl.read_database_uri(query, connection_string)

In [None]:
job_bot_compare.filter(pl.col('NORMAL_JOB_WALL_TIME')>0)

In [None]:
# print(job_bot_compare.filter(pl.col('NORMAL_JOB_WALL_TIME')>0).filter(pl.col('BOT_TASKS_WALL_TIME')>0).select(pl.sum('session_length')))
print(job_bot_compare.filter(pl.col('NORMAL_JOB_WALL_TIME')>0).select(pl.sum('NORMAL_JOB_COUNT')))
print(job_bot_compare.filter(pl.col('BOT_TASKS_WALL_TIME')>0).select(pl.sum('BOT_TASKS_COUNT')))
print(job_bot_compare.filter(pl.col('NORMAL_JOB_WALL_TIME')>0).select(pl.sum('node_count_sum')))
print(job_bot_compare.filter(pl.col('BOT_TASKS_WALL_TIME')>0).select(pl.sum('node_count_sum')))
print(len(job_bot_compare.filter(pl.col('NORMAL_JOB_WALL_TIME')>0)))
print(len(job_bot_compare.filter(pl.col('BOT_TASKS_WALL_TIME')>0)))
print(len(job_bot_compare))
print(job_bot_compare.select(pl.sum('NORMAL_JOB_WALL_TIME')))
print(job_bot_compare.select(pl.sum('BOT_TASKS_WALL_TIME')))
print(job_bot_compare.select(pl.sum('NORMAL_JOB_COMPUTING_TIME')))
print(job_bot_compare.select(pl.sum('BOT_TASKS_COMPUTING_TIME')))
# 2.8907e10
# 5.8115e10
# 234587 + 12252 : 120s, 7.60개, 7.16개
# 235349 + 10138 : 60s, 3.92개, 3.97개
# 239186
# 1124123
# 245661


In [None]:
job_bot_compare.filter(pl.col('BOT_TASKS_JOBS_COUNT')==3)

In [None]:
query = f"""
WITH base AS (
SELECT 
    t.*
    , json_array_length(nodes) AS node_count
FROM {BASE_TABLE}
WHERE 1=1
{BASE_CONDITION}
), base_with_prev_submit_time AS (
SELECT 
    base.*
    , LAG(base.submit_time) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ASC) AS prev_submit_time
FROM base
), base_with_time_diff AS (
SELECT 
    base_with_prev_submit_time.* 
    , CAST((JULIANDAY(submit_time)-JULIANDAY(prev_submit_time)) * 1440 * 10 AS int) / 10.0 AS submit_time_diff 
FROM base_with_prev_submit_time
)
SELECT submit_time_diff
FROM base_with_time_diff
WHERE submit_time_diff IS NOT NULL
"""
submit_time_diff = pl.read_database_uri(query, connection_string)

In [None]:
alt.Chart(submit_time_diff.filter(pl.col('submit_time_diff')<60)).transform_window(
    ecdf="cume_dist()",
    sort=[{"field": "submit_time_diff"}],
).mark_line(
    interpolate="step-after"
).encode(
    x="submit_time_diff:Q",
    y="ecdf:Q"
)

In [None]:
submit_time_diff.filter(pl.col('submit_time_diff')<2)

## SESSION_JOB Comparison

Distribution of the number of sessions per user_id and job_name:  
	•	99.4% of sessions fall within 20 sessions or fewer.  
	•	98.5% of sessions fall within 10 sessions or fewer.  
	•	There are also occasional cases where the number of sessions exceeds 112. These correspond to unique (user_id, job_name) pairs, typically associated with jobs that generate log_wrf and log_real logs, occurring periodically.  
	•	Exceptionally, there are outlier cases with 1,853 and 1,224 sessions, which are considered anomalous data points  

Number of jobs per session:  
	•	99.1% of sessions contain 20 jobs or fewer.  
	•	98.4% of sessions contain 10 jobs or fewer.  
	•	Exceptionally, there were cases where the number of jobs per session reached 3,764 and 3,288. These jobs were queries generated by the ad_system.  
	•	Additionally, there are rare cases where a session contains more than 250 jobs; however, at most, only four such sessions were observed.  

⸻

Comparison of session-wise sums:  
	•	Comparison was conducted for wait_s sum, run_s sum, total_s sum, and think_time sum per session.  
	•	It was found that in most sessions, think_time dominates the total_time.  
	•	When think_time is excluded, wait_time and run_time show proportionality.  
	•	The correlation coefficient between total_time and think_time is extremely high, reaching 0.999.  
	•	The correlation between run_s and wait_s is about 0.118, indicating a weak linear relationship.  

Although the Pearson correlation coefficient between run_time and wait_time is low, linear regression analysis shows that the gap until the next session increases as wait_time and run_time increase.  
If we can analyze sessions of jobs in advance, it becomes possible to predict the timing of the next session and prepare in advance accordingly.  

⸻

Comparison of session-wise total_computing_time sums  

Example:  
	•	user_3: “e1165a14” – “case1insul”  

In [None]:
query = f"""
WITH base AS (
SELECT 
    t.*
    , json_array_length(nodes) AS node_count
    , MAX(end_time) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_end_time
FROM {BASE_TABLE}
WHERE 1=1
{BASE_CONDITION}
), base_with_prev_end_time AS (
SELECT 
    base.*
    , LAG(base.max_end_time) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ASC) AS prev_end_time
FROM base
), base_with_session_flag AS (
SELECT 
    base_with_prev_end_time.*
    , CASE 
        WHEN prev_end_time > submit_time THEN 0 
        ELSE 1 
    END AS is_new_session
    , MAX(ROUND((JULIANDAY(start_time)-JULIANDAY(prev_end_time)) * 86400), 0) AS additional_wait_s
FROM base_with_prev_end_time
), base_with_session_id AS (
SELECT 
    base_with_session_flag.*
    , CASE WHEN
        is_new_session == 1 THEN wait_s
        ELSE additional_wait_s
    END AS new_wait_s
    , SUM(is_new_session) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS session_id
FROM base_with_session_flag
), session AS (
SELECT 
    user_id
    , job_name
    , session_id
    , count(1) AS session_length
    , count(1) AS session_length_for_sum
    , SUM(new_wait_s) AS session_total_wait_time
    , sum(used_walltime) * 1.0 AS total_wall_time
    , sum(used_walltime * node_count) * 1.0 AS total_computing_time
    , MIN(submit_time) AS first_submit_time
    , MIN(start_time) AS first_start_time
    , MAX(end_time) AS last_end_time
    , MIN(node_count) AS min_node_count
    , MAX(node_count) AS max_node_count
FROM base_with_session_id
GROUP BY 1, 2, 3
), session_with_rn AS (
SELECT 
    *
    , row_number() over (PARTITION BY user_id, job_name ORDER BY first_submit_time ASC) AS rn
    , ROUND((JULIANDAY(last_end_time)-JULIANDAY(first_submit_time)) * 86400) - session_total_wait_time AS session_total_run_time
FROM session
), session_with_think_time AS (
SELECT 
    t1.*
    , CASE WHEN
        t2.rn IS NULL THEN -1
        ELSE ROUND((JULIANDAY(t2.first_submit_time)-JULIANDAY(t1.last_end_time)) * 86400) 
    END AS session_gap_time
    , CASE WHEN
        t2.rn IS NULL THEN -1 
        ELSE ROUND((JULIANDAY(t2.first_submit_time)-JULIANDAY(t1.first_submit_time)) * 86400)
    END AS session_total_time
FROM session_with_rn AS t1 
    LEFT JOIN session_with_rn AS t2 
    ON t1.rn+1 = t2.rn AND t1.user_id = t2.user_id AND t1.job_name = t2.job_name
)
SELECT
    *
    , session_total_wait_time AS total_wait_time
    , session_total_time AS total_time
    , session_total_run_time AS total_execution_time
    , session_total_wait_time/session_total_time*100 AS wait_for_total
    , session_gap_time/session_total_time*100 AS gap_for_total
    , session_total_run_time/session_total_time*100 AS execution_for_total
FROM session_with_think_time
WHERE 1=1
"""
session_job = pl.read_database_uri(query, connection_string)

In [None]:
session_job

In [None]:
tt = f"""WITH base AS (
SELECT 
    t.*
    , json_array_length(nodes) AS node_count
    , MAX(end_time) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_end_time
FROM {BASE_TABLE}
WHERE 1=1
{BASE_CONDITION}
)
SELECT *
FROM base
"""
data_tt = pl.read_database_uri(tt, connection_string)

In [None]:
count1, count2 = 0, 0
for row in session_job.filter(pl.col('session_gap_time')>-1).rows(named=True):
    count1 += 1
    if row['session_gap_time'] >= (row['total_wait_time'] + row['total_execution_time']) * 10:
        count2 += 1

In [None]:
session_job.filter(pl.col('user_id')=='e1165a14').filter(pl.col('session_length')>1)

In [None]:
unique_set = collections.defaultdict(int)
for row in session_job.filter(pl.col('session_id')>=100).rows(named=True):
    unique_set[(row['user_id'], row['job_name'])] = max(unique_set[(row['user_id'], row['job_name'])], row['session_id'])

In [None]:
# user_id, job_name 별 세션 개수
session_count = session_job.group_by('user_id', 'job_name').len()
# alt.Chart(session_count.filter(pl.col('len')<21)).mark_bar().encode(
#     alt.X("len:N", axis=alt.Axis(title='Session Count')),
#     alt.Y("count()", axis=alt.Axis(title='Unique (User Id, Job Name) Count'))
# )

In [None]:
new_user_job2 = session_job.group_by('session_length').agg(pl.col("session_length_for_sum").sum())
new_user_job2 = new_user_job2.with_columns((pl.col("session_length_for_sum") / 1121803).alias("ratio"))
new_user_job2 = new_user_job2.with_columns(pl.lit("User-Job Pair").alias("Grouping Condition"))

In [None]:
job_count_figure2 = alt.Chart(new_user_job2.filter(pl.col('session_length')<=1000).filter(pl.col('session_length')>=1)).mark_line().properties(
    width=400,
    height=130,
).transform_window(
    sort=[{'field': 'session_length'}],
    cumulative_wheat='sum(ratio)'
).encode(
    alt.X("session_length:Q", axis=alt.Axis(title='Job Count', labelFontSize=13, titleFontSize=13, labelAngle=0, values=[100*i for i in range(11)])),
    alt.Y("cumulative_wheat:Q", axis=alt.Axis(title='Probability', labelFontSize=13, titleFontSize=13)),
    alt.Color("Grouping Condition").legend(orient="top", labelFontSize=13, labelLimit=200, titleFontSize=0
                                 , direction='horizontal', titleOrient='left', labelFontWeight = 'bold')
)

In [None]:
job_count_figure2 + job_count_figure + group_count_figure

In [None]:
(job_count_figure2 + job_count_figure + group_count_figure).save('job_count.pdf', ppi=1600)

In [None]:
new_user_job2.filter(pl.col('session_length')==1)

In [None]:
session_count = session_job.group_by('user_id', 'job_name').len()
session_sum = session_job.group_by('user_id', 'job_name').sum()
joined = session_count.join(session_sum, on=['user_id','job_name'])
distribution_data = []
def add_data_to_dict(session_length, category, proportion, list_v):
    tmp = {}
    tmp['session_count'] = session_length
    tmp['category'] = category
    tmp['proportion'] = proportion
    list_v.append(tmp)


In [None]:
session_lengths = {}
total = 0
for k, v in session_count.group_by('len').count().rows():
    session_lengths[k] = v
    total += v
tmp = 0
for k in sorted(session_lengths.keys()):
    tmp += session_lengths[k]
    add_data_to_dict(k, 'count', session_lengths[k]/total*100, distribution_data)
    if k>20: break
# 20개를 기준으로 총 99.4%의 세션이 속하고,
# 10개를 기준으로는 총 98.5%의 세션이 속한다.

# 예외적으로 1853, 1224 의 세션개수를 가진 데이터가 존재하는데 이러한 데이터는 예외 데이터이다.
# 또한, 세션 개수가 112개 이상인 데이터도 간헐적으로 존재하는데 이 역시 유니크한 user_id, job_name 이 1개인 케이스로 예외라고 볼 수 있다. 
# 100 포함 해서 넘는가 250개 존재


In [None]:
session_lengths2 = collections.defaultdict(int)
total = 0
for values in joined.rows(named=True):
    k, v = values['len'], values['total_wall_time']
    session_lengths2[k] += v
    total += v
tmp = 0
for k in sorted(session_lengths2.keys()):
    tmp += session_lengths2[k]
    add_data_to_dict(k, 'total_wall_time', session_lengths2[k]/total*100, distribution_data)
    add_data_to_dict(k, 'AVG(total_wall_time)', session_lengths2[k]/session_lengths[k]/k, distribution_data)
    if k>20: break
session_lengths3 = collections.defaultdict(int)
total = 0
for values in joined.rows(named=True):
    k, v = values['len'], values['total_computing_time']
    session_lengths3[k] += v
    total += v
tmp = 0
for k in sorted(session_lengths3.keys()):
    tmp += session_lengths3[k]
    add_data_to_dict(k, 'total_computing_time', session_lengths3[k]/total*100, distribution_data)
    add_data_to_dict(k, 'AVG(total_computing_time)', session_lengths3[k]/session_lengths[k]/k, distribution_data)
    if k>20: break

In [None]:
dd = pl.DataFrame(distribution_data)
new_dd = dd.filter(pl.col("category").is_in(["count", "total_computing_time", "total_wall_time"]))

distribution1 = alt.Chart(new_dd).mark_line().properties(
    width=400,
    height=130,
).encode(
    alt.X("session_count:N", axis=alt.Axis(title='Session Count', labelFontSize=13, values=[1,5,10,15,20], titleFontSize=13, labelAngle=0)),
    alt.Y("proportion:Q", axis=alt.Axis(title='Proportion(%)', labelFontSize=13, titleFontSize=13)),
    alt.Color("category", sort=['total_wall_time', 'total_computing_time', 'count']).legend(orient="top", labelFontSize=13, labelLimit=200, titleFontSize=0, labelFontWeight = 'bold'
                                 , direction='horizontal', titleOrient='left', labelExpr="{'count':'Count', 'total_wall_time':'Wall Time', 'total_computing_time':'Total Computing Time'}[datum.label]")
)

In [None]:
count_over_100 = 0
for i in session_lengths:
    if i >= 100:
        count_over_100 += (i*session_lengths[i])
        # print(i,session_lengths[i])

In [None]:
count_over_100

In [None]:
total_wall_time_over_100 = 0
for i in session_lengths2:
    if i >= 100:
        total_wall_time_over_100 += session_lengths2[i]
print('total_wall_time', total_wall_time_over_100/count_over_100)
total_computing_time_over_100 = 0
for i in session_lengths3:
    if i >= 100:
        total_computing_time_over_100 += session_lengths3[i]
print('total_computing_time', total_computing_time_over_100/count_over_100)

In [None]:
for i in distribution_data:
    if i['session_count'] == 1 and 'AVG' in i['category']:
        print(i)

In [None]:
distribution1.save('proportion.pdf', ppi=1600)

In [None]:
ratio_total_wall_time = new_dd.filter(pl.col('category')=='AVG(total_wall_time)').with_columns(
    (pl.col('category')+'_ratio').alias('category2'),
    (pl.col('proportion')/40844.523282).alias('proportion2'),
).drop('proportion','category').rename({'category2':'category','proportion2':'proportion'})

ratio_total_computing_time = new_dd.filter(pl.col('category')=='AVG(total_computing_time)').with_columns(
    (pl.col('category')+'_ratio').alias('category2'),
    (pl.col('proportion')/180571.105693).alias('proportion2'),
).drop('proportion','category').rename({'category2':'category','proportion2':'proportion'})

In [None]:
new_dd2 = pl.concat([ratio_total_wall_time, ratio_total_computing_time])

In [None]:
dd.filter(pl.col('session_count')<=20)

In [None]:
dd = pl.DataFrame(distribution_data)
new_dd = dd.filter(pl.col("category").is_in(["AVG(total_computing_time)", "AVG(total_wall_time)"]))
distribution2 = alt.Chart(new_dd).mark_line().properties(
    width=400,
    height=130
).transform_calculate(
    hour='datum.proportion/60/60/24',
).encode(
    alt.X("session_count:N", axis=alt.Axis(title='Session Count', labelFontSize=13, values=[1,5,10,15,20], titleFontSize=13, labelAngle=0)),
    alt.Y("hour:Q", axis=alt.Axis(title='Time(Day)', labelFontSize=13, titleFontSize=13, labelAngle=-90)),
    alt.Color("category", sort=['total_wall_time', 'total_computing_time']).legend(orient="top", labelFontSize=13, labelLimit=200, titleFontSize=0, labelFontWeight = 'bold'
                                 , direction='horizontal', titleOrient='left', labelExpr="{'count':'count', 'AVG(total_wall_time)':'Wall Time', 'AVG(total_computing_time)':'Total Computing Time', 'AVG(total_computing_time)_ratio':'Total Computing Time(Ratio)'}[datum.label]")
)

In [None]:
distribution2

In [None]:
distribution2.save('average_for_session_count.pdf', ppi=1600)

### Statistical Analysis

In [None]:
query = f"""
WITH base AS (
SELECT 
    t.*
    , json_array_length(nodes) AS node_count
    , MAX(end_time) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_end_time
FROM {BASE_TABLE}
WHERE 1=1
{BASE_CONDITION}
), base_with_prev_end_time AS (
SELECT 
    base.*
    , LAG(base.max_end_time) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ASC) AS prev_end_time
FROM base
), base_with_session_flag AS (
SELECT 
    base_with_prev_end_time.*
    , CASE 
        WHEN prev_end_time > submit_time THEN 0 
        ELSE 1 
    END AS is_new_session
    , MAX(ROUND((JULIANDAY(start_time)-JULIANDAY(prev_end_time)) * 86400), 0) AS additional_wait_s
FROM base_with_prev_end_time
), base_with_session_id AS (
SELECT 
    base_with_session_flag.*
    , CASE WHEN
        is_new_session == 1 THEN wait_s
        ELSE additional_wait_s
    END AS new_wait_s
    , SUM(is_new_session) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS session_id
FROM base_with_session_flag
), session AS (
SELECT 
    user_id
    , job_name
    , session_id
    , count(1) AS session_length
    , count(1) AS session_length_for_sum
    , SUM(new_wait_s) AS session_total_wait_time
    , sum(used_walltime) * 1.0 AS total_wall_time
    , sum(used_walltime * node_count) * 1.0 AS total_computing_time
    , MIN(submit_time) AS first_submit_time
    , MIN(start_time) AS first_start_time
    , MAX(end_time) AS last_end_time
    , MIN(node_count) AS min_node_count
    , MAX(node_count) AS max_node_count
FROM base_with_session_id
GROUP BY 1, 2, 3
), session_with_rn AS (
SELECT 
    *
    , row_number() over (PARTITION BY user_id, job_name ORDER BY first_submit_time ASC) AS rn
    , ROUND((JULIANDAY(last_end_time)-JULIANDAY(first_submit_time)) * 86400) - session_total_wait_time AS session_total_run_time
FROM session
), session_with_think_time AS (
SELECT 
    t1.*
    , CASE WHEN
        t2.rn IS NULL THEN -1
        ELSE ROUND((JULIANDAY(t2.first_submit_time)-JULIANDAY(t1.last_end_time)) * 86400) 
    END AS session_gap_time
    , CASE WHEN
        t2.rn IS NULL THEN -1 
        ELSE ROUND((JULIANDAY(t2.first_submit_time)-JULIANDAY(t1.first_submit_time)) * 86400)
    END AS session_total_time
FROM session_with_rn AS t1 
    LEFT JOIN session_with_rn AS t2 
    ON t1.rn+1 = t2.rn AND t1.user_id = t2.user_id AND t1.job_name = t2.job_name
)
SELECT
    user_id
    , job_name
    , MAX(session_id) AS session_count
    , SUM(total_wall_time) AS total_wall_time
    , SUM(total_computing_time) AS total_computing_time
    , SUM(total_wall_time) / SUM(session_length) AS avg_total_wall_time
    , SUM(total_computing_time) / SUM(session_length) AS avg_total_computing_time
FROM session_with_think_time
WHERE 1=1
GROUP BY 1, 2
"""
stats = pl.read_database_uri(query, connection_string)

In [None]:
f_oneway(stats.filter(pl.col('session_count')==1).select('avg_total_wall_time'), stats.filter(pl.col('session_count')==2).select('avg_total_wall_time'))

In [None]:
job_lengths = {}
total = 0
for k, v in session_job.group_by('session_length').count().rows():
    job_lengths[k] = v
    total += v
tmp = 0
for k in sorted(job_lengths.keys()):
    tmp += job_lengths[k]
    print(k, job_lengths[k], tmp, tmp/total*100)

# 20개를 기준으로 총 99.1%의 세션이 속하고,
# 10개를 기준으로는 총 98.4%의 세션이 속한다.
# 예외적으로 3764, 3288 의 Job 개수를 가진 데이터가 존재하였다.
# 또한, Job의 개수가 250개 이상인 데이터도 간헐적으로 존재하는데 이러한 케이스는 많아야 4개의 세션이 존재하였다.

In [None]:
alt.Chart(session_job).mark_circle().encode(
    alt.X(alt.repeat("column"), type='quantitative').scale(type="log"),
    alt.Y(alt.repeat("row"), type='quantitative'),
    color='Origin:N'
).properties(
    width=150,
    height=150
).repeat(
    row=['session_total_time', 'session_total_run_time', 'session_total_wait_time', 'session_think_time'],
    column=['session_think_time', 'session_total_wait_time', 'session_total_run_time', 'session_total_time']
).interactive()



### Distribution of wait, run, think, total 

In [None]:
session_job.select('session_total_run_time', 'session_total_wait_time').describe()

In [None]:
session_job.filter(pl.col('session_total_time')>0).select('session_total_time', 'session_gap_time').describe()

In [None]:
start = session_job.select('total_wait_time').with_columns(pl.lit('TWT').alias("time_column")).rename({'total_wait_time':'value'})
name_maps = {
    'total_execution_time':'TET',
    'session_gap_time':'SGT',
    'total_time':'TT'
}
for time_column in ['total_execution_time'
                    , 'session_gap_time'
                    , 'total_time'
                   ]:
    new_df = session_job.select(time_column).filter(pl.col(time_column)>0).with_columns(pl.lit(name_maps[time_column]).alias("time_column")).rename({time_column:'value'})
    start = pl.concat([start, new_df])

In [None]:
base1 = alt.LayerChart(data=start).transform_aggregate(
    min="min(value)",
    max="max(value)",
    mean="mean(value)",
    median="median(value)",
    q1="q1(value)",
    q3="q3(value)",
    groupby=["time_column"]
).encode(
    alt.Y("time_column:N", axis=alt.Axis(title='Time Metric', labelFontSize=12, titleFontSize=12), sort=["TWT", "TET", "SGT", "TT"]),
    alt.X("value:Q", axis=alt.Axis(title='Time(Seconds)', labelFontSize=12, titleFontSize=12, values=[1,10,100,1000,10000,100000,1000000,10000000])).scale(zero=False, type='log'),
).add_layers(
    alt.Chart().mark_rule().encode(x='min:Q', x2='max:Q'),
    alt.Chart().mark_bar(width=15).encode(alt.Color("time_column:N").legend(None), x='q1:Q', x2='q3:Q'),
    alt.Chart().mark_tick(color='white', width=20).encode(x='median:Q'),
    alt.Chart().mark_circle(color='black').encode(x='mean:Q')
).configure(
    numberFormat='.2s'
)

In [None]:
base1

In [None]:
base1.save('time_distribution.pdf', ppi=1600)

In [None]:
# total_wait_time >= 1M seconds
# session_job.filter(pl.col('total_wait_time')>1_000_000).filter(pl.col('session_length')>1).filter(pl.col('total_wall_time')<100000).sum()
for row in session_job.filter(pl.col('total_wait_time')>1_000_000).filter(pl.col('session_length')>1).rows(named=True):
    print(row['job_name'])

In [None]:
# total_execution_time >= 1M seconds
cnt = 0
for row in session_job.filter(pl.col('total_execution_time')>1000000).sort('session_length', descending=True).rows(named=True):
    # print(row['user_id'], row['job_name'], row['session_length'])
    # if row['session_length']<100: break
    if row['session_length']<100:
        cnt += row['session_length']
print(cnt)

In [None]:
# session_gap_time >= 10M seconds
cnt = 0
for row in session_job.filter(pl.col('session_gap_time')>10_000_000).sort('session_length', descending=True).rows(named=True):
    # print(row['user_id'], row['job_name'], row['session_length'])
    # if cnt == 100:
    #     break
    cnt += row['session_length']
print(cnt)

In [None]:
times = ['total_wait_time'
         , 'total_execution_time'
         , 'session_gap_time'
         , 'total_time'
         # , 'total_wall_time'
         # , 'total_computing_time'
        ]
name_maps = {
    'total_wait_time':'TWT',
    'total_execution_time':'TET',
    'session_gap_time':'SGT',
    'total_time':'TT'
}
def get_corr(df, time1, time2):
    df = df.filter(pl.col(time1)>0).filter(pl.col(time2)>0)
    corr = df.select(pl.corr(time1,time2)).row(index=0)[0]
    return corr

values = {
    'time1': [],
    'time2': [],
    'corr':[]
}
for t1 in range(len(times)):
    for t2 in range(len(times)):
        time1 = times[t1]
        time2 = times[t2]
        values['time1'].append(name_maps[time1])
        values['time2'].append(name_maps[time2])
        values['corr'].append(get_corr(session_job, time1, time2))
corr_df = pl.DataFrame(values)

In [None]:
time_distribution_base2 = alt.Chart(corr_df).mark_rect().encode(
    alt.X("time1:N", axis=alt.Axis(title='Times', labelFontSize=13, titleFontSize=0), sort=["TWT", "TET", "SGT", "TT"]),
    alt.Y("time2:N", axis=alt.Axis(title='Times', labelFontSize=13, titleFontSize=0), sort=["TWT", "TET", "SGT", "TT"]),
    alt.Color('corr:Q', bandPosition=0.5).legend(orient="right", labelFontSize=13, labelLimit=200, titleFontSize=13)
).properties(
title = "Entire Dataset"
)

In [None]:
time_distribution_base2

### Clustering

In [None]:
from sklearn.cluster import KMeans, DBSCAN
import matplotlib.pyplot as plt

In [None]:
# elbow
sse = {}
for k in range(1, 10):
    kmeans = KMeans(n_clusters=k, init='k-means++', max_iter=1000, random_state=0).fit(session_job_kmeans.select('wait_for_total', 'run_for_total', 'think_for_total'))
    sse[k] = kmeans.inertia_ # Inertia: Sum of distances of samples to their closest cluster center
plt.figure()
plt.plot(list(sse.keys()), list(sse.values()))
plt.xlabel("Number of cluster")
plt.ylabel("SSE")
plt.show()

In [None]:
session_job_kmeans = session_job.filter(pl.col('session_total_time')>0)
kmeans = KMeans(n_clusters=3, init='k-means++', max_iter=300, random_state=0)
kmeans.fit(session_job_kmeans.select('wait_for_total', 'execution_for_total', 'gap_for_total'))

In [None]:
session_with_label = session_job_kmeans.with_columns(pl.Series(name="label", values=kmeans.labels_))

In [None]:
box_plots = []
for i in range(3):
    session_with_label_filter = session_with_label.filter(pl.col('label')==i)
    start = session_with_label_filter.select('total_wait_time').with_columns(pl.lit('0_total_wait_time').alias("time_column")).rename({'total_wait_time':'value'})
    order = 1
    for time_column in ['total_execution_time'
                        , 'session_gap_time'
                        , 'total_time'
                        # , 'total_wall_time'
                        # , 'total_computing_time'
                       ]:
        new_df = session_with_label_filter.select(time_column).filter(pl.col(time_column)>0).with_columns(pl.lit(f'{order}_{time_column}').alias("time_column")).rename({time_column:'value'})
        start = pl.concat([start, new_df])
        order += 1
    base1 = alt.Chart(start).mark_boxplot(extent="min-max").encode(
    alt.X("time_column:N", axis=alt.Axis(title='Times')),
    alt.Y("value:Q", axis=alt.Axis(title='Value Distribution')).scale(zero=False, type='log'),
    alt.Color("time_column:N").legend(None),
    ).properties(
    height=200
)
    box_plots.append(base1)

In [None]:
session_with_label.filter(pl.col('label')==1)

In [None]:
value_with_label = {}
for i in range(3):
    value_with_label[i] = {
    'time1': [],
    'time2': [],
    'corr':[]
    }
    session_with_label_filter = session_with_label.filter(pl.col('label')==i)
    for t1 in range(len(times)):
        for t2 in range(len(times)):
            time1 = times[t1]
            time2 = times[t2]
            value_with_label[i]['time1'].append(name_maps[time1])
            value_with_label[i]['time2'].append(name_maps[time2])
            value_with_label[i]['corr'].append(get_corr(session_with_label_filter, time1, time2))

In [None]:
corrs = []
titles = ["High Wait", "High Gap", "High Execution"]
for i in range(3):
    corr_df = pl.DataFrame(value_with_label[i])
    base = alt.Chart(corr_df).mark_rect().encode(
    alt.X("time1:N", axis=alt.Axis(title='Times', labelFontSize=13, titleFontSize=0), sort=["TWT", "TET", "SGT", "TT"]),
    alt.Y("time2:N", axis=alt.Axis(title='Times', labelFontSize=13, titleFontSize=0), sort=["TWT", "TET", "SGT", "TT"]),
    alt.Color('corr:Q')
    ).properties(
    title = titles[i]
    )
    corrs.append(base)

In [None]:
index = 0
for i in range(len(value_with_label[index]['time1'])):
    print(value_with_label[index]['time1'][i], value_with_label[index]['time2'][i], value_with_label[index]['corr'][i])

In [None]:
(time_distribution_base2 | corrs[0] | corrs[2]).save("correlations.pdf", ppi=1600)

In [None]:
# corrs[0] | corrs[2]
for i in range(3):
    (box_plots[i] | corrs[i]).show()

In [None]:
fig = px.scatter_ternary(session_job_kmeans, 
                         a="wait_for_total", 
                         b="execution_for_total",
                         c="gap_for_total", 
                         opacity=0.01, 
                         color=kmeans.labels_,
                        title="Session Clustering over the Time Ratio")
fig.update(layout_coloraxis_showscale=False)
# fig.write_image('clustering.png')

In [None]:
session_job_kmeans = session_job_kmeans.with_row_index()

In [None]:
kmeans_label = pl.DataFrame(kmeans.labels_).rename({'column_0':'label'}).with_row_index()

In [None]:
session_job_kmeans = session_job_kmeans.join(kmeans_label, on='index')

In [None]:
session_job_kmeans = session_job_kmeans.with_columns(
    pl.col("index").cast(pl.String),
)

In [None]:
session_job_kmeans.filter(pl.col('label')==2)

In [None]:
ratio_chart = alt.Chart(session_job_kmeans).mark_circle(opacity=0.01).encode(
    alt.X("execution_for_total:Q", axis=alt.Axis(title='TET / TT (%)', labelFontSize=13, titleFontSize=16, labelAngle=0)),
    alt.Y("wait_for_total:Q", axis=alt.Axis(title='TWT / TT (%)', labelFontSize=13, titleFontSize=16, values=[0,20,40,60,80,100])),
    alt.Color("label:N").legend(title='cluster',orient="top", labelFontSize=13, labelLimit=200, titleFontSize=0
                                 , direction='horizontal', titleOrient='left', symbolType = 'circle', symbolOpacity=1, labelFontWeight = 'bold'
                                 , labelExpr="{'0':'HWC', '1':'HGC', '2':'HEC'}[datum.label]"
                               )
).properties(
    width=300,
    height=120)

In [None]:
ratio_chart

In [None]:
ratio_chart.save('session_clustering.png', ppi=800)

In [None]:
from altair.expr import datum
from vega_datasets import data

source = data.jobs.url

alt.Chart(source).mark_line().encode(
    alt.X('year:O'),
    alt.Y('perc:Q', axis=alt.Axis(format='%')),
    color=alt.Color('sex:N'),
).properties(
    title='Percent of work-force working as Welders'
).transform_filter(
    datum.job == 'Welder'
).configure_legend(symbolSize=80, symbolStrokeWidth=20)

In [None]:
base = alt.Chart(session_job).mark_circle(size=60).encode(
    x='session_total_wait_time',
    y='session_total_run_time',
).interactive()

base + base.transform_regression('session_total_wait_time', 'session_total_run_time').mark_line().encode(color=alt.value("#db646f"))

In [None]:
session_total_time_cdf = alt.Chart(session_job_kmeans).transform_window(
    ecdf="cume_dist()",
    sort=[{"field": "think_for_total"}],
).mark_line(
    interpolate="step-after"
).encode(
    x="think_for_total:Q",
    y="ecdf:Q",
    text="think_for_total"
)

In [None]:
def chart_with_regression(df, x, y):
    base = alt.Chart(df).mark_circle(size=30,opacity=0.01).encode(
    x=x,
    y=y,
).interactive()
    base = base + base.transform_regression(x, y).mark_line().encode(color=alt.value("#db646f"))
    print(df.select(pl.corr(x, y)))
    return base

In [None]:
for i in range(3):
    chart_with_regression(session_with_label.filter(pl.col('label')==i), 'session_total_run_time', 'session_think_time').show()

In [None]:
session_with_label

## User Level Analysis

user_1 : x2957a01  
user_2 : x2956a03  

In [None]:
query = f"""
WITH base AS (
SELECT 
    t.*
    , json_array_length(nodes) AS node_count
    , MAX(end_time) OVER (PARTITION BY user_id ORDER BY submit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_end_time
FROM {BASE_TABLE}
WHERE 1=1
{BASE_CONDITION}
), base_with_prev_end_time AS (
SELECT 
    base.*
    , LAG(base.max_end_time) OVER (PARTITION BY user_id ORDER BY submit_time ASC) AS prev_end_time
FROM base
), base_with_session_flag AS (
SELECT 
    base_with_prev_end_time.*
    , CASE 
        WHEN prev_end_time > submit_time THEN 0 
        ELSE 1 
    END AS is_new_session
    , MAX(ROUND((JULIANDAY(start_time)-JULIANDAY(prev_end_time)) * 86400), 0) AS additional_wait_s
FROM base_with_prev_end_time
), base_with_session_id AS (
SELECT 
    base_with_session_flag.*
    , CASE WHEN
        is_new_session == 1 THEN wait_s
        ELSE additional_wait_s
    END AS new_wait_s
    , SUM(is_new_session) OVER (PARTITION BY user_id ORDER BY submit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS session_id
FROM base_with_session_flag
), session AS (
SELECT 
    user_id
    , session_id
    , count(distinct job_name) As job_count
    , count(distinct job_name) As job_count_for_sum
    , GROUP_CONCAT(distinct job_name) AS job_names
    , count(1) AS session_length
    , count(1) AS session_length_for_sum
    , SUM(new_wait_s) AS session_total_wait_time
    , sum(used_walltime) * 1.0 AS total_wall_time
    , sum(used_walltime * node_count) * 1.0 AS total_computing_time
    , MIN(submit_time) AS first_submit_time
    , MIN(start_time) AS first_start_time
    , MAX(end_time) AS last_end_time
    , MIN(node_count) AS min_node_count
    , MAX(node_count) AS max_node_count
FROM base_with_session_id
GROUP BY 1, 2
), session_with_rn AS (
SELECT 
    *
    , row_number() over (PARTITION BY user_id ORDER BY first_submit_time ASC) AS rn
    , ROUND((JULIANDAY(last_end_time)-JULIANDAY(first_submit_time)) * 86400) - session_total_wait_time AS session_total_run_time
FROM session
), session_with_think_time AS (
SELECT 
    t1.*
    , CASE WHEN
        t2.rn IS NULL THEN -1
        ELSE ROUND((JULIANDAY(t2.first_submit_time)-JULIANDAY(t1.last_end_time)) * 86400) 
    END AS session_gap_time
    , CASE WHEN
        t2.rn IS NULL THEN -1 
        ELSE ROUND((JULIANDAY(t2.first_submit_time)-JULIANDAY(t1.first_submit_time)) * 86400)
    END AS session_total_time
FROM session_with_rn AS t1 
    LEFT JOIN session_with_rn AS t2 
    ON t1.rn+1 = t2.rn AND t1.user_id = t2.user_id
)
SELECT
    *
    , session_total_wait_time AS total_wait_time
    , session_total_time AS total_time
    , session_total_run_time AS total_execution_time
    , session_total_wait_time/session_total_time*100 AS wait_for_total
    , session_gap_time/session_total_time*100 AS gap_for_total
    , session_total_run_time/session_total_time*100 AS execution_for_total
FROM session_with_think_time
WHERE 1=1
"""
user_job = pl.read_database_uri(query, connection_string)

In [None]:
job_count_figure = alt.Chart(new_user_job.filter(pl.col('session_length')>0).filter(pl.col('session_length')<=100)).mark_line().properties(
    width=400,
    height=130,
).encode(
    alt.X("session_lengtht:N", axis=alt.Axis(title='Number of jobs in a sesion', labelFontSize=13, titleFontSize=13, labelAngle=0, values=[10*i for i in range(10)])),
    alt.Y("session_length", axis=alt.Axis(title='Job count', labelFontSize=13, titleFontSize=13)),
).configure(
    numberFormat='.2s'
)

In [None]:
new_user_job = user_job.group_by('session_length').agg(pl.col("session_length_for_sum").sum())
new_user_job = new_user_job.with_columns((pl.col("session_length_for_sum") / 1121803).alias("ratio"))
new_user_job = new_user_job.with_columns(pl.lit("User").alias("Grouping Condition"))

In [None]:
job_count_figure = alt.Chart(new_user_job.filter(pl.col('session_length')<=1000).filter(pl.col('session_length')>=1)).mark_line().properties(
    width=400,
    height=130,
).transform_window(
    sort=[{'field': 'session_length'}],
    cumulative_wheat='sum(ratio)'
).encode(
    alt.X("session_length:Q", axis=alt.Axis(title='Job Count', labelFontSize=13, titleFontSize=13, labelAngle=0, values=[100*i for i in range(11)])),
    alt.Y("cumulative_wheat:Q", axis=alt.Axis(title='Probability', labelFontSize=13, titleFontSize=13)),
    alt.Color("Grouping Condition").legend(orient="top", labelFontSize=13, labelLimit=200, titleFontSize=0
                                 , direction='horizontal', titleOrient='left', labelFontWeight = 'bold')    
)

In [None]:
job_count_figure

In [None]:
job_count_figure.save('job_count.png', ppi=1600)

In [None]:
new_user_job.filter(pl.col('session_length')==1)

In [None]:
count = 0
for row in user_job.filter(pl.col('job_count')>=11).rows(named=True):
    if "openmp" in row['job_names'].lower():
        print(row['job_names'])
        print(row)
        count += 1
    if count == 5:
        break

In [None]:
cnt = 0 
cases_100 = []
cases_50 = []
# with open('f.txt', 'w') as f:
for row in user_job.filter(pl.col('job_count')>=2).rows(named=True):
    data = {'distance':cal_distance(row['job_names']), 'job_count':row['job_count']}
    if row['job_count'] >= 100:
        cases_100.append(data)
    cases_50.append(data)

In [None]:
sorted(cases_100, key=lambda x: x['distance'])[83]

In [None]:
len(cases_100)

In [None]:
for i in range(len(cases_50)):
    cases_50[i]['over_100'] = '100+' if cases_50[i]['job_count'] >= 100 else '2-99'
for i in range(len(cases_100)):
    cases_100[i]['over_100'] = '100+'   

In [None]:
cases_100_df = pl.DataFrame(cases_100)
cases_50_df = pl.DataFrame(cases_50)
cases_100_graph = alt.Chart(cases_100_df).transform_window(
    ecdf="cume_dist()",
    sort=[{"field": "distance"}],
).mark_line(
    interpolate="step-after"
).encode(
    alt.X("distance:Q", axis=alt.Axis(title='Levenshtein Distance Ratio', labelFontSize=14, titleFontSize=14, labelAngle=0)),
    alt.Y("ecdf:Q", axis=alt.Axis(title='Probability', labelFontSize=14, titleFontSize=14)),
    alt.Color("over_100:N").legend(orient="top", labelFontSize=14, labelLimit=200, titleFontSize=0
                                 , direction='horizontal', titleOrient='left', labelFontWeight = 'bold')    
).properties(
    height=110,
    width=300
)
cases_50_graph = alt.Chart(cases_50_df.filter(pl.col('over_100')=='2-99')).transform_window(
    ecdf="cume_dist()",
    sort=[{"field": "distance"}],
).mark_line(
    interpolate="step-after"
).encode(
    alt.X("distance:Q", axis=alt.Axis(title='Levenshtein Distance Ratio', labelFontSize=14, titleFontSize=14, labelAngle=0)),
    alt.Y("ecdf:Q", axis=alt.Axis(title='Probability', labelFontSize=14, titleFontSize=14)),
    alt.Color("over_100:N").legend(orient="top", labelFontSize=14, labelLimit=200, titleFontSize=0
                                 , direction='horizontal', titleOrient='left', labelFontWeight = 'bold')    
).properties(
    height=110,
    width=300
)

In [None]:
alt.layer(cases_50_graph, cases_100_graph).save('distance.pdf')

In [None]:
from Levenshtein import distance, ratio
def cal_distance(items):
    items = items.split(',')
    max_v = collections.defaultdict(int)
    distances = 0
    count = 0 
    for i1 in range(len(items)):
        for i2 in range(i1+1,len(items)):
            dis = ratio(items[i1], items[i2])
            distances += dis
            # if dis <=0.5: print(items[i1], items[i2])
            count += 1
    return distances / count * 1.0

In [None]:
ratio('abc', 'adef')

In [None]:
user_job.filter(pl.col('session_length')==1).filter(pl.col('session_id')>3)

In [None]:
user_job.filter(pl.col('user_id')=='x2957a01')

In [None]:
user_job.filter(pl.col('user_id')=='x2956a03')

In [None]:
user_job.filter(pl.col('session_length')>1).select(pl.sum('session_length'))

In [None]:
job_count = user_job.group_by('job_count').len()
job_sum = user_job.group_by('job_count').sum()
job_joined = job_count.join(job_sum, on=['job_count'])

In [None]:
job_lengths = collections.defaultdict(int)
total = 0
for row in job_joined.rows(named=True):
    k, v = row['job_count'], row['len']
    job_lengths[k] += v
    total += v
tmp = 0
for k in sorted(job_lengths.keys()):
    tmp += (job_lengths[k] * k)
    # print(k, job_lengths[k], tmp, tmp/total*100)

## Analysis- group level

In [None]:
PARTITION_KEY = "group_id"
query = f"""
WITH base AS (
SELECT 
    t.*
    , json_array_length(nodes) AS node_count
    , MAX(end_time) OVER (PARTITION BY {PARTITION_KEY} ORDER BY submit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_end_time
FROM {BASE_TABLE}
WHERE 1=1
{BASE_CONDITION}
), base_with_prev_end_time AS (
SELECT 
    base.*
    , LAG(base.max_end_time) OVER (PARTITION BY {PARTITION_KEY} ORDER BY submit_time ASC) AS prev_end_time
FROM base
), base_with_session_flag AS (
SELECT 
    base_with_prev_end_time.*
    , CASE 
        WHEN prev_end_time > submit_time THEN 0 
        ELSE 1 
    END AS is_new_session
    , MAX(ROUND((JULIANDAY(start_time)-JULIANDAY(prev_end_time)) * 86400), 0) AS additional_wait_s
FROM base_with_prev_end_time
), base_with_session_id AS (
SELECT 
    base_with_session_flag.*
    , CASE WHEN
        is_new_session == 1 THEN wait_s
        ELSE additional_wait_s
    END AS new_wait_s
    , SUM(is_new_session) OVER (PARTITION BY {PARTITION_KEY} ORDER BY submit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS session_id
FROM base_with_session_flag
), session AS (
SELECT 
    {PARTITION_KEY}
    , session_id
    , count(distinct job_name) As job_count
    , count(distinct job_name) As job_count_for_sum
    , GROUP_CONCAT(distinct job_name) AS job_names
    , count(1) AS session_length
    , count(1) AS session_length_for_sum
    , SUM(new_wait_s) AS session_total_wait_time
    , sum(used_walltime) * 1.0 AS total_wall_time
    , sum(used_walltime * node_count) * 1.0 AS total_computing_time
    , MIN(submit_time) AS first_submit_time
    , MIN(start_time) AS first_start_time
    , MAX(end_time) AS last_end_time
    , MIN(node_count) AS min_node_count
    , MAX(node_count) AS max_node_count
FROM base_with_session_id
GROUP BY 1, 2
), session_with_rn AS (
SELECT 
    *
    , row_number() over (PARTITION BY {PARTITION_KEY} ORDER BY first_submit_time ASC) AS rn
    , ROUND((JULIANDAY(last_end_time)-JULIANDAY(first_submit_time)) * 86400) - session_total_wait_time AS session_total_run_time
FROM session
), session_with_think_time AS (
SELECT 
    t1.*
FROM session_with_rn AS t1 
    LEFT JOIN session_with_rn AS t2 
    ON t1.rn+1 = t2.rn AND t1.{PARTITION_KEY} = t2.{PARTITION_KEY}
)
SELECT     *
FROM session_with_think_time
WHERE 1=1
"""
group_job = pl.read_database_uri(query, connection_string)
    # , CASE WHEN
    #     t2.rn IS NULL THEN -1
    #     ELSE ROUND((JULIANDAY(t2.first_submit_time)-JULIANDAY(t1.last_end_time)) * 86400) 
    # END AS session_gap_time
    # , CASE WHEN
    #     t2.rn IS NULL THEN -1 
    #     ELSE ROUND((JULIANDAY(t2.first_submit_time)-JULIANDAY(t1.first_submit_time)) * 86400)
    # END AS session_total_time

# , session_gap_time/session_total_time*100 AS gap_for_total
    # , session_total_wait_time AS total_wait_time
    # , session_total_time AS total_time
    # , session_total_run_time AS total_execution_time
    # , session_total_wait_time/session_total_time*100 AS wait_for_total
    # , session_total_run_time/session_total_time*100 AS execution_for_total


In [None]:
group_count_figure = alt.Chart(new_group_job.filter(pl.col('session_length')<=1000).filter(pl.col('session_length')>=1)).mark_line().properties(
    width=400,
    height=130,
).transform_window(
    sort=[{'field': 'session_length'}],
    cumulative_wheat='sum(ratio)'
).encode(
    alt.X("session_length:Q", axis=alt.Axis(title='Job Count', labelFontSize=13, titleFontSize=13, labelAngle=0, values=[100*i for i in range(11)])),
    alt.Y("cumulative_wheat:Q", axis=alt.Axis(title='Probability', labelFontSize=13, titleFontSize=13)),
    alt.Color("Grouping Condition").legend(orient="top", labelFontSize=13, labelLimit=200, titleFontSize=0
                                 , direction='horizontal', titleOrient='left', labelFontWeight = 'bold')    
)

In [None]:
new_group_job.filter(pl.col('session_length')==1)

In [None]:
new_group_job = group_job.group_by('session_length').agg(pl.col("session_length_for_sum").sum())
new_group_job = new_group_job.with_columns((pl.col("session_length_for_sum") / 1121803).alias("ratio"))
new_group_job = new_group_job.with_columns(pl.lit("Group").alias("Grouping Condition"))

In [None]:
new_group_job.filter(pl.col('session_length')<1000)

## Analysis Considering node_count

•	In most cases, the jobs within a session have the same node_count.  
•	The number of sessions where node_count remains consistent is 390,046, while sessions with varying node_count total 2,395.  
•	Among sessions with more than two jobs, there are 40,612 consistent sessions and 2,395 inconsistent ones.  
•	Overall, 94.4% of sessions maintain the same node_count across all jobs.  

⸻

Distributions by node_count  
	•	The distributions of wait_time, run_time, and computing_time tend to increase as node_count increases.  
	•	Conversely, wall_time, think_time, and total_time tend to decrease with larger node_count.  

In [None]:
query = f"""
WITH base AS (
SELECT 
    t.*
    , json_array_length(nodes) AS node_count
    , MAX(end_time) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_end_time
FROM {BASE_TABLE}
WHERE 1=1
{BASE_CONDITION}
), base_with_prev_end_time AS (
SELECT 
    base.*
    , LAG(base.max_end_time) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ASC) AS prev_end_time
FROM base
), base_with_session_flag AS (
SELECT 
    base_with_prev_end_time.*
    , CASE 
        WHEN prev_end_time > submit_time THEN 0 
        ELSE 1 
    END AS is_new_session
    , MAX(ROUND((JULIANDAY(start_time)-JULIANDAY(prev_end_time)) * 86400), 0) AS additional_wait_s
FROM base_with_prev_end_time
), base_with_session_id AS (
SELECT 
    base_with_session_flag.*
    , CASE WHEN
        is_new_session == 1 THEN wait_s
        ELSE additional_wait_s
    END AS new_wait_s
    , SUM(is_new_session) OVER (PARTITION BY user_id, job_name ORDER BY submit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS session_id
FROM base_with_session_flag
), session AS (
SELECT 
    user_id
    , job_name
    , session_id
    , count(1) AS session_length
    , SUM(new_wait_s) AS session_total_wait_time
    , SUM(run_s) AS total_wall_time
    , SUM(run_s * node_count) AS total_computing_time
    , MIN(submit_time) AS first_submit_time
    , MIN(start_time) AS first_start_time
    , MAX(end_time) AS last_end_time
    , MIN(node_count) AS min_node_count
    , MAX(node_count) AS max_node_count
FROM base_with_session_id
GROUP BY 1, 2, 3
), session_with_rn AS (
SELECT 
    *
    , row_number() over (PARTITION BY user_id, job_name ORDER BY first_submit_time ASC) AS rn
    , ROUND((JULIANDAY(last_end_time)-JULIANDAY(first_submit_time)) * 86400) - session_total_wait_time AS session_total_run_time
FROM session
), session_with_think_time AS (
SELECT 
    t1.*
    , CASE WHEN
        t2.rn IS NULL THEN -1
        ELSE ROUND((JULIANDAY(t2.first_submit_time)-JULIANDAY(t1.last_end_time)) * 86400) 
    END AS session_gap_time
    , CASE WHEN
        t2.rn IS NULL THEN -1 
        ELSE ROUND((JULIANDAY(t2.first_submit_time)-JULIANDAY(t1.first_submit_time)) * 86400)
    END AS session_total_time
FROM session_with_rn AS t1 
    LEFT JOIN session_with_rn AS t2 
    ON t1.rn+1 = t2.rn AND t1.user_id = t2.user_id AND t1.job_name = t2.job_name
)
SELECT
    *
    , session_total_wait_time AS total_wait_time
    , session_total_time AS total_time
    , session_total_run_time AS total_execution_time
    , session_total_wait_time/session_total_time*100 AS wait_for_total
    , session_gap_time/session_total_time*100 AS gap_for_total
    , session_total_run_time/session_total_time*100 AS execution_for_total
    , CASE WHEN min_node_count = max_node_count THEN 'Identical'
    ELSE 'Dynamic' END AS have_same_node_count
FROM session_with_think_time
WHERE 1=1
"""
session_job_with_node = pl.read_database_uri(query, connection_string)

In [None]:
session_job_with_node.filter(pl.col('session_length')>1).filter(pl.col('have_same_node_count')==False)

In [None]:
session_job_with_tmp = collections.defaultdict(int)
session_job_with_tmp_key = collections.defaultdict(int)
for row in session_job_with_node.rows(named=True):
    session_job_with_tmp[(row['session_length'], row['have_same_node_count'])] += 1
    session_job_with_tmp_key[row['session_length']] += 1
for k in session_job_with_tmp:
    l, _ = k
    session_job_with_tmp[k] /= session_job_with_tmp_key[l]
df_tmp = []
for k in session_job_with_tmp:
    l, v = k
    df_tmp.append({'session_length':l, 'ratio':session_job_with_tmp[k]*100, 'have_same_node_count':v})
session_job_df = pl.DataFrame(df_tmp)

In [None]:
session_job_df.filter(pl.col('session_length')==20)

In [None]:
node_count_identical = alt.Chart(session_job_df.filter(pl.col('session_length')>1).filter(pl.col('session_length')<21).rename({"have_same_node_count": "Node Count"})).mark_bar(width=12).properties(
    width=400,
    height=130,
).encode(
    alt.X('session_length:Q', axis=alt.Axis(title='Node Count', labelFontSize=13, titleFontSize=15), scale=alt.Scale(domain=[2, 20])),
    alt.Y('ratio:Q', axis=alt.Axis(title='Ratio(%)', labelFontSize=13, titleFontSize=15
         )),
    alt.Color("Node Count:N").legend(orient="top", labelFontSize=13, labelLimit=200, titleFontSize=0
    , direction='horizontal', titleOrient='left', columnPadding=40, labelFontWeight = 'bold')
)

In [None]:
node_count_identical

In [None]:
node_count_identical.save('node_count_identical.pdf', ppi=1600)

In [None]:
node_count_lengths = collections.defaultdict(int)
total = 0
for row in session_job_with_node_filter.group_by('min_node_count').len().rows(named=True):
    k, v = row['min_node_count'], row['len']
    node_count_lengths[k] += v
    total += v
tmp = 0
for k in sorted(node_count_lengths.keys()):
    tmp += node_count_lengths[k]
    # print(k, node_count_lengths[k], tmp, tmp/total*100)

In [None]:
node_count_distribution = alt.Chart(session_job_with_node_filter.filter(pl.col('min_node_count')<=16)).mark_line(point=True).properties(
    width=400,
    height=100,
).encode(
    alt.X("min_node_count:N", axis=alt.Axis(title='Node Count', labelFontSize=13, values=[1,2,4,8,16,32], titleFontSize=13, labelAngle=0)),
    alt.Y("count()", axis=alt.Axis(title='Count', labelFontSize=13, titleFontSize=13)),
).configure(
    numberFormat='.2s'
)

In [None]:
session_job_with_node.filter(pl.col('session_length')>1)

In [None]:
alt.Chart(session_job_with_node.filter(pl.col('min_node_count')<=16)).mark_line(point=True).properties(
    width=400,
    height=100,
).encode(
    alt.X("min_node_count:N", axis=alt.Axis(title='Node Count', labelFontSize=13, values=[1,2,4,8,16,32], titleFontSize=13, labelAngle=0)),
    alt.Y("count()", axis=alt.Axis(title='Count', labelFontSize=13, titleFontSize=13)),
).configure(
    numberFormat='.2s'
)

In [None]:
node_count_distribution

In [None]:
node_count_distribution.save('

In [None]:
def get_corr_node_count(df, time1):
    df = df.filter(pl.col(time1)>0)
    corr = df.select(pl.corr('min_node_count',time1)).row(index=0)[0]
    return corr

times = ['total_wait_time', 'total_execution_time', 'total_wall_time', 'total_computing_time','session_gap_time', 'total_time']
for time in times:
    print(time, get_corr_node_count(session_job_with_node_filter, time))

In [None]:
(plots[0] | plots[1] | plots[2] | plots[3] | plots[4] | plots[5]).save('node_count.pdf', ppi=200)

## Prediction Modeling

In [None]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.linear_model import LinearRegression, RANSACRegressor

import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score

In [None]:
session_with_label.filter(pl.col('label')==0).describe(percentiles=[0.05, 0.95])

In [None]:
time = '2024-05-01'
train = session_with_label.filter(pl.col('first_submit_time')<time)
# .filter(pl.col('session_gap_time')>=165.0).filter(pl.col('session_gap_time')<=101959.0)
test = session_with_label.filter(pl.col('first_submit_time')>=time)
# .filter(pl.col('session_gap_time')>=165.0).filter(pl.col('session_gap_time')<=101959.0)

In [None]:
test

In [None]:
labels = [0,2]
variables = [['total_wait_time', 'total_execution_time'], ['total_execution_time'], ['total_wait_time']]
# models = [LinearRegression(), RandomForestRegressor(n_estimators=100, random_state=42), xgb.XGBRegressor(n_estimators=100, learning_rate=0.1, random_state=42)]
models = [xgb.XGBRegressor(n_estimators=100, learning_rate=0.1, random_state=42)]
y_preds = {}
for label in labels:
    for model in models:
        for variable_index in range(len(variables)):
            labeld = train.filter(pl.col('label')==label)
            # labeld = train
            X_train = labeld.select(variables[variable_index])
            y_train = labeld.select('session_gap_time')
            test_labeld = test.filter(pl.col('label')==label)
            # test_labeld = test
            X_test = test_labeld.select(variables[variable_index])
            y_test = test_labeld.select('session_gap_time')
            model.fit(X_train, y_train)        
            y_pred = model.predict(X_test)
            mse = mean_squared_error(y_test, y_pred)
            r2 = r2_score(y_test, y_pred)
            y_preds[(label,variable_index)] = y_pred
            print(label, variable_index, r2)

In [None]:
get_graph(0).save('prediction_high_wait.pdf', ppi=100)
get_graph(2).save('prediction_high_execution.pdf', ppi=100)
# get_graph(0).save('prediction_whole_time.pdf', ppi=400)

In [None]:
def get_graph(label):
    label_0 = test.filter(pl.col('label')==label)
    y_test = label_0.select('session_gap_time')
    y_test = y_test.to_dicts()
    label_yy = []
    graph = []
    for j in range(len(variables)):
        label_yy = []
        for i in range(len(y_test)):
            label_yy.append({'y_test':y_test[i]['session_gap_time'], 'y_predict':y_preds[(label,j)][i]})
        df = pl.DataFrame(label_yy).filter(pl.col('y_predict')<200000).filter(pl.col('y_test')<200000).filter(pl.col('y_test')>0).filter(pl.col('y_predict')>0)
        base = alt.Chart(df).mark_circle(size=20,opacity=0.1).encode(
            x='y_test',
            y='y_predict',
        ).interactive()
        base2 = alt.Chart(df).mark_line(color='red').encode(
            x='y_test',
            y='y_test',
        ).interactive()
        chart = (base+base2).encode(
        x=alt.X(axis=alt.Axis(title='Session Gap Time (Actual)', labelFontSize=22, titleFontSize=22, values=[0,50000,100000,150000,200000])),
        y=alt.Y(axis=alt.Axis(title='Session Gap Time (Prediction)', labelFontSize=22, titleFontSize=22, values=[0,50000,100000,150000,200000], labelAngle=-90))
        ).configure(
        numberFormat='.2s'
        )
        graph.append(chart)
    return graph[0]