In [1]:
# Import libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import pandas as pd
from functools import reduce

In [2]:
# Set view options
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 50)

In [3]:
# Initialize spark session
config = pyspark.SparkConf().setAll([('spark.executor.memory', '8g'),
                                     ('spark.executor.cores', '3'),
                                     ('spark.cores.max', '3'),
                                     ('spark.driver.memory','8g'),
                                     ('spark.driver.maxResultSize', '8g')])

spark = SparkSession.builder.config(conf = config).getOrCreate()

In [4]:
# Read in files as spark dataframes
slurm_jobs = spark.read.option('header', True).csv('../data/fullsample.csv')

ce5_log = (spark.read.option('delimiter', ' - ')
           .csv('../data/slurm_wrapper_ce5.log')
           .toDF('TIMESTAMP', 'USER', 'RETRY', 'RUNTIME', 'RETURNCODE', 'COMMAND')
)

ce6_log = (spark.read.option('delimiter', ' - ')
           .csv('../data/slurm_wrapper_ce6.log')
           .toDF('TIMESTAMP', 'USER', 'RETRY', 'RUNTIME', 'RETURNCODE', 'COMMAND')
)

In [5]:
# Clean up dataframes
slurm_jobs = (slurm_jobs
              .withColumn('STATE', F.regexp_replace('STATE', r'CANCELLED by.*', 'CANCELLED by USER'))
              .withColumn('BEGIN', F.to_timestamp(F.col('BEGIN')))
              .withColumn('END', F.to_timestamp(F.col('END')))
              .withColumn('NODES', F.col('NODES').cast('int'))
              .withColumn('CPUS', F.col('CPUS').cast('int'))
              .withColumn('SIGNAL', F.regexp_extract(F.col('EXITCODE'), '.*:(\d+)', 1).cast('int'))
              .withColumn('EXITCODE', F.regexp_extract(F.col('EXITCODE'), '(\d+):.*', 1).cast('int'))
              .withColumn('REQMEMxNODE', F.regexp_replace('REQMEM', 'Mn', '').cast('long') * F.col('NODES'))
              .withColumn('REQMEMxCPU', F.regexp_replace('REQMEM', 'Mc', '').cast('long') * F.col('CPUS'))
              .withColumn('REQMEMTOT', F.coalesce(F.col('REQMEMxNODE'), F.col('REQMEMxCPU')))
              .withColumn('REQMEMPERCORE', F.col('REQMEMTOT') / F.col('CPUS'))
              .withColumn('USEDMEM', F.regexp_replace('USEDMEM', 'M', '').cast('long'))
              .drop('REQMEMxNODE', 'REQMEMxCPU')
              .filter('END is not null')
)

ce5_log = (ce5_log
           .withColumn('TIMESTAMP', F.to_timestamp(F.col('TIMESTAMP')))
           .withColumn('USER', F.regexp_replace('USER', 'user ', '').cast('int'))
           .withColumn('RETRY', F.regexp_replace('RETRY', 'retry ', '').cast('int'))
           .withColumn('RUNTIME', F.regexp_replace('RUNTIME', 'time ', '').cast('float'))
           .withColumn('RETURNCODE', F.regexp_replace('RETURNCODE', 'returncode ', '').cast('int'))
           .withColumn('COMMAND', F.regexp_extract(F.col('COMMAND'), '.*/usr/bin/(\w+)', 1))
)

ce6_log = (ce6_log
           .withColumn('TIMESTAMP', F.to_timestamp(F.col('TIMESTAMP')))
           .withColumn('USER', F.regexp_replace('USER', 'user ', '').cast('int'))
           .withColumn('RETRY', F.regexp_replace('RETRY', 'retry ', '').cast('int'))
           .withColumn('RUNTIME', F.regexp_replace('RUNTIME', 'time ', '').cast('float'))
           .withColumn('RETURNCODE', F.regexp_replace('RETURNCODE', 'returncode ', '').cast('int'))
           .withColumn('COMMAND', F.regexp_extract(F.col('COMMAND'), '.*/usr/bin/(\w+)', 1))
)

