In [1]:
# Download pyhive
import sys
!conda install --yes --prefix {sys.prefix} pyhive

Solving environment: done

# All requested packages already installed.



In [5]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import copy
import re
import datetime as dt
from pyhive import hive
from operator import itemgetter

hiveTraceTableName = 'trace_avro_v3'

# HiveServer2 Status http://sayonara1a-mnds1-2-prd.eng.sfdc.net:10002/
# Haddop All Apps Status http://sayonara1a-mnds1-2-prd.eng.sfdc.net:8088/cluster/set mapred.job.name = Job: condSql.dailyFieldsNeeded;
def getHiveConnection():
    sdbKP = 'sayonara1a-mnds1-2-prd.eng.sfdc.net'
    conn = hive.connect(sdbKP, 
                        configuration={'hive.fetch.task.conversion': 'none'})
    return(conn)
def createHql(minDate=None,
              maxDate=None,
              hostnames=[]):
    hql = '''
set hive.fetch.task.conversion=none;

DROP TABLE IF EXISTS et_timerMap;
DROP TABLE IF EXISTS et_tmp_bes_17;
DROP TABLE IF EXISTS et_tmp_e2e_time;
DROP TABLE IF EXISTS et_tmp_e2e_catMap;
DROP TABLE IF EXISTS et_tmp_e2e_catMapSum;

CREATE TABLE IF NOT EXISTS et_timerMap (categoryName STRING, metricName STRING);
INSERT INTO et_timerMap VALUES 
('cpu_time', 'main'),
('wait_app_time', 'recv'),
('wait_app_time', 'recv_command_wait'),
('wait_app_time', 'send'),
('cpu_time', 'process_msg'),
('cpu_time', 'simple_query'),
('cpu_time', 'parse_message'),
('cpu_time', 'describe_message'),
('cpu_time', 'bind_message'),
('cpu_time', 'execute_message'),
('cpu_time', 'close_message'),
('cpu_time', 'parse'),
('cpu_time', 'qAnalyze'),
('cpu_time', 'rewrite'),
('cpu_time', 'plan'),
('cpu_time', 'spl_overhead'),
('cpu_time', 'spl_body'),
('cpu_time', 'spl_eval_expr'),
('cpu_time', 'spl_eval_expr_trivial'),
('cpu_time', 'spl_eval_expr_simple'),
('cpu_time', 'spl_eval_expr_simple_init'),
('cpu_time', 'spl_eval_expr_simple_run'),
('cpu_time', 'spl_eval_expr_complex'),
('cpu_time', 'spl_compile'),
('cpu_time', 'executor_start'),
('cpu_time', 'executor_start_init_plan'),
('cpu_time', 'executor_start_serviceability'),
('cpu_time', 'executor_run'),
('cpu_time', 'executor_finish'),
('cpu_time', 'executor_end'),
('cpu_time', 'executor_end_serviceability'),
('cpu_time', 'executor_rewind'),
('cpu_time', 'SPI_execute'),
('cpu_time', 'SPI_prepare'),
('cpu_time', 'SPI_open'),
('cpu_time', 'SPI_fetch'),
('cpu_time', 'SPI_close'),
('cpu_time', 'SPI_misc'),
('cpu_time', 'SPI_free'),
('cpu_time', 'SPI_save'),
('cpu_time', 'plancache_getcached'),
('cpu_time', 'plancache_build'),
('cpu_time', 'plancache_revalidate'),
('cpu_time', 'plancache_planmove'),
('cpu_time', 'plancache_psrcmove'),
('cpu_time', 'plancache_publish'),
('cpu_time', 'lsm'),
('cpu_time', 'xlog_cpu_time'),
('wait_time', 'xlog_group_commit_wait'),
('wait_time', 'wait_misc'),
('wait_time', 'wait_sql_lock'),
('wait_time', 'wait_AEAM_lock'),
('wait_time', 'wait_EIRQ_lock'),
('wait_time', 'wait_MXP_lock'),
('io_time', 'store_ext_create'),
('io_time', 'store_ext_delete'),
('io_time', 'store_ext_list'),
('io_time', 'store_ext_stat'),
('io_time', 'store_ext_open'),
('io_time', 'store_ext_seek'),
('io_time', 'store_log_sync_write'),
('io_time', 'store_data_sync_write'),
('io_time', 'store_cat_sync_write'),
('io_time', 'store_seq_read'),
('io_time', 'store_rand_read'),
('io_time', 'store_ext_close'),
('cpu_time', 'tuple_sort'),
('wait_time', 'wait_memstore_throttle'),
('wait_time', 'temp_file_read'),
('wait_time', 'temp_file_write'),
('io_time', 'store_log_async_submit'),
('io_time', 'store_log_async_status'),
('io_time', 'store_data_async_submit'),
('io_time', 'store_data_async_status'),
('io_time', 'store_cat_async_submit'),
('io_time', 'store_cat_async_status'),
('cpu_time', 'lsm_insert'),
('cpu_time', 'lsm_insert_snapshot_check'),
('cpu_time', 'lsm_insert_dup_check_primary'),
('cpu_time', 'lsm_insert_dup_check_secondary'),
('cpu_time', 'lsm_scan'),
('cpu_time', 'lsm_fetch'),
('cpu_time', 'memstore_lock'),
('cpu_time', 'memstore_insert'),
('cpu_time', 'memstore_fetch'),
('cpu_time', 'memstore_scan'),
('cpu_time', 'seq_nextval'),
('wait_time', 'wait_BufReadIO_lock'),
('ext_time', 'cluuid_read'),
('ext_time', 'cluuid_write'),
('ext_time', 'zoo_aset_sync'),
('ext_time', 'zkHandleInit'),
('ext_time', 'zkHandleClose'),
('ext_time', 'znode_get_array'),
('wait_time', 'wait_PlanCacheHash_lock'),
('cpu_time', 'locationcache_invalidate_key'),
('cpu_time', 'locationcache_invalidate_full'),
('cpu_time', 'locationcache_invalidate_era'),
('cpu_time', 'bufmgr_getfreebuf'),
('wait_time', 'wait_ProcArray_lock'),
('wait_time', 'wait_SinvalRead_lock'),
('wait_time', 'wait_SinvalWrite_lock'),
('wait_time', 'wait_LockTable_lock'),
('wait_time', 'wait_Backend_lock'),
('wait_time', 'wait_PlanCacheLRU_lock'),
('wait_time', 'wait_DdlVersionLock'),
('cpu_time', 'ostream_add_record'),
('io_time', 'ostream_open_extent'),
('io_time', 'ostream_close_extent'),
('io_time', 'ostream_persist_dset'),
('cpu_time', 'lsm_ew_add_records'),
('cpu_time', 'lsm_ew_add_to_extent'),
('cpu_time', 'plancache_func_callback'),
('cpu_time', 'plancache_rel_callback'),
('cpu_time', 'plancache_reset'),
('cpu_time', 'locationcache_inval_era_single'),
('cpu_time', 'locationcache_inval_era_bulk'),
('wait_time', 'wait_prefetchQ_lock'),
('cpu_time', 'locationcache_inval_from_bitmap'),
('cpu_time', 'funccache_inval_local'),
('cpu_time', 'funccache_insert'),
('cpu_time', 'funccache_delete'),
('cpu_time', 'funccache_lookup'),
('cpu_time', 'plancache_getspace'),
('cpu_time', 'plancache_getplan'),
('cpu_time', 'plancache_onetimeplan'),
('cpu_time', 'plancache_cachedplan'),
('cpu_time', 'extentmerge_scan'),
('cpu_time', 'daemon_unattributed'),
('wait_time', 'wait_FuncCacheHash_lock'),
('wait_time', 'wait_FuncCacheLRU_lock'),
('cpu_time', 'funccache_drain'),
('cpu_time', 'funccache_getspace'),
('cpu_time', 'funccache_inval_global'),
('cpu_time', 'funccache_evict'),
('cpu_time', 'funccache_lruadd'),
('cpu_time', 'funccache_recycle'),
('cpu_time', 'funccache_publish'),
('cpu_time', 'funccache_free'),
('cpu_time', 'funccache_copy'),
('cpu_time', 'funccache_copy_spi'),
('cpu_time', 'funccache_copy_fail'),
('cpu_time', 'funccache_copy_spi_fail'),
('cpu_time', 'pscan_fetch'),
('cpu_time', 'bloom_check'),
('cpu_time', 'bloom_add'),
('wait_app_time', 'dbms_alert_wait'),
('wait_app_time', 'sfdc_ipc_wait'),
('wait_app_time', 'pg_sleep'),
('cpu_time', 'intervalStats_work'),
('cpu_time', 'intervalStats_rollup'),
('cpu_time', 'intervalStats_package'),
('wait_time', 'intervalStats_wait_retry'),
('wait_time', 'wait_catsrv_lock'),
('io_time', 'catsrv_read'),
('io_time', 'catsrv_write'),
('cpu_time', 'spl_exec_check_plan'),
('cpu_time', 'spl_exec_prepare_plan'),
('wait_time', 'wait_FuncCacheSPI_lock'),
('cpu_time', 'spl_expr_lock_check_plan'),
('cpu_time', 'spl_expr_lock_publish_plan'),
('cpu_time', 'compress'),
('cpu_time', 'decompress')
;

CREATE TABLE IF NOT EXISTS et_tmp_bes_17 AS
SELECT 
    trace_record_id,
    hostname,
    from_unixtime(CAST(event_details['interval_start'] as bigint) DIV CAST(power(10, 9) AS bigint)) AS interval_start,
    rectype,
    per_proc_stats.end2end_time_stats as e2eMap
FROM trace_avro_v3
WHERE format_mode = 'EVENT_STATS'
 AND instr(rectype, 'Interval Metrics') > 0
'''
    hql += "AND file_dt >= '{minDate}'  and file_dt <= '{maxDate}'".format(minDate=minDate,
                                                                             maxDate=maxDate)
    hql += 'AND ('
    for hi, hn in enumerate(hostnames):
        hql += "    (concat(split(label, '-')[0], '-', split(label, '-')[3]) =  '" + hn + "' )"
        if hi+1 < len(hostnames):
            hql += '\n   OR '
            
    hql += '''
);


CREATE TABLE IF NOT EXISTS et_tmp_e2e_time AS
SELECT 
    a.trace_record_id,
    a.hostname,
    a.interval_start,
    a.rectype,
    IF(a.proc_process_key == 'recvCount', a.proc_process_value, NULL) AS recvCount,
    a.proc_process_key,
    a.proc_process_value
    FROM (
        SELECT 
            trace_record_id,
            hostname,
            interval_start,
            rectype,
            mk AS proc_process_key,
            mv AS proc_process_value
          FROM et_tmp_bes_17
          LATERAL VIEW explode(e2eMap) m AS mk, mv) a
    WHERE instr(a.proc_process_key, 'Count') == 0
       OR a.proc_process_key == 'recvCount';
    
CREATE TABLE IF NOT EXISTS et_tmp_e2e_catMap AS
SELECT 
    l.*,
    r.categoryName,
    r.metricName
  FROM et_tmp_e2e_time as l
  JOIN et_timerMap as r
  ON l.proc_process_key == r.metricName;
  
  
CREATE TABLE IF NOT EXISTS et_tmp_e2e_catMapSum AS
SELECT 
    hostname,
    trace_record_id,
    interval_start,
    rectype,
    categoryName,
    SUM(proc_process_value) as sumTime
FROM et_tmp_e2e_catMap
GROUP BY hostname, trace_record_id, interval_start, rectype, categoryName;

set hive.cli.print.header=true;
SELECT 
    l.hostname AS hostname,
    l.trace_record_id AS trace_record_id,
    l.interval_start AS interval_start,
    substr(l.rectype, length('Interval Metrics ')+1, length(l.rectype)) AS process_type,
    l.categoryName AS cpu_category_name,
    l.sumTime AS sum_time_ns,
    r.recvCount AS recvCount
FROM et_tmp_e2e_catMapSum as l
LEFT OUTER JOIN (
    SELECT trace_record_id,
        recvCount
    FROM et_tmp_e2e_time
    WHERE recvCount is not NULL
) r
ON l.trace_record_id = r.trace_record_id
WHERE l.categoryName == 'cpu_time';

DROP TABLE IF EXISTS et_timerMap;
DROP TABLE IF EXISTS et_tmp_bes_17;
DROP TABLE IF EXISTS et_tmp_e2e_time;
DROP TABLE IF EXISTS et_tmp_e2e_catMap;
DROP TABLE IF EXISTS et_tmp_e2e_catMapSum;
    '''
    
    return(hql)

