In [1]:
slurm_file_path ="slurm_expanded.parquet"
prom_file_path = "models/prom_pure_label.parquet"

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import unix_timestamp, lag, when, sum, min, max, col, expr, collect_list, size
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder \
    .master("local") \
    .appName("ReadParquetFiles") \
    .config("spark.jars", "/home/xiaoyu/mariadb-java-client-3.1.2.jar") \
    .config("spark.driver.memory", "32G") \
    .config("spark.executor.memory", "8G") \
    .getOrCreate()

Picked up _JAVA_OPTIONS: -Xmx26624m
Picked up _JAVA_OPTIONS: -Xmx26624m


23/08/16 00:38:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
slurm_data = spark.read.parquet(slurm_file_path, header=True, inferSchema=True)

                                                                                

In [5]:
prom_data = spark.read.parquet(prom_file_path, header=True, inferSchema=True)

In [7]:
slurm_data = slurm_data.select(*(col(c).alias('slurm_' + c) for c in slurm_data.columns))

In [8]:

slurm_data = slurm_data.withColumn('finishTasks', when(slurm_data['slurm_state'] == 'COMPLETED', 1).otherwise(0))

In [9]:
from pyspark.sql.functions import col

start_time = "2022-06-30 18:00:30"
end_time = "2022-11-01 00:00:00"

slurm_data = slurm_data.filter(
    (col("slurm_start_date") >= start_time) & (col("slurm_end_date") <= end_time)
)

prom_data = prom_data.filter(
    (col("timestamp") >= start_time) & (col("timestamp") <= end_time)
)

In [10]:
from pyspark.sql.functions import sum as _sum
from pyspark.sql.functions import broadcast

result = prom_data.join(
    broadcast(slurm_data),
    (prom_data['timestamp'] >= slurm_data['slurm_start_date']) & 
    (prom_data['timestamp'] <= slurm_data['slurm_end_date']) &
    (slurm_data['slurm_node'] == prom_data['node']),
    how='left_outer'
)

grouped = result.groupBy(*prom_data.columns).agg(
    collect_list('slurm_id').alias('jobID'),
    _sum('finishTasks').alias('finishTasks')
)

grouped = grouped.withColumn('jobCount', size('jobID'))

grouped.show()

[Stage 5:>                                                          (0 + 1) / 1]

+-------------------+------+-----------+----------+----------+------------------------+--------------------------------+--------------------------+-----------------------+-------------------------+-------------------------------------+----------------------------------+---------+-----------------+-----+-----------+--------+
|          timestamp|  node|node_load15|node_load5|node_load1|node_memory_Active_bytes|node_filesystem_device_error-sum|node_netstat_Icmp_InErrors|node_netstat_Tcp_InErrs|node_netstat_Udp_InErrors|nvidia_gpu_power_usage_milliwatts-max|nvidia_gpu_temperature_celsius-max|isAnomaly|__index_level_0__|jobID|finishTasks|jobCount|
+-------------------+------+-----------+----------+----------+------------------------+--------------------------------+--------------------------+-----------------------+-------------------------+-------------------------------------+----------------------------------+---------+-----------------+-----+-----------+--------+
|2022-06-30 18:00:30|r

                                                                                

In [12]:
from pyspark.sql.functions import sum as _sum, struct, collect_list, size, broadcast

result = prom_data.join(
    broadcast(slurm_data),
    (prom_data['timestamp'] >= slurm_data['slurm_start_date']) & 
    (prom_data['timestamp'] <= slurm_data['slurm_end_date']) &
    (slurm_data['slurm_node'] == prom_data['node']),
    how='left_outer'
)

result = result.withColumn('jobStruct', struct('slurm_id', 'slurm_state'))

grouped = result.groupBy(*prom_data.columns).agg(
    collect_list('jobStruct').alias('jobList'),
    _sum('finishTasks').alias('finishTasks')
)

grouped = grouped.withColumn('jobCount', size('jobList'))