In [6]:
# Aggregate started jobs by minute
started_jobs = (slurm_jobs
                .groupBy(F.date_trunc('minute', F.col('BEGIN')).alias('TIMESTAMP'))
                .agg(F.count(F.col('BEGIN')).alias('STARTEDJOBS'))
                .sort('TIMESTAMP')
                .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
started_jobs = started_jobs.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [7]:
# Aggregate ended jobs by minute
ended_jobs = (slurm_jobs
              .groupBy(F.date_trunc('minute', F.col('END')).alias('TIMESTAMP'))
              .agg(F.count(F.col('END')).alias('ENDEDJOBS'))
              .sort('TIMESTAMP')
              .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ended_jobs = ended_jobs.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [8]:
# Aggregate running jobs by minute
slurm_jobs_started = slurm_jobs.select('BEGIN').withColumnRenamed('BEGIN', 'TIMESTAMP').withColumn('RUNNINGJOBS', F.lit(1))
slurm_jobs_ended = slurm_jobs.select('END').withColumnRenamed('END', 'TIMESTAMP').withColumn('RUNNINGJOBS', F.lit(-1))

running_jobs = (slurm_jobs_started
                .union(slurm_jobs_ended)
                .sort('TIMESTAMP')
                .withColumn('RUNNINGJOBS', F.sum(F.col('RUNNINGJOBS')).over(Window.orderBy('TIMESTAMP')))
                .select('TIMESTAMP', 'RUNNINGJOBS')
                .dropDuplicates()
                .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
running_jobs = running_jobs.set_index('TIMESTAMP').reindex(idx, method = 'pad', fill_value = 0)

In [9]:
# Aggregate requested memory by minute
slurm_jobs_started = slurm_jobs.select('BEGIN', 'REQMEMTOT').withColumnRenamed('BEGIN', 'TIMESTAMP').withColumn('REQMEMTOT', F.col('REQMEMTOT'))
slurm_jobs_ended = slurm_jobs.select('END', 'REQMEMTOT').withColumnRenamed('END', 'TIMESTAMP').withColumn('REQMEMTOT', F.col('REQMEMTOT') * -1)

requested_memory = (slurm_jobs_started
                    .union(slurm_jobs_ended)
                    .sort('TIMESTAMP')
                    .withColumn('REQMEMTOT', F.sum(F.col('REQMEMTOT')).over(Window.orderBy('TIMESTAMP')))
                    .select('TIMESTAMP', 'REQMEMTOT')
                    .dropDuplicates()
                    .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
requested_memory = requested_memory.set_index('TIMESTAMP').reindex(idx, method = 'pad', fill_value = 0)

In [10]:
# Aggregate used memory by minute
slurm_jobs_started = slurm_jobs.select('BEGIN', 'USEDMEM').withColumnRenamed('BEGIN', 'TIMESTAMP').withColumn('USEDMEM', F.col('USEDMEM'))
slurm_jobs_ended = slurm_jobs.select('END', 'USEDMEM').withColumnRenamed('END', 'TIMESTAMP').withColumn('USEDMEM', F.col('USEDMEM') * -1)

used_memory = (slurm_jobs_started
               .union(slurm_jobs_ended)
               .sort('TIMESTAMP')
               .withColumn('USEDMEM', F.sum(F.col('USEDMEM')).over(Window.orderBy('TIMESTAMP')))
               .select('TIMESTAMP', 'USEDMEM')
               .dropDuplicates()
               .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
used_memory = used_memory.set_index('TIMESTAMP').reindex(idx, method = 'pad', fill_value = 0)

In [11]:
# Aggregate used nodes by minute (gross overestimate since jobs can share nodes)
slurm_jobs_started = slurm_jobs.select('BEGIN', 'NODES').withColumnRenamed('BEGIN', 'TIMESTAMP').withColumn('NODES', F.col('NODES'))
slurm_jobs_ended = slurm_jobs.select('END', 'NODES').withColumnRenamed('END', 'TIMESTAMP').withColumn('NODES', F.col('NODES') * -1)

used_nodes = (slurm_jobs_started
              .union(slurm_jobs_ended)
              .sort('TIMESTAMP')
              .withColumn('NODES', F.sum(F.col('NODES')).over(Window.orderBy('TIMESTAMP')))
              .select('TIMESTAMP', 'NODES')
              .dropDuplicates()
              .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
used_nodes = used_nodes.set_index('TIMESTAMP').reindex(idx, method = 'pad', fill_value = 0)

In [12]:
# Aggregate used cpus by minute
slurm_jobs_started = slurm_jobs.select('BEGIN', 'CPUS').withColumnRenamed('BEGIN', 'TIMESTAMP').withColumn('CPUS', F.col('CPUS'))
slurm_jobs_ended = slurm_jobs.select('END', 'CPUS').withColumnRenamed('END', 'TIMESTAMP').withColumn('CPUS', F.col('CPUS') * -1)

used_cpus = (slurm_jobs_started
             .union(slurm_jobs_ended)
             .sort('TIMESTAMP')
             .withColumn('CPUS', F.sum(F.col('CPUS')).over(Window.orderBy('TIMESTAMP')))
             .select('TIMESTAMP', 'CPUS')
             .dropDuplicates()
             .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
used_cpus = used_cpus.set_index('TIMESTAMP').reindex(idx, method = 'pad', fill_value = 0)

In [13]:
# Aggregate all commands by minute
ce5_all_commands = (ce5_log
                    .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                    .agg(F.count(F.col('TIMESTAMP')).alias('COMMANDS'))
                    .sort('TIMESTAMP')
                    .toPandas()  
)

ce6_all_commands = (ce6_log
                    .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                    .agg(F.count(F.col('TIMESTAMP')).alias('COMMANDS'))
                    .sort('TIMESTAMP')
                    .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ce5_all_commands = ce5_all_commands.set_index('TIMESTAMP').reindex(idx, fill_value = 0)
ce6_all_commands = ce6_all_commands.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [14]:
# Aggregate sbatch commands by minute
ce5_sbatch_commands = (ce5_log
                       .filter('COMMAND = "sbatch"') 
                       .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                       .agg(F.count(F.col('TIMESTAMP')).alias('SBATCH_COMMANDS'))
                       .sort('TIMESTAMP')
                       .toPandas()  
)

ce6_sbatch_commands = (ce6_log
                       .filter('COMMAND = "sbatch"') 
                       .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                       .agg(F.count(F.col('TIMESTAMP')).alias('SBATCH_COMMANDS'))
                       .sort('TIMESTAMP')
                       .toPandas()  
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ce5_sbatch_commands = ce5_sbatch_commands.set_index('TIMESTAMP').reindex(idx, fill_value = 0)
ce6_sbatch_commands = ce6_sbatch_commands.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [15]:
# Aggregate scontrol commands by minute
ce5_scontrol_commands = (ce5_log
                         .filter('COMMAND = "scontrol"') 
                         .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                         .agg(F.count(F.col('TIMESTAMP')).alias('SCONTROL_COMMANDS'))
                         .sort('TIMESTAMP')
                         .toPandas()  
)

ce6_scontrol_commands = (ce6_log
                         .filter('COMMAND = "scontrol"') 
                         .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                         .agg(F.count(F.col('TIMESTAMP')).alias('SCONTROL_COMMANDS'))
                         .sort('TIMESTAMP')
                         .toPandas()  
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ce5_scontrol_commands = ce5_scontrol_commands.set_index('TIMESTAMP').reindex(idx, fill_value = 0)
ce6_scontrol_commands = ce6_scontrol_commands.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [16]:
# Aggregate user 9204 sbatch commands by minute
ce5_user_9204_sbatch_commands = (ce5_log
                                 .filter('USER = 9204') 
                                 .filter('COMMAND = "sbatch"')
                                 .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                                 .agg(F.count(F.col('TIMESTAMP')).alias('USER_9204_SBATCH_COMMANDS'))
                                 .sort('TIMESTAMP')
                                 .toPandas()  
)

ce6_user_9204_sbatch_commands = (ce6_log
                                 .filter('USER = 9204') 
                                 .filter('COMMAND = "sbatch"')
                                 .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                                 .agg(F.count(F.col('TIMESTAMP')).alias('USER_9204_SBATCH_COMMANDS'))
                                 .sort('TIMESTAMP')
                                 .toPandas()  
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ce5_user_9204_sbatch_commands = ce5_user_9204_sbatch_commands.set_index('TIMESTAMP').reindex(idx, fill_value = 0)
ce6_user_9204_sbatch_commands = ce6_user_9204_sbatch_commands.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [17]:
# Aggregate all timeouts by minute
ce5_all_timeouts = (ce5_log
                    .filter('RUNTIME > 15') 
                    .filter('RETURNCODE > 0')
                    .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                    .agg(F.count(F.col('TIMESTAMP')).alias('TIMEOUTS'))
                    .sort('TIMESTAMP')
                    .toPandas()  
)

ce6_all_timeouts = (ce6_log
                    .filter('RUNTIME > 15') 
                    .filter('RETURNCODE > 0')
                    .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                    .agg(F.count(F.col('TIMESTAMP')).alias('TIMEOUTS'))
                    .sort('TIMESTAMP')
                    .toPandas()  
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ce5_all_timeouts = ce5_all_timeouts.set_index('TIMESTAMP').reindex(idx, fill_value = 0)
ce6_all_timeouts = ce6_all_timeouts.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [18]:
# Aggregate sbatch timeouts by minute
ce5_sbatch_timeouts = (ce5_log
                       .filter('COMMAND = "sbatch"') 
                       .filter('RUNTIME > 15') 
                       .filter('RETURNCODE > 0')
                       .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                       .agg(F.count(F.col('TIMESTAMP')).alias('SBATCH_TIMEOUTS'))
                       .sort('TIMESTAMP')
                       .toPandas()  
)

ce6_sbatch_timeouts = (ce6_log
                       .filter('COMMAND = "sbatch"') 
                       .filter('RUNTIME > 15') 
                       .filter('RETURNCODE > 0')
                       .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                       .agg(F.count(F.col('TIMESTAMP')).alias('SBATCH_TIMEOUTS'))
                       .sort('TIMESTAMP')
                       .toPandas()  
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ce5_sbatch_timeouts = ce5_sbatch_timeouts.set_index('TIMESTAMP').reindex(idx, fill_value = 0)
ce6_sbatch_timeouts = ce6_sbatch_timeouts.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [31]:
# Aggregate scontrol timeouts by minute
ce5_scontrol_timeouts = (ce5_log
                         .filter('COMMAND = "scontrol"') 
                         .filter('RUNTIME > 15') 
                         .filter('RETURNCODE > 0') 
                         .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                         .agg(F.count(F.col('TIMESTAMP')).alias('SCONTROL_TIMEOUTS'))
                         .sort('TIMESTAMP')
                         .toPandas()  
)

ce6_scontrol_timeouts = (ce6_log
                         .filter('COMMAND = "scontrol"') 
                         .filter('RUNTIME > 15') 
                         .filter('RETURNCODE > 0')                         
                         .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                         .agg(F.count(F.col('TIMESTAMP')).alias('SCONTROL_TIMEOUTS'))
                         .sort('TIMESTAMP')
                         .toPandas()  
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ce5_scontrol_timeouts = ce5_scontrol_timeouts.set_index('TIMESTAMP').reindex(idx, fill_value = 0)
ce6_scontrol_timeouts = ce6_scontrol_timeouts.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [20]:
# Aggregate user 9204 sbatch timeouts by minute
ce5_user_9204_sbatch_timeouts = (ce5_log
                                 .filter('USER = 9204') 
                                 .filter('COMMAND = "sbatch"')
                                 .filter('RUNTIME > 15') 
                                 .filter('RETURNCODE > 0')
                                 .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                                 .agg(F.count(F.col('TIMESTAMP')).alias('USER_9204_SBATCH_TIMEOUTS'))
                                 .sort('TIMESTAMP')
                                 .toPandas()  
)

ce6_user_9204_sbatch_timeouts = (ce6_log
                                 .filter('USER = 9204') 
                                 .filter('COMMAND = "sbatch"') 
                                 .filter('RUNTIME > 15') 
                                 .filter('RETURNCODE > 0')
                                 .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                                 .agg(F.count(F.col('TIMESTAMP')).alias('USER_9204_SBATCH_TIMEOUTS'))
                                 .sort('TIMESTAMP')
                                 .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ce5_user_9204_sbatch_timeouts = ce5_user_9204_sbatch_timeouts.set_index('TIMESTAMP').reindex(idx, fill_value = 0)
ce6_user_9204_sbatch_timeouts = ce6_user_9204_sbatch_timeouts.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [21]:
# Aggregate all runtimes by minute
ce5_all_runtimes = (ce5_log
                    .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                    .agg(F.sum(F.col('RUNTIME')).alias('RUNTIMES'))
                    .sort('TIMESTAMP')
                    .toPandas()  
)

ce6_all_runtimes = (ce6_log
                    .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                    .agg(F.sum(F.col('RUNTIME')).alias('RUNTIMES'))
                    .sort('TIMESTAMP')
                    .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ce5_all_runtimes = ce5_all_runtimes.set_index('TIMESTAMP').reindex(idx, fill_value = 0)
ce6_all_runtimes = ce6_all_runtimes.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [22]:
# Aggregate sbatch runtimes by minute
ce5_sbatch_runtimes = (ce5_log
                       .filter('COMMAND = "sbatch"') 
                       .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                       .agg(F.sum(F.col('RUNTIME')).alias('SBATCH_RUNTIMES'))
                       .sort('TIMESTAMP')
                       .toPandas()  
)

ce6_sbatch_runtimes = (ce6_log
                       .filter('COMMAND = "sbatch"') 
                       .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                       .agg(F.sum(F.col('RUNTIME')).alias('SBATCH_RUNTIMES'))
                       .sort('TIMESTAMP')
                       .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ce5_sbatch_runtimes = ce5_sbatch_runtimes.set_index('TIMESTAMP').reindex(idx, fill_value = 0)
ce6_sbatch_runtimes = ce6_sbatch_runtimes.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [32]:
# Aggregate scontrol runtimes by minute
ce5_scontrol_runtimes = (ce5_log
                         .filter('COMMAND = "scontrol"') 
                         .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                         .agg(F.sum(F.col('RUNTIME')).alias('SCONTROL_RUNTIMES'))
                         .sort('TIMESTAMP')
                         .toPandas()  
)

ce6_scontrol_runtimes = (ce6_log
                         .filter('COMMAND = "scontrol"') 
                         .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                         .agg(F.sum(F.col('RUNTIME')).alias('SCONTROL_RUNTIMES'))
                         .sort('TIMESTAMP')
                         .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ce5_scontrol_runtimes = ce5_scontrol_runtimes.set_index('TIMESTAMP').reindex(idx, fill_value = 0)
ce6_scontrol_runtimes = ce6_scontrol_runtimes.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [24]:
# Aggregate user 9204 sbatch runtimes by minute
ce5_user_9204_sbatch_runtimes = (ce5_log
                                 .filter('USER = 9204') 
                                 .filter('COMMAND = "sbatch"') 
                                 .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                                 .agg(F.sum(F.col('RUNTIME')).alias('USER_9204_SBATCH_RUNTIMES'))
                                 .sort('TIMESTAMP')
                                 .toPandas()  
)

ce6_user_9204_sbatch_runtimes = (ce6_log
                                 .filter('USER = 9204') 
                                 .filter('COMMAND = "sbatch"') 
                                 .groupBy(F.date_trunc('minute', F.col('TIMESTAMP')).alias('TIMESTAMP'))
                                 .agg(F.sum(F.col('RUNTIME')).alias('USER_9204_SBATCH_RUNTIMES'))
                                 .sort('TIMESTAMP')
                                 .toPandas()
)

idx = pd.date_range('2020-10-01', '2021-10-08', freq = '1min', name = 'TIMESTAMP')
ce5_user_9204_sbatch_runtimes = ce5_user_9204_sbatch_runtimes.set_index('TIMESTAMP').reindex(idx, fill_value = 0)
ce6_user_9204_sbatch_runtimes = ce6_user_9204_sbatch_runtimes.set_index('TIMESTAMP').reindex(idx, fill_value = 0)

In [25]:
# Merge slurm time series data together and write to csv
slurm_dfs = [started_jobs, ended_jobs, running_jobs, requested_memory, used_memory, used_nodes, used_cpus]

slurm = reduce(lambda left,right: pd.merge(left, right, left_index = True, right_index = True), slurm_dfs)
slurm.to_csv('../data/slurm_time_series.csv', index = True)

In [None]:
# Merge ce5 time series data together and write to csv
ce5_dfs = [ce5_all_commands, ce5_sbatch_commands, ce5_scontrol_commands, ce5_user_9204_sbatch_commands,
           ce5_all_timeouts, ce5_sbatch_timeouts, ce5_scontrol_timeouts, ce5_user_9204_sbatch_timeouts,
           ce5_all_runtimes, ce5_sbatch_runtimes, ce5_scontrol_runtimes, ce5_user_9204_sbatch_runtimes]

ce5 = reduce(lambda left,right: pd.merge(left, right, left_index = True, right_index = True), ce5_dfs)
ce5.to_csv('../data/ce5_time_series.csv', index = True)

In [None]:
# Merge ce6 time series data together and write to csv
ce6_dfs = [ce6_all_commands, ce6_sbatch_commands, ce6_scontrol_commands, ce6_user_9204_sbatch_commands,
           ce6_all_timeouts, ce6_sbatch_timeouts, ce6_scontrol_timeouts, ce6_user_9204_sbatch_timeouts,
           ce6_all_runtimes, ce6_sbatch_runtimes, ce6_scontrol_runtimes, ce6_user_9204_sbatch_runtimes]

ce6 = reduce(lambda left,right: pd.merge(left, right, left_index = True, right_index = True), ce6_dfs)
ce6.to_csv('../data/ce6_time_series.csv', index = True)

In [None]:
# Merge ce5 and ce6 time series data together and write to csv
ce5_ce6 = ce5 + ce6
ce5_ce6.to_csv('../data/ce5_ce6_time_series.csv', index = True)

In [None]:
# Merge slurm, ce5, and ce6 time series data together and write to csv
dfs = [slurm, ce5_ce6, ce5.add_prefix('CE5_'), ce6.add_prefix('CE6_')]

slurm_ce5_ce6 = reduce(lambda left,right: pd.merge(left, right, left_index = True, right_index = True), dfs)
slurm_ce5_ce6.to_csv('../data/slurm_ce5_ce6_time_series.csv', index = True)

In [None]:
# Merge ce5 and ce6 log files, sort, and write to csv
ce5_ce6_log = (ce5_log
               .union(ce6_log)
               .sort('TIMESTAMP', 'RUNTIME')
               .toPandas()
)

ce5_ce6_log.to_csv('../data/slurm_wrapper_ce5_ce6.log', index = False)