# Data Exploration

In [1]:
# Initialize Spark, RERUN EVERY TIME!
from pyspark import SparkConf, SparkContext

SparkContext.setSystemProperty('spark.executor.memory', '50g')

conf = SparkConf()\
        .setMaster("spark://c650mnp06:7077")\
        .setAppName("Sandbox")

sc = SparkContext(conf = conf)

In [2]:
# Index Constants
TRANS_ALLOC = "cast-allocation-ornl"
TRANS_STEP  = "cast-allocation-step-ornl"
STEP_HIST   = "cast-db-csm_step_history-ornl-*"
NODE_HIST   = "cast-db-csm_allocation_node_history-ornl-*"
ALLOC_HIST  = "cast-db-csm_allocation_history-ornl-*"

# Set the Connection Constants
NODES       = "10.7.4.15,10.7.4.17"
PORTS       = "9200"

# Defining Node  History

We want to get some data from the allocation node history table before anything else and leverage Elasticsearch. Step 1 is to determine which fields we want and how we're going to aggregate them (tons of data is great, but skew can be just as if not more interesting) Using this we can start to refine the allocation data to make it easier to run analytics on the allocation data. The following variables are all in the **data** object. The data will be condensed to pivot on the **data.allocation_id**. 

**NOTE:** This is one way of analyzing the data. Other analytics may be written using the raw allocation node data, but I'm interested in a few specific indicators to apply to the overall allocation.

* **memory_usage_max**
    * Should store the Average and the Maximum values.
    * **mem_usage_avg**
    * **mem_usage_max**
* **gpu_usage**
    * Trickier, do we want the total sum or average of all nodes?
    * **gpu_usage_avg**
* **gpfs_read**
    * I think the average across all nodes should be appropriate to get a sense for GPFS usage in general.
    * **gpfs_read_avg**
* **gpfs_write**
    * I think the average across all nodes should be appropriate to get a sense for GPFS usage in general.
    * **gpfs_write_avg**
* **allocation_id**
    * Pivot value, summations and math operations pivot on this data point.
* **cpu_usage**
    * Average should be fine, can show trends. If combined with median, could indicate uneven workload.
    * **cpu_usage_avg**
    * **cpu_usage_median**
* **ib_tx**
    * **ib_tx_avg** 
* **ib_rx**
    * **ib_rx_avg** 
* **energy**
    * **energy_avg**
    * **energy_median**

In [3]:
from datetime import datetime
# Set up the time range
date_format= '%Y-%m-%d'
search_format='date_time_no_millis'

start_time="2019-03-01T01:00:00Z"
end_time="2019-03-02T01:00:00Z"

timerange='''{{
    "lte"    : "{2}",
    "gte"    : "{0}",
    "format" : "{1}"
}}'''.format(start_time, search_format, end_time)
            


In [4]:
# Let's Write a Query!

NHistQuery = '''
{{ 
    "query" : {{
        "range" : {{
             "data.history_time" : {0}
        }}
    }}
}}'''.format(timerange)

#NHistQuery='{ "query":{ "match_all":{ } } }'

print(NHistQuery)
NHistFields = ["data.allocation_id",  "data.ib_tx", "data.ib_rx", "data.memory_usage_max", "data.gpu_usage",
                "data.gpfs_read", "data.gpfs_write", "data.gpu_energy", "data.cpu_usage", "data.energy"]

es_conf = {"es.resource": "{0}".format(NODE_HIST),
          "es.nodes"    : NODES,
          "es.port"     : PORTS,
          "es.query"    : NHistQuery, 
          "es.read.field.include" : ",".join(NHistFields) }



nodeHist = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
                         "org.apache.hadoop.io.NullWritable", \
                         "org.elasticsearch.hadoop.mr.LinkedMapWritable",conf=es_conf)
nodeHist.first()


{ 
    "query" : {
        "range" : {
             "data.history_time" : {
    "lte"    : "2019-03-02T01:00:00Z",
    "gte"    : "2019-03-01T01:00:00Z",
    "format" : "date_time_no_millis"
}
        }
    }
}


('C267_2kBUCN44TPi76iy',
 {'data': {'memory_usage_max': 14262140928,
   'gpu_usage': 80018308947,
   'gpfs_write': 43495780352,
   'gpu_energy': 27377712,
   'allocation_id': 192585,
   'ib_rx': 22715380148,
   'gpfs_read': 87735091200,
   'cpu_usage': 228366705621402,
   'ib_tx': 28335591197,
   'energy': 38796902}})

In [5]:
from operator import add

hSchema = (  "allocation_id", ("memory_usage_max", "gpu_usage", "gpfs_write", "cpu_usage", "ib_tx", "ib_rx", "energy"))

def mapObj( o ):
    obj  = o[1]["data"]
    
    # Build the tuple, if the field is None  set to 0.
    oTup = tuple( obj[field] if obj[field] is not None  else 0  for field in hSchema[1]  )
    return (obj["allocation_id"], oTup)
    

def reduceObj(x,y):
    return tuple(map(add, x,y))
      


