# Extraction of FTS transfer error messages

**Objective:** 

 - extract FTS transfer error data
 - explore the data
 - save relevant samples and summaries

### Spark Session 

In [1]:
%%time

# start Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("fts_data_extraction").getOrCreate()
#spark = SparkSession.builder.master("local[*]").appName("fts_data").getOrCreate()
spark

CPU times: user 41.8 ms, sys: 25 ms, total: 66.7 ms
Wall time: 7.56 s


## Import data

**Note:** the period 7/10 - 10/10 is considered since there should be an issue with an FTS instance and we want our approach to be able to spot it.

In [2]:
%%time 

# FTS data path
path_list = ['/project/monitoring/archive/fts/raw/complete/2019/10/{:0>2}/*'.format(i) for i in range(7,11)]

# load the data in the json file
all_transfers = spark.read.json(path_list)

CPU times: user 368 ms, sys: 102 ms, total: 470 ms
Wall time: 4min 10s


### Basic exploration 

The DataFrame obtained is a combination of two DataFrames, one containing the actual data and one the metadata. Thus, we extract just the data part since metadata are not so relevant to our scope:

In [3]:
# retrieve just data 
all_transfers_data = all_transfers.select("data.*")

all_transfers_data.printSchema()

root
 |-- activity: string (nullable = true)
 |-- block_size: long (nullable = true)
 |-- buf_size: long (nullable = true)
 |-- channel_type: string (nullable = true)
 |-- chk_timeout: long (nullable = true)
 |-- dest_srm_v: string (nullable = true)
 |-- dst_hostname: string (nullable = true)
 |-- dst_se: string (nullable = true)
 |-- dst_site_name: string (nullable = true)
 |-- dst_url: string (nullable = true)
 |-- endpnt: string (nullable = true)
 |-- f_size: long (nullable = true)
 |-- file_id: string (nullable = true)
 |-- file_size: long (nullable = true)
 |-- final_destination: string (nullable = true)
 |-- ipv6: boolean (nullable = true)
 |-- is_recoverable: boolean (nullable = true)
 |-- job_id: string (nullable = true)
 |-- job_state: string (nullable = true)
 |-- latency: long (nullable = true)
 |-- log_link: string (nullable = true)
 |-- nstreams: long (nullable = true)
 |-- operation_time: long (nullable = true)
 |-- remote_access: boolean (nullable = true)
 |-- retry: lon

In [1]:
%%time

n_trans = all_transfers_data.count()
n_vars = len(all_transfers_data.columns)

print("FTS transfer dataset shape:", n_trans, n_vars)

'%%time\n\nn_trans = all_transfers_data.count()\nn_vars = len(all_transfers_data.columns)\n\nprint("FTS transfer dataset shape:", n_trans, n_vars)\n'

The dataset of all transfers attempted between 7/10 and 10/10 is made of ~17.5MLN rows and 68 columns. Let us now have a closer look into the variables and their meaning:

### Variables' description

<div class="alert alert-block alert-danger">
<b>Reminder:</b> Check variable relevance to error detection with domain experts and then i) shorten the list (only relevant features with explanations), ii) add reference to documentation, iii) add legend with message/auxiliary variable colors. 
</div>


The following list contains a description of the variables' content:

**Generic information:**

  - "tr_id": "YEAR-MONTH-DAY-HOURMINUTE__sourcese__destse__file_id__job_id",
  - "endpnt": "FTS3 endpoint",
  - "src_srm_v": "Source SRM version, always 2.0 if srm is used",
  - "dest_srm_v": "Destination SRM version, always 2.0 if srm is used",
  - **<font color='green'>"vo"</font>:** "Virtual Organization",
  - "src_url": "Source URL",
  - "dst_url": "Destination URL",
  - **<font color='green'>"src_hostname"</font>:** "Source hostname",
  - **<font color='green'>"dst_hostname"</font>:** "Destination hostname",
  - "src_site_name": "", // Always empty
  - "dst_site_name": "", // Always empty
  - **<font color='green'>"t_channel"</font>:** "source_protocol://source_host__dest_protocol://dest_host",

