In [58]:
import pandas as pd
import numpy as np
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
import concurrent.futures
import os
import time

from tqdm.notebook import tqdm



In [None]:
"""
This notebook aims at cleaning the data that we have obtained from Slurm.
See the get_slurm data.py file. We have used the following line to get data from slurm 
manager. 

    # Run the 'sacct' command with job ID and format options
    command = ['sacct', '-j', str(job_id), '--format=Submit,Eligible,Start,End,Elapsed,JobID,JobName,
                                                State,AllocCPUs,TotalCPU,AveRSS,MaxRSS,NodeList']
    result = subprocess.run(command, capture_output=True, text=True)
    """

In [59]:
folder_path = Path('./system_analytics_2024/slurm_data')
file_path_parquet_reading_slurm= folder_path / 'slurm_data.parquet.gzip'
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [60]:

df = pd.read_parquet(file_path_parquet_reading_slurm)
df.sample(n=5)

Unnamed: 0,job_id,feature
4719358,7355160,Submit Eligible ...
1846628,3502766,Submit Eligible ...
3985296,6067874,Submit Eligible ...
219490,513913,Submit Eligible ...
1252957,2443934,Submit Eligible ...


In [61]:

df['feature'] = df['feature'].str.split('\n')
df['length_of_feature'] = [len(l) for l in df['feature'].tolist()]
df['length_of_feature'].value_counts()[0:10]


length_of_feature
4     2615765
3     2055873
6      334856
7      119681
8        7276
18       4882
5        3568
9        2551
13       2406
12       2046
Name: count, dtype: int64

In [62]:
""" 
I could not make the process that comee in the next cell as a multiprocessing unit.
It would be great to do so!
"""



# df_len = len(df)
# chunk_size = 500000

# list_1 = list(range(0, df_len, chunk_size))
# if list_1[-1] != df_len:
#     list_1.append(df_len)
    
# fin_list = list(zip(list_1[0:-1], list_1[1:]))




# def heavy_computation(index):
#     #print(len(df)) # this reurns the correct length of the dataframe
#     data_processed = []
#     for n in range(index[0], index[1]):
        
#         len_feature = int(df.iloc[n, :]['length_of_feature'])

#         if len_feature > 3:
#             job_id =int( df.iloc[n, :]['job_id'])
#             query_name = df.iloc[n, :]['feature'][0]
#             signal = df.iloc[n, :]['feature'][2:-1]
#             data = {'job_id': [job_id] * len(signal),
#                     'query_name': [query_name] * len(signal),
#                     'signal': signal}
#             # can we append the dictionary and later turn it into a data feame
#             data_processed.append(pd.DataFrame(data)) 
#     return data_processed
        



# t1 = time.perf_counter()
# # CPU-bound task: heavy computation
# max_workers = min(100, os.cpu_count())
# with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
#     results = list(executor.map(heavy_computation, fin_list))




# t2 = time.perf_counter()

# print(f'Finished in {t2-t1} seconds')
# # Flatten the nested list of DataFrames
# flattened_results = [df for sublist in results for df in sublist]

# # Combine all processed chunks into a single DataFrame
# final_result = pd.concat(flattened_results, ignore_index=True)
    
    


' \nI could not make the process that comee in the next cell as a multiprocessing unit.\nIt would be great to do so!\n'

In [63]:

# write a function to process the data row wise
lower_bound = 0
upper_bound = len(df)
data_processed = []

for n in range(lower_bound, upper_bound):
# df['length_of_feature']

    # len_feature = len(df.iloc[n, :]['feature'])
    len_feature = df.iloc[n, :]['length_of_feature']
    if len_feature > 3:
        job_id =int( df.iloc[n, :]['job_id'])
        query_name = df.iloc[n, :]['feature'][0]
        signal = df.iloc[n, :]['feature'][2:-1]
        
        
        data = {'job_id': [job_id] * len(signal),
                'query_name': [query_name] * len(signal),
                 'signal': signal}

        data_processed.append(pd.DataFrame(data))

df = pd.concat(data_processed, axis=0)
print(len(df))

# df.to_parquet(folder_path/'slurm_data_half_cleaned.parquet.gzip')



5649412


In [64]:
df['query_name'] = df['query_name'].str.split()
df['signal'] = df['signal'].str.split()
# df_temp.drop('feature', inplace=True, axis=1)
df.sample(n=5)

Unnamed: 0,job_id,query_name,signal
0,7026386,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-07-16T16:15:39, 2024-07-16T16:15:39, 202..."
0,7596661,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-08-21T10:43:25, 2024-08-21T10:43:25, 202..."
2970,8236849,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-10-23T10:47:13, 2024-10-23T10:47:13, 202..."
0,8109088,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-10-07T15:01:41, 2024-10-07T15:01:42, 202..."
0,4333524,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2023-11-03T16:13:18, 2023-11-03T16:13:18, 202..."


In [65]:

# get the length of signal name column
df['length_of_query'] = [len(l) for l in df['query_name'].tolist()]
df['length_of_signal'] = [len(l) for l in df['signal'].tolist()]
# sum(np.array(length_of_signal_name) - (np.array(length_of_values)))
print(df['length_of_query'].value_counts(),
df['length_of_signal'].value_counts())