nHistReduced = nodeHist.map(mapObj).reduceByKey(reduceObj)

In [6]:
nHistReduced.first()

(194418,
 (185462554624,
  16747076,
  62380670513152,
  736386358996,
  1007324318967115,
  977201842041714,
  7071035517))

In [7]:
nHistReduced.cache()

PythonRDD[9] at RDD at PythonRDD.scala:53

# Allocation Node History Reduced
At this point we have a full allocation node history.

In [8]:
# Let's Write a Query!

allocQuery = '''
{{ 
    "query" : {{
        "range" : {{
             "data.end_time" : {0}
        }}
    }}
}}'''.format(timerange)

print(allocQuery)
allocFields = ["data.allocation_id"   ,  "data.job_submit_time", "data.begin_time"    , "data.end_time" , 
               "data.projected_memory",   "data.state"         , "data.ssd_min"  , 
               "data.ssd_max"         , "data.user_name"       , "data.num_processors", "data.num_nodes", 
               "data.launch_node_name", "data.time_limit"      , "data.isolated_cores", "data.num_gpus" ]

es_conf = {"es.resource": "{0}".format(ALLOC_HIST),
          "es.nodes"    : NODES,
          "es.port"     : PORTS,
          "es.query"    : allocQuery, 
          "es.read.field.include" : ",".join(allocFields) }



aHist = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
                         "org.apache.hadoop.io.NullWritable", \
                         "org.elasticsearch.hadoop.mr.LinkedMapWritable",conf=es_conf)
aHist.first()


{ 
    "query" : {
        "range" : {
             "data.end_time" : {
    "lte"    : "2019-03-02T01:00:00Z",
    "gte"    : "2019-03-01T01:00:00Z",
    "format" : "date_time_no_millis"
}
        }
    }
}


('-ydr_WkBkJgQNSr8-OIf',
 {'data': {'projected_memory': 0,
   'ssd_min': 0,
   'num_processors': 0,
   'time_limit': 7200,
   'num_gpus': 0,
   'user_name': 'walksloud',
   'end_time': '2019-03-01 01:00:08.433065',
   'launch_node_name': 'batch3',
   'begin_time': '2019-03-01 00:59:26.029625',
   'num_nodes': 12,
   'allocation_id': 194312,
   'job_submit_time': '2019-03-01 00:58:55',
   'state': 'complete',
   'isolated_cores': 1,
   'ssd_max': 0}})

In [9]:
allocSchema = ("allocation_id"    , ( "job_submit_time", "begin_time"    , "end_time"      ,
               "projected_memory" ,   "state"          , "ssd_min"       ,
                "ssd_max"         ,   "user_name"      , "num_processors", "num_nodes"     , 
                "launch_node_name",   "time_limit"     , "isolated_cores", "num_gpus" ))

def mapAHistObj( o ):
    obj  = o[1]["data"]
    
    # Build the tuple, if the field is None  set to 0.
    oTup = tuple( obj[field] if obj[field] is not None  else 0  for field in allocSchema[1]  )
    return (obj[allocSchema[0]], oTup)


aHistReduced = aHist.map(mapAHistObj)

In [10]:
jSchema = tuple([allocSchema[0]]) + allocSchema[1] + hSchema[1]

def mapJoin(obj):
    return tuple([obj[0]]) + obj[1][0] + obj[1][1]

joinedHist= aHistReduced.join(nHistReduced).map(mapJoin)
joinedHist.cache()

PythonRDD[21] at RDD at PythonRDD.scala:53

In [46]:
# Import the goods.
import json
from datetime import datetime

import pandas as pd
import matplotlib.pyplot as plt

import numpy as np
import pyspark.ml

from pyspark.mllib.stat import Statistics
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.util import MLUtils

import pyspark.sql.functions as F


%matplotlib inline 


In [13]:
# Monkey patch  the DataFrame to have a makeDummies function.
def makeDummies(self, colName):
    if self.dummyCols is None:
        self.dummyCols = dict()
        self.dummyCols["_base"] = self.columns
        #del self.dummyCols["_base"][0]
        
        
    categ = self.select(colName).distinct().rdd.flatMap(lambda x: x).collect()
    exps  = [ F.when(F.col(colName) == cat, 1).otherwise(0).alias(str(cat)) for cat in categ ]
    
    self.dummyCols[colName] = categ    
    
    self.dummyCols["_base"].remove(colName)
    
    newDF =  self.select( self.columns  + exps).drop(colName)
    newDF.dummyCols = self.dummyCols
    
    return newDF

def getDummyCols(self, *args):
    if self.dummyCols is None:
        self.dummyCols = dict()

    if len(args) is 0:
        args=self.dummyCols.keys()
        
    dCols = list()
    for col in args :
        dCols += self.dummyCols.get(col, [])
        
    return dCols

def getBase(self):
    if self.dummyCols is not None:
        return self.dummyCols["_base"]
    else:
        return []

def makeDate(self, colName):
    self.select(colName).rdd.map(lambda d : datetime.strptime(d))
    return self.select(self.columns)