**Time information:**
  - "timestamp_tr_st": 0, // Timestamp of the whole process start, in milliseconds
  - **<font color='green'>"timestamp_tr_comp"</font>:** 0, // Timestamp of the whole process completion, in milliseconds
  - "timestamp_chk_src_st": 0, // Timestamp when started the validation of the source checksum, in milliseconds
  - "timestamp_chk_src_ended": 0, // Timestamp when finished the validation of the source checksum, in milliseconds
  - "timestamp_checksum_dest_st": 0, // Timestamp when started the validation of the destination checksum, in milliseconds
  - "timestamp_checksum_dest_ended": 0, // Timestamp when finished the validation of the destination checksum, in milliseconds
  - "t_timeout": 0, // Timeout used for the transfer
  - "chk_timeout": 0, // Timeout used for the checksum operations

**Error informations:**
  - **<font color='red'>"t_error_code"</font>:** 0, // Error code: an errno value (i.e. ENOENT)
  
  > corresponds to the errno value returned by the url-copy process (i.e. ENOENT)
  
  - **<font color='red'>"tr_error_scope"</font>:** "Error scope, empty if ok",
  
  > 3 possible values, SOURCE, TRANSFER and DESTINATION depending on where the error happens. SOURCE for instance is set if the source file is not there or the source checksum query fails.
  
  - **<font color='red'>"t_failure_phase"</font>:** "Error phase, empty of ok",
  
  > 3 possible values TRANSFER_PREPARATION, TRANSFER, TRANSFER_FINALIZATION ( more or less they map to the values of tr_error_scope)
  
  - **<font color='red'>"tr_error_category"</font>:** "Error category, empty if ok",
  
  > this is the string representation of the t_error_code as returned by the strerror_r function (https://linux.die.net/man/3/strerror_r), possible values are:
COMMUNICATION_ERROR_ON_SEND, FILE_EXIST, PERMISSION_DENIED, etc

  - **<font color='red'>"t_final_transfer_state"</font>:** "Ok|Error|Abort",

  > != "Ok" for errors

  - **<font color='red'>"t__error_message"</font>:** "Error message, empty if ok",
  
  > string error from the storage



**Transfer metrics:**
  - "tr_bt_transfered": 0, // How many bytes have been transferred
  - "nstreams": 0, // How many streams have been used
  - "buf_size": 0, // TCP buffer size used (for backwards compatibility)
  - "tcp_buf_size": 0, // TCP buffer size used
  - "block_size": 0, // Unused

  - "f_size"  0, // Filesize

  - "time_srm_prep_st": 0, // Timestamp of the start of the SRM GET operation, if any, in milliseconds
  - "time_srm_prep_end": 0, // Timestamp of the completion of the SRM GET operation, if any, in milliseconds
  - "time_srm_fin_st": 0, // Timestamp of the start of the SRM PUT operation, if any, in milliseconds
  - "time_srm_fin_end": 0, // Timestamp of the completion of the SRM PUT operation, if any, in milliseconds

  - "srm_space_token_src": "Source space token, if any",
  - "srm_space_token_dst": "Destination space token, if any",

  - "tr_timestamp_start": 0, // Timestamp of the start of *only the transfer part* (excluding preparation), in milliseconds
  - "tr_timestamp_complete": 0, // Timestamp of the completion of *only the transfer part* (excluding preparation), in milliseconds


  - "channel_type": "urlcopy", // Always
  - "user_dn": "User that submitted the job",
  - "file_metadata": "File metadata set by the user at submission",
  - "job_metadata": "Job metadata set by the user at submission"


  - "retry": 0, // When retries are enabled, which retry is this transfer
  - "retry_max": 0, // When retries are enabled, max number of retries for this transfer
  - "job_m_replica": false, // true if this transfer belongs to a multiple replica job
  - "job_state": "Job state, if known",
  - "is_recoverable": false, // true if FTS3 considers this transfer could be retried (depends on the error code)
  - "ipv6": false, // true if the transfer took place over IPv6
  - "transfer_type": "streamed|3rd pull|3rd push" // How the transfer was done

<div class="alert alert-block alert-danger">
<b>Alert:</b> Some of the variables are not documented!
    
 't_final_transfer_state_flag',
 'file_id',
 'file_size',
 'activity',
 'timestamp_checksum_src_diff',
 'final_destination',
 'srm_finalization_time',
 'job_id',
 'timestamp_checksum_dst_diff',
 'src_se',
 'throughput',
 'operation_time',
 'srm_preparation_time',
 'srm_overhead_percentage',
 'srm_overhead_time',
 'log_link',
 'remote_access',
 'user',
 'latency',
 'dst_se'
 
</div>

### Error extraction

Now we focus on transfer errors only and split the DataFrame in multiple tables with similar pieces of information:

In [4]:
errors = all_transfers_data.filter(all_transfers_data["t_final_transfer_state_flag"] == 0)

In [5]:
n_errs = errors.count()
n_errs

4716431

### **Messages**

In [22]:
spark.sparkContext.uiWebUrl

'http://fd1182a52b46:5201'

#### Unique messages 

In [23]:
error_freq = errors.groupBy("t__error_message").count()
error_freq = error_freq.orderBy(error_freq["count"].desc()).withColumn("percentage", error_freq["count"]/n_errs*100)

windowval = (Window.orderBy(error_freq['count'].desc()).rangeBetween(Window.unboundedPreceding, 0))

error_freq = error_freq.withColumn('cum_perc', F.sum('percentage').over(windowval)).withColumn("msg_id", F.monotonically_increasing_id())
error_freq = error_freq.withColumnRenamed("t__error_message", "message").select("msg_id", "message", "count", "percentage", "cum_perc")

error_freq = error_freq.orderBy(error_freq['count'].desc())

In [24]:
%%time
pd_distinct_mess = error_freq.toPandas()
pd_distinct_mess.head()

CPU times: user 2.69 s, sys: 259 ms, total: 2.95 s
Wall time: 4min 32s


Unnamed: 0,msg_id,message,count,percentage,cum_perc
0,0,TRANSFER globus_ftp_client: the server responded with an error 500 500-Command failed. : System error in open: No such file or directory 500-A system call failed: No such file or directory 500 End.,570137,12.088314,12.088314
1,1,"TRANSFER globus_ftp_control: gss_init_sec_context failed GSS Major Status: Unexpected Gatekeeper or Service Name globus_gsi_gssapi: Authorization denied: The name of the remote entity (/DC=EU/DC=EGI/C=ZA/O=Hosts/O=University of Cape Town/CN=dtn.ilifu.ac.za), and the expected name for the remote entity (/CN=dtn.ilifu.ac.za) do not match",324553,6.881326,18.96964
2,2,Error on XrdCl::CopyProcess::Run(): [ERROR] Server responded with an error: [3010] Write permission denied,224141,4.752343,23.721984
3,3,DESTINATION SRM_PUT_TURL error on the turl request : [SE][StatusOfPutRequest][SRM_EXCEED_ALLOCATION] Space associated with the space token 2310037 is not enough to hold SURL.,115402,2.446808,26.168792
4,4,TRANSFER an end-of-file was reached globus_xio: An end of file occurred,113417,2.404721,28.573512


#### Unique tr_error_category 

In [6]:
%%time

from pyspark.sql import Window
from pyspark.sql import functions as F

error_freq = errors.groupBy("tr_error_category").count()
error_freq = error_freq.orderBy(error_freq["count"].desc()).withColumn("percentage", error_freq["count"]/n_errs*100)

windowval = (Window.orderBy(error_freq['count'].desc()).rangeBetween(Window.unboundedPreceding, 0))

error_freq = error_freq.withColumn('cum_perc', F.sum('percentage').over(windowval)).withColumn("msg_id", F.monotonically_increasing_id())
error_freq = error_freq.withColumnRenamed("tr_error_category", "category").select("msg_id", "category", "count", "percentage", "cum_perc")

error_freq = error_freq.orderBy(error_freq['count'].desc())

CPU times: user 26 ms, sys: 5.35 ms, total: 31.4 ms
Wall time: 187 ms


In [7]:
pd_distinct_err_cat = error_freq.toPandas()
pd_distinct_err_cat

Unnamed: 0,msg_id,category,count,percentage,cum_perc
0,0,COMMUNICATION_ERROR_ON_SEND,1736678,36.821868,36.821868
1,1,NO_SUCH_FILE_OR_DIRECTORY,1180219,25.023561,61.845429
2,2,PERMISSION_DENIED,667872,14.160538,76.005967
3,3,CONNECTION_TIMED_OUT,240245,5.093788,81.099755
4,4,INPUT/OUTPUT_ERROR,187725,3.980234,85.07999
5,5,NO_MESSAGE_OF_DESIRED_TYPE,140743,2.9841,88.064089
6,6,PROTOCOL_NOT_SUPPORTED,128317,2.720638,90.784727
7,7,DISK_QUOTA_EXCEEDED,115561,2.450179,93.234906
8,8,FILE_EXISTS,112961,2.395053,95.629958
9,9,NO_SPACE_LEFT_ON_DEVICE,101843,2.159323,97.789282


In [8]:
n_cat = 10
top_n_cat = pd_distinct_err_cat.category.head(10)

top_n_cat

0    COMMUNICATION_ERROR_ON_SEND
1      NO_SUCH_FILE_OR_DIRECTORY
2              PERMISSION_DENIED
3           CONNECTION_TIMED_OUT
4             INPUT/OUTPUT_ERROR
5     NO_MESSAGE_OF_DESIRED_TYPE
6         PROTOCOL_NOT_SUPPORTED
7            DISK_QUOTA_EXCEEDED
8                    FILE_EXISTS
9        NO_SPACE_LEFT_ON_DEVICE
Name: category, dtype: object

#### Unique High Level Group 

Try to parse the message strings to get the informative part at the beginning:

In [12]:
def strip_paths(string, max_len = 20):
    '''Take a string and eliminate path/url (i.e. tokens longer than max_len)'''
    tks = [x for x in string.split() if len(x)<max_len]
    return(' '.join(tks))

def high_level_grouping_str(error_str, max_len = 7, max_len_tokens=30):
    '''Take an error string and return the meaningful description from the beginning of the message'''
    import re

    # strip paths
    error_col = strip_paths(error_str, max_len_tokens) 

    # extract group/subgroup
    grouping = error_col.split(": ")[0]
    
    # reduce long strings (>max_len tokens)
    try:
        grouping = ' '.join(grouping.split()[:max_len])
    except:
        grouping = ' '.join(grouping.split())
        
                        
    # take only capital tokens for long messages and take all for short ones
    if len(grouping.split())>(max_len-1):
        grouping = re.findall(r'\b[A-Z]+_?[A-Z]+_?[A-Z]+_?[A-Z]+\b', grouping)        
                            
    # join list elements to get a string
    if type(grouping)==type(list()):
        grouping = ' '.join(grouping)
                        
    return(grouping)

In [13]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# transform in user defined function
high_level_grouping_udf = udf(high_level_grouping_str, StringType())

# create high level groups
errors = errors.withColumn("hlg", high_level_grouping_udf("t__error_message"))

# compute
error_freq = errors.groupBy("hlg").count()
error_freq = error_freq.orderBy(error_freq["count"].desc()).withColumn("percentage", error_freq["count"]/n_errs*100)

windowval = (Window.orderBy(error_freq['count'].desc()).rangeBetween(Window.unboundedPreceding, 0))

error_freq = error_freq.withColumn('cum_perc', F.sum('percentage').over(windowval)).withColumn("msg_id", F.monotonically_increasing_id())
error_freq = error_freq.withColumnRenamed("hlg", "high_level_group").select("msg_id", "high_level_group", "count", "percentage", "cum_perc")

error_freq = error_freq.orderBy(error_freq['count'].desc())


In [14]:
%%time
pd_distinct_hlg = error_freq.toPandas()
pd_distinct_hlg

CPU times: user 441 ms, sys: 134 ms, total: 575 ms
Wall time: 4min 55s


Unnamed: 0,msg_id,high_level_group,count,percentage,cum_perc
0,0,TRANSFER globus_ftp_client,1305287,27.675312,27.675312
1,1,Error on XrdCl::CopyProcess::Run(),564591,11.970725,39.646037
2,2,srm-ifce err,480659,10.191159,49.837197
3,3,DESTINATION SRM_PUTDONE,373385,7.916685,57.753882
4,4,TRANSFER globus_ftp_control,331525,7.029150,64.783032
5,5,globus_ftp_client,192425,4.079886,68.862918
6,6,TRANSFER ERROR,181285,3.843690,72.706608
7,7,DESTINATION SRM_PUT_TURL,144295,3.059411,75.766019
8,8,Protocol not supported or path/url invalid:,126173,2.675180,78.441198
9,9,Error reported from srm_ifce,120969,2.564842,81.006040


<div class="alert alert-block alert-warning">
<b>Alert:</b> There are 86062 messages that are not dealt with properly by the high_level_grouping_udf function and, thus, result in missing values of hlg. More detailed information on the <b>14 unique strings</b> are reported below:
</div>

In [18]:
missing = errors.filter(errors.hlg=="").select("t__error_message")
pd_missing = missing.toPandas()

In [21]:
import pandas as pd
pd.set_option('display.max_colwidth', -1)

pd_missing.t__error_message.unique()

array(['Destination file exists and overwrite is not enabled',
       'Checksum value required if mode is not end to end',
       'User and source checksums do not match',
       'Source and destination checksums do not match',
       'User and destination checksums do not match',
       'Failed to stat file (No such file or directory)',
       '[gfal2_stat][gfal_plugin_statG][gfal_xrootd_statG] Failed to stat file (No such file or directory)',
       'Result Could not connect to server after 1 attempts',
       'Failure Authentication error, reached maximum number of attempts after 1 attempts',
       'Result Domain name resolution failed after 1 attempts',
       '[gfal2_stat][gfal_plugin_statG][davix2gliberr] Result Domain name resolution failed after 1 attempts',
       'Failed to stat file (No route to host)',
       '[gfal2_stat][gfal_plugin_statG][davix2gliberr] Result Could not connect to server after 1 attempts',
       '[gfal2_stat][gfal_plugin_statG][davix2gliberr] Result Au

In [26]:
print("The total number of errors tracked between 7/10 and 10/10 is:", n_errs)
print("The number of unique messages is:", pd_distinct_mess.shape[0])
print("The number of unique high level groups (parsing) is:", pd_distinct_hlg.shape[0])
print("The number of unique categories (FTS) is:", pd_distinct_err_cat.shape[0])

The total number of errors tracked between 7/10 and 10/10 is: 4716431
The number of unique messages is: 271795
The number of unique high level groups (parsing) is: 88
The number of unique categories (FTS) is: 27


#### Save relevant insights

In [45]:
!pwd

/eos/home-l/lclissa/SWAN_projects/rucio-log-clustering/notebooks


In [48]:
# save summary statistics 
pd_distinct_mess.to_csv("t__error_message_stats.csv", index=False)
pd_distinct_err_cat.to_csv("tr_error_category_stats.csv", index=False)
pd_distinct_hlg.to_csv("hlg_stats.csv", index=False)

In [60]:
# save missing high level groups
import numpy as np

np.savetxt("missing_hlg.csv", pd_missing.t__error_message.unique(), header="message missing_hlg", fmt=('%s'))

##### Clean up environment

In [61]:
%who

F	 SparkSession	 StringType	 Window	 a	 all_transfers	 all_transfers_data	 dist_mess_1	 error_freq	 
errors	 high_level_grouping_str	 high_level_grouping_udf	 missing	 n_cat	 n_errs	 np	 path_list	 pd	 
pd_dist_mess_1	 pd_distinct_err_cat	 pd_distinct_hlg	 pd_distinct_mess	 pd_missing	 selected_category	 spark	 strip_paths	 top_n_cat	 
udf	 windowval	 


In [62]:
del (all_transfers, all_transfers_data, error_freq, path_list, n_cat, top_n_cat, windowval, Window, high_level_grouping_str,
     high_level_grouping_udf, missing, pd_missing)