# CRAB Spark tape recall history

This jobs is querying `rules_history` table of cmsrucio to answer theses questions:
- How long do tasks stay in “taperecall”?

In [2]:
from datetime import datetime, timedelta, timezone
import os
import time
import pandas as pd

from pyspark import SparkContext, StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    current_user,
    col, collect_list, concat_ws, greatest, lit, lower, when,
    avg as _avg,
    count as _count,
    hex as _hex,
    max as _max,
    min as _min,
    round as _round,
    sum as _sum,
)
from pyspark.sql.types import (
    StructType,
    LongType,
    StringType,
    StructField,
    DoubleType,
    IntegerType,
)

In [3]:
spark = SparkSession\
        .builder\
        .appName('tape-recall-history')\
        .getOrCreate()
spark

24/10/04 19:20:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
spark.catalog.clearCache()

In [5]:
# arguments
# secret path, also check if file exists
secretpath = os.environ.get('OPENSEARCH_SECRET_PATH', f'{os.getcwd()}/../workdir/secret_opensearch.txt')
if not os.path.isfile(secretpath): 
    raise Exception(f'OS secrets file {secretpath} does not exists')
# if PROD, index prefix will be `crab-*`, otherwise `crab-test-*`
PROD = os.environ.get('PROD', 'false').lower() in ('true', '1', 't')
# FROM_DATE, in strptime("%Y-%m-%d")
START = os.environ.get('START_DATE', None) 
END = os.environ.get('END_DATE', None)

In [6]:
# try to import osearch from current directory, fallback to $PWD/../workdir if not found
try:
    import osearch
except ModuleNotFoundError:
    import sys
    sys.path.insert(0, f'{os.getcwd()}/../workdir')
    import osearch

In [7]:
# variables for run inside notebook
START_DATE = "2020-01-01"
END_DATE = "2024-10-01"
# if cronjob, replace constant with value from env
if START and END:
    START_DATE = START
    END_DATE = END

In [8]:
# index name
index_name = 'tape-recall-history' # always put test index prefix
# use prod index pattern if this execution is for production
if PROD:
    index_name = f'crab-{index_name}'
else:
    index_name = f'crab-test-{index_name}'

In [9]:
# datetime object
start_datetime = datetime.strptime(START_DATE, "%Y-%m-%d").replace(tzinfo=timezone.utc)
end_datetime = datetime.strptime(END_DATE, "%Y-%m-%d").replace(tzinfo=timezone.utc)
# sanity check
if end_datetime < start_datetime: 
    raise Exception(f"end date ({END_DATE}) is less than start date ({START_DATE})")
start_epochmilis = int(start_datetime.timestamp()) * 1000
end_epochmilis = int(end_datetime.timestamp()) * 1000
yesterday_epoch = int((end_datetime-timedelta(days=1)).timestamp())

In [10]:
# debug
print(START_DATE, 
      END_DATE, 
      index_name,
      sep='\n')

2020-01-01
2024-10-01
crab-test-tape-recall-history


In [52]:
# Import data into spark

HDFS_RUCIO_RULES_HISTORY = f'/project/awg/cms/rucio/{END_DATE}/rules_history/'

print("==============================================="
      , "RUCIO : Rules History"
      , "==============================================="
      , "File Directory:", HDFS_RUCIO_RULES_HISTORY
      , "Work Directory:", os.getcwd()
      , "==============================================="
      , "===============================================", sep='\n')

# we only interest in the rules where state does not change anymore.
# which means, only the rules that already expired.
rucio_rules_history = (
    spark.read.format('avro').load(HDFS_RUCIO_RULES_HISTORY).withColumn('ID', lower(_hex(col('ID'))))
         .select("ID", "ACCOUNT", "NAME", "STATE", "EXPIRES_AT", "UPDATED_AT", "CREATED_AT")
         .filter(f"""\
                  1=1
                  AND ACTIVITY = 'Analysis TapeRecall'
                  AND EXPIRES_AT >= {start_epochmilis}
                  AND EXPIRES_AT < {end_epochmilis}
                  """)
         .cache()
)
rucio_rules_history.createOrReplaceTempView("rules_history")