pyspark.sql.dataframe.DataFrame.makeDummies = makeDummies
pyspark.sql.dataframe.DataFrame.dummyCols   = None
pyspark.sql.dataframe.DataFrame.makeDate    = makeDate
pyspark.sql.dataframe.DataFrame.getDummyCols= getDummyCols
pyspark.sql.dataframe.DataFrame.getBase     = getBase

In [36]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)
jDF = joinedHist.toDF(jSchema)
jDF.write.mode("overwrite").option("header", "true").csv("/tmp/joinDataFrame.csv")

In [37]:
uJDF = jDF.makeDate("job_submit_time")\
    .makeDate("begin_time")\
    .makeDate("end_time")\
    .makeDummies("launch_node_name")\
    .makeDummies("state")\
    .makeDummies("user_name")

In [38]:
# NOTE DON'T RUN THIS TWICE, IT KILLS THE DUMMY COLUMN LOGIC!
#uJDF.getDummyCols("_base","user_name", "account","launch_node_name","state","job_type"))
featureCols = uJDF.getDummyCols()
uJDF = uJDF.drop()

In [44]:
features    = uJDF.rdd.map(lambda row: row[4:])
featureCols = uJDF.columns[4:]
scaler      = StandardScaler(withMean=true, withStd=True, inputCol="feature", outputCol="scaledFeature")
#features    = StandardScaler.transform(features)
corr_matrix = Statistics.corr(features, method="pearson") #  Retrieve a matrix
pdJDF       = pd.DataFrame(corr_matrix)

pdJDF.index,pdJDF.columns = featureCols,featureCols

In [45]:
features.head().features


AttributeError: 'PipelinedRDD' object has no attribute 'head'

In [31]:
len(uJDF.getDummyCols())

0

In [30]:
featureCols

['allocation_id',
 'job_submit_time',
 'begin_time',
 'end_time',
 'projected_memory',
 'ssd_min',
 'ssd_max',
 'num_processors',
 'num_nodes',
 'time_limit',
 'isolated_cores',
 'num_gpus',
 'memory_usage_max',
 'gpu_usage',
 'gpfs_write',
 'cpu_usage',
 'ib_tx',
 'ib_rx',
 'energy',
 'batch1',
 'batch4',
 'batch2',
 'batch5',
 'batch3',
 'complete',
 'mcclurej',
 'pfliu',
 'esuchyta',
 'merzky1',
 'psvirin',
 'brenaud',
 'kmehta',
 'mpapadak',
 'shku',
 'rta',
 'cnegre',
 'wqzhang',
 'gustavj',
 'walkup',
 'q5p',
 'tmaier',
 'peller',
 'msandov1',
 'wanghy',
 'rsankar',
 'bee',
 'wangy',
 'nwedi',
 'maxpkatz',
 'jaharris',
 'panitkin',
 'jyc',
 'hshan',
 'z8j',
 'candy',
 'ia4021',
 'cosdis',
 'xsy',
 'brettin',
 'kiran92',
 'dappelh',
 'kngott',
 'havenith',
 'amueller',
 'gbarca',
 'bvilasen',
 'luissua',
 'tjcw',
 'lld',
 'carson16',
 'naughton',
 'kathirrk',
 'pgrete',
 'chunli',
 'arghyac',
 'khv',
 'azamat',
 'ames',
 'o0d',
 'tpapathe',
 'daddison',
 'glaser',
 'jbao',
 'dwrig

In [27]:
uJDF.columns

['allocation_id',
 'job_submit_time',
 'begin_time',
 'end_time',
 'projected_memory',
 'ssd_min',
 'ssd_max',
 'num_processors',
 'num_nodes',
 'time_limit',
 'isolated_cores',
 'num_gpus',
 'memory_usage_max',
 'gpu_usage',
 'gpfs_write',
 'cpu_usage',
 'ib_tx',
 'ib_rx',
 'energy',
 'batch1',
 'batch4',
 'batch2',
 'batch5',
 'batch3',
 'complete',
 'mcclurej',
 'pfliu',
 'esuchyta',
 'merzky1',
 'psvirin',
 'brenaud',
 'kmehta',
 'mpapadak',
 'shku',
 'rta',
 'cnegre',
 'wqzhang',
 'gustavj',
 'walkup',
 'q5p',
 'tmaier',
 'peller',
 'msandov1',
 'wanghy',
 'rsankar',
 'bee',
 'wangy',
 'nwedi',
 'maxpkatz',
 'jaharris',
 'panitkin',
 'jyc',
 'hshan',
 'z8j',
 'candy',
 'ia4021',
 'cosdis',
 'xsy',
 'brettin',
 'kiran92',
 'dappelh',
 'kngott',
 'havenith',
 'amueller',
 'gbarca',
 'bvilasen',
 'luissua',
 'tjcw',
 'lld',
 'carson16',
 'naughton',
 'kathirrk',
 'pgrete',
 'chunli',
 'arghyac',
 'khv',
 'azamat',
 'ames',
 'o0d',
 'tpapathe',
 'daddison',
 'glaser',
 'jbao',
 'dwrig