# Write the messages to pandas frames

## 1. Load data and filter 

In [91]:
! hdfs dfs -ls hdfs:///cms/users/llayer/

19/08/05 13:50:14 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
Found 22 items
-rw-r--r--   3 llayer zh   81467745 2019-05-08 17:46 hdfs:///cms/users/llayer/actionhist.csv
-rw-r--r--   3 llayer zh   34737856 2019-05-08 16:47 hdfs:///cms/users/llayer/actionshist.h5
-rw-r--r--   3 llayer zh   20926756 2019-03-06 14:07 hdfs:///cms/users/llayer/actionshistory.json
drwxr-xr-x   - llayer zh          0 2019-05-19 18:47 hdfs:///cms/users/llayer/debug0.csv
drwxr-xr-x   - llayer zh          0 2019-05-20 16:48 hdfs:///cms/users/llayer/debug1.csv
drwxr-xr-x   - llayer zh          0 2019-05-20 19:05 hdfs:///cms/users/llayer/debug2.csv
drwxr-xr-x   - llayer zh          0 2019-05-20 19:09 hdfs:///cms/users/llayer/debug3.csv
drwxr-xr-x   - llayer zh          0 2019-05-20 19:23 hdfs:///cms/users/llayer/d

In [None]:
! hdfs dfs -rm -r -skipTrash hdfs:///cms/users/llayer/output*

In [7]:
import filter_messages

In [5]:
reload(filter_messages)

<module 'filter_messages' from 'filter_messages.pyc'>

In [1]:
# Load the data
timerange = [20170101, 20171009]
#timerange = [20180601, 20181101]
#timerange = [20181101, 20190207]
#timerange = [20171011, 20180301]
#timerange = [20180301, 20180601]
#avro_rdd = filter_messages.load_data(sc, timerange)

In [2]:
# filter the tasks - keep only failing 
def get_failing(row):
    rec = row[0]
    meta = rec.get('meta_data', {})
    if meta.get('jobstate', '') != 'jobfailed':
        return False
    return True