grouped.show(truncate=False)


[Stage 5:>                                                          (0 + 1) / 1]

+-------------------+------+-----------+----------+----------+------------------------+--------------------------------+--------------------------+-----------------------+-------------------------+-------------------------------------+----------------------------------+---------+-----------------+--------------+-----------+--------+
|timestamp          |node  |node_load15|node_load5|node_load1|node_memory_Active_bytes|node_filesystem_device_error-sum|node_netstat_Icmp_InErrors|node_netstat_Tcp_InErrs|node_netstat_Udp_InErrors|nvidia_gpu_power_usage_milliwatts-max|nvidia_gpu_temperature_celsius-max|isAnomaly|__index_level_0__|jobList       |finishTasks|jobCount|
+-------------------+------+-----------+----------+----------+------------------------+--------------------------------+--------------------------+-----------------------+-------------------------+-------------------------------------+----------------------------------+---------+-----------------+--------------+-----------+-----

                                                                                

In [13]:
distinct_values = grouped.select('finishTasks', 'jobCount').dropDuplicates()

distinct_values.show()



+-----------+--------+
|finishTasks|jobCount|
+-----------+--------+
|          1|      12|
|          6|      13|
|         10|      14|
|         13|      14|
|          1|      15|
|          4|       8|
|         11|      13|
|          1|       7|
|          4|       9|
|         10|      10|
|          0|      10|
|          4|       7|
|          2|       8|
|          6|      14|
|          0|       4|
|          8|      16|
|          8|       8|
|          0|       5|
|         13|      15|
|          3|       9|
+-----------+--------+
only showing top 20 rows



                                                                                

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, max
from pyspark.sql.types import DateType
slurm_data = slurm_data.withColumn('slurm_start_date', slurm_data['slurm_start_date'].cast(TimestampType()))
slurm_data = slurm_data.withColumn('slurm_end_date', slurm_data['slurm_end_date'].cast(TimestampType()))
earliest_start_date = slurm_data.agg(min('slurm_start_date')).collect()[0][0]
latest_end_date = slurm_data.agg(max('slurm_end_date')).collect()[0][0]

print("最早的开始日期:", earliest_start_date)
print("最晚的结束日期:", latest_end_date)

最早的开始日期: 2022-06-30 18:00:36
最晚的结束日期: 2022-10-31 23:56:15


In [9]:
from pyspark.sql.functions import min, max
from pyspark.sql.types import TimestampType
prom_data = prom_data.withColumn('timestamp', prom_data['timestamp'].cast(TimestampType()))

earliest_timestamp = prom_data.agg(min('timestamp')).collect()[0][0]
latest_timestamp = prom_data.agg(max('timestamp')).collect()[0][0]

time_span = latest_timestamp - earliest_timestamp

print("最早的时间戳:", earliest_timestamp)
print("最晚的时间戳:", latest_timestamp)
print("时间跨度:", time_span)



最早的时间戳: 2022-06-30 18:00:30
最晚的时间戳: 2022-11-01 00:00:00
时间跨度: 123 days, 5:59:30


                                                                                

In [35]:
prom_data.show()



+-------------------+------+-----------+----------+----------+------------------------+--------------------------------+--------------------------+-----------------------+-------------------------+-------------------------------------+----------------------------------+---------+-----------------+
|          timestamp|  node|node_load15|node_load5|node_load1|node_memory_Active_bytes|node_filesystem_device_error-sum|node_netstat_Icmp_InErrors|node_netstat_Tcp_InErrs|node_netstat_Udp_InErrors|nvidia_gpu_power_usage_milliwatts-max|nvidia_gpu_temperature_celsius-max|isAnomaly|__index_level_0__|
+-------------------+------+-----------+----------+----------+------------------------+--------------------------------+--------------------------+-----------------------+-------------------------+-------------------------------------+----------------------------------+---------+-----------------+
|2022-08-02 01:00:30|r12n20|       3.25|      2.43|       4.4|               4.58505E9|                

                                                                                

