In [1]:
from datetime import datetime, timedelta, time, date
from pyspark.sql import functions as f

In [2]:
class HoursOfTheDay:
    def __init__(self, date):
        self.datetime = datetime.combine(date, time.min)
        self.hour = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.hour > 23:
            raise StopIteration
        else:
            fromm = self.datetime + timedelta(hours=self.hour)
            to = self.datetime + timedelta(hours=self.hour+1) - timedelta(microseconds=1)
            self.hour += 1
            return int(fromm.timestamp()*1000000), int(to.timestamp()*1000000)

In [3]:
def loadSpans(from_, to):
    return (spark.read.format("es")
                .option("es.resource", "zipkin*")
                .load()
                .select('traceId',
                        f.concat_ws('_', *['localEndpoint.serviceName', 'name']).alias('endpoint'),
                        'duration',
                        'id',
                        'kind',
                        'timestamp',
                        'parentId')
                .filter(f.col('timestamp').between(from_,to)))


In [4]:
def filterServerSpans(spans):
    return (spans.filter(spans.kind=='SERVER')
                 .drop('parentId','kind'))

In [5]:
def filterClientSpans(spans):
    return (spans.filter(spans.kind=='CLIENT')
                 .drop('kind'))

In [6]:
def createClientsDuration(serverSpans, clientSpans):
    return (clientSpans.groupBy('parentId')
                       .agg(f.sum('duration').alias('clients_duration')))

In [7]:
def craeteServerSpansWithClientsDuration(spans):
    serverSpans = filterServerSpans(spans)
    clientSpans = filterClientSpans(spans)
    clientsDuration = createClientsDuration(serverSpans, clientSpans)
    return (serverSpans.join(clientsDuration,
                            serverSpans.id == clientsDuration.parentId,
                                 'left_outer')
                           .drop('parentId')
                           .na.fill(0))

In [8]:
def createAvgDurPerTraceEndpointPairs(serverSpansWithSelfDuration):
    return  (serverSpansWithSelfDuration
                        .groupBy('traceId','endpoint')
                        .agg(f.avg('duration').alias('avg_duration'),
                             f.avg('self_duration').alias('avg_self_duration')))

In [10]:
def createEndpointTraces(spans):
    serverSpansWithClientsDuration = craeteServerSpansWithClientsDuration(spans)
    serverSpansWithSelfDuration = (serverSpansWithClientsDuration
                                      .withColumn('self_duration',
                                                  f.col('duration') - f.col('clients_duration')))
    avgDurPerTraceEndpoint = createAvgDurPerTraceEndpointPairs(serverSpansWithSelfDuration)
    return (avgDurPerTraceEndpoint
                        .groupBy('traceId')
                        .pivot('endpoint')
                        .agg(f.first('avg_self_duration').alias('avg_self_dur'),
                             f.first('avg_duration').alias('avg_dur'))
                        .dropna())
                        

In [2]:
day = date.today()

In [19]:
for from_, to in HoursOfTheDay(day):
    spans = loadSpans(from_, to)
    endpointTraces = createEndpointTraces(spans)
    endpointTraces.write.parquet('data/%d.parquet'% from_)