# initialize

In [None]:
from __future__ import nested_scopes
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
display(HTML('<style>.CodeMirror{font-family: "Courier New";font-size: 12pt;}</style>'))

In [None]:
import pyspark.sql.functions as F
import json
import builtins
from itertools import chain
import seaborn as sns


In [None]:
from pyspark.sql.types import (StructType, StructField, DateType,
    TimestampType, StringType, LongType, IntegerType, DoubleType,FloatType)

from pyspark.sql.functions import pandas_udf, PandasUDFType

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StringIndexer, VectorAssembler

from pyspark.sql.window import Window


In [None]:
import math
from functools import reduce
import re
import collections
from pyspark.ml import Pipeline
import pandas
import numpy
import time
from pandasql import sqldf
import html

pandas.options.display.max_rows=1000
pandas.options.display.max_columns=200
pandas.options.display.float_format = '{:,}'.format

In [None]:
from ipywidgets import IntProgress,Layout
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.lines as mlines


In [None]:
import pyhdfs


fs = pyhdfs.HdfsClient(hosts='10.0.2.125:50070', user_name='yuzhou')

# base class

In [None]:
class SparkLog_Analysis:
    def __init__(self, appid,jobids,clients):
        pass

In [None]:
class Analysis:
    def __init__(self,file):
        self.file=file
        self.starttime=0
        self.df=None
    
    def load_data(self):
        pass
    
    def generate_trace_view_list(self,id=0, **kwargs):
        if self.df==None:
            self.load_data()
        trace_events=[]
        node=kwargs.get('node',"node")
        trace_events.append(json.dumps({"name": "process_name","ph": "M","pid":id,"tid":0,"args":{"name":" "+node}}))
        return trace_events

    
    def generate_trace_view(self, trace_output, **kwargs):
        traces=[]
        traces.extend(self.generate_trace_view_list(0,**kwargs))
        
        output='''
        {
            "traceEvents": [
        
        ''' + \
        ",\n".join(traces)\
       + '''
            ],
            "displayTimeUnit": "ns"
        }'''

        with open(trace_output+'.json', 'w') as outfile: 
            outfile.write(output)

        display(HTML("<a href=http://xxx:1088/tracing_examples/trace_viewer.html#/tracing/test_data/"+trace_output+".json>http://xxx:1088/tracing_examples/trace_viewer.html#/tracing/test_data/"+trace_output+".json</a>"))
        

# app log analysis

In [None]:
from pyspark.sql.functions import udf
@udf("long")
def isfinish_udf(s):
    import json
    s=json.loads(s)
    def isfinish(root):
        if "isFinalPlan=false" in root['simpleString'] or root['children'] is None:
            return 0
        for c in root["children"]:
            if isfinish(c)==0:
                return 0
        return 1
    if len(s)>0:
        return isfinish(s[0])
    else:
        return 0
    
@pandas_udf("taskid long, start long, dur long, name string", PandasUDFType.GROUPED_MAP)
def time_breakdown(pdf):
    ltime=pdf['Launch Time'][0]+2
    pdf['start']=0
    pdf['dur']=0
    outpdf=[]
    ratio=(pdf["Finish Time"][0]-pdf["Launch Time"][0])/pdf["Update"].sum()
    ratio=1 if ratio>1 else ratio
    for idx,l in pdf.iterrows():
        if(l["Update"]*ratio>1):
            outpdf.append([l["Task ID"],ltime,int(l["Update"]*ratio),l["mname"]])
            ltime=ltime+int(l["Update"]*ratio)
    if len(outpdf)>0:
        return pandas.DataFrame(outpdf)
    else:
        return pandas.DataFrame({'taskid': pandas.Series([], dtype='long'),
                   'start': pandas.Series([], dtype='long'),
                   'dur': pandas.Series([], dtype='long'),
                   'name': pandas.Series([], dtype='str'),
                                })
    