def transposeGroupedByDataFrame(df,
                                groupby=[],
                                splitby=None,
                                fieldNameToSplit=None):
    metaFieldNames = groupby
    hierachicalIndexValues = []
    for fn in metaFieldNames:
        hierachicalIndexValues.append(df[fn].tolist())

    # add field containing field names to be use for new fields
    hierachicalIndexValues.append(df[splitby].tolist())

    indexNames = copy.copy(metaFieldNames)
    indexNames.append(splitby)
    index = pd.MultiIndex.from_arrays(hierachicalIndexValues, names=indexNames)

    tempDf = pd.DataFrame(df[fieldNameToSplit])
    tempDf.set_index(index, inplace=True)
    tDf = tempDf.unstack(level=splitby,
                         fill_value=0.0)
    tDf.reset_index(level=range(len(metaFieldNames)), inplace=True)
    tDf = pd.DataFrame(tDf.to_records(index=False))
    
    # need to "clean up" resulting field names
    renameDict = {}
    renameRe = re.compile('\(\'([^\']*)\',\s*\'([^\']*)\'\)')
    for ln in tDf.columns.values:

        lnMatch = renameRe.match(ln)
        if lnMatch is not None:
            indexName = lnMatch.group(1)
            valueName = lnMatch.group(2)
            if valueName == '':
                renameDict[ln] = indexName
            else:
                renameDict[ln] = valueName

    tDf.rename(columns=renameDict, inplace=True)
    
    return(tDf)