def map_to_KV(row):
    
    # Assume that the first error code per step is the correct one
    rec = row[0]
    task = rec["task"]
    steps = rec.get('steps', [])
    
    exit_code_first = []
    sites = []
    exit_codes = []
    error_msg = []
    error_type = []
    steps_counter = []
    names = []
    peakvaluerss = []
    peakvaluevsize = []
    writeTotalMB = []
    readPercentageOps = []
    readAveragekB = []
    readTotalMB = []
    readNumOps = []
    readCachePercentageOps = []
    readMBSec = []
    writeTotalSecs = []
    readTotalSecs = []
    readMaxMSec = []
    TotalJobCPU = []
    NumberOfStreams = []
    TotalInitCPU = []
    TotalEventCPU = []
    AvgEventCPU = []
    EventThroughput = []
    TotalInitTime = []
    AvgEventTime = []
    NumberOfThreads = []
    MinEventCPU = []
    MaxEventTime = []
    TotalJobTime = []
    TotalLoopCPU = []
    MinEventTime = []
    MaxEventCPU = []
    
    
    
    for counter, step in enumerate(steps):
        errors = step['errors']
        
        if len(errors) > 0:
            # Save the first exit code per step
            exit_code_first.append(errors[0]['exitCode'])
            # Save the site
            sites.append( step.get('site','') )
            # Save the step number
            #steps.append(counter)
            # Save the name
            #names.append( step.get('name', '') )
            
            # Loop over the errors
            for error in errors:
                                   
                exit_codes.append(error['exitCode'])
                steps_counter.append(counter)
                names.append( step.get('name', '') )
                error_type.append( error['type'] )
                error_msg.append( error['details'].replace("\n", " ").replace('\r', ' ') )
                
                
                # Save memory
                for k,v in step['performance']['memory'].iteritems():
                    
                    value = -1. if v is None else v
                    if k == 'PeakValueVsize':
                        peakvaluevsize.append(value)
                    if k == 'PeakValueRss':
                        peakvaluerss.append(value)
                
                # Save storage
                for k,v in step['performance']['storage'].iteritems():
                    
                    value = -1. if v is None else v
                    if k == 'writeTotalMB':
                        writeTotalMB.append(value)
                    if k == 'readPercentageOps':
                        readPercentageOps.append(value)                        
                    if k == 'readAveragekB':
                        readAveragekB.append(value)
                    if k == 'readTotalMB':
                        readTotalMB.append(value)                     
                    if k == 'readNumOps':
                        readNumOps.append(value)
                    if k == 'readCachePercentageOps':
                        readCachePercentageOps.append(value)                        
                    if k == 'writeTotalSecs':
                        writeTotalSecs.append(value)
                    if k == 'readMBSec':
                        readMBSec.append(value)                  
                    if k == 'readTotalSecs':
                        readTotalSecs.append(value)
                    if k == 'readMaxMSec':
                        readMaxMSec.append(value)                  

                # Save cpu
                for k,v in step['performance']['cpu'].iteritems():
                    
                    value = -1. if v is None else v
                    if k == 'TotalJobCPU':
                        TotalJobCPU.append(value)
                    if k == 'NumberOfStreams':
                        NumberOfStreams.append(value)  
                    if k == 'TotalInitCPU':
                        TotalInitCPU.append(value)
                    if k == 'TotalEventCPU':
                        TotalEventCPU.append(value)                 
                    if k == 'AvgEventCPU':
                        AvgEventCPU.append(value)
                    if k == 'EventThroughput':
                        EventThroughput.append(value)  
                    if k == 'TotalInitTime':
                        TotalInitTime.append(value)
                    if k == 'AvgEventTime':
                        AvgEventTime.append(value)                  
                    if k == 'NumberOfThreads':
                        NumberOfThreads.append(value)
                    if k == 'MinEventCPU':
                        MinEventCPU.append(value)  
                    if k == 'MaxEventTime':
                        MaxEventTime.append(value)
                    if k == 'TotalJobTime':
                        TotalJobTime.append(value)
                    if k == 'TotalLoopCPU':
                        TotalLoopCPU.append(value)  
                    if k == 'MinEventTime':
                        MinEventTime.append(value)
                    if k == 'MaxEventCPU':
                        MaxEventCPU.append(value)          
                        
                        
    # Hack for the 2017 schema
    if len(NumberOfStreams) == 0:
        for i in range(len(AvgEventCPU)):
            NumberOfStreams.append(-1.)
            TotalInitCPU.append(-1.) 
            TotalInitTime.append(-1.) 
            NumberOfThreads.append(-1.)
    
    res = (exit_codes, error_msg, error_type, steps_counter, names, peakvaluerss, peakvaluevsize, writeTotalMB, 
           readPercentageOps, readAveragekB, readTotalMB, readNumOps, readCachePercentageOps, readMBSec, writeTotalSecs, 
           readTotalSecs, readMaxMSec, TotalJobCPU, NumberOfStreams, TotalInitCPU, TotalEventCPU, AvgEventCPU, 
           EventThroughput, TotalInitTime, AvgEventTime, NumberOfThreads, MinEventCPU, MaxEventTime, 
           TotalJobTime, TotalLoopCPU, MinEventTime,MaxEventCPU)
        
    return [((task, e, s), res) for e, s in zip(exit_codes, sites)]


def map_to_frame_format(row):
    keys = row[0]
    rec = row[1]
    
    variables = zip(*rec)
    
    return [keys + var for var in variables]

In [3]:
def filter_wmentries(wmarchive_entries):
    
    # Filter the data
    failing_workflows = wmarchive_entries.filter(lambda x : get_failing(x)) \
                        .flatMap(lambda x : map_to_KV(x))
    # Reduce redundant keys
    one_message_per_key = failing_workflows.reduceByKey(lambda x,y: (x))
    one_message_per_key_flat = one_message_per_key.flatMap(lambda x: map_to_frame_format(x))
    
    df_names = ["task_name", "error", "site", "exit_codes", "error_msg", "error_type", "steps_counter", "names",
            "peakvaluerss", "peakvaluevsize" , "writeTotalMB", "readPercentageOps", "readAveragekB", "readTotalMB",
            "readNumOps", "readCachePercentageOps", "readMBSec", "writeTotalSecs", "readTotalSecs", "readMaxMSec",
            "TotalJobCPU", "NumberOfStreams", "TotalInitCPU", "TotalEventCPU", "AvgEventCPU", "EventThroughput", 
            "TotalInitTime", "AvgEventTime", "NumberOfThreads", "MinEventCPU", "MaxEventTime", "TotalJobTime",
            "TotalLoopCPU", "MinEventTime", "MaxEventCPU" ]
    
    df = one_message_per_key_flat.toDF(df_names)
    return df