length_of_query
13    5649412
Name: count, dtype: int64

length_of_signal
11    4018924
13    1630471
12          9
14          8
Name: count, dtype: int64

In [66]:
# remove all strange signals
df = df[(df['length_of_signal']==11) | (df['length_of_signal']==13)]
df_11 = df[(df['length_of_signal']==11)].copy()
df_13 = df[(df['length_of_signal']==13)].copy()

In [67]:
# these are the signal names
signal_names = ['Submit', 'Eligible', 'Start', 'End', 'Elapsed',
                'JobID', 'JobName', 'State', 'AllocCPUS', 'TotalCPU',
                'AveRSS', 'MaxRSS',
                'NodeList']



# for the 13 signals
for i, signal_name in enumerate(signal_names):
    df_13[signal_name] = df_13['signal'].apply(lambda x:x[i])
    

# for the 11 signals
for i, signal_name in enumerate(signal_names[0:10] + signal_names[-1:]):
    df_11[signal_name] = df_11['signal'].apply(lambda x:x[i])


# concant the two frames:
df_cleaned = pd.concat([df_13, df_11], axis=0)
df_cleaned.sample(n=10)

Unnamed: 0,job_id,query_name,signal,length_of_query,length_of_signal,Submit,Eligible,Start,End,Elapsed,JobID,JobName,State,AllocCPUS,TotalCPU,AveRSS,MaxRSS,NodeList
0,4799751,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-01-08T13:01:20, 2024-01-08T13:01:21, 202...",13,11,2024-01-08T13:01:20,2024-01-08T13:01:21,2024-01-08T13:12:25,2024-01-08T13:12:40,00:00:15,4799528_158,france_nc+,COMPLETED,16,00:00:00,,,tcn468
0,8018778,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-10-01T09:02:53, 2024-10-01T09:02:53, 202...",13,11,2024-10-01T09:02:53,2024-10-01T09:02:53,2024-10-01T09:03:17,2024-10-01T17:01:56,07:58:39,8018778,NMes3_rad+,COMPLETED,16,4-21:16:07,,,tcn194
0,7082974,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-07-22T12:11:41, 2024-07-22T12:11:41, 202...",13,11,2024-07-22T12:11:41,2024-07-22T12:11:41,2024-07-22T12:11:58,2024-07-22T12:14:25,00:02:27,7082974,4b5d1e31-+,COMPLETED,32,00:00:00,,,tcn193
3,7748757,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-09-07T10:08:32, 2024-09-07T10:08:32, 202...",13,13,2024-09-07T10:08:32,2024-09-07T10:08:32,2024-09-07T10:08:32,2024-09-07T10:16:01,00:07:29,7748757.0,vasp_gam,COMPLETED,32,03:51:49,85599552,90632K,tcn206
3,8234916,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-10-21T20:06:32, 2024-10-21T20:06:32, 202...",13,13,2024-10-21T20:06:32,2024-10-21T20:06:32,2024-10-21T20:06:32,2024-10-22T06:06:30,09:59:58,8234916.0,vasp_gam,CANCELLED,192,79-04:24:+,232230373,261230K,tcn655
5,8070823,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-10-04T05:20:46, 2024-10-04T05:20:46, 202...",13,13,2024-10-04T05:20:46,2024-10-04T05:20:46,2024-10-04T05:20:46,2024-10-04T07:08:31,01:47:45,8070823.2,python,COMPLETED,18,01:44:09,8818438K,8818438K,gcn7
0,6636839,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-06-14T09:30:44, 2024-06-14T09:30:44, 202...",13,11,2024-06-14T09:30:44,2024-06-14T09:30:44,2024-06-14T09:31:14,2024-06-14T10:06:50,00:35:36,6636839,bash,COMPLETED,16,00:00:00,,,tcn118
0,6775358,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-06-27T02:57:57, 2024-06-27T02:57:57, 202...",13,11,2024-06-27T02:57:57,2024-06-27T02:57:57,2024-06-27T02:57:58,2024-06-27T03:07:19,00:09:21,6775358,j.2008120+,COMPLETED,96,00:00:00,,,tcn1174
0,7825812,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-09-13T11:36:41, 2024-09-13T11:36:52, 202...",13,11,2024-09-13T11:36:41,2024-09-13T11:36:52,2024-09-13T11:37:08,2024-09-13T11:40:10,00:03:02,7825754_55,run.sh,CANCELLED+,16,03:03.603,,,tcn5
1612,8001373,"[Submit, Eligible, Start, End, Elapsed, JobID,...","[2024-09-29T23:57:55, 2024-09-29T23:57:55, 202...",13,13,2024-09-29T23:57:55,2024-09-29T23:57:55,2024-09-29T23:57:55,2024-09-29T23:58:08,00:00:13,8001373_537+,batch,FAILED,16,00:00.276,1681K,1681K,tcn306


In [68]:
df_cleaned.drop(['query_name','signal', 'length_of_query',
                 'length_of_signal', 'JobName'], axis=1, inplace=True)

df_cleaned.rename(columns={"JobID":"Slurm_job_id"}, inplace=True)

: 

In [42]:
df_cleaned.to_parquet(folder_path / 'slurm_data_preprocessed.parquet.gzip', compression='gzip')