In [12]:
def getHiveConn():
    sdbKP = 'fliang-wsl3.internal.salesforce.com'
    conn = hive.Connection(host=sdbKP, port=10000, username="fliang")
    return(conn)

myConn = getHiveConn()
print myConn

TTransportException: Could not connect to any of [('10.9.103.79', 10000)]

In [8]:
# hive query isn't currently working from the jupyter notebook at this time (9/17/2018)
inputFilename = None #'2018-09-10.cpuBuckets.tsv' # = None if want to fetch from hive
noisy = True
numCores = {'gs1-phx.ops.sfdc.net': 40,
            'cs999-dfw.ops.sfdc.net': 40 # will = 80 after 9/15 when hyper threading was enabled
           }

# run query or read file
inDf = None 
graphicMinTime =  None # dt.datetime.strptime('09/13/2018 13:18:49', '%m/%d/%Y %H:%M:%S')
graphicMaxTime =  None # dt.datetime.strptime('09/13/2018 15:10:32', '%m/%d/%Y %H:%M:%S')

if inputFilename is None:
    minDate = '2018-09-14' # these only matter when running hive query from jupyter notebook; else uses file contents
    maxDate = '2018-09-14'
    hql = createHql(minDate = minDate, 
                    maxDate = maxDate, 
                    hostnames = ['cs999-dfw.ops.sfdc.net', 'gs1-phx.ops.sfdc.net'])
    # write the query to disk in case we need to execute in kpclient
    fp = open('cpuBuckets.sql', 'w')
    fp.write(hql)
    fp.close()
    
    # connect and get data
    myConn = getHiveConnection()
    print myConn
    inDf = pd.read_sql(hql, myConn)
    
    dataFilename = '{0}_{1}.cpuRawData.tsv'.format(minDate, maxDate)
    inDf.to_csv(dataFilename, sep='\t', index=False)