## 2. Join with the actionshistory keys and write do pandas

In [4]:
from pyspark import SQLContext, StorageLevel
sql = SQLContext(sc)
labeled_failing_tasks = (sql.read
     .format("com.databricks.spark.csv")
     .option("header", "true")
     .load("actionshistory_300719.csv"))
from pyspark.sql.types import IntegerType
labeled_failing_tasks = labeled_failing_tasks.withColumn("error", labeled_failing_tasks["error"].cast(IntegerType()))

In [8]:
def join_to_pandas( timerange, store = True):
    
    avro_rdd = filter_messages.load_data(sc, timerange)
    df = filter_wmentries( avro_rdd )
    df_pandas = df.join(labeled_failing_tasks, ['task_name', 'error', 'site']).toPandas()
    
    if store == True:
        t1, t2 = str(timerange[0]), str(timerange[1])
        df_pandas.to_hdf( 'messages_filtered_' + t1 + '_' + t2 + '.h5', 'frame' )
        
    return df_pandas

In [9]:
df_pandas = join_to_pandas(timerange, store = True)

[20170101, 20171009]
['hdfs:///cms/wmarchive/avro/fwjr/2017/01/01', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/02', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/03', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/04', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/05', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/06', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/07', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/08', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/09', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/10', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/11', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/12', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/13', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/14', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/15', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/16', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/17', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/18', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/19', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/20', 'hdfs:///cms/wmarchive/avro/fwjr/2017/01/21', 'hdfs:///cms

your performance may suffer as PyTables will pickle object types that it cannot
map directly to c-types [inferred_type->unicode,key->block2_values] [items->['task_name', 'site', 'error_msg', 'error_type', 'names', '_c0']]

  return pytables.to_hdf(path_or_buf, key, self, **kwargs)


In [11]:
#timerange = [20170101, 20171009]
#timerange = [20180601, 20181101]
#timerange = [20181101, 20190207]
#timerange = [20171011, 20180301]
#timerange = [20180301, 20180601]
time_chunks =  [
                #(20170101, 20171009),
                (20171011, 20180301),
                (20180301, 20180601),
                (20180601, 20181101),
                (20181101, 20190207),
                (20190207, 20190801),   
                ]

In [12]:
for time in time_chunks:
    df_pandas = join_to_pandas(time, store = True)

(20171011, 20180301)
['hdfs:///cms/wmarchive/avro/fwjr/2017/10/11', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/12', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/13', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/14', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/15', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/16', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/17', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/18', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/19', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/20', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/21', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/22', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/23', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/24', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/25', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/26', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/27', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/28', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/29', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/30', 'hdfs:///cms/wmarchive/avro/fwjr/2017/10/31', 'hdfs:///cms

(20180601, 20181101)
['hdfs:///cms/wmarchive/avro/fwjr/2018/06/01', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/02', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/03', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/04', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/05', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/06', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/07', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/08', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/09', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/10', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/11', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/12', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/13', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/14', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/15', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/16', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/17', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/18', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/19', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/20', 'hdfs:///cms/wmarchive/avro/fwjr/2018/06/21', 'hdfs:///cms

(20190207, 20190801)
['hdfs:///cms/wmarchive/avro/fwjr/2019/02/07', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/08', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/09', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/10', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/11', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/12', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/13', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/14', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/15', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/16', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/17', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/18', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/19', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/20', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/21', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/22', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/23', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/24', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/25', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/26', 'hdfs:///cms/wmarchive/avro/fwjr/2019/02/27', 'hdfs:///cms

In [None]:
"""
https://gist.github.com/joshlk/871d58e01417478176e7import pandas as pd

def _map_to_pandas(rdds):
    #Needs to be here due to pickling issues 
    return [pd.DataFrame(list(rdds))]

def toPandas(df, n_partitions=None):
    
    #Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
    #repartitioned if `n_partitions` is passed.
    #:param df:              pyspark.sql.DataFrame
    #:param n_partitions:    int or None
    #:return:                pandas.DataFrame
    
    if n_partitions is not None: df = df.repartition(n_partitions)
    df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
    df_pand = pd.concat(df_pand)
    df_pand.columns = df.columns
    return df_pand

"""