class App_Log_Analysis(Analysis):
    def __init__(self, file, jobids):
        Analysis.__init__(self,file)
        self.jobids=[] if jobids is None else [str(l) for l in jobids]
        self.df=None
        self.pids=[]
        
    def load_data(self):
        print("load data ", self.file)
        jobids=self.jobids
        df=spark.read.json(self.file)
        
        if 'App ID' in df.columns:
            self.appid=df.where("`App ID` is not null").collect()[0]["App ID"]
        else:
            self.appid="Application-00000000"
                
        if df.where("Event='org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates'").count()>0:
            self.dfacc=df.where("Event='org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates'").select(F.col("executionId").alias("queryid"),F.explode("accumUpdates"))
        else:
            self.dfacc = None
            
        if "sparkPlanInfo" in df.columns:
            self.queryplans=df.where("(Event='org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart' or Event='org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate') \
                                  and (sparkPlanInfo.nodeName!='AdaptiveSparkPlan' or sparkPlanInfo.simpleString='AdaptiveSparkPlan isFinalPlan=true') ").select(F.col("executionId").alias("queryid"),'physicalPlanDescription',"sparkPlanInfo.*")
        else:
            self.queryplans=None
        
        seen = set()
        
        if self.queryplans is not None:
            self.queryplans=self.queryplans.where(isfinish_udf(F.to_json("children"))==1)
        
            self.allmetrics=[]
            if self.queryplans.count() > 0:
                metrics=self.queryplans.collect()
                def get_metric(root):
                    for l in root["metrics"]:
                        if l['accumulatorId'] not in seen:
                            seen.add(l['accumulatorId'])
                            self.allmetrics.append([l['accumulatorId'],l["metricType"],l['name'],root["nodeName"]])
                    if root['children'] is not None:
                        for c in root["children"]:
                            get_metric(c)
                for c in metrics:
                    get_metric(c)
        
            amsdf=spark.createDataFrame(self.allmetrics)
            amsdf=amsdf.withColumnRenamed("_1","ID").withColumnRenamed("_2","type").withColumnRenamed("_3","Name").withColumnRenamed("_4","nodeName")
        
        
        if self.dfacc is not None:
            self.dfacc=self.dfacc.select("queryid",(F.col("col")[0]).alias("ID"),(F.col("col")[1]).alias("Update")).join(amsdf,on=["ID"])
        
        if self.queryplans is not None:
            self.metricscollect=[l for l in self.allmetrics if l[1] in ['nsTiming','timing'] and (l[2].startswith("totaltime_") or l[2].startswith("scan time") or l[2].startswith("shuffle write time") or l[2].startswith("shuffle spill time")) and l[2] not in("totaltime_collectbatch") ]
        
        #config=df.where("event='SparkListenerJobStart' and Properties.`spark.executor.cores` is not null").select("Properties.*").limit(1).collect()
        config=df.select("`Spark Properties`.*").where("`spark.app.id` is not null").limit(1).collect()
    
        configdic=config[0].asDict()
        self.parallelism=int(configdic['spark.sql.shuffle.partitions']) if 'spark.sql.shuffle.partitions' in configdic else 1
        self.executor_cores=int(configdic['spark.executor.cores']) if 'spark.executor.cores' in configdic else 1
        self.executor_instances=int(configdic['spark.executor.instances']) if 'spark.executor.instances' in configdic else 1
        self.taskcpus= int(configdic['spark.task.cpus'])if 'spark.task.cpus' in configdic else 1
        self.batchsize= int(configdic['spark.sql.execution.arrow.maxRecordsPerBatch'])if 'spark.sql.execution.arrow.maxRecordsPerBatch' in configdic else 1
        
        self.realexecutors = df.where(~F.isnull(F.col("Executor ID"))).select("Executor ID").distinct().count()

        if "spark.sql.execution.id" in df.where("Event='SparkListenerJobStart'").select("Properties.*").columns:
            df_jobstart=df.where("Event='SparkListenerJobStart'").select("Job ID","Submission Time",F.col("Properties.`spark.sql.execution.id`").alias("queryid"),"Stage IDs")
        else:
            df_jobstart=df.where("Event='SparkListenerJobStart'").select("Job ID","Submission Time",F.lit(0).alias("queryid"),"Stage IDs")
        df_jobend=df.where("Event='SparkListenerJobEnd'").select("`Job ID`","Completion Time")
        df_job=df_jobstart.join(df_jobend,"Job ID")
        df_job=df_job.withColumnRenamed("Submission Time","job_start_time")
        df_job=df_job.withColumnRenamed("Completion Time","job_stop_time")
        self.df_job=df_job
        
        jobstage=df_job.select("*",F.explode("Stage IDs").alias("Stage ID"))
        task=df.where("(Event='SparkListenerTaskEnd' or Event='SparkListenerTaskStart') ").select("Event","Stage ID","task info.*","task metrics.*")
        
        self.failed_stages = [str(l['Stage ID']) for l in task.where("Failed='true'").select("Stage ID").distinct().collect()]
        
        self.speculativetask = task.where("speculative = 'true'").count()
        self.speculativekilledtask = task.where("speculative = true and killed='true'").count()
        self.speculativestage = task.where("speculative = true and killed='true'").select("`Stage ID`").distinct().count()
        
        validtsk = task.where("Event = 'SparkListenerTaskEnd' and (Failed<>'true' or killed<>'true')").select("`Task ID`")
        task=task.join(validtsk,on='Task ID',how='inner')
        
        taskjob=task.\
            select("Host","`Event`","`Launch Time`","`Executor ID`","`Task ID`","`Finish Time`",
                    "`Stage ID`","`Input Metrics`.`Bytes Read`","`Disk Bytes Spilled`","`Memory Bytes Spilled`","`Shuffle Read Metrics`.`Local Bytes Read`","`Shuffle Read Metrics`.`Remote Bytes Read`",
                   "`Shuffle Write Metrics`.`Shuffle Bytes Written`","`Executor Deserialize Time`","`Shuffle Read Metrics`.`Fetch Wait Time`","`Executor Run Time`","`Shuffle Write Metrics`.`Shuffle Write Time`",
                   "`Result Serialization Time`","`Getting Result Time`","`JVM GC Time`","`Executor CPU Time`","Accumulables","Peak Execution Memory",
                    F.when(task['Finish Time']==0,task['Launch Time']).otherwise(task['Finish Time']).alias('eventtime')
        ).join(jobstage,"Stage ID").where("`Finish Time` is null or `Finish Time` <=job_stop_time+5")
        
        self.df=taskjob
        
        if len(jobids)>0:
            self.df=self.df.where('`Job ID` in ({:s})'.format(','.join(jobids)))
        
        queryids=self.df.select(F.col("queryid").astype(IntegerType())).distinct().where("queryid is not null").orderBy("queryid").toPandas()
        
        self.query_num=len(queryids)
        if self.query_num>0:
            queryidx=queryids.reset_index()
            queryidx['index']=queryidx['index']+1
            #tpcds query
            if self.query_num==103:
                queryidx['index']=queryidx['index'].map(tpcds_query_map)
            qidx=spark.createDataFrame(queryidx)
            qidx=qidx.withColumnRenamed("index","real_queryid")
            self.df=self.df.join(qidx,on="queryid",how="left")
            if self.dfacc is not None:
                self.dfacc=self.dfacc.join(qidx,on="queryid",how='left')

            if self.queryplans:
                self.queryplans=self.queryplans.join(qidx,"queryid",how="right")
        
        self.df=self.df.fillna(0)
        self.df=self.df.withColumn('Executor ID',F.when(F.col("Executor ID")=="driver",1).otherwise(F.col("Executor ID")))
        self.df.cache()
        
        
        
        ##############################
        
        dfx=self.df.where("Event='SparkListenerTaskEnd'").select("Stage ID","Launch Time","Finish Time","Task ID")
        dfxpds=dfx.toPandas()
        dfxpds.columns=[l.replace(" ","_") for l in dfxpds.columns]
        dfxpds_ods=sqldf('''select * from dfxpds order by finish_time desc''')
        criticaltasks=[]
        idx=0
        prefinish=0
        launchtime=dfxpds_ods["Launch_Time"][0]
        criticaltasks.append([dfxpds_ods["Task_ID"][0],launchtime,dfxpds_ods["Finish_Time"][0]])
        total_row=len(dfxpds_ods)

        while True:
            while idx<total_row:
                if dfxpds_ods["Finish_Time"][idx]-2<launchtime:
                    break
                idx=idx+1
            else:
                break
            cur_finish=dfxpds_ods["Finish_Time"][idx]
            cur_finish=launchtime-1 if cur_finish>=launchtime else cur_finish
            launchtime=dfxpds_ods["Launch_Time"][idx]
            criticaltasks.append([dfxpds_ods["Task_ID"][idx],launchtime,cur_finish])
        self.criticaltasks=criticaltasks

    def get_physical_plan(appals,**kwargs):
        if appals.df is None:
            appals.load_data()
        queryid=kwargs.get('queryid',None)
        shownops=kwargs.get("shownops",['ArrowRowToColumnarExec','ColumnarToRow','RowToArrowColumnar','ArrowColumnarToRow','Filter','HashAggregate','Project','SortAggregate','SortMergeJoin','window'])
        
        desensitization=kwargs.get('desensitization',True)
        
        def get_fields(colss):
            lvls=0
            colns=[]
            ks=""
            for c in colss:
                if c=="," and lvls==0:
                    colns.append(ks)
                    ks=""
                    continue
                if c==" " and ks=="":
                    continue
                if c=="(":
                    lvls+=1
                if c==")":
                    lvls-=1
                ks+=c
            if ks!="":
                colns.append(ks)
            return colns
        
        def get_column_names(s, opname, resultname, prefix, columns, funcs):
            p=re.search(r" "+opname+" ",s[0])
            if p:
                for v in s[1].split("\n"):
                    if v.startswith(resultname):
                        cols=re.search("\[([^0-9].+)\]",v)
                        if cols:
                            colss=cols.group(1)
                            colns=get_fields(colss)
                            if opname+str(len(columns)) not in funcs:
                                funcs[opname+str(len(columns))]=[]
                            funcs[opname+str(len(columns))].extend(colns)
                            for c in colns:
                                if " AS " in c:
                                    c=re.sub("#\d+L*","",c)
                                    colname=re.search(r" AS (.+)",c).group(1)
                                    if colname not in columns:
                                        columns[colname]=prefix
        
        plans=appals.queryplans.select('real_queryid','physicalPlanDescription').collect() if queryid is None else appals.queryplans.where(f"real_queryid='{queryid}'").select("physicalPlanDescription").collect()
        
        for pr in range(0,len(plans)):
            plan=plans[pr]['physicalPlanDescription']
            nodes={}
            lines=plan.split("\n")
            for idx in range(0,len(lines)):
                l=lines[idx]
                if l=='+- == Final Plan ==':
                    while l!='+- == Initial Plan ==':
                        idx+=1
                        l=lines[idx]
                        if not l.endswith(")"):
                            break
                        idv=re.search("\(\d+\)$",l).group(0)
                        nodes[idv]=[l]
                if l=="== Physical Plan ==":
                    while not lines[idx+1].startswith("("):
                        idx+=1
                        l=lines[idx]
                        if not l.endswith(")"):
                            break
                        idv=re.search("\(\d+\)$",l).group(0)
                        nodes[idv]=[l]
                        
                if l.startswith("("):
                    idv=re.search("^\(\d+\)",l).group(0)
                    if idv in nodes:
                        desc=""
                        while l.strip()!="":
                            desc+=l+"\n"
                            idx+=1
                            l=lines[idx]
                        desc=re.sub(r"#\d+L*",r"",desc)
                        desc=re.sub(r"= [^)]+",r"=",desc)
                        desc=re.sub(r"IN \([^)]\)",r"IN ()",desc)
                        desc=re.sub(r"In\([^)]\)",r"In()",desc)
                        desc=re.sub(r"EqualTo\(([^,]+),[^)]+\)",r"EqualTo(\1,)",desc)
                        desc=re.sub(r"搜索广告",r"xxx",desc)
                        ## add all keyword replace here
                        nodes[idv].append(desc)
            tables={}
            columns={}
            functions={}
            for s in nodes.values():
                p=re.search(r"Scan arrow [^.]*\.([^ ]+)",s[0])
                if p:
                    tn=p.group(1)
                    if not tn in tables:
                        tables[tn]="table"
                    if desensitization:
                        s[0]=s[0].replace(tn,tables[tn])
                        s[1]=s[1].replace(tn,tables[tn])
                    colsv=[]
                    schema=[]
                    for v in s[1].split("\n"):
                        if v.startswith("ReadSchema"):
                            cols=re.search("<(.*)>",v)
                            if cols:
                                colss=cols.group(1).split(",")
                                for c in colss:
                                    cts=c.split(":")
                                    ct=cts[0]
                                    if not ct in columns:
                                        if len(cts)==2:
                                            cts[1]=cts[1]
                                            columns[ct]=cts[1]+"_"
                                        else:
                                            columns[ct]="c_"
                        if v.startswith("Location") and desensitization:
                            s[1]=s[1].replace(v+"\n","")
                            
                get_column_names(s, "Project", "Output", "proj_", columns, functions)
                get_column_names(s, "HashAggregate", "Results", "shagg_", columns, functions)
                get_column_names(s, "SortAggregate", "Results", "stagg_", columns, functions)
                get_column_names(s, "ColumnarConditionProject", "Arguments", "cproj_", columns, functions)
                get_column_names(s, "ColumnarHashAggregate", "Results", "cshagg_", columns, functions)
                get_column_names(s, "Window", "Arguments", "window_", columns, functions)

            keys=[]
            ckeys=list(columns.keys())
            for l in range(0,len(ckeys)):
                k1=ckeys[l]
                for k in range(0,len(keys)):
                    if keys[k] in k1:
                        keys.insert(k,k1)
                        break
                else:
                    keys.append(k1)
                
            for s in nodes.values():
                s[1]=html.escape(s[1])
                if desensitization:
                    for c in keys:
                        v=columns[c]
                        if v.startswith("array") or v.startswith("map") or v.startswith("struct"):
                            s[1]=re.sub(c, '<span style="color:red;background-color:yellow">'+html.escape(v)+"</span>",s[1])
                        else:
                            s[1]=re.sub(c, "<font color=#33cc33>"+html.escape(v)+"</font>",s[1])


            htmls=['''<table style="table-layout:fixed;max-width: 100%;">''']
            qid=pr+1 if queryid is None else queryid
            htmls.append(f"<tr><td colspan=2>{qid}</td></tr>")
            for l in nodes.values():
                if shownops is not None:
                    for k in shownops:
                        if " "+k+" " in l[0]:
                            break
                    else:
                        continue
                htmls.append("<tr>")
                htmls.append('<td width=33%><div align="left" style="font-family:Courier New;overflow-wrap: anywhere">')
                htmls.append(l[0].replace(" ","_")
                             .replace("ColumnarToRow","<font color=blue>ColumnarToRow</font>")
                             .replace("RowToArrowColumnar","<font color=blue>RowToArrowColumnar</font>")
                             .replace("ArrowColumnarToRow","<font color=blue>ArrowColumnarToRow</font>")
                             .replace("ArrowRowToColumnar","<font color=blue>ArrowRowToColumnar</font>")
                            )
                htmls.append("</div></td>")
                htmls.append('<td width=66%><div align="left" style="font-family:Courier New;overflow-wrap: anywhere">')
                ls=l[1].split("\n")
                lsx=[]
                for t in ls:
                    cols=re.search("\[([^0-9].+)\]",t)
                    if cols:
                        colss=cols.group(1)
                        colns=get_fields(colss)
                        t=re.sub("\[([^0-9].+)\]","",t)
                        t+="["+'<span style="background-color:#ededed;">;</span>'.join(colns)+"]"                        
                    if ":" in t:
                        lsx.append(re.sub(r'^([^:]+:)',r'<font color=blue>\1</font>',t))
                    else:
                        lsx.append(t)
                htmls.append("<br>".join(lsx))
                htmls.append("</div></td>")
                htmls.append("</tr>")
            htmls.append("</table>")
            display(HTML("\n".join(htmls)))
            
            for k, v in functions.items():
                functions[k]=[l for l in v if "(" in l]
            for f in functions.values():
                for idx in range(0,len(f)):
                    for c in keys:
                        v=columns[c]
                        if v.startswith("array") or v.startswith("map") or v.startswith("struct"):
                            f[idx]=re.sub(c, '<span style="color:red;background-color:yellow">'+html.escape(v)+"</span>",f[idx])
                        else:
                            f[idx]=re.sub(c, "<font color=#33cc33>"+html.escape(v)+"</font>",f[idx])
            funchtml="<table>"
            for k,v in functions.items():
                if shownops is not None:
                    for ks in shownops:
                        if " "+ks+" " in k:
                            break
                    else:
                        continue
                funchtml+="<tr><td width=10%>"+k+'</td><td width=90%><table stype="width:100%;table-layout:fixed">'
                for f in v:
                    funchtml+='<tr><td width=100% ><div align="left" style="font-family:Courier New">'+f+"</div></td></tr>"
                funchtml+="</table></td></tr>"
            funchtml+="</table>"    
            display(HTML(funchtml))
        
        return plans
        
    def get_physical_allnodes(appals,**kwargs):
        if appals.df is None:
            appals.load_data()
        queryid=None
        
        plans=appals.queryplans.select('real_queryid','physicalPlanDescription').collect() if queryid is None else appals.queryplans.where(f"real_queryid='{queryid}'").select("physicalPlanDescription").collect()
        
        allnodes={}
        for pr in range(0,len(plans)):
            plan=plans[pr]['physicalPlanDescription']
            allnodes[pr]={}
            nodes=allnodes[pr]
            if plan is None:
                continue
            lines=plan.split("\n")
            for idx in range(0,len(lines)):
                l=lines[idx]
                if l=='+- == Final Plan ==':
                    while l!='+- == Initial Plan ==':
                        idx+=1
                        l=lines[idx]
                        if not l.endswith(")"):
                            break
                        idv=re.search("\(\d+\)$",l).group(0)
                        nodes[idv]=[l]
                if l.startswith("("):
                    idv=re.search("^\(\d+\)",l).group(0)
                    if idv in nodes:
                        desc=""
                        while l!="":
                            desc+=l+"\n"
                            idx+=1
                            l=lines[idx]
                        nodes[idv].append(desc)
        return allnodes
        
        
    def get_basic_state(appals):
        if appals.df is None:
            appals.load_data()
        display(HTML("<a href=http://sr525:18080/history/"+appals.appid+">http://sr525:18080/history/"+appals.appid+"</a>"))
        
        errorcolor="#000000" if appals.executor_instances == appals.realexecutors else "#c0392b"
        
        qtime=appals.get_query_time(plot=False)
        sums=qtime.sum()
        if len(appals.failed_stages)>0:
            failure="<br>".join(["query: " + str(l["real_queryid"])+"|stage: " + str(l["Stage ID"]) for l in appals.df.where("`Stage ID` in ("+",".join(appals.failed_stages)+")").select("real_queryid","Stage ID").distinct().collect()])
        else:
            failure=""
        display(HTML(f'''
        <table border="1" cellpadding="1" cellspacing="1" style="width:500px">
            <tbody>
                <tr>
                    <td style="width:135px">appid</td>
                    <td style="width:351px"><span style="color:#000000"><strong>{appals.appid}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">executor.instances</td>
                    <td style="width:351px"><span style="color:#000000"><strong>{appals.executor_instances}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">executor.cores</td>
                    <td style="width:351px"><span style="color:#000000"><strong>{appals.executor_cores}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">shuffle.partitions</td>
                    <td style="width:351px"><span style="color:#000000"><strong>{appals.parallelism}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">batch size</td>
                    <td style="width:351px"><span style="color:#000000"><strong>{appals.batchsize}</strong></span></td>
                </tr>                
                <tr>
                    <td style="width:135px">real executors</td>
                    <td style="width:351px"><span style="color:{errorcolor}"><strong>{appals.realexecutors}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">Failed Tasks</td>
                    <td style="width:351px"><span style="color:{errorcolor}"><strong>{failure}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">Speculative Tasks</td>
                    <td style="width:351px"><span style="color:#87b00c"><strong>{appals.speculativetask}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">Speculative Killed Tasks</td>
                    <td style="width:351px"><span style="color:#87b00c"><strong>{appals.speculativekilledtask}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">Speculative Stage</td>
                    <td style="width:351px"><span style="color:#87b00c"><strong>{appals.speculativestage}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">runtime</td>
                    <td style="width:351px"><strong>{round(sums['runtime'],2)}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">disk spilled</td>
                    <td style="width:351px"><strong>{round(sums['disk spilled'],2)}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">memspilled</td>
                    <td style="width:351px"><strong>{round(sums['memspilled'],2)}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">local_read</td>
                    <td style="width:351px"><strong>{round(sums['local_read'],2)}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">remote_read</td>
                    <td style="width:351px"><strong>{round(sums['remote_read'],2)}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">shuffle_write</td>
                    <td style="width:351px"><strong>{round(sums['shuffle_write'],2)}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">task run time</td>
                    <td style="width:351px"><strong>{round(sums['run_time'],2)}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">ser_time</td>
                    <td style="width:351px"><strong>{round(sums['ser_time'],2)}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">f_wait_time</td>
                    <td style="width:351px"><strong>{round(sums['f_wait_time'],2)}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">gc_time</td>
                    <td style="width:351px"><strong>{round(sums['gc_time'],2)}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">input read</td>
                    <td style="width:351px"><strong>{round(sums['input read'],2)}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">acc_task_time</td>
                    <td style="width:351px"><strong>{round(sums['acc_task_time'],2)}</strong></td>
                </tr>
            </tbody>
        </table>

        '''))        
        
    def generate_trace_view_list_exec(self,id=0,**kwargs):
        Analysis.generate_trace_view_list(self,**kwargs)
        showcpu=kwargs.get('showcpu',False)
        shownodes=kwargs.get("shownodes",None)
        
        showdf=self.df.where(F.col("Host").isin(shownodes)) if shownodes else self.df
        
        events=showdf.toPandas()
        coretrack={}
        trace_events=[]
        starttime=self.starttime
        taskend=[]
        trace={"traceEvents":[]}
        exec_hosts={}
        hostsdf=showdf.select("Host").distinct().orderBy("Host")
        hostid=100000
        ended_event=[]
        
        for i,l in hostsdf.toPandas().iterrows():
            exec_hosts[l['Host']]=hostid
            hostid=hostid+100000

        for idx,l in events.iterrows():
            if l['Event']=='SparkListenerTaskStart':
                hostid=exec_hosts[l['Host']]

                tsk=l['Task ID']
                pid=int(l['Executor ID'])*100+hostid
                self.pids.append(pid)
                stime=l['Launch Time']
                #the task's starttime and finishtime is the same, ignore it.
                if tsk in ended_event:
                    continue
                if not pid in coretrack:
                    tids={}
                    trace_events.append({
                       "name": "process_name",
                       "ph": "M",
                       "pid":pid,
                       "tid":0,
                       "args":{"name":"{:s}.{:s}".format(l['Host'],l['Executor ID'])}
                      })

                else:
                    tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==-1:
                        tids[t]=[tsk,stime]
                        break
                else:
                    t=len(tids)
                    tids[t]=[tsk,stime]
                #print("task {:d} tid is {:s}.{:d}".format(tsk,pid,t))
                coretrack[pid]=tids

            if l['Event']=='SparkListenerTaskEnd':
                sevt={}
                eevt={}
                hostid=exec_hosts[l['Host']]
                pid=int(l['Executor ID'])*100+hostid
                tsk=l['Task ID']
                fintime=l['Finish Time']

                tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==tsk:
                        tids[t]=[-1,-1]
                        break
                else:
                    ended_event.append(tsk)
                    continue
                for ps in reversed([key for key in tids.keys()]) :
                    if tids[ps][1]-fintime<0 and tids[ps][1]-fintime>=-2:
                        fintime=tids[ps][1]
                        tids[t]=tids[ps]
                        tids[ps]=[-1,-1]
                        break
                if starttime==0:
                    starttime=l['Launch Time']
                    print(f'applog start time: {starttime}')

                sstime=l['Launch Time']-starttime

                trace_events.append({
                       'tid':pid+int(t),
                       'ts':sstime,
                       'dur':fintime-l['Launch Time'],
                       'pid':pid,
                       "ph":'X',
                       'name':"stg{:d}".format(l['Stage ID']),
                       'args':{"job id": l['job id'],
                               "stage id": l['Stage ID'],
                               "tskid":tsk,
                               "input":builtins.round(l["Bytes Read"]/1024/1024,2),
                               "spill":builtins.round(l["Memory Bytes Spilled"]/1024/1024,2),
                               "Shuffle Read Metrics": "",
                               "|---Local Read": builtins.round(l["Local Bytes Read"]/1024/1024,2),
                               "|---Remote Read":builtins.round(l["Remote Bytes Read"]/1024/1024,2),
                               "Shuffle Write Metrics": "",
                               "|---Write":builtins.round(l['Shuffle Bytes Written']/1024/1024,2)
                               }
                      })

                des_time=l['Executor Deserialize Time']
                read_time=l['Fetch Wait Time']
                exec_time=l['Executor Run Time']
                write_time=math.floor(l['Shuffle Write Time']/1000000)
                ser_time=l['Result Serialization Time']
                getrst_time=l['Getting Result Time']
                durtime=fintime-sstime-starttime;

                times=[0,des_time,read_time,exec_time,write_time,ser_time,getrst_time]
                time_names=['sched delay','deserialize time','read time','executor time','write time','serialize time','result time']
                evttime=reduce((lambda x, y: x + y),times)
                if evttime>durtime:
                    times=[math.floor(l*1.0*durtime/evttime) for l in times]
                else:
                    times[0]=durtime-evttime

                esstime=sstime
                for idx in range(0,len(times)):
                    if times[idx]>0:
                        trace_events.append({
                             'tid':pid+int(t),
                             'ts':esstime,
                             'dur':times[idx],                
                             'pid':pid,
                             'ph':'X',
                             'name':time_names[idx]})
                        if idx==3:
                            trace_events.append({
                                 'tid':pid+int(t),
                                 'ts':esstime,
                                 'dur':l['JVM GC Time'],
                                 'pid':pid,
                                 'ph':'X',
                                 'name':'GC Time'})
                            if showcpu:
                                trace_events.append({
                                     'tid':pid+int(t),
                                     'ts':esstime,
                                     'pid':pid,
                                     'ph':'C',
                                     'name':'cpu% {:d}'.format(pid+int(t)),
                                     'args':{'value':l['Executor CPU Time']/1000000.0/times[idx]}})
                                trace_events.append({
                                     'tid':pid+int(t),
                                     'ts':esstime+times[idx],
                                     'pid':pid,
                                     'ph':'C',
                                     'name':'cpu% {:d}'.format(pid+int(t)),
                                     'args':{'value':0}})
                        esstime=esstime+times[idx]
        self.starttime=starttime
        return [json.dumps(l) for l in trace_events]

    def generate_trace_view_list(self,id=0,**kwargs):
        Analysis.generate_trace_view_list(self,**kwargs)
        showcpu=kwargs.get('showcpu',False)
        shownodes=kwargs.get("shownodes",None)
        
        showdf=self.df.where(F.col("Host").isin(shownodes)) if shownodes else self.df
        
        showdf=showdf.orderBy(["eventtime", "Finish Time"], ascending=[1, 0])
        
        events=showdf.drop("Accumulables").toPandas()
        coretrack={}
        trace_events=[]
        starttime=self.starttime
        taskend=[]
        trace={"traceEvents":[]}
        exec_hosts={}
        hostsdf=showdf.select("Host").distinct().orderBy("Host")
        hostid=100000
        ended_event=[]
        
        for i,l in hostsdf.toPandas().iterrows():
            exec_hosts[l['Host']]=hostid
            hostid=hostid+100000

        tskmap={}
        for idx,l in events.iterrows():
            if l['Event']=='SparkListenerTaskStart':
                hostid=exec_hosts[l['Host']]

                tsk=l['Task ID']
                pid=int(l['Executor ID'])*100+hostid
                self.pids.append(pid)
                stime=l['Launch Time']
                #the task's starttime and finishtime is the same, ignore it.
                if tsk in ended_event:
                    continue
                if not pid in coretrack:
                    tids={}
                    trace_events.append({
                       "name": "process_name",
                       "ph": "M",
                       "pid":pid,
                       "tid":0,
                       "args":{"name":"{:s}.{:s}".format(l['Host'],l['Executor ID'])}
                      })

                else:
                    tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==-1:
                        tids[t]=[tsk,stime]
                        break
                else:
                    t=len(tids)
                    tids[t]=[tsk,stime]
                #print(f"task {tsk} tid is {pid}.{t}")
                coretrack[pid]=tids

            if l['Event']=='SparkListenerTaskEnd':
                sevt={}
                eevt={}
                hostid=exec_hosts[l['Host']]
                pid=int(l['Executor ID'])*100+hostid
                tsk=l['Task ID']
                fintime=l['Finish Time']
                
                tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==tsk:
                        tids[t]=[-1,-1]
                        break
                else:
                    ended_event.append(tsk)
                    continue
                for ps in reversed([key for key in tids.keys()]) :
                    if tids[ps][1]-fintime<0 and tids[ps][1]-fintime>=-2:
                        fintime=tids[ps][1]
                        tids[t]=tids[ps]
                        tids[ps]=[-1,-1]
                        break
                if starttime==0:
                    starttime=l['Launch Time']
                    print(f'applog start time: {starttime}')

                sstime=l['Launch Time']-starttime

                trace_events.append({
                       'tid':pid+int(t),
                       'ts':sstime,
                       'dur':fintime-l['Launch Time'],
                       'pid':pid,
                       "ph":'X',
                       'name':"stg{:d}".format(l['Stage ID']),
                       'args':{"job id": l['Job ID'],
                               "stage id": l['Stage ID'],
                               "tskid":tsk,
                               "input":builtins.round(l["Bytes Read"]/1024/1024,2),
                               "spill":builtins.round(l["Memory Bytes Spilled"]/1024/1024,2),
                               "Shuffle Read Metrics": "",
                               "|---Local Read": builtins.round(l["Local Bytes Read"]/1024/1024,2),
                               "|---Remote Read":builtins.round(l["Remote Bytes Read"]/1024/1024,2),
                               "Shuffle Write Metrics": "",
                               "|---Write":builtins.round(l['Shuffle Bytes Written']/1024/1024,2)
                               }
                      })
                tskmap[tsk]={'pid':pid,'tid':pid+int(t)}

        self.starttime=starttime
        self.tskmap=tskmap
        output=[json.dumps(l) for l in trace_events]
        
        df=self.df
        
        if showcpu and len(self.metricscollect)>0:
            metricscollect=self.metricscollect
            metrics_explode=df.where("Event='SparkListenerTaskEnd'").withColumn("metrics",F.explode("Accumulables"))
            m1092=metrics_explode.select(F.col("Executor ID"),F.col("`Stage ID`"),"`Task ID`",F.col("`Finish Time`"),F.col("`Launch Time`"),(F.col("`Finish Time`")-F.col("`Launch Time`")).alias("elapsedtime"),"metrics.*").where(F.col("ID").isin([l[0] for l in metricscollect]))
            metric_name_df = spark.createDataFrame(metricscollect)
            metric_name_df=metric_name_df.withColumnRenamed("_1","ID")
            metric_name_df=metric_name_df.withColumnRenamed("_2","unit")
            metric_name_df=metric_name_df.withColumnRenamed("_3","mname")

            met_df=m1092.join(metric_name_df,on="ID")
            met_df=met_df.withColumn("Update",F.when(F.col("unit")=='nsTiming',F.col("Update")/1000000).otherwise(F.col("Update")+0))
            met_df=met_df.where("Update>1")

            metdfx=met_df.groupBy("Task ID","elapsedtime").agg(F.sum("Update").alias("totalCnt"))
            taskratio=metdfx.withColumn("ratio",F.when(F.col("totalCnt")<F.col("elapsedtime"),1).otherwise(F.col("elapsedtime")/F.col("totalCnt"))).select("Task ID","ratio")
            met_df=met_df.join(taskratio,on="Task ID")
            met_df=met_df.withColumn("Update",F.col("Update")*F.col("ratio"))

            w = (Window.partitionBy('Task ID').orderBy(F.desc("Update")).rangeBetween(Window.unboundedPreceding, 0))
            met_df=met_df.withColumn('cum_sum', F.sum('Update').over(w))

            met_df=met_df.withColumn("starttime",F.col("Launch Time")+F.col("cum_sum")-F.col("Update"))

            tskmapdf = spark.createDataFrame(pandas.DataFrame(self.tskmap).T.reset_index())
            met_df=met_df.join(tskmapdf,on=[met_df["Task ID"]==tskmapdf["index"]])

            rstdf=met_df.select(
                F.col("tid"),
                F.round(F.col("starttime")-self.starttime,0).alias("ts"),
                F.round(F.col("Update"),0).alias("dur"),
                F.col("pid"),
                F.lit("X").alias("ph"),
                F.col("mname").alias("name")
            ).where(F.col("ts").isNotNull()).orderBy('ts')

            output.extend(rstdf.toJSON().collect())

            qtime=df.where("Event='SparkListenerTaskEnd'").groupBy("real_queryid").agg(F.min("Finish Time").alias("time"))
            output.extend(qtime.select(
                F.lit("i").alias("ph"),
                (F.col("time")-starttime).alias('ts'),
                F.lit(0).alias("pid"),
                F.lit(0).alias("tid"),
                F.lit("p").alias("s")
            ).toJSON().collect())
        
        self.starttime=starttime
        
        if kwargs.get("show_criticalshow_time_metric_path",True):
            output.extend(self.generate_critical_patch_traceview(hostid-1))
        
        return output        

    def generate_critical_patch_traceview(self,pid):
        if self.df is None:
            self.load_data()
        traces=[]
        df=self.df.where("Event='SparkListenerTaskEnd' and real_queryid is not null")
        criticaltasks=self.criticaltasks
        cripds=pandas.DataFrame(criticaltasks)
        cripds.columns=['task_id',"launch","finish"]
        cridf=spark.createDataFrame(cripds)
        df_ctsk=df.join(cridf,on=[F.col("task_id")==F.col("Task ID")],how="inner")
        traces.extend(df_ctsk.select(F.lit(38).alias("tid"),
                      (F.col("launch")-F.lit(self.starttime)+1).alias("ts"),
                      (F.col("finish")-F.col("launch")-1).alias("dur"),
                      F.lit(pid).alias("pid"),
                      F.lit("X").alias("ph"),
                      F.concat(F.lit("stg"),F.col("Stage ID")).alias("name"),
                      F.struct(
                          F.col("Task ID").alias('taskid'),
                          F.col("Executor ID").astype(IntegerType()).alias('exec_id'),
                          F.col("Host").alias("host"),
                          ).alias("args")
                        ).toJSON().collect())
        traces.extend(df.groupBy("real_queryid").agg(F.max("Finish Time").alias("finish"),F.min("Launch Time").alias("launch")).select(
                        F.lit(38).alias("tid"),
                      (F.col("launch")-F.lit(self.starttime)).alias("ts"),
                      (F.col("finish")-F.col("launch")).alias("dur"),
                      F.lit(pid).alias("pid"),
                      F.lit("X").alias("ph"),
                      F.concat(F.lit("qry"),F.col("real_queryid")).alias("name")).toJSON().collect())


        metricscollect=self.metricscollect

        metrics_explode=df_ctsk.where("Event='SparkListenerTaskEnd'").withColumn("metrics",F.explode("Accumulables"))
        m1092=metrics_explode.select(F.col("Executor ID"),F.col("`Stage ID`"),"`Task ID`",F.col("`Finish Time`"),F.col("`Launch Time`"),(F.col("`Finish Time`")-F.col("`Launch Time`")).alias("elapsedtime"),"metrics.*").where(F.col("ID").isin([l[0] for l in metricscollect]))
        metric_name_df = spark.createDataFrame(metricscollect)
        metric_name_df=metric_name_df.withColumnRenamed("_1","ID")
        metric_name_df=metric_name_df.withColumnRenamed("_2","unit")
        metric_name_df=metric_name_df.withColumnRenamed("_3","mname")
        metric_name_df=metric_name_df.withColumnRenamed("_4","node")

        metric_name_df=metric_name_df.where("mname <> 'totaltime_collectbatch'")

        met_df=m1092.join(metric_name_df,on="ID")
        met_df=met_df.withColumn("Update",F.when(F.col("unit")=='nsTiming',F.col("Update")/1000000).otherwise(F.col("Update")+0))
        
        #pandas UDF doesn't work. hang
        #tmbk=met_df.groupBy('Task ID').apply(time_breakdown)
        
        w=Window.partitionBy('Task ID')
        met_df1=met_df.withColumn("sum_update",F.sum("Update").over(w))
        met_df2=met_df1.withColumn("ratio",(F.col("Finish Time")-F.col("Launch Time")-2)/F.col("sum_update"))
        met_df3=met_df2.withColumn("ratio",F.when(F.col("ratio")>1,1).otherwise(F.col("ratio")))
        met_df4=met_df3.withColumn("update_ratio",F.floor(F.col("ratio")*F.col("Update")))
        met_df5=met_df4.where(F.col("update_ratio")>2)
        w = (Window.partitionBy('Task ID').orderBy(F.desc("update_ratio")).rowsBetween(Window.unboundedPreceding, Window.currentRow))
        met_df6=met_df5.withColumn('ltime_dur', F.sum('update_ratio').over(w))
        met_df8=met_df6.withColumn("ltime",F.col("ltime_dur")+F.col("Launch Time")-F.col("update_ratio"))

        tmbk=met_df8.withColumn("taskid",F.col("Task ID")).withColumn("start",F.col("ltime")+F.lit(1)).withColumn("dur",F.col("update_ratio")-F.lit(1)).withColumn("name",F.col("mname"))
        
        
        traces.extend(tmbk.select(
                        F.lit(38).alias("tid"),
                      (F.col("start")-F.lit(self.starttime)).alias("ts"),
                      (F.col("dur")).alias("dur"),
                      F.lit(pid).alias("pid"),
                      F.lit("X").alias("ph"),
                      F.col("name").alias("name")).toJSON().collect())
        traces.append(json.dumps({
                       "name": "process_name",
                       "ph": "M",
                       "pid":pid,
                       "tid":0,
                       "args":{"name":"critical path"}
                      }))
        return traces    
    
    def show_Stage_histogram(apps,stageid,bincount):
        if apps.df is None:
            apps.load_data()
        
        inputsize = apps.df.where("`Stage ID`={:d}".format(stageid)).select("Stage ID","Executor ID", "Task ID", F.explode("Accumulables")) \
                      .select("Stage ID","Executor ID", "Task ID","col.*") \
                      .where("Name='input size in bytes' or Name='size of files read'") \
                      .groupBy("Task ID") \
                      .agg((F.sum("Update")).alias("input read"))


        stage37=apps.df.where("`Stage ID`={:d} and event='SparkListenerTaskEnd'".format(stageid) )\
                        .join(inputsize,on=["Task ID"],how="left")\
                        .fillna(0) \
                        .select(F.col('Host'), 
                                F.round((F.col('Finish Time')/1000-F.col('Launch Time')/1000),2).alias('elapsedtime'),
                                F.round((F.col('`input read`')+F.col('`Bytes Read`')+F.col('`Local Bytes Read`')+F.col('`Remote Bytes Read`'))/1024/1024,2).alias('input'))
        stage37=stage37.cache()
        hist_elapsedtime=stage37.select('elapsedtime').rdd.flatMap(lambda x: x).histogram(15)
        hist_input=stage37.select('input').rdd.flatMap(lambda x: x).histogram(15)
        fig, axs = plt.subplots(figsize=(30, 5),nrows=1, ncols=2)
        ax=axs[0]
        binSides, binCounts = hist_elapsedtime
        binSides=[builtins.round(l,2) for l in binSides]

        N = len(binCounts)
        ind = numpy.arange(N)
        width = 0.5

        rects1 = ax.bar(ind+0.5, binCounts, width, color='b')

        ax.set_ylabel('Frequencies')
        ax.set_title('stage{:d} elapsed time breakdown'.format(stageid))
        ax.set_xticks(numpy.arange(N+1))
        ax.set_xticklabels(binSides)

        ax=axs[1]
        binSides, binCounts = hist_input
        binSides=[builtins.round(l,2) for l in binSides]

        N = len(binCounts)
        ind = numpy.arange(N)
        width = 0.5
        rects1 = ax.bar(ind+0.5, binCounts, width, color='b')

        ax.set_ylabel('Frequencies')
        ax.set_title('stage{:d} input data breakdown'.format(stageid))
        ax.set_xticks(numpy.arange(N+1))
        ax.set_xticklabels(binSides)

        out=stage37
        outpds=out.toPandas()

        fig, axs = plt.subplots(nrows=1, ncols=3, sharey=False,figsize=(30,8),gridspec_kw = {'width_ratios':[1, 1, 1]})
        plt.subplots_adjust(wspace=0.01)

        groups= outpds.groupby('Host')
        for name, group in groups:
            axs[0].plot(group.input, group.elapsedtime, marker='o', linestyle='', ms=5, label=name)
        axs[0].set_xlabel('input size (MB)')
        axs[0].set_ylabel('elapsed time (s)')

        axs[0].legend()

        axs[0].get_shared_y_axes().join(axs[0], axs[1])

        sns.violinplot(y='elapsedtime', x='Host', data=outpds,palette=['g'],ax=axs[1])

        sns.violinplot(y='input', x='Host', data=outpds,palette=['g'],ax=axs[2])

        #ax.xaxis.set_major_formatter(mtick.FormatStrFormatter(''))
        #ax.yaxis.set_major_formatter(mtick.FormatStrFormatter(''))

        if False:
            out=stage37
            vecAssembler = VectorAssembler(inputCols=["input",'elapsedtime'], outputCol="features").setHandleInvalid("skip")
            new_df = vecAssembler.transform(out)
            kmeans = KMeans(k=2, seed=1)  # 2 clusters here
            model = kmeans.fit(new_df.select('features'))
            transformed = model.transform(new_df)


            outpds=transformed.select('Host','elapsedtime','input','prediction').toPandas()

            fig, axs = plt.subplots(nrows=1, ncols=2, sharey=False,figsize=(30,8),gridspec_kw = {'width_ratios':[1, 1]})
            plt.subplots_adjust(wspace=0.01)

            groups= outpds.groupby('prediction')
            for name, group in groups:
                axs[0].plot(group.input, group.elapsedtime, marker='o', linestyle='', ms=5, label=name)
            axs[0].legend()

            bars=transformed.where('prediction=1').groupBy("Host").count().toPandas()

            axs[1].bar(bars['Host'], bars['count'], 0.4, color='coral')
            axs[1].set_title('cluster=1')

        plt.show()
        
    def show_Stages_hist(apps,**kwargs):
        if apps.df is None:
            apps.load_data()
        
        bincount=kwargs.get("bincount",15)
        threshold=kwargs.get("threshold",0.9)
        
        query=kwargs.get("queryid",None)
        if query and type(query)==int:
            query = [query,]
        df=apps.df.where(F.col("real_queryid").isin(query)) if query else apps.df
        
        totaltime=df.where("event='SparkListenerTaskEnd'" ).agg(F.sum(F.col('Finish Time')-F.col('Launch Time')).alias('total_time')).collect()[0]['total_time']
        stage_time=df.where("event='SparkListenerTaskEnd'" ).groupBy('`Stage ID`').agg(F.sum(F.col('Finish Time')-F.col('Launch Time')).alias('total_time')).orderBy('total_time', ascending=False).toPandas()
        stage_time['acc_total'] = stage_time['total_time'].cumsum()/totaltime
        stage_time=stage_time.reset_index()
        fig, ax = plt.subplots(figsize=(30, 5))

        rects1 = ax.plot(stage_time['index'],stage_time['acc_total'],'b.-')
        ax.set_xticks(stage_time['index'])
        ax.set_xticklabels(stage_time['Stage ID'])
        ax.set_xlabel('stage')
        ax.grid(which='major', axis='x')
        plt.show()
        shownstage=[]
        for x in stage_time.index:
            if stage_time['acc_total'][x]<=threshold:
                shownstage.append(stage_time['Stage ID'][x])
            else:
                shownstage.append(stage_time['Stage ID'][x])
                break
        for row in shownstage:
            apps.show_Stage_histogram(row,bincount) 
            
    def get_hottest_stages(apps,**kwargs):
        if apps.df is None:
            apps.load_data()
        
        bincount=kwargs.get("bincount",15)
        threshold=kwargs.get("threshold",0.9)
        plot=kwargs.get("plot",True)
        
        query=kwargs.get("queryid",None)
        if query and type(query)==int:
            query = [query,]
        df=apps.df.where(F.col("real_queryid").isin(query)) if query else apps.df.where("queryid is not NULL")

        stage_time=df.where("event='SparkListenerTaskEnd'" ).groupBy('`Stage ID`','Job ID','real_queryid').agg(
            F.sum(F.col('Finish Time')-F.col('Launch Time')).alias('total_time'),
            F.stddev(F.col('Finish Time')/1000-F.col('Launch Time')/1000).alias('stdev_time'),
            F.count("*").alias("cnt"),
            F.first('queryid').astype(IntegerType()).alias('queryid')
            )\
            .select('`Stage ID`','Job ID','real_queryid','queryid',
                    (F.col("total_time")/1000/(F.when(F.col("cnt")>F.lit(apps.executor_instances*apps.executor_cores/apps.taskcpus),F.lit(apps.executor_instances*apps.executor_cores/apps.taskcpus)).otherwise(F.col("cnt")))).alias("total_time"),
                    F.col("stdev_time")
                   ).orderBy('total_time', ascending=False).toPandas()

        totaltime=stage_time['total_time'].sum()
        stage_time['acc_total'] = stage_time['total_time'].cumsum()/totaltime
        stage_time['total'] = stage_time['total_time']/totaltime
        stage_time=stage_time.reset_index()

        shownstage=stage_time.loc[stage_time['acc_total'] <=threshold]
        shownstage['stg']=shownstage['real_queryid'].astype(str)+'_'+shownstage['Job ID'].astype(str)+'_'+shownstage['Stage ID'].astype(str)
        if plot:
            shownstage.plot.bar(x="stg",y="total",figsize=(30,8))



        norm = matplotlib.colors.Normalize(vmin=0, vmax=max(stage_time.queryid))
        cmap = matplotlib.cm.get_cmap('brg')
        def setbkcolor(x):
            rgba=cmap(norm(x['queryid']))
            return ['background-color:rgba({:d},{:d},{:d},1); color:white'.format(int(rgba[0]*255),int(rgba[1]*255),int(rgba[2]*255))]*9

        if plot:
            display(stage_time.style.apply(setbkcolor,axis=1).format({"total_time":lambda x: '{:,.2f}'.format(x),"acc_total":lambda x: '{:,.2%}'.format(x),"total":lambda x: '{:,.2%}'.format(x)}))
        
        return stage_time

    def scatter_elapsetime_input(apps,stageid):
        if apps.df is None:
            apps.load_data()
        stage37=apps.df.where("`Stage ID`={:d} and event='SparkListenerTaskEnd'".format(stageid) ).select(F.round((F.col('Finish Time')/1000-F.col('Launch Time')/1000),2).alias('elapsedtime'),F.round((F.col('`Bytes Read`')+F.col('`Local Bytes Read`')+F.col('`Remote Bytes Read`'))/1024/1024,2).alias('input')).toPandas()
        stage37.plot.scatter('input','elapsedtime',figsize=(30, 5))

    def get_critical_path_stages(self):     
        df=self.df.where("Event='SparkListenerTaskEnd'")
        criticaltasks=self.criticaltasks
        cripds=pandas.DataFrame(criticaltasks)
        cripds.columns=['task_id',"launch","finish"]
        cridf=spark.createDataFrame(cripds)
        df_ctsk=df.join(cridf,on=[F.col("task_id")==F.col("Task ID")],how="inner")
        df_ctsk=df_ctsk.withColumn("elapsed",(F.col("Finish Time")-F.col("Launch Time"))/1000)
        return df_ctsk.where("elapsed>10").orderBy(F.desc("elapsed")).select("real_queryid",F.round("elapsed",2).alias("elapsed"),"Host","executor ID","Stage ID","Task ID",F.round(F.col("Bytes Read")/1000000,0).alias("file read"),F.round((F.col("Local Bytes Read")+F.col("Remote Bytes Read"))/1000000,0).alias("shuffle read")).toPandas()
        
    def show_time_metric(self,**kwargs):
        if self.df is None:
            self.load_data()
        shownodes=kwargs.get("shownodes",None)
        query=kwargs.get("queryid",None)
        plot=kwargs.get("plot",True)
        taskids=kwargs.get("taskids",None)
        
        if query and type(query)==int:
            query = [query,]
        
        showexecutor=kwargs.get("showexecutor",True) if not taskids else False
        queryid = query[0] if query else 0
        
        df=self.df.where(F.col("Host").isin(shownodes)) if shownodes else self.df
        df=df.where(F.col("real_queryid").isin(query)) if query else df.where("queryid is not NULL")

        df=df.where(F.col("Task ID").isin(taskids)) if taskids else df

        exec_cores=1 if taskids else self.executor_cores
        execs=1 if taskids else self.executor_instances

        metricscollect=self.metricscollect

        metrics_explode=df.where("Event='SparkListenerTaskEnd'").withColumn("metrics",F.explode("Accumulables"))
        m1092=metrics_explode.select(F.col("Executor ID"),F.col("`Stage ID`"),"`Task ID`",F.col("`Finish Time`"),F.col("`Launch Time`"),(F.col("`Finish Time`")-F.col("`Launch Time`")).alias("elapsedtime"),"metrics.*").where(F.col("ID").isin([l[0] for l in metricscollect]))
        metric_name_df = spark.createDataFrame(metricscollect)
        metric_name_df=metric_name_df.withColumnRenamed("_1","ID")
        metric_name_df=metric_name_df.withColumnRenamed("_2","unit")
        metric_name_df=metric_name_df.withColumnRenamed("_3","mname")
        metric_name_df=metric_name_df.withColumnRenamed("_4","node")

        runtime=metrics_explode.agg(F.round(F.max("Finish Time")/1000-F.min("Launch Time")/1000,2).alias("runtime")).collect()[0]["runtime"]

        met_df=m1092.join(metric_name_df,on="ID")
        met_df=met_df.withColumn("Update",F.when(F.col("unit")=='nsTiming',F.col("Update")/1000000).otherwise(F.col("Update")+0))
        outpdf=met_df.groupBy("`Executor ID`","mname").sum("Update").orderBy("Executor ID").toPandas()

        met_time_cnt=df.where("Event='SparkListenerTaskEnd'")
        exectime=met_time_cnt.groupBy("Executor ID").agg((F.max("Finish Time")-F.min("Launch Time")).alias("totaltime"),F.sum(F.col("`Finish Time`")-F.col("`Launch Time`")).alias("tasktime"))

        totaltime_query=met_time_cnt.groupBy("real_queryid").agg((F.max("Finish Time")-F.min("Launch Time")).alias("totaltime")).agg(F.sum("totaltime").alias("totaltime")).collect()
        totaltime_query=totaltime_query[0]["totaltime"]
        
        pdf=exectime.toPandas()
        exeids=set(outpdf['Executor ID'])
        outpdfs=[outpdf[outpdf["Executor ID"]==l] for l in exeids]
        tasktime=pdf.set_index("Executor ID").to_dict()['tasktime']

        def comb(l,r):
            execid=list(r['Executor ID'])[0]
            lp=r[['mname','sum(Update)']]
            lp.columns=["mname","val_"+execid]
            idle=totaltime_query*exec_cores-tasktime[execid]
            nocount=tasktime[execid]-sum(lp["val_"+execid])
            if idle<0:
                idle=0
            if nocount<0:
                nocount=0
            lp=lp.append([{"mname":"idle","val_"+execid:idle}])
            lp=lp.append([{"mname":"not_counted","val_"+execid:nocount}])
            if l is not None:
                return pandas.merge(lp, l,on=["mname"],how='outer')
            else:
                return lp

        rstpdf=None
        for l in outpdfs[0:]:
            rstpdf=comb(rstpdf,l)
            
        for l in [l for l in rstpdf.columns if l!="mname"]:
            rstpdf[l]=rstpdf[l]/1000/exec_cores
    
        rstpdf=rstpdf.sort_values(by="val_"+list(exeids)[0],axis=0,ascending=False)
        if showexecutor and plot:
            rstpdf.set_index("mname").T.plot.bar(stacked=True,figsize=(30,8))
        pdf_sum=pandas.DataFrame(rstpdf.set_index("mname").T.sum())
        totaltime=totaltime_query/1000
        pdf_sum[0]=pdf_sum[0]/(execs)
        pdf_sum[0]["idle"]=(totaltime_query-sum(tasktime.values())/execs/exec_cores)/1000
        pdf_sum=pdf_sum.sort_values(by=0,axis=0,ascending=False)
        pdf_sum=pdf_sum.T
        pdf_sum.columns=["{:>2.0f}%_{:s}".format(pdf_sum[l][0]/totaltime*100,l) for l in pdf_sum.columns]
        matplotlib.rcParams['font.sans-serif'] = "monospace"
        matplotlib.rcParams['font.family'] = "monospace"
        import matplotlib.font_manager as font_manager
        if plot:
            ax=pdf_sum.plot.bar(stacked=True,figsize=(30,8))
            font = font_manager.FontProperties(family='monospace',
                                               style='normal', size=14)
            ax.legend(prop=font,loc=4)
            plt.title("{:s} q{:d} executors={:d} cores_per_executor={:d} parallelism={:d} sumtime={:.0f} runtime={:.0f}".format(self.file.split("/")[2],queryid,self.executor_instances,self.executor_cores,self.parallelism,totaltime,runtime),fontdict={'fontsize':24})
        return pdf_sum

    def show_critical_path_time_breakdown(self,**kwargs):
        if self.df is None:
            self.load_data()
        return self.show_time_metric(taskids=[l[0].item() for l in self.criticaltasks])
    
    def get_spark_config(self):
        df=spark.read.json(self.file)
        self.appid=df.where("`App ID` is not null").collect()[0]["App ID"]
        pandas.set_option('display.max_rows', None)
        pandas.set_option('display.max_columns', None)
        pandas.set_option('display.max_colwidth', 100000)
        return df.select("Properties.*").where("`spark.app.id` is not null").limit(1).toPandas().T
    
    def get_app_name(self):
        cfg=self.get_spark_config()
        display(HTML("<font size=5 color=red>" + cfg.loc[cfg.index=='spark.app.name'][0][0]+"</font>"))
        
        
    def get_query_time(self,**kwargs):
        if self.df is None:
            self.load_data()
        queryid=kwargs.get("queryid",None)
        showtable=kwargs.get("showtable",True)
        plot=kwargs.get("plot",True)
        
        if queryid and type(queryid)==int:
            queryid = [queryid,]
           
        df=self.df.where(F.col("real_queryid").isin(queryid)) if queryid else self.df.where("queryid is not NULL")
        
            
        stages=df.select("real_queryid","Stage ID").distinct().orderBy("Stage ID").groupBy("real_queryid").agg(F.collect_list("Stage ID").alias("stages")).orderBy("real_queryid")
        runtimeacc=df.where("Event='SparkListenerTaskEnd'") \
                      .groupBy("real_queryid") \
                      .agg(F.round(F.sum(F.col("Finish Time")-F.col("Launch Time"))/1000/self.executor_instances/self.executor_cores*self.taskcpus,2).alias("acc_task_time"))
        inputsize = df.select("real_queryid","Stage ID","Executor ID", "Task ID", F.explode("Accumulables")) \
                      .select("real_queryid","Stage ID","Executor ID", "Task ID","col.*") \
                      .where("Name='input size in bytes' or Name='size of files read'") \
                      .groupBy("real_queryid") \
                      .agg(F.round(F.sum("Update")/1024/1024/1024,2).alias("input read")).orderBy("real_queryid")
        if self.dfacc is not None:
            inputsizev1 = self.dfacc.where("Name='size of files read'").groupBy("real_queryid").agg(F.round(F.sum("Update")/1024/1024/1024,2).alias("input read v1")).orderBy("real_queryid")
            inputsize=inputsize.join(inputsizev1,on="real_queryid",how="outer")
            inputsize=inputsize.withColumn("input read",F.coalesce(F.col("input read"),F.col("input read v1"))).drop("input read v1")
        
        outputrows = df.select("real_queryid","Stage ID","Stage ID",F.explode("Accumulables"))\
                        .select("real_queryid","Stage ID","Stage ID","col.*")\
                        .where("Name='number of output rows'")\
                        .groupBy("real_queryid")\
                        .agg(F.round(F.sum("Update")/1000000000,2).alias("output rows"))
        
        stages=runtimeacc.join(stages,on="real_queryid",how="left")
        stages=inputsize.join(stages,on="real_queryid",how="left")
        stages=stages.join(outputrows,on='real_queryid',how="left")
        
        out=df.groupBy("real_queryid").agg(
            F.round(F.max("job_stop_time")/1000-F.min("job_start_time")/1000,2).alias("runtime"),
            F.round(F.sum("Disk Bytes Spilled")/1024/1024/1024,2).alias("disk spilled"),
            F.round(F.sum("Memory Bytes Spilled")/1024/1024/1024,2).alias("memspilled"),
            F.round(F.sum("Local Bytes Read")/1024/1024/1024,2).alias("local_read"),
            F.round(F.sum("Remote Bytes Read")/1024/1024/1024,2).alias("remote_read"),
            F.round(F.sum("Shuffle Bytes Written")/1024/1024/1024,2).alias("shuffle_write"),
            F.round(F.sum("Executor Deserialize Time")/1000/self.parallelism,2).alias("deser_time"),
            F.round(F.sum("Executor Run Time")/1000/self.parallelism,2).alias("run_time"),
            F.round(F.sum("Result Serialization Time")/1000/self.parallelism,2).alias("ser_time"),
            F.round(F.sum("Fetch Wait Time")/1000/self.parallelism,2).alias("f_wait_time"),
            F.round(F.sum("JVM GC Time")/1000/self.parallelism,2).alias("gc_time"),
            F.round(F.max("Peak Execution Memory")/1000000000*self.executor_instances*self.executor_cores,2).alias("peak_mem"),
            F.max("queryid").alias("queryid")
            ).join(stages,"real_queryid",how="left").orderBy("real_queryid").toPandas().set_index("real_queryid")
        out["executors"]=self.executor_instances
        out["core/exec"]=self.executor_cores
        out["task.cpus"]=self.taskcpus
        out['parallelism']=self.parallelism
        
        if not showtable:
            return out

        def highlight_greater(x):
            m1 = x['acc_task_time'] / x['runtime'] * 100
            m2 = x['run_time'] / x['runtime'] * 100
            m3 = x['f_wait_time'] / x['runtime'] * 100
            

            df1 = pandas.DataFrame('', index=x.index, columns=x.columns)

            df1['acc_task_time'] = m1.apply(lambda x: 'background-image: linear-gradient(to right,#5fba7d {:f}%,white {:f}%)'.format(x,x))
            df1['run_time'] = m2.apply(lambda x: 'background-image: linear-gradient(to right,#5fba7d {:f}%,white {:f}%)'.format(x,x))
            df1['f_wait_time'] = m3.apply(lambda x: 'background-image: linear-gradient(to right,#d65f5f {:f}%,white {:f}%)'.format(x,x))
            return df1


        cm = sns.light_palette("green", as_cmap=True)
        if plot:
            display(out.style.apply(highlight_greater, axis=None).background_gradient(cmap=cm,subset=['input read', 'shuffle_write']))
        
        return out
    
    def get_query_time_metric(self):
        if self.df is None:
            self.load_data()
        querids=self.df.select("queryid").distinct().collect()
        for idx,q in enumerate([l["queryid"] for l in querids]):
            self.show_time_metric(query=[q,],showexecutor=False)
            
    def getOperatorCount(self):
        if self.df is None:
            self.load_data()
        df=spark.read.json(self.file)
        queryids=self.df.select(F.col("queryid").astype(LongType()),F.col("real_queryid")).distinct().orderBy("real_queryid")
        queryplans=self.queryplans.collect()
        list_queryid=[l.real_queryid for l in queryids.collect()]

        def get_child(execid,node):
            if node["nodeName"] not in qps:
                qps[node["nodeName"]]={l:0 for l in list_queryid}
            qps[node["nodeName"]][execid]=qps[node["nodeName"]][execid]+1
            if node["children"] is not None:
                for c in node["children"]:
                    get_child(execid,c)

        qps={}
        for c in queryplans:
            get_child(c['real_queryid'],c)

        return pandas.DataFrame(qps).T.sort_index(axis=0)        
    
    def get_query_plan(self,**kwargs):
        if self.df is None:
            self.load_data()

        queryid=kwargs.get("queryid",None)
        stageid=kwargs.get("stageid",None)
        
        outputstage=kwargs.get("outputstage",None)
        
        show_plan_only=kwargs.get("show_plan_only",False)
        show_simple_string=kwargs.get("show_simple_string",False)

        plot=kwargs.get("plot",True)
        
        colors=["#{:02x}{:02x}{:02x}".format(int(l[0]*255),int(l[1]*255),int(l[2]*255)) for l in matplotlib.cm.get_cmap('tab20').colors]
        
        if queryid is not None:
            if type(queryid)==int or type(queryid)==str:
                queryid = [queryid,]
            shown_stageid = [l["Stage ID"] for l in self.df.where(F.col("real_queryid").isin(queryid)).select("Stage ID").distinct().collect()]
        if stageid is not None:
            if type(stageid)==int:
                shown_stageid = [stageid,]
            elif type(stageid)==list:
                shown_stageid = stageid
            queryid = [l["real_queryid"] for l in self.df.where(F.col("`Stage ID`").isin(shown_stageid)).select("real_queryid").limit(1).collect()]


        queryplans=[]
        queryplans = self.queryplans.where(F.col("real_queryid").isin(queryid)).orderBy("real_queryid").collect() if queryid else self.queryplans.orderBy("real_queryid").collect()
        dfmetric=self.df.where("Event='SparkListenerTaskEnd'").select("queryid","real_queryid","Stage ID","Job ID",F.explode("Accumulables").alias("metric")).select("*","metric.*").select("Stage ID","ID","Update").groupBy("ID","Stage ID").agg(F.round(F.sum("Update"),1).alias("value"),F.round(F.stddev("Update"),1).alias("stdev")).collect()
        accid2stageid={l.ID:(l["Stage ID"],l["value"],l["stdev"]) for l in dfmetric}

        stagetime=self.df.where((F.col("real_queryid").isin(queryid))).where(F.col("Event")=='SparkListenerTaskEnd').groupBy("Stage ID").agg(
            F.round(F.sum(F.col("Finish Time")-F.col("Launch Time"))/1000/self.executor_instances/self.executor_cores*self.taskcpus,1).alias("elapsed time"),
            F.round(F.stddev(F.col("Finish Time")-F.col("Launch Time"))/1000,1).alias("time stdev"),
            F.count(F.col("Task ID")).alias("partitions")
            ).orderBy(F.desc("elapsed time")).collect()

        apptotaltime=reduce(lambda x,y: x+y['elapsed time'], stagetime,0)
        if apptotaltime==0:
            display(HTML("<font size=4 color=red>Error, totaltime is 0 </font>"))
            apptotaltime=1
            return ""

        stagemap={l["Stage ID"]:l["elapsed time"] for l in stagetime}
        stage_time_stdev_map={l["Stage ID"]:l["time stdev"] for l in stagetime}
        stagepartmap={l["Stage ID"]:l["partitions"] for l in stagetime}

        keystage=[]
        keystagetime=[]
        subtotal=0
        for s in stagetime:
            subtotal=subtotal+s['elapsed time']
            keystage.append(s['Stage ID'])
            keystagetime.append(s['elapsed time'])
            if subtotal/apptotaltime>0.9:
                break
        keystagetime=["{:02x}{:02x}".format(int(255*l/keystagetime[0]),255-int(255*l/keystagetime[0])) for l in keystagetime if keystagetime[0]>0]
        keystagemap=dict(zip(keystage,keystagetime))
        outstr=[]
        def print_plan(real_queryid,level,node,parent_stageid):
            stageid = accid2stageid[int(node["metrics"][0]["accumulatorId"])][0]  if node["metrics"] is not None and len(node["metrics"])>0 and node["metrics"][0]["accumulatorId"] in accid2stageid else parent_stageid

            if stageid in shown_stageid:
                fontcolor=f"color:#{keystagemap[stageid]}00;font-weight:bold" if stageid in keystagemap else "color:#000000"
                stagetime=0 if stageid not in stagemap else stagemap[stageid]
                stageParts=0 if stageid not in stagepartmap else stagepartmap[stageid]

                input_rowcntstr=""
                output_rowcntstr=""
                timestr=""
                timename=""
                input_columnarbatch=""
                output_columnarbatch=""
                output_row_batch=""
                other_metric_name_str=""
                other_metric_str=""

                outputrows=0
                outputbatches=0
                if node["metrics"] is not None:
                    for m in node["metrics"]:

                        if m["accumulatorId"] not in accid2stageid:
                            continue

                        value=accid2stageid[m["accumulatorId"]][1]
                        stdev_value=accid2stageid[m["accumulatorId"]][2]
                        stdev_value=0 if stdev_value is None else stdev_value
                        if m["metricType"] in ['nsTiming','timing']:
                            totaltime=value/1000 if  m["metricType"] == 'timing' else value/1000000000
                            stdev_value=stdev_value/1000 if  m["metricType"] == 'timing' else stdev_value/1000000000
                            timename=timename+m["name"]+"<br>"
                            timeratio= 0  if stagetime==0 else totaltime/self.executor_instances/self.executor_cores*self.taskcpus/stagetime*100
                            timeratio_query = totaltime/self.executor_instances/self.executor_cores*self.taskcpus/apptotaltime*100
                            if timeratio > 10 or timeratio_query>10:
                                timestr=timestr+"<font style='background-color:#ffff42'>{:.2f}s ({:.1f}%, {:.1f}%, {:.2f})</font><br>".format(totaltime,timeratio, totaltime/self.executor_instances/self.executor_cores*self.taskcpus/apptotaltime*100,stdev_value)
                            else:
                                timestr=timestr+"{:.2f}s ({:.1f}%, {:.1f}%, {:.2f})<br>".format(totaltime,timeratio, totaltime/self.executor_instances/self.executor_cores*self.taskcpus/apptotaltime*100,stdev_value)
                        elif m["name"]=="number of output rows":
                            output_rowcntstr="{:,.1f}".format(value/1000/1000)+" M"
                            outputrows=value
                        elif m["name"] in ["number of output columnar batches","number of output batches","output_batches"]:
                            output_columnarbatch="{:,d}".format(int(value))
                            outputbatches=value
                        elif m["name"]=="number of input rows":
                            input_rowcntstr="{:,.1f}".format(value/1000/1000)+" M"
                        elif m["name"] in ["number of input batches","number of Input batches","input_batches"]:
                            input_columnarbatch="{:,d}".format(int(value))
                        else:
                            other_metric_name_str=other_metric_name_str+m["name"]+"<br>"
                            if value>1000000000:
                                other_metric_str=other_metric_str+"{:,.1f} G (stdev: {:,.1f})<br>".format(value/1000000000,stdev_value/1000000000)
                            elif value>1000000:
                                other_metric_str=other_metric_str+"{:,.1f} M (stdev: {:,.1f})<br>".format(value/1000000,stdev_value/1000000)
                            elif value>1000:
                                other_metric_str=other_metric_str+"{:,.1f} K (stdev: {:,.1f})<br>".format(value/1000,stdev_value/1000)
                            else:
                                other_metric_str=other_metric_str+"{:,d} (stdev: {:,.1f})<br>".format(int(value),stdev_value)


                if outputrows>0 and outputbatches>0:
                    output_row_batch="{:,d}".format(int(outputrows/outputbatches))


                fontcolor=f"color:#{keystagemap[stageid]}00;font-weight:bold" if stageid in keystage else "color:#000000"
                stagetime=0 if stageid not in stagemap else stagemap[stageid]
                stage_time_stdev=0 if stageid not in stage_time_stdev_map else stage_time_stdev_map[stageid]
                
                nodenamestr=node["nodeName"]
                if nodenamestr is None:
                    nodenamestr=""
                if nodenamestr in ['ColumnarToRow','RowToArrowColumnar','ArrowColumnarToRow','ArrowRowToColumnarExec']:
                    nodename='<span style="color: green; background-color: #ffff42">'+nodenamestr+'</span>'
                else:
                    nodename=nodenamestr
                if outputstage is not None:
                    outputstage.append({"queryid":real_queryid,"stageid":stageid,"stagetime":stagetime,"stageParts":stageParts,"nodename":nodenamestr,"output_rowcnt":outputrows,"nodename_level":" ".join(["|_" for l in range(0,level)]) + " " + nodenamestr})
                if not show_plan_only:
                    nodestr= " ".join(["|_" for l in range(0,level)]) + " " + nodename
                    if show_simple_string :
                        simstr=node['simpleString']
                        nodestr = nodestr + "<br>\n" +  simstr                                                                 
                                                                  
                    outstr.append(f"<tr><td style='{fontcolor}'>{stageid}</td>"+
                                  f"<td style='{fontcolor}'> {stagetime}({stage_time_stdev}) </td>"+
                                  f"<td style='{fontcolor}'> {stageParts} </td>"+
                                  f"<td style='text-align:left; background-color:{colors[stageid % 20]}'>" + nodestr + f"</td>"+
                                  f"<td style='{fontcolor}'> {input_rowcntstr} </td>"+
                                  f"<td style='{fontcolor}'> {input_columnarbatch} </td>"+
                                  f"<td style='{fontcolor}'> {output_rowcntstr} </td>"+
                                  f"<td style='{fontcolor}'> {output_columnarbatch} </td>"+
                                  f"<td style='{fontcolor}'> {output_row_batch} </td>"+
                                  f"<td style='{fontcolor}'> {timename} </td>"+
                                  f"<td style='{fontcolor}'>{timestr}</td>"+
                                  f"<td style='{fontcolor}'> {other_metric_name_str} </td>"+
                                  f"<td style='{fontcolor}'>{other_metric_str}</td></tr>")
                else:
                    outstr.append(f"<tr><td style='{fontcolor}'>{stageid}</td>"+
                                  f"<td style='{fontcolor}'> {stagetime} </td>"+
                                  f"<td style='{fontcolor}'> {stageParts} </td>"+
                                  f"<td style='text-align:left; background-color:{colors[stageid % 20]}'>" + " ".join(["|_" for l in range(0,level)]) + " " + nodename + f"</td>"+
                                  f"<td style='{fontcolor}'> {output_rowcntstr} </td></tr>")
                    
            if node["children"] is not None:
                for c in node["children"]:
                    print_plan(real_queryid, level+1,c,stageid)

        for c in queryplans:
            outstr.append("<font color=red size=4>"+str(c['real_queryid'])+"</font><table>")
            if not show_plan_only:
                outstr.append('''<tr>
                                    <td>stage id</td>
                                    <td>stage time</td>
                                    <td>partions</td>
                                    <td>operator</td>
                                    <td>input rows</td>
                                    <td>input batches</td>
                                    <td>output rows</td>
                                    <td>output batches</td>
                                    <td>output rows/batch</td>
                                    <td width=150>time metric name</td>
                                    <td width=200>time(%stage,%total,stdev)</td>
                                    <td width=150>other metric name</td>
                                    <td width=130>value</td>
                                </tr>''')
            else:
                outstr.append('''<tr>
                                    <td>stage id</td>
                                    <td>stage time</td>
                                    <td>partions</td>
                                    <td>operator</td>
                                    <td>output rows</td>
                                </tr>''')

            print_plan(c['real_queryid'],0,c,0)
            outstr.append("</table>")
        if plot:
            display(HTML(" ".join(outstr)))
        return " ".join(outstr)
    
    def get_metric_output_rowcnt(self, **kwargs):
        return self.get_metric_rowcnt("number of output rows",**kwargs)
        
    def get_metric_input_rowcnt(self, **kwargs):
        return self.get_metric_rowcnt("number of input rows",**kwargs)
        
    def get_metric_rowcnt(self,rowname, **kwargs):
        if self.df is None:
            self.load_data()

        queryid=kwargs.get("queryid",None)
        stageid=kwargs.get("stageid",None)
        
        if queryid and type(queryid)==int:
            queryid = [queryid,]
            
        if stageid and type(stageid)==int:
            stageid = [stageid,]
            
        queryplans = self.queryplans.where(F.col("real_queryid").isin(queryid)).orderBy("real_queryid").collect() if queryid else self.queryplans.orderBy("real_queryid").collect()
        qps=[]

        rownames=rowname if type(rowname)==list else [rowname,]
        def get_child(execid,node):
            if node['metrics'] is not None:
                outputrows=[x for x in node["metrics"] if "name" in x and x["name"] in rownames]
                if len(outputrows)>0:
                    qps.append([node["nodeName"],execid,outputrows[0]['accumulatorId']])
            if node["children"] is not None:
                for c in node["children"]:
                    get_child(execid,c)
        for c in queryplans:
            get_child(c['real_queryid'],c)

        if len(qps)==0:
            print("Metric ",rowname," is not found. ")
            return None
        stagetime=self.df.where("Event='SparkListenerTaskEnd'").groupBy("Stage ID").agg(F.round(F.sum(F.col("Finish Time")-F.col("Launch Time"))/1000/self.executor_instances/self.executor_cores*self.taskcpus,2).alias("stage time"))
        dfmetric=self.df.where("Event='SparkListenerTaskEnd'").select("queryid","real_queryid","Stage ID","Job ID",F.explode("Accumulables").alias("metric")).select("*","metric.*").drop("metric")
        numrowmetric=spark.createDataFrame(qps)
        numrowmetric=numrowmetric.withColumnRenamed("_1","metric").withColumnRenamed("_2","real_queryid").withColumnRenamed("_3","metricid")
        dfmetric_rowcnt=dfmetric.join(numrowmetric.drop("real_queryid"),on=[F.col("metricid")==F.col("ID")],how="right")
        stagemetric=dfmetric_rowcnt.groupBy("queryid","real_queryid","Job ID","Stage ID","metricid").agg(F.round(F.sum("Update")/1000000,2).alias("total_row"),F.max("metric").alias("nodename")).join(stagetime,"Stage ID")

        if queryid:
            return stagemetric.where(F.col("real_queryid").isin(queryid)).orderBy("Stage ID").toPandas()
        else:
            noderow=stagemetric.groupBy("real_queryid","nodename").agg(F.round(F.sum("total_row"),2).alias("total_row")).orderBy("nodename").collect()
            out={}
            qids=set([r.real_queryid for r in noderow])
            for r in noderow:
                if r.nodename not in out:
                    out[r.nodename]={c:0 for c in qids}
                out[r.nodename][r.real_queryid]=r.total_row
            return pandas.DataFrame(out).T
    
    def get_query_info(self,queryid):
        display(HTML("<font color=red size=7 face='Courier New'><b> time stat info </b></font>",))
        tmp=self.get_query_time(queryid=queryid)
        display(HTML("<font color=red size=7 face='Courier New'><b> stage stat info </b></font>",))
        display(self.get_stage_stat(queryid=queryid))
        display(HTML("<font color=red size=7 face='Courier New'><b> query plan </b></font>",))
        self.get_query_plan(queryid=queryid)
        display(HTML("<font color=red size=7 face='Courier New'><b> stage hist info </b></font>",))
        self.show_Stages_hist(queryid=queryid)
        display(HTML("<font color=red size=7 face='Courier New'><b> time info </b></font>",))
        display(self.show_time_metric(queryid=queryid))
        display(HTML("<font color=red size=7 face='Courier New'><b> operator and rowcount </b></font>",))
        display(self.get_metric_input_rowcnt(queryid=queryid))
        display(self.get_metric_output_rowcnt(queryid=queryid))
        
    def get_app_info(self,**kwargs):
        if self.df is None:
            self.load_data()

        display(HTML(f"<font color=red size=7 face='Courier New'><b> {self.appid} </b></font>",))
        display(HTML(f"<a href=http://sr525:18080/history/{self.appid}>http://sr525:18080/history/{self.appid}</a>"))
        display(HTML("<font color=red size=7 face='Courier New'><b> query time </b></font>",))
        tmp=self.get_query_time(**kwargs)
        display(HTML("<font color=red size=7 face='Courier New'><b> operator count </b></font>",))
        pdf=self.getOperatorCount()
        display(pdf.style.apply(background_gradient,
               cmap='OrRd',
               m=pdf.min().min(),
               M=pdf.max().max(),
               low=0,
               high=1))
        
        display(HTML("<font color=red size=7 face='Courier New'><b> operator input row count </b></font>",))
        pdf=self.get_metric_input_rowcnt(**kwargs)
        if pdf is not None:
            display(pdf.style.apply(background_gradient,
                   cmap='OrRd',
                   m=pdf.min().min(),
                   M=pdf.max().max(),
                   low=0,
                   high=1))
        display(HTML("<font color=red size=7 face='Courier New'><b> operator output row count </b></font>",))
        pdf=self.get_metric_output_rowcnt(**kwargs)
        if pdf is not None:
            display(pdf.style.apply(background_gradient,
                   cmap='OrRd',
                   m=pdf.min().min(),
                   M=pdf.max().max(),
                   low=0,
                   high=1))
        self.show_time_metric(**kwargs)
        
    def get_stage_stat(self,**kwargs):
        if self.df is None:
            self.load_data()

        queryid=kwargs.get("queryid",None)

        if queryid and type(queryid)==int:
            queryid = [queryid,]
            
        df=self.df.where(F.col("real_queryid").isin(queryid)).where(F.col("Event")=='SparkListenerTaskEnd')
        
        inputsize = df.select("real_queryid","Stage ID","Executor ID", "Task ID", F.explode("Accumulables")) \
                      .select("real_queryid","Stage ID","Executor ID", "Task ID","col.*") \
                      .where("Name='input size in bytes' or Name='size of files read'") \
                      .groupBy("Stage ID") \
                      .agg(F.round(F.sum("Update")/1024/1024/1024,2).alias("input read"))
        
        return df.groupBy("Job ID","Stage ID").agg(
            F.round(F.sum(F.col("Finish Time")-F.col("Launch Time"))/1000/self.executor_instances/self.executor_cores*self.taskcpus,1).alias("elapsed time"),
            F.round(F.sum(F.col("Disk Bytes Spilled"))/1024/1024/1024,1).alias("disk spilled"),
            F.round(F.sum(F.col("Memory Bytes Spilled"))/1024/1024/1024,1).alias("mem spilled"),
            F.round(F.sum(F.col("Local Bytes Read"))/1024/1024/1024,1).alias("local read"),
            F.round(F.sum(F.col("Remote Bytes Read"))/1024/1024/1024,1).alias("remote read"),
            F.round(F.sum(F.col("Shuffle Bytes Written"))/1024/1024/1024,1).alias("shuffle write"),
            F.round(F.sum(F.col("Executor Deserialize Time"))/1000,1).alias("deseri time"),
            F.round(F.sum(F.col("Fetch Wait Time"))/1000,1).alias("fetch wait time"),
            F.round(F.sum(F.col("Shuffle Write Time"))/1000000000,1).alias("shuffle write time"),
            F.round(F.sum(F.col("Result Serialization Time"))/1000,1).alias("seri time"),
            F.round(F.sum(F.col("Getting Result Time"))/1000,1).alias("get result time"),
            F.round(F.sum(F.col("JVM GC Time"))/1000,1).alias("gc time"),
            F.round(F.sum(F.col("Executor CPU Time"))/1000000000,1).alias("exe cpu time")    
            ).join(inputsize,on=["Stage ID"],how="left").orderBy("Stage ID").toPandas()
    
    def get_metrics_by_node(self,node_name):
        if self.df is None:
            self.load_data()
            
        metrics=self.queryplans.collect()
        coalesce=[]
        metricsid=[0]
        def get_metric(root):
            if root['nodeName']==node_name:
                metricsid[0]=metricsid[0]+1
                for l in root["metrics"]:
                    coalesce.append([l['accumulatorId'],l["metricType"],l['name'],root["nodeName"],metricsid[0]])
            if root["children"] is not None:
                for c in root["children"]:
                    get_metric(c)
        for c in metrics:
            get_metric(c)

        df=self.df.select("queryid","real_queryid",'Stage ID','Task ID','Job ID',F.explode("Accumulables"))
        df=df.select("*","col.*")
        metricdf=spark.createDataFrame(coalesce)
        metricdf=metricdf.withColumnRenamed("_1","ID").withColumnRenamed("_2","Unit").withColumnRenamed("_3","metricName").withColumnRenamed("_4","nodeName").withColumnRenamed("_5","nodeID")
        df=df.join(metricdf,on=["ID"],how="right")
        shufflemetric=set(l[2] for l in coalesce)
        metricdfs=[df.where(F.col("Name")==l).groupBy("real_queryid","nodeID","Stage ID").agg(F.stddev("Update").alias(l+"_stddev"),F.mean("Update").alias(l+"_mean"),F.mean("Update").alias(l) if l.startswith("avg") else F.sum("Update").alias(l)) for l in shufflemetric]
        
        stagetimedf=self.df.where("Event='SparkListenerTaskEnd'").groupBy("Stage ID").agg(F.count("*").alias("partnum"),F.round(F.sum(F.col("Finish Time")-F.col("Launch Time"))/1000,2).alias("ElapsedTime"))
        
        nodemetric=reduce(lambda x,y: x.join(y, on=['nodeID',"Stage ID","real_queryid"],how="full"),metricdfs)
        return nodemetric.join(stagetimedf,on="Stage ID")
    
    
    def get_coalesce_batch_row_cnt(self,**kwargs):
        stagesum=self.get_metrics_by_node("CoalesceBatches")
        
        pandas.options.display.float_format = '{:,}'.format
        
        stagesum=stagesum.withColumnRenamed("number of output rows","rows")
        
        coalescedf = stagesum.orderBy("real_queryid",'Stage ID').where("rows>4000").toPandas()
        
        coalescedf["row/input_batch"] = coalescedf["rows"]/coalescedf["input_batches"]
        coalescedf["row/out_batch"] = coalescedf["rows"]/coalescedf["output_batches"]
        coalescedf['stage']=coalescedf["real_queryid"].astype(str)+"_"+coalescedf['Stage ID'].astype(str)
        
        ax=coalescedf.plot(y=["row/input_batch","row/out_batch"],figsize=(30,8),style="-*")
        coalescedf.plot(ax=ax,y=['rows'],secondary_y=['rows'],style="k_")
        self.print_real_queryid(ax,coalescedf)
        
        return coalescedf
    
    def print_real_queryid(self,ax,dataset):
        ax.axes.get_xaxis().set_ticks([])

        ymin, ymax = ax.get_ybound()

        real_queryid=list(dataset['real_queryid'])
        s=real_queryid[0]
        lastx=0
        for idx,v in enumerate(real_queryid):
            if v!=s:
                xmin = xmax = idx-1+0.5
                l = mlines.Line2D([xmin,xmax], [ymin,ymax],color="green")
                ax.add_line(l)
                ax.text(lastx+(xmin-lastx)/2-0.25,ymin-(ymax-ymin)/20,f"{s}",size=20)
                s=v
                lastx=xmin

    def get_shuffle_stat(self,**kwargs):
        if self.df is None:
            self.load_data()
            
        shufflesize=kwargs.get("shuffle_size",1000000)
        queryid=kwargs.get("queryid",None)

        exchangedf=self.get_metrics_by_node("ColumnarExchange")
        exchangedf.cache()
        exchangedf.count()
        mapdf=exchangedf.where("totaltime_split is not null").select("nodeID",F.col("Stage ID").alias("map_stageid"),"real_queryid",F.floor(F.col("totaltime_split")/F.col("totaltime_split_mean")).alias("map_partnum"),"totaltime_compress","totaltime_computepid","totaltime_split","shuffle write time","shuffle spill time",'shuffle records written','data size','shuffle bytes written','shuffle bytes written_mean','shuffle bytes written_stddev','shuffle bytes spilled','number of input rows')
        reducerdf=exchangedf.where("totaltime_split is null").select("nodeID",F.col("Stage ID").alias("reducer_stageid"),"real_queryid",'local blocks read','local bytes read',F.floor(F.col("records read")/F.col("records read_mean")).alias("reducer_partnum"),(F.col('avg read batch num rows')/10).alias("avg read batch num rows"),'remote bytes read','records read','remote blocks read',(F.col("number of output rows")/F.col("records read")).alias("avg rows per split recordbatch"))
        shuffledf=mapdf.join(reducerdf,on=["nodeID","real_queryid"],how="full")
        if queryid is not None:
            shuffledf=shuffledf.where(F.col("real_queryid")==queryid) 
        shuffle_pdf=shuffledf.where("`shuffle bytes written`>1000000").orderBy("real_queryid","map_stageid","nodeID").toPandas()
        shuffle_pdf["shuffle bytes written"]=shuffle_pdf["shuffle bytes written"]/1000000000
        shuffle_pdf["data size"]=shuffle_pdf["data size"]/1000000000
        shuffle_pdf["shuffle bytes written_mean"]=shuffle_pdf["shuffle bytes written_mean"]/1000000
        shuffle_pdf["shuffle bytes written_stddev"]=shuffle_pdf["shuffle bytes written_stddev"]/1000000
        ax=shuffle_pdf.plot(y=["avg read batch num rows",'avg rows per split recordbatch'],figsize=(30,8),style="b-*",title="average batch size after split")
        self.print_real_queryid(ax,shuffle_pdf)
        shuffle_pdf["split_ratio"]=shuffle_pdf["records read"]/shuffle_pdf['shuffle records written']
        ax=shuffle_pdf.plot(y=["split_ratio","records read"],secondary_y=["records read"],figsize=(30,8),style="-*",title="Split Ratio")
        self.print_real_queryid(ax,shuffle_pdf)
        shuffle_pdf["compress_ratio"]=shuffle_pdf["data size"]/shuffle_pdf['shuffle bytes written']
        ax=shuffle_pdf.plot(y=["shuffle bytes written","compress_ratio"],secondary_y=["compress_ratio"],figsize=(30,8),style="-*",title="compress ratio")
        self.print_real_queryid(ax,shuffle_pdf)
        shufflewritepdf=shuffle_pdf
        ax=shufflewritepdf.plot.bar(y=["shuffle write time","shuffle spill time","totaltime_compress","totaltime_split","totaltime_computepid"],stacked=True,figsize=(30,8),title="split time + shuffle write time vs. shuffle bytes written")
        ax=shufflewritepdf.plot(ax=ax,y=["shuffle bytes written"],secondary_y=["shuffle bytes written"],style="-*")
        self.print_real_queryid(ax,shufflewritepdf)
        
        metrics=self.queryplans.collect()
        coalesce=[]
        metricsid=[0]
        def get_metric(root):
            if root['nodeName']=="ColumnarExchange":
                metricsid[0]=metricsid[0]+1
                for l in root["metrics"]:
                    coalesce.append([l['accumulatorId'],l["metricType"],l['name'],root["nodeName"],metricsid[0],root["simpleString"]])
            if root["children"] is not None:
                for c in root["children"]:
                    get_metric(c)
        for c in metrics:
            get_metric(c)

        tps={}
        for r in coalesce:
            rx=re.search(r"\[OUTPUT\] List\((.*)\)",r[5])
            if rx:
                if r[4] not in tps:
                    tps[r[4]]={}
                    fds=rx.group(1).split(", ")
                    for f in fds:
                        if not re.search(r":(.+Type)",f):
                            print(fds)
                        else:
                            tp=re.search(r":(.+Type)",f).group(1)
                            if tp not in tps[r[4]]:
                                tps[r[4]][tp]=1
                            else:
                                tps[r[4]][tp]+=1
        if len(tps)>0:
            typedf=pandas.DataFrame(tps).T.reset_index()
            typedf=typedf.fillna(0)
            shuffle_pdf=pandas.merge(shuffle_pdf,typedf,left_on="nodeID",right_on="index")
            shufflewritepdf=shuffle_pdf
            ax=shufflewritepdf.plot.bar(y=["number of input rows"],stacked=True,figsize=(30,8),title="rows vs. shuffle data type")
            ax=shufflewritepdf.plot(ax=ax,y=list(typedf.columns[1:]),secondary_y=list(typedf.columns[1:]),style="-o")
            self.print_real_queryid(ax,shufflewritepdf)
            ax=shufflewritepdf.plot.bar(y=["totaltime_split"],stacked=True,figsize=(30,8),title="split time vs. shuffle data type")
            ax=shufflewritepdf.plot(ax=ax,y=list(typedf.columns[1:]),secondary_y=list(typedf.columns[1:]),style="-o")
            self.print_real_queryid(ax,shufflewritepdf)

        
        
        shufflewritepdf.plot(x="shuffle bytes written",y=["shuffle write time","totaltime_split"],figsize=(30,8),style="*")
        shufflewritepdf["avg shuffle batch size after split"]=shufflewritepdf["shuffle bytes written"]*1000000/shufflewritepdf['records read']
        shufflewritepdf["avg batch size after split"]=shufflewritepdf["data size"]*1000000/shufflewritepdf['records read']
        ax=shufflewritepdf.plot(y=["avg shuffle batch size after split","avg batch size after split","shuffle bytes written"],secondary_y=["shuffle bytes written"],figsize=(30,8),style="-*",title="avg batch KB after split")
        self.print_real_queryid(ax,shufflewritepdf)
        shufflewritepdf["avg batch# per splitted partition"]=shufflewritepdf['records read']/(shufflewritepdf['local blocks read']+shufflewritepdf['remote blocks read'])
        ax=shufflewritepdf.plot(y=["avg batch# per splitted partition",'records read'],secondary_y=['records read'],figsize=(30,8),style="-*",title="avg batch# per splitted partition")
        self.print_real_queryid(ax,shufflewritepdf)
        fig, ax = plt.subplots(figsize=(30,8))
        ax.set_title('shuffle wite bytes with stddev')
        ax.errorbar(x=shuffle_pdf.index,y=shuffle_pdf['shuffle bytes written_mean'], yerr=shuffle_pdf['shuffle bytes written_stddev'], linestyle='None', marker='o')
        self.print_real_queryid(ax,shuffle_pdf)
        shuffle_pdf['record batch per mapper per reducer']=shuffle_pdf['records read']/(shuffle_pdf["map_partnum"]*shuffle_pdf['reducer_partnum'])
        ax=shuffle_pdf.plot(y=["record batch per mapper per reducer"],figsize=(30,8),style="b-*",title="record batch per mapper per reducer")
        self.print_real_queryid(ax,shuffle_pdf)
        
        inputsize = self.df.select("Stage ID","Executor ID", "Task ID", F.explode("Accumulables")) \
              .select("Stage ID","Executor ID", "Task ID","col.*") \
              .where("Name='input size in bytes' or Name='size of files read'") \
              .groupBy("Task ID") \
              .agg((F.sum("Update")).alias("input read"))
        stageinput=self.df.where("event='SparkListenerTaskEnd'" )\
                                .join(inputsize,on=["Task ID"],how="left")\
                                .fillna(0) \
                                .select(F.col('Host'), F.col("real_queryid"),F.col('Stage ID'),F.col('Task ID'),
                                        F.round((F.col('Finish Time')/1000-F.col('Launch Time')/1000),2).alias('elapsedtime'),
                                        F.round((F.col('`input read`')+F.col('`Bytes Read`')+F.col('`Local Bytes Read`')+F.col('`Remote Bytes Read`'))/1024/1024,2).alias('input'))
        baisstage=stageinput.groupBy("real_queryid","Stage ID").agg(F.mean("elapsedtime").alias("elapsed"),F.mean("input").alias("input"),
                                                            (F.stddev("elapsedtime")).alias("elapsedtime_err"),
                                                            (F.stddev("input")).alias("input_err"),
                                                            (F.max("elapsedtime")-F.mean("elapsedtime")).alias("elapsed_max"),
                                                            (F.mean("elapsedtime")-F.min("elapsedtime")).alias("elapsed_min"),
                                                            (F.max("input")-F.mean("input")).alias("input_max"),
                                                            (F.mean("input")-F.min("input")).alias("input_min")).orderBy("real_queryid","Stage ID")
        dfx=baisstage.toPandas()
        fig, ax = plt.subplots(figsize=(30,8))
        ax.set_title('input size')
        ax.errorbar(x=dfx.index,y=dfx['input'], yerr=dfx['input_err'], fmt='ok', ecolor='red', lw=3)
        ax.errorbar(x=dfx.index,y=dfx['input'],yerr=[dfx['input_min'],dfx['input_max']],
                     fmt='.k', ecolor='gray', lw=1)
        self.print_real_queryid(ax,dfx)
        
        fig, ax = plt.subplots(figsize=(30,8))
        ax.set_title('stage time')

        ax.errorbar(x=dfx.index,y=dfx['elapsed'], yerr=dfx['elapsedtime_err'], fmt='ok', ecolor='red', lw=5)
        ax.errorbar(x=dfx.index,y=dfx['elapsed'],yerr=[dfx['elapsed_min'],dfx['elapsed_max']],
                     fmt='.k', ecolor='gray', lw=1)

        self.print_real_queryid(ax,dfx)
        return (shuffle_pdf,dfx)
    
    def get_stages_w_odd_partitions(appals,**kwargs):
        if appals.df is None:
            appals.load_data()
        return appals.df.where("Event='SparkListenerTaskEnd'")\
                    .groupBy("Stage ID","real_queryid")\
                    .agg((F.sum(F.col('Finish Time')-F.col('Launch Time'))/1000).alias("elapsed time"),
                         F.count('*').alias('partitions'))\
                    .where(F.col("partitions")%(appals.executor_cores*appals.executor_instances/appals.taskcpus)!=0)\
                    .orderBy(F.desc("elapsed time")).toPandas()
   
    def get_scaned_column_v1(appals):
        def get_scans(node):
            if node['nodeName'].startswith("Scan arrow"):
                scans.append(node)
            for c in node['children']:
                get_scans(c)

        alltable=[]
        for qid in range(1,23):
            scans=[]
            plans=appals.queryplans.where("real_queryid="+str(qid)).collect()
            get_scans(plans[0])
            for s in scans:
                alltable.append([qid,",".join([l.split(":")[0] for l in re.split(r'[<>]',s['metadata']['ReadSchema'])[1].split(",")])])
        return alltable
    
    def get_scaned_column_v2(appals):
        def get_scans(node):
            if node['nodeName'].startswith("ColumnarBatchScan"):
                scans.append(node)
            for c in node['children']:
                get_scans(c)

        alltable=[]
        for qid in range(1,23):
            scans=[]
            plans=appals.queryplans.where("real_queryid="+str(qid)).collect()
            get_scans(plans[0])
            for s in scans:
                alltable.append([qid,",".join([l.split("#")[0] for l in re.split(r"[\[\]]",s['simpleString'])[1].split(",")])])
        return alltable
    
    def compare_query(appals,queryid,appbaseals):
        print(f"~~~~~~~~~~~~~~~~~~~~~~~~~~~~Query{queryid}~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
        appals.show_critical_path_time_breakdown(queryid=22)
        s1=appals.get_stage_stat(queryid=queryid)
        s2=appbaseals.get_stage_stat(queryid=queryid)
        ls=s1[['Stage ID','elapsed time']]
        ls.columns=['l sid','l time']
        rs=s2[['Stage ID','elapsed time']]
        rs.columns=['r sid','r time']
        js=ls.join(rs)
        js['gap']=js['r time'] - js['l time']
        js['gap']=js['gap'].round(2)
        display(js)
        display(s1)
        display(s2)
        stagesmap={}
        for x in range(0,min(len(s1),len(s2))):
            stagesmap[s1['Stage ID'][x]]=s2['Stage ID'][x]
        totaltime=sum(s1['elapsed time'])
        acctime=0
        s1time=s1.sort_values("elapsed time",ascending=False,ignore_index=True)
        ldfx=appals.get_metric_output_rowcnt(queryid=queryid)
        rdfx=appbaseals.get_metric_output_rowcnt(queryid=queryid)

        for x in range(0,len(s1time)):
            sid1=int(s1time['Stage ID'][x])
            sid2=int(stagesmap[sid1])
            print(f"============================================================")
            display(ldfx[ldfx['Stage ID']==sid1])
            display(rdfx[ldfx['Stage ID']==sid2])
            print(f" Gazelle  Query {queryid}  Stage {sid1}")
            xf=appals.get_query_plan(stageid=sid1,show_simple_string=True)
            print(f" Photon  Query {queryid}  Stage {sid2}")
            xf=appbaseals.get_query_plan(stageid=sid2,show_simple_string=True)
            acctime+=s1time['elapsed time'][x]
            if acctime/totaltime>=0.9:
                break

In [None]:
notlist=['resource.executor.cores',
 'spark.app.id',
 'spark.app.initial.file.urls',
 'spark.app.name',
 'spark.app.startTime',
 'spark.driver.port',
 'spark.job.description',
 'spark.jobGroup.id',
 'spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
 'spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
 'spark.rdd.scope',
 'spark.sql.execution.id',
 '__fetch_continuous_blocks_in_batch_enabled',
 'spark.driver.appUIAddress'
 'spark.driver.appUIAddress',
 'spark.driver.host',
 'spark.driver.appUIAddress',
 'spark.driver.extraClassPath',
 'spark.eventLog.dir',
 'spark.executorEnv.CC',
 'spark.executorEnv.LD_LIBRARY_PATH',
 'spark.executorEnv.LD_PRELOAD',
 'spark.executorEnv.LIBARROW_DIR',
 'spark.files',
 'spark.history.fs.logDirectory',
 'spark.sql.warehouse.dir',
 'spark.yarn.appMasterEnv.LD_PRELOAD',
 'spark.yarn.dist.files'
]
def comp_spark_conf(app0,app1):   
    pdf_sparkconf_0=app0.get_spark_config()
    pdf_sparkconf_1=app1.get_spark_config()
    pdfc=pdf_sparkconf_0.join(pdf_sparkconf_1,lsuffix=app0.appid[-8:],rsuffix=app1.appid[-8:])
    pdfc["0"+app0.appid[-8:]]=pdfc["0"+app0.appid[-8:]].str.lower()
    pdfc["0"+app1.appid[-8:]]=pdfc["0"+app1.appid[-8:]].str.lower()
    
    pdfc['comp']=(pdfc["0"+app0.appid[-8:]]==pdfc["0"+app1.appid[-8:]])
    return pdfc.loc[(pdfc['comp']==False) & (~pdfc.index.isin(notlist))]

# MISC

In [None]:
def show_rst(pdrst):
    html='''<style type="text/css">
    .tg  {border-collapse:collapse;border-spacing:0;border-color:#aabcfe;}
    .tg td{font-family:Courier New;font-size:18px;padding:10px 5px;border-style:solid;border-width:1px;overflow:hidden;word-break:normal;border-color:#aabcfe;color:#669;background-color:#e8edff;}
    .tg th{font-family:Courier New;font-size:18px;font-weight:normal;padding:10px 5px;border-style:solid;border-width:1px;overflow:hidden;word-break:normal;border-color:#aabcfe;color:#039;background-color:#b9c9fe;}
    .tg .tg-phtq{background-color:#D2E4FC;border-color:inherit;text-align:left;vertical-align:top}
    .tg .tg-c3ow{border-color:inherit;text-align:center;vertical-align:top}
    .tg .tg-0pky{border-color:inherit;text-align:left;vertical-align:top}
    .tg .tg-phtq_v{background-color:#D2E4FC;border-color:inherit;text-align:left;vertical-align:top; color:#FF0000}
    .tg .tg-0pky_v{border-color:inherit;text-align:left;vertical-align:top; color:#FF0000}
    </style>
    <table class="tg">
      <tr>
    '''
    cols=pdrst.columns  
    html=html+''.join(['<th class="tg-c3ow">{:s}</th>'.format(l) for l in cols if l!='app_id'])
    html=html+'<th class="tg-c3ow">spark log</th><th class="tg-c3ow">trace_view</th></tr>'
    for idx, r in pdrst.iterrows():
        html=html+"<tr>"
        html=html+"".join(['<td class="{:s}">{:s}</td>'.format('tg-phtq' if l!='elapsed time' else 'tg-phtq_v', str(r[l])) for l in cols if l!='app_id'])
        html=html+'''<td class="tg-phtq"><a href="http://10.1.2.107:18080/history/{:s}/jobs"> {:s}</a></td><td class="tg-phtq"><a href=http://sr525:1088/tracing_examples/trace_viewer.html#/tracing/test_data/{:s}.json> {:s}</a></td>'''.format(r['app_id'].appid,r['app_id'].appid,r['app_id'].appid,r['app_id'].appid)
        html=html+"</tr>"
    html=html+"</table>"
    display(HTML(html))

In [None]:
def reduce_metric(pdrst,slave_id,metric,core,agg_func):
    pdrst['rst']=pdrst.apply(lambda x:x['app_id'].get_reduce_metric(slave_id,metric,core,agg_func), axis=1)
    for l in agg_func:
        pdrst[get_alias_name(metric,l)]=pdrst.apply(lambda x:x['rst'].iloc[0][get_alias_name(metric,l)],axis=1)
    return pdrst.drop(columns=['rst'])

In [None]:
def clean_data(rsta):
    for i in range(0,r):
        m=rsta.loc[i,'emon_mem_bw']
        if m>40000:
            for j in range(i,i+5):
                if j>=r:
                    break
                if rsta.loc[j,'emon_mem_bw']<40000:
                    m=0
                    break
            rsta.loc[i,'emon_mem_bw']=m
        else:
            rsta.loc[i,'emon_mem_bw']=False

In [None]:
def background_gradient(s, m, M, cmap='PuBu', low=0, high=0):
    from matplotlib import colors
    rng = M - m
    norm = colors.Normalize(m - (rng * low),
                            M + (rng * high))
    normed = norm(s.values)
    c = [colors.rgb2hex(x) for x in plt.cm.get_cmap(cmap)(normed)]
    return ['background-color: {:s}'.format(color) for color in c]

# TPCDS query map

In [None]:
m='''1	q01
    2	q02
    3	q03
    4	q04
    5	q05
    6	q06
    7	q07
    8	q08
    9	q09
    10	q10
    11	q11
    12	q12
    13	q13
    14	q14a
    15	q14b
    16	q15
    17	q16
    18	q17
    19	q18
    20	q19
    21	q20
    22	q21
    23	q22
    24	q23a
    25	q23b
    26	q24a
    27	q24b
    28	q25
    29	q26
    30	q27
    31	q28
    32	q29
    33	q30
    34	q31
    35	q32
    36	q33
    37	q34
    38	q35
    39	q36
    40	q37
    41	q38
    42	q39a
    43	q39b
    44	q40
    45	q41
    46	q42
    47	q43
    48	q44
    49	q45
    50	q46
    51	q47
    52	q48
    53	q49
    54	q50
    55	q51
    56	q52
    57	q53
    58	q54
    59	q55
    60	q56
    61	q57
    62	q58
    63	q59
    64	q60
    65	q61
    66	q62
    67	q63
    68	q64
    69	q65
    70	q66
    71	q67
    72	q68
    73	q69
    74	q70
    75	q71
    76	q72
    77	q73
    78	q74
    79	q75
    80	q76
    81	q77
    82	q78
    83	q79
    84	q80
    85	q81
    86	q82
    87	q83
    88	q84
    89	q85
    90	q86
    91	q87
    92	q88
    93	q89
    94	q90
    95	q91
    96	q92
    97	q93
    98	q94
    99	q95
    100	q96
    101	q97
    102	q98
    103	q99'''.split("\n")
tpcds_query_map=[l.strip().split("\t") for l in m]
tpcds_query_map={int(l[0]):l[1] for l in tpcds_query_map}