else:
    inDf = pd.read_csv(inputFilename, sep='\t')
if noisy:
    print inDf.columns.values
    
# split cpu rows into columns and create a distinct df for each hostname
df = inDf.loc[inDf['cpu_category_name'] == 'cpu_time', :]
allDaemon = np.unique(df['process_type'])
logicalHostnames = []
for r in df['hostname'].values:
    # create logical hostname
    lhnParts = r.split('-')
    lhn = '-'.join([lhnParts[0], lhnParts[3]])
    logicalHostnames.append(lhn)

df['logicalHostname'] = logicalHostnames


for lhn in np.unique(df['logicalHostname']):
    hdf = df.loc[df['logicalHostname'] == lhn]
    procNames = np.unique(hdf['process_type'])
    tt = dt.datetime.strptime(np.min(hdf['interval_start'].values), '%Y-%m-%d %H:%M:%S')
    minDateStr = dt.datetime.strftime(tt, '%Y-%m-%d')
    
    # determine master at each minute
    rcdf = hdf.loc[hdf['process_type'] == 'Backend processes']
    hostnames = np.unique(rcdf['hostname'])
    recvCountDf = transposeGroupedByDataFrame(rcdf,
                                              groupby=['logicalHostname', 'interval_start'],
                                              splitby='hostname',
                                              fieldNameToSplit='recvcount')
    recvCountDf['maxRecvCount'] = recvCountDf[hostnames].max(axis=1)
    recvCountDf['maxAtIndex'] = np.argmax(np.array(recvCountDf[hostnames]), axis=1)
    
    # find values of interest from the master node for each time slice
    final = {'interval_start': [], 
             'master_nodename': []}
    for pn in procNames:
        final[pn] = []
    for i, t in enumerate(recvCountDf['interval_start']):
        if graphicMinTime is not None:
            tt = dt.datetime.strptime(t, '%Y-%m-%d %H:%M:%S')
            if tt < graphicMinTime :
                continue
        if graphicMaxTime is not None:
            tt = dt.datetime.strptime(t, '%Y-%m-%d %H:%M:%S')
            if tt > graphicMaxTime :
                continue
                
        mn = hostnames[recvCountDf['maxAtIndex'][i]]
        for pn in procNames:
            sumtime_ns = hdf.loc[(hdf['hostname'] == mn) &
                                 (hdf['interval_start'] == t) & 
                                 (hdf['process_type'] == pn), 'sum_time_ns'].values
            if len(sumtime_ns) == 0:
                sumtime_ns = 0
            elif len(sumtime_ns) == 1:
                sumtime_ns = sumtime_ns[0]
            else:
                print 'WARNING: Found more than one process time value for 1 time x 1 process_name'
            final[pn].append(sumtime_ns)
        final['interval_start'].append(t)
        final['master_nodename'].append(mn)
        if i % 10 == 0:
            if noisy:
                print i, t
        #if i >= 1440:
        #    break

    # convert to df and add percent cpu of all available time / min    
    finalDf = pd.DataFrame(final)
    
    colsToPlot = []
    yList = []
    yStatForOrderingLegend = {}
    for pn in procNames:
        pcpu = '%CPU[{0}]'.format(pn)
        finalDf[pcpu] = 100.0 * finalDf[pn]/(numCores[lhn] * 60.0 * 10.0**9.0) # convert ns / min to %of all available cores/min
        yStatForOrderingLegend[pn] = np.percentile(finalDf[pcpu], 50.0)
    finalDataFilename = '{0}.{1}.cpuPerDaemon_data.tsv'.format(minDateStr, lhn)
    finalDf.to_csv(finalDataFilename, sep='\t', index=False)
    
    # add to Y in decreasing order of average %CPU
    legendKey = []
    sortedProcNames = sorted(yStatForOrderingLegend.items(), key=itemgetter(1), reverse=True)
    for pn,pc in sortedProcNames:
        pcpu = '%CPU[{0}]'.format(pn)
        yList.append(finalDf[pcpu].values)
        legendKey.append(pn)
    # create graphics
    x = finalDf['interval_start']
    numX = len(x)
    y = np.vstack(yList)
    
    imageWidthInches = int(float(numX)/20.0)
    if imageWidthInches > 2.0**16:
        imageWidthInches = 2.0**16 - 1
    fig = plt.figure(figsize=(imageWidthInches,10))
    ax = plt.subplot(111)
    ax.stackplot(x, y, labels=legendKey)
    plt.grid(True, 'major', 'y', ls='--', lw=.5, c='k', alpha=.3)
    plt.grid(True, 'major', 'x', ls='--', lw=.5, c='k', alpha=.3)
    plt.xticks(range(0,numX,10), x[0:numX:10], rotation=90)
    minY = np.min(y)
    maxY = np.max(y)
    if maxY > 600:
        percentileLevelY = np.percentile(y, 99.99)
        if percentileLevelY < 100.0:
            plt.ylim(ymax=100.0)
        elif percentileLevelY > 600.0:
            three9s = np.percentile(y, 99.9)
            if three9s < 600.0:
                plt.ylim(ymax=three9s)
            else:
                plt.ylim(ymax=600.0)
        else:
            plt.ylim(ymax=percentileLevelY)
            
    if minY < 0:
        plt.ylim(ymin=0)
    plt.title('Percent CPU: {0}'.format(lhn))
    print lhn, np.min(x), np.max(x)
    #plt.xlim(xmin=np.min(x), xmax=np.max(x))
    
    # Shrink axis to make room for legend and x-axis labels
    box = ax.get_position()
    ax.set_position([box.x0, box.y0, box.width * 0.8, box.height * 0.85])

    # Put a legend to the right of the current axis
    ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
    
    # add text indicating the number of cores used to convert to percentage values
    #plt.text(np.min(x), 0.90*np.max(y), '#cores={0}'.format(numCores[lhn]), fontsize=10)
    
    imFilename = '{0}.{1}.cpuPerDaemon.jpg'.format(minDateStr, lhn)
    plt.savefig(imFilename, dpi=100)
    plt.show()
    

TTransportException: Could not connect to any of [('10.253.212.74', 10000)]