HDFS_CRAB_part = f'/project/awg/cms/crab/tasks/{END_DATE}/'
print("==============================================="
      , "CRAB Table"
      , "==============================================="
      , "File Directory:", HDFS_CRAB_part
      , "Work Directory:", os.getcwd()
      , "==============================================="
      , "===============================================", sep='\n')

# do not filter taskdb by create time (TM_START_TIME) because it is possible that rules are created 6 months ago
tasks_df = (
    spark.read.format('avro').load(HDFS_CRAB_part)
         .select("TM_TASKNAME","TM_START_TIME","TM_TASK_STATUS",  'TM_TASKNAME', 'TM_START_TIME', 'TM_TASK_STATUS' , 'TM_DDM_REQID')
         .cache()
)
tasks_df.createOrReplaceTempView("tasks")

RUCIO : Rules History
File Directory:
/project/awg/cms/rucio/2024-10-01/rules_history/
Work Directory:
/eos/home-i00/t/tseethon/SWAN_projects/CRABServer/src/script/Monitor/crab-spark/notebooks
CRAB Table
File Directory:
/project/awg/cms/crab/tasks/2024-10-01/
Work Directory:
/eos/home-i00/t/tseethon/SWAN_projects/CRABServer/src/script/Monitor/crab-spark/notebooks


24/10/04 20:23:43 WARN CacheManager: Asked to cache already cached data.


In [53]:
# rucio append new row to rules_history when the content rules table change (not sure the exact condition)
# We need to get "the latest" row for each rules by:
# - If rule has state "O", select the earliest UPDATED_AT row.
#   For the OK rule, we can calculate number of days using UPDATED_AT-CREATED_AT. 
#   However, there are some posiblility that rucio append new entry with newer UPDATED_AT (For exmple 37fcada73f14439b88558ef792e10276)
# - If not, select the latest UPDATED_AT row.
#   This because the rules still in temporary state, and the rules will go to the end state 
#   (not the real state, but rules_history will not getting new row anymore) after rules is expired 
#   So, we can calculate number of day by EXPIRES_AT-CREATED_AT
#
# Here is the step to translate above condition to SQL (in the buttom-up manner)
# 1. count number of row where the state is 'O'.
# 2. left join the rule history by ID, so each row will have number of state O 
# 3. select the earliest row for "the rule that have state O"=> where clause. this can be done by windows function, sort by UPDATED_AT ascending for each ID, then filter only row_number "1"
# 4. select the latest row for "the rule that does not have state O at all". 
#    This is a bit tricky but can be done by filter out the rule that have number of state O more than zero.
#    which this column already availabe from left join in step 2.
#    For the "select latest row" we do the same way as 4. but sort by UPDATED_AT descending instead.
# 5. merge result from 3. and 4.
# 6. (not) done (yet)! We will calculate number of date in the next step

query = f"""\
--- rn_t then latestupdate_t is simply translation of drop_duplicates(), to get the latest entry of each rules
WITH 
tba_t AS (
SELECT *
FROM rules_history
),
t1 AS (
SELECT ID, 
       SUM(CASE WHEN state = 'O' THEN 1 ELSE 0 END) AS state_o
FROM tba_t
GROUP BY ID
),
t2 AS (
SELECT tba_t.ID AS ID, tba_t.ACCOUNT AS ACCOUNT, tba_t.NAME AS NAME, tba_t.STATE AS STATE, tba_t.EXPIRES_AT AS EXPIRES_AT, tba_t.UPDATED_AT AS UPDATED_AT, tba_t.CREATED_AT AS CREATED_AT,
       t1.state_o AS state_o
FROM tba_t
LEFT JOIN t1 ON tba_t.ID = t1.ID
),
t3 AS (
SELECT *, row_number() over(partition by ID order by UPDATED_AT) as row_num
FROM t2
WHERE STATE = 'O'
), 
r1 AS (
SELECT * FROM t3
WHERE row_num = 1
),
t4 AS (
SELECT *, row_number() over(partition by ID order by UPDATED_AT DESC) as row_num
FROM t2
WHERE STATE != 'O' AND state_o = 0
),
r2 AS (
SELECT * FROM t4
WHERE row_num = 1
),
r_all AS (
SELECT * FROM r1
UNION ALL
SELECT * FROM r2
)
SELECT * 
FROM r_all
"""