In [11]:
grouped.write.parquet('prom_integrated.parquet')

                                                                                

In [13]:
grouped.write.parquet('prom_integrated_v2.parquet')

                                                                                

In [None]:
slurm_data['state'] = slurm_data['state'].replace({'CANCELLED by .*': 'CANCELLED'}, regex=True)

In [None]:
prom_data['jobCount'] = prom_data['jobID'].apply(len)

In [None]:
states = ['TIMEOUT', 'COMPLETED', 'CANCELLED', 'FAILED', 'OUT_OF_MEMORY', 'NODE_FAIL']

In [None]:
for state in states:
    prom_data[state] = 0

def count_states(job_ids, slurm, prom_data_row):
    job_states = slurm[slurm['id'].isin(job_ids)]['state']
    for state in job_states:
        if state in states:
            prom_data_row[state] += 1
    return prom_data_row

prom_data = prom_data.apply(lambda row: count_states(row['jobID'], slurm_data, row), axis=1)

In [None]:
prom_data.to_parquet('prom_inter.parquet')

In [41]:
prom_data

Unnamed: 0,id,timestamp,node,node_time_seconds,node_load15,surfsara_power_usage,up,node_netstat_Tcp_OutSegs,node_netstat_Tcp_InErrs,node_context_switches_total,...,node_disk_io_now-sum,node_rapl_package_joules_total-sum,node_network_receive_drop_total-sum,jobID,TIMEOUT,COMPLETED,CANCELLED,FAILED,OUT_OF_MEMORY,NODE_FAIL
1972309,10530208,2022-06-30 16:00:30,r11n5,1.656610e+09,16.08,192.0,1.0,1.001440e+11,0.0,2.568690e+11,...,0.0,187752.55,0.0,[],0,0,0,0,0,0
1972589,10530209,2022-06-30 16:01:00,r11n5,1.656610e+09,16.07,192.0,1.0,1.001450e+11,0.0,2.568690e+11,...,0.0,191318.44,0.0,[],0,0,0,0,0,0
1972869,10530210,2022-06-30 16:01:30,r11n5,1.656610e+09,16.07,192.0,1.0,1.001460e+11,0.0,2.568690e+11,...,0.0,194886.42,0.0,[],0,0,0,0,0,0
1973149,10530211,2022-06-30 16:02:00,r11n5,1.656610e+09,16.07,192.0,1.0,1.001480e+11,0.0,2.568690e+11,...,0.0,198454.06,0.0,[],0,0,0,0,0,0
1973429,10530212,2022-06-30 16:02:30,r11n5,1.656610e+09,16.07,192.0,1.0,1.001480e+11,0.0,2.568690e+11,...,0.0,201958.90,0.0,[],0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
96535621,147665620,2022-11-22 11:18:30,r11n5,1.669120e+09,5.92,92.0,1.0,2.411190e+09,0.0,1.449280e+11,...,0.0,4180.07,0.0,[],0,0,0,0,0,0
96535841,147665621,2022-11-22 11:19:00,r11n5,1.669120e+09,5.92,92.0,1.0,2.411330e+09,0.0,1.449320e+11,...,0.0,5824.30,0.0,[],0,0,0,0,0,0
96536061,147665622,2022-11-22 11:19:30,r11n5,1.669120e+09,5.89,92.0,1.0,2.411400e+09,0.0,1.449370e+11,...,0.0,7205.37,0.0,[],0,0,0,0,0,0
96536281,147665623,2022-11-22 11:20:00,r11n5,1.669120e+09,5.77,92.0,1.0,2.411660e+09,0.0,1.449420e+11,...,0.0,9073.11,0.0,[],0,0,0,0,0,0


In [44]:
prom_data['COMPLETED'].unique()

array([0, 1, 5, 4, 3, 2, 8, 7, 6, 9])

