<h2>Análise de traços de execução</h2>

<h3>Installs de bibliotecas necessárias</h3>

In [1]:
%pip install findspark pyspark_dist_explore pyspark

Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'c:\Users\322010\AppData\Local\Programs\Python\Python310\python.exe -m pip install --upgrade pip' command.


<h4>Primeiro utilizamos o findspark para que o ambiente saiba onde o spark está localizado.</h4>

In [2]:
import findspark

In [3]:
findspark.init()

<h4>Imports(têm de ser feitos após o init do findspark)</h4>

In [28]:
import pyspark
from pyspark_dist_explore import hist
import time
import matplotlib.pyplot as plt
from pyspark.sql.functions import col
import math
import sys,os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType,StructField, LongType, IntegerType,FloatType

<h4>Inicializamos uma seção Spark, ou pegamos a que está atualmente em execução</h4>

In [5]:
spark=SparkSession.builder.appName("Sessao").getOrCreate()

In [6]:
sc = spark.sparkContext

<h4>Definimos um schema para o RDD</h4>

In [21]:
instance_events_schema = StructType([ \
    StructField("time",LongType(),True), \
    StructField("type",IntegerType(),True), \
    StructField("collection_id",IntegerType(),False), \
    StructField("priority", IntegerType(), True), \
    StructField("instance_index", IntegerType(), False), \
    StructField("cpu_resource_request", FloatType(), True), \
    StructField("memory_resource_request", FloatType(), True) \
  ])

<h4>Carregamos um arquivo CSV em um RDD(sem cabeçalho e com o schema definido)</h4>

In [22]:
rddCE = spark.read.option("header","true").schema(instance_events_schema).csv("instance_events/instance_events-000000000000.csv")

<h4>Média dos requerimentos de utilização de memória(por tipo de "coisa")</h4>

<p>Ao persistir o RDD, operações subsequentes reutilizarão os dados relativos ao RDD em operações que o envolvam, diminuindo drasticamente o tempo de execução das mesmas</p>

In [23]:
rddCE.persist()

DataFrame[time: bigint, type: int, collection_id: int, priority: int, instance_index: int, cpu_resource_request: float, memory_resource_request: float]

<p>Essa é executada sem cache</p>

In [10]:
rddCE.orderBy('type').groupBy('type').agg({"`memory_resource_request`":'avg'}).show()

+----+----------------------------+
|type|avg(memory_resource_request)|
+----+----------------------------+
|   0|        0.003215060709829252|
|   1|         0.01933770519928046|
|   2|        0.003345685608592...|
|   3|        0.003335462585395...|
|   4|        0.003473938670384...|
|   5|        0.020445465992991966|
|   6|        0.004764337917852076|
|   7|        0.002935408097730315|
|   8|        0.005776905272776365|
|   9|        0.003988534368734...|
|  10|        0.008756889907134162|
+----+----------------------------+



<p>Essa é executada COM cache</p>

In [11]:
rddCE.orderBy('type').groupBy('type').agg({"`cpu_resource_request`":'avg'}).show()

+----+-------------------------+
|type|avg(cpu_resource_request)|
+----+-------------------------+
|   0|     0.009525861089444346|
|   1|     0.009540171341634771|
|   2|     0.009649705901209166|
|   3|     0.009805624388920475|
|   4|     0.009645259130290452|
|   5|     0.008741734786721873|
|   6|     0.014509926217036775|
|   7|     0.009677616500338168|
|   8|     0.018229468438387228|
|   9|     0.013241837398293013|
|  10|     0.011053578572176771|
+----+-------------------------+



<h4>filtra os eventos de tasks com tipo=3(submissão), com valores de tempo dentro do intervalo observado no traço (0<t<MAXINT>>)</h4>

In [24]:
rddCE = rddCE.filter((rddCE.time.isNotNull()) & (rddCE.type==3) & (rddCE.time>0) & (rddCE.time<sys.maxsize))

Converte microssegundo em hora:

In [13]:
def microToHour(x):
    return math.floor(x/3.6e+9)

Define o objeto udf, que pode ser utilizado pra aplicar a função microToHour no RDD

In [14]:
udf_hours = udf(lambda x:microToHour(x),IntegerType())

cria-se nova coluna com o tempo em horas e adiciona-a ao RDD

In [15]:
rddWithH = rddCE.withColumn("hour",udf_hours(col("time")))

Acha as máximas e mínimas das horas

In [25]:
maxTime = rddCE.agg({'time':'max'}).collect()[0][0]
minTime = rddCE.agg({'time':'min'}).collect()[0][0]
maxTime,minTime

(604799625804, 600777673)

In [27]:
rddCE.agg({'time':'count'}).collect()[0][0]/(microToHour(maxTime)-microToHour(minTime))

11904.419161676647

Encontra a quantidade de tasks submetidas por hora, ao iterar por todos os arquivos

In [34]:
minTime = sys.maxsize
maxTime = 0
type3Count = 0
for filename in os.listdir("instance_events"):
    f = os.path.join("instance_events",filename)
    rddCE = spark.read.option("header","true").schema(instance_events_schema).csv(f)
    rddCE.persist()
    rddCE = rddCE.filter((rddCE.time.isNotNull()) & (rddCE.type==3) & (rddCE.time>0) & (rddCE.time<sys.maxsize))
    type3Count = type3Count+rddCE.agg({'time':'count'}).collect()[0][0]
    if rddCE.agg({'time':'count'}).collect()[0][0]>0:
        maxTime = max(maxTime,rddCE.agg({'time':'max'}).collect()[0][0])
        minTime = min(minTime,rddCE.agg({'time':'min'}).collect()[0][0])
    
minTime,maxTime,type3Count/(microToHour(maxTime)-microToHour(minTime))

(600001531, 604799934699, 429141.46706586826)

In [33]:
rddCE.agg({'time':'count'}).collect()[0][0]

0