tmprules = spark.sql(query)
tmprules.show(10, False)
tmprules.createOrReplaceTempView("tmprules")

+--------------------------------+--------+----------------------------------------------------------------------------------------------------------------------------------+-----+-------------+-------------+-------------+-------+-------+-------+-------+
|ID                              |ACCOUNT |NAME                                                                                                                              |STATE|EXPIRES_AT   |UPDATED_AT   |CREATED_AT   |state_s|state_o|state_r|row_num|
+--------------------------------+--------+----------------------------------------------------------------------------------------------------------------------------------+-----+-------------+-------------+-------------+-------+-------+-------+-------+
|0006907403fe4e9486a3aed69406d8f9|tihsu   |/DoublePhoton_FlatPt-1To100-gun/Phase2Spring23DIGIRECOMiniAOD-PU200_Trk1GeV_131X_mcRun4_realistic_v5-v1/GEN-SIM-DIGI-RAW-MINIAOD  |O    |1726479827000|1726134227000|1725642766000|0      |1    

In [78]:
# Calculate number of days, for state O, UPDATED_AT-CREATED_AT, otherwise EXPIRES_AT-CREATED_AT
# then enrich the data with the crab taskdb table by join rule ID with TM_DDM_REQID column
# need to apply windows function again to select only the rule id with the latest crab tasks

query = f"""\
WITH 
calc_days_t AS (
SELECT ID, ACCOUNT, NAME, STATE, EXPIRES_AT, UPDATED_AT, CREATED_AT,
       CASE 
           WHEN STATE = 'O' THEN ceil((UPDATED_AT-CREATED_AT)/86400000)  
           ELSE ceil((EXPIRES_AT-CREATED_AT)/86400000)
       END AS DAYS
FROM tmprules
),
join_t AS (
SELECT 
    calc_days_t.ID AS ID, 
    calc_days_t.ACCOUNT AS ACCOUNT, 
    calc_days_t.NAME AS NAME, 
    calc_days_t.STATE AS STATE, 
    calc_days_t.DAYS AS DAYS, 
    calc_days_t.EXPIRES_AT AS EXPIRES_AT, 
    calc_days_t.UPDATED_AT AS UPDATED_AT, 
    calc_days_t.CREATED_AT AS CREATED_AT, 
    tasks.TM_TASKNAME AS TM_TASKNAME,
    IFNULL(tasks.TM_START_TIME, 0) AS TM_START_TIME, 
    tasks.TM_TASK_STATUS AS TM_TASK_STATUS
FROM calc_days_t
LEFT JOIN tasks ON calc_days_t.ID = tasks.TM_DDM_REQID
),
window_t AS (
SELECT ID, ACCOUNT, NAME, STATE, DAYS, EXPIRES_AT, UPDATED_AT, CREATED_AT, TM_TASKNAME, TM_START_TIME, TM_TASK_STATUS, 
       row_number() OVER (PARTITION BY ID ORDER BY TM_START_TIME DESC) AS row_num
FROM join_t 
),
uniqueid_t AS (
SELECT *
FROM window_t 
WHERE row_num = 1
), 
finalize_t AS (
SELECT ID, ACCOUNT, NAME, STATE, DAYS, EXPIRES_AT, UPDATED_AT, CREATED_AT, TM_TASKNAME, IFNULL(TM_START_TIME, 0) as TM_START_TIME, TM_TASK_STATUS, 
       EXPIRES_AT AS timestamp,
       'tape_recall_history' AS type
FROM uniqueid_t 
)
SELECT ID, COUNT(*) AS C
FROM finalize_t
GROUP BY ID
ORDER BY C DESC
"""