In [43]:
r11n5_slurm['start_date'] = r11n5_slurm['start_date'].dt.round('30S')
r11n5_slurm['end_date'] = r11n5_slurm['end_date'].dt.ceil('30S')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  r11n5_slurm['start_date'] = r11n5_slurm['start_date'].dt.round('30S')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  r11n5_slurm['end_date'] = r11n5_slurm['end_date'].dt.ceil('30S')


In [45]:
def count_jobs_start(row, r11n5_slurm):
    return r11n5_slurm[r11n5_slurm['start_date'] == row['timestamp']].shape[0]

prom_data['jobCount_30s'] = prom_data.apply(lambda row: count_jobs_start(row, r11n5_slurm), axis=1)


In [47]:
states = ['TIMEOUT', 'COMPLETED', 'CANCELLED', 'FAILED', 'OUT_OF_MEMORY', 'NODE_FAIL']

for state in states:
    prom_data[state + '_30s'] = 0

def count_states_30s(row, r11n5_slurm):
    job_states_30s = r11n5_slurm[r11n5_slurm['end_date'] == row['timestamp']]['state']
    for state in job_states_30s:
        if state in states:
            row[state + '_30s'] += 1
    return row

prom_data = prom_data.apply(lambda row: count_states_30s(row, r11n5_slurm), axis=1)

In [48]:
prom_data.to_parquet('integrated_r11n5.parquet')


In [49]:
prom_data

Unnamed: 0,id,timestamp,node,node_time_seconds,node_load15,surfsara_power_usage,up,node_netstat_Tcp_OutSegs,node_netstat_Tcp_InErrs,node_context_switches_total,...,FAILED,OUT_OF_MEMORY,NODE_FAIL,jobCount_30s,TIMEOUT_30s,COMPLETED_30s,CANCELLED_30s,FAILED_30s,OUT_OF_MEMORY_30s,NODE_FAIL_30s
1972309,10530208,2022-06-30 16:00:30,r11n5,1.656610e+09,16.08,192.0,1.0,1.001440e+11,0.0,2.568690e+11,...,0,0,0,0,0,0,0,0,0,0
1972589,10530209,2022-06-30 16:01:00,r11n5,1.656610e+09,16.07,192.0,1.0,1.001450e+11,0.0,2.568690e+11,...,0,0,0,0,0,0,0,0,0,0
1972869,10530210,2022-06-30 16:01:30,r11n5,1.656610e+09,16.07,192.0,1.0,1.001460e+11,0.0,2.568690e+11,...,0,0,0,0,0,0,0,0,0,0
1973149,10530211,2022-06-30 16:02:00,r11n5,1.656610e+09,16.07,192.0,1.0,1.001480e+11,0.0,2.568690e+11,...,0,0,0,0,0,0,0,0,0,0
1973429,10530212,2022-06-30 16:02:30,r11n5,1.656610e+09,16.07,192.0,1.0,1.001480e+11,0.0,2.568690e+11,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
96535621,147665620,2022-11-22 11:18:30,r11n5,1.669120e+09,5.92,92.0,1.0,2.411190e+09,0.0,1.449280e+11,...,0,0,0,0,0,0,0,0,0,0
96535841,147665621,2022-11-22 11:19:00,r11n5,1.669120e+09,5.92,92.0,1.0,2.411330e+09,0.0,1.449320e+11,...,0,0,0,0,0,0,0,0,0,0
96536061,147665622,2022-11-22 11:19:30,r11n5,1.669120e+09,5.89,92.0,1.0,2.411400e+09,0.0,1.449370e+11,...,0,0,0,0,0,0,0,0,0,0
96536281,147665623,2022-11-22 11:20:00,r11n5,1.669120e+09,5.77,92.0,1.0,2.411660e+09,0.0,1.449420e+11,...,0,0,0,0,0,0,0,0,0,0


In [50]:
prom_data['COMPLETED_30s'].unique()

array([0, 1, 2, 3, 4])