tmpdf = spark.sql(query)
tmpdf.show(10, False)


+--------------------------------+---+
|ID                              |C  |
+--------------------------------+---+
|0006907403fe4e9486a3aed69406d8f9|1  |
|002657a392fe46799db39696ef9acf04|1  |
|007fa987e8b94ae98ade337e2ae9e412|1  |
|0201b5b36a9546a79ebfb6ac3899b7c5|1  |
|028a5ca0f941415e8061447ade99402f|1  |
|058c85a3008d46f1b9b27dae1ff2d3ad|1  |
|059bd3a99b3b41b9ae5c7522daee676e|1  |
|05c7603ed8594a7ab6da0872015fd2a7|1  |
|05eb729cbaf641189c41892adcbb4cf3|1  |
|0639080709c2405e85be4562fc48485c|1  |
+--------------------------------+---+
only showing top 10 rows



In [79]:
tmpdf.count()

319

In [None]:
query = f"""\
repeated_ids AS (
    SELECT ID
    FROM rules_history
    GROUP BY ID
    HAVING COUNT(*) > 2
),
tba_t AS (
SELECT *
FROM rules_history
)
SELECT * FROM tba_t
"""

tmpdf = spark.sql(query)
tmpdf.show(100, False)

In [None]:
spark.sql("""\
SELECT * FROM rules_history
WHERE ID = '37fcada73f14439b88558ef792e10276'
""").show(10, False)

In [None]:
tmpdf.count()

In [None]:
docs = tmpdf.toPandas().to_dict('records')

In [None]:
schema = {
        "settings": {"index": {"number_of_shards": "1", "number_of_replicas": "1"}},
        "mappings": {
            "properties": {
                "ID": {"ignore_above": 2048, "type": "keyword"},
                "ACCOUNT": {"ignore_above": 2048, "type": "keyword"},
                "NAME": {"ignore_above": 2048, "type": "keyword"},
                "STATE": {"ignore_above": 2048, "type": "keyword"},
                "DAYS": {"type": "long"},
                "EXPIRES_AT": {"format": "epoch_millis", "type": "date"},
                "UPDATED_AT": {"format": "epoch_millis", "type": "date"},
                "CREATED_AT": {"format": "epoch_millis", "type": "date"},
                "TM_TASKNAME": {"ignore_above": 2048, "type": "keyword"},
                "TM_START_TIME": {"format": "epoch_millis", "type": "date"},
                "TM_TASK_STATUS": {"ignore_above": 2048, "type": "keyword"},
                "type": {"ignore_above": 2048, "type": "keyword"},
                "timestamp": {"format": "epoch_millis", "type": "date"},
            }

        }

    }


In [None]:
import importlib
importlib.reload(osearch)

In [None]:
osearch.send_os(docs, index_name, schema, secretpath, yesterday_epoch)

In [None]:
# Add a single doc to es everyday to check if pipeline is running successfully.
# This is need because we did not have rule that expires everyday
# Remember to filter it out in grafana (For example `NOT ID:00000000000000000` in lucene query)
day = start_datetime
monitoring_docs = []
while day < end_datetime:
    milisec = int(day.timestamp())*1000
    doc = {
        "ID": '00000000000000000',
        "ACCOUNT": 'cmscrab',
        "NAME": '/Pipeline/Monitoring/AOD',
        "STATE": 'P',
        "DAYS": -1,
        "EXPIRES_AT": milisec,
        "UPDATED_AT": milisec,
        "CREATED_AT": milisec,
        "TM_TASKNAME": '240000_000000:cmscrab_crab_20240000_000000',
        "TM_START_TIME": milisec,
        "TM_TASK_STATUS": 'PLACEHOLDER',
        "type": 'tape_recall_history',
        "timestamp": milisec,

    }
    monitoring_docs.append(doc)
    day += timedelta(days=1)
osearch.send_os(monitoring_docs, index_name, schema, secretpath, timestamp_str)