In [1]:
msg = "Hello World"
print(msg)

Hello World


In [2]:
# configure spark variables
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
    
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

# load up other dependencies
import re
import pandas as pd

import glob

raw_data_files = glob.glob('../ansible/results/bpftrace/server1-1200m-1.out')
raw_data_files

base_df = spark.read.text(raw_data_files)
base_df.printSchema()

base_df.show(10, truncate=False)

21/09/09 19:55:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


root
 |-- value: string (nullable = true)





+-------------------------------------------------------------+
|value                                                        |
+-------------------------------------------------------------+
|Attaching 12 probes...                                       |
|Tracing latency of network stack funtions. Hit Ctrl-C to end.|
|@q1[openssl]: count 140, average 1405, total 196805          |
|@q1[ksoftirqd/169]: count 3, average 3099, total 9297        |
|                                                             |
|@q2[ksoftirqd/169]: count 25, average 863, total 21583       |
|@q2[openssl]: count 870, average 6923, total 6023733         |
|                                                             |
|                                                             |
|@q4[openssl]: count 9809, average 955, total 9369210         |
+-------------------------------------------------------------+
only showing top 10 rows





In [3]:
sample_logs = [item['value'] for item in base_df.take(15)]
sample_logs

['Attaching 12 probes...',
 'Tracing latency of network stack funtions. Hit Ctrl-C to end.',
 '@q1[openssl]: count 140, average 1405, total 196805',
 '@q1[ksoftirqd/169]: count 3, average 3099, total 9297',
 '',
 '@q2[ksoftirqd/169]: count 25, average 863, total 21583',
 '@q2[openssl]: count 870, average 6923, total 6023733',
 '',
 '',
 '@q4[openssl]: count 9809, average 955, total 9369210',
 '@q4[ksoftirqd/169]: count 115, average 969, total 111548',
 '',
 '@q5[openssl]: count 74695, average 432, total 32294636',
 '@q5[ksoftirqd/169]: count 811, average 441, total 357987',
 '']

In [4]:
bpftrace_ps_pattern = r'^@(q\d)\[([\d\D]*)\]\D*(\d+)\D*(\d+)\D*(\d+)([\d\D]*)'
ps = [re.search(bpftrace_ps_pattern, item).groups()
           if re.search(bpftrace_ps_pattern, item)
           else None
           for item in sample_logs]
ps

[None,
 None,
 ('q1', 'openssl', '140', '1405', '196805', ''),
 ('q1', 'ksoftirqd/169', '3', '3099', '9297', ''),
 None,
 ('q2', 'ksoftirqd/169', '25', '863', '21583', ''),
 ('q2', 'openssl', '870', '6923', '6023733', ''),
 None,
 None,
 ('q4', 'openssl', '9809', '955', '9369210', ''),
 ('q4', 'ksoftirqd/169', '115', '969', '111548', ''),
 None,
 ('q5', 'openssl', '74695', '432', '32294636', ''),
 ('q5', 'ksoftirqd/169', '811', '441', '357987', ''),
 None]

In [5]:
bpftrace_cs_pattern = r'^@(uc)\D*(\d+)\D*(\d+)\D*(\d+)([\d\D]*)'
cs = [re.search(bpftrace_cs_pattern, item).groups()
           if re.search(bpftrace_cs_pattern, item)
           else None
           for item in sample_logs]
cs

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

In [6]:
from pyspark.sql.functions import regexp_extract

bpftrace_cs_df = base_df.select(
                                regexp_extract('value', bpftrace_cs_pattern, 2).alias('cs_count'),
                                regexp_extract('value', bpftrace_cs_pattern, 3).alias('cs_average'),
                                regexp_extract('value', bpftrace_cs_pattern, 4).alias('cs_total'),
).filter(' cs_count != ""')

bpftrace_cs_df.show(15)

+--------+----------+--------+
|cs_count|cs_average|cs_total|
+--------+----------+--------+
|    2078|     33403|69412916|
|    2367|      7694|18213407|
|    2001|      6394|12795386|
|    2038|      6929|14122903|
|    2181|      7305|15932257|
|    2273|      6571|14936365|
|    1965|      7080|13912777|
|    2037|      6759|13769099|
|    2119|      6046|12811959|
|    2236|      6316|14124644|
|    2553|     19738|50392032|
|    2032|      7363|14963412|
|    2159|      6375|13764149|
|    2297|      6944|15951080|
|    1988|      6285|12495677|
+--------+----------+--------+
only showing top 15 rows



In [7]:
print(bpftrace_cs_df.toPandas().to_numpy()[0])

['2078' '33403' '69412916']


In [8]:
from pyspark.sql.functions import regexp_extract

bpftrace_logs_df = base_df.select(
                                regexp_extract('value', bpftrace_ps_pattern, 1).alias('level'),
                                regexp_extract('value', bpftrace_ps_pattern, 2).alias('program'),
                                regexp_extract('value', bpftrace_ps_pattern, 3).alias('pkt_count'),
                                regexp_extract('value', bpftrace_ps_pattern, 4).alias('pkt_average'),
                                regexp_extract('value', bpftrace_ps_pattern, 5).alias('pkt_total')
                                ).filter(' level != "" ')
bpftrace_logs_df.show(15, truncate=True)

+-----+-------------+---------+-----------+---------+
|level|      program|pkt_count|pkt_average|pkt_total|
+-----+-------------+---------+-----------+---------+
|   q1|      openssl|      140|       1405|   196805|
|   q1|ksoftirqd/169|        3|       3099|     9297|
|   q2|ksoftirqd/169|       25|        863|    21583|
|   q2|      openssl|      870|       6923|  6023733|
|   q4|      openssl|     9809|        955|  9369210|
|   q4|ksoftirqd/169|      115|        969|   111548|
|   q5|      openssl|    74695|        432| 32294636|
|   q5|ksoftirqd/169|      811|        441|   357987|
|   q1|      openssl|      293|       1186|   347711|
|   q1|ksoftirqd/169|        8|       2185|    17483|
|   q2|      openssl|     1564|        616|   964075|
|   q2|ksoftirqd/169|       43|        777|    33448|
|   q4|      openssl|    13323|        950| 12666103|
|   q4|ksoftirqd/169|       88|        989|    87061|
|   q5|      openssl|   101414|        430| 43703385|
+-----+-------------+-------

In [9]:
from pyspark.sql.functions import regexp_extract

bpftrace_cs_df = base_df.select(
                                regexp_extract('value', bpftrace_cs_pattern, 2).alias('cs_count'),
                                regexp_extract('value', bpftrace_cs_pattern, 3).alias('cs_average'),
                                regexp_extract('value', bpftrace_cs_pattern, 4).alias('cs_total'),
).filter(' cs_count != ""')

bpftrace_cs_df.show(15)

+--------+----------+--------+
|cs_count|cs_average|cs_total|
+--------+----------+--------+
|    2078|     33403|69412916|
|    2367|      7694|18213407|
|    2001|      6394|12795386|
|    2038|      6929|14122903|
|    2181|      7305|15932257|
|    2273|      6571|14936365|
|    1965|      7080|13912777|
|    2037|      6759|13769099|
|    2119|      6046|12811959|
|    2236|      6316|14124644|
|    2553|     19738|50392032|
|    2032|      7363|14963412|
|    2159|      6375|13764149|
|    2297|      6944|15951080|
|    1988|      6285|12495677|
+--------+----------+--------+
only showing top 15 rows



In [10]:
cs_array = bpftrace_cs_df.toPandas().to_numpy()[0]
print(cs_array)

['2078' '33403' '69412916']


In [11]:
from pyspark.sql.functions import lit

bpftrace_res_df = bpftrace_logs_df.withColumn("cs_count", lit(cs_array[0])
                    ).withColumn("cs_average", lit(cs_array[1])
                    ).withColumn("cs_total", lit(cs_array[2]))
bpftrace_res_df.show(15, truncate=True)

+-----+-------------+---------+-----------+---------+--------+----------+--------+
|level|      program|pkt_count|pkt_average|pkt_total|cs_count|cs_average|cs_total|
+-----+-------------+---------+-----------+---------+--------+----------+--------+
|   q1|      openssl|      140|       1405|   196805|    2078|     33403|69412916|
|   q1|ksoftirqd/169|        3|       3099|     9297|    2078|     33403|69412916|
|   q2|ksoftirqd/169|       25|        863|    21583|    2078|     33403|69412916|
|   q2|      openssl|      870|       6923|  6023733|    2078|     33403|69412916|
|   q4|      openssl|     9809|        955|  9369210|    2078|     33403|69412916|
|   q4|ksoftirqd/169|      115|        969|   111548|    2078|     33403|69412916|
|   q5|      openssl|    74695|        432| 32294636|    2078|     33403|69412916|
|   q5|ksoftirqd/169|      811|        441|   357987|    2078|     33403|69412916|
|   q1|      openssl|      293|       1186|   347711|    2078|     33403|69412916|
|   

In [12]:
raw_data_files = glob.glob('../ansible/results/iperf/server1-1200m-1.out')
raw_data_files

base_df = spark.read.text(raw_data_files)
base_df.printSchema()

base_df.show(10, truncate=False)

root
 |-- value: string (nullable = true)

+-----+
|value|
+-----+
+-----+



21/09/09 19:55:54 WARN DataSource: All paths were ignored:
  


In [13]:
sample_logs = [item['value'] for item in base_df.take(15)]
sample_logs

iperf_s_pattern = r'[\d\D]*-([\d.]*)\D*([\d.]*)\D*([\d.]*)\D*([\d.]*)\D*([\d/]*)\D*([\d.]*)([\d\D]*)'
cs = [re.search(iperf_s_pattern, item).groups()
           if re.search(iperf_s_pattern, item)
           else None
           for item in sample_logs]
cs

[]

In [14]:
raw_data_files = glob.glob('../ansible/results/openssl/server1-1200m-1.out')
raw_data_files

base_df = spark.read.text(raw_data_files)
base_df.printSchema()

base_df.show(10, truncate=False)

21/09/09 19:55:54 WARN DataSource: All paths were ignored:
  


root
 |-- value: string (nullable = true)

+-----+
|value|
+-----+
+-----+



In [15]:
sample_logs = [item['value'] for item in base_df.take(15)]
sample_logs

openssl_s_pattern = r'\D*([\d]*)\D*([\d]*)\D*([\d]*)\D*([\d]*)\D*([\d]*)\D*([\d.]*)([\d\D]*)'
cs = [re.search(openssl_s_pattern, item).groups()
           if re.search(openssl_s_pattern, item)
           else None
           for item in sample_logs]
cs

openssl_logs_df = base_df.select(
                                regexp_extract('value', openssl_s_pattern, 6).alias('us_time')
                                ).filter(' us_time != "" ')

openssl_logs_df.show(5)
cs_array = openssl_logs_df.toPandas().to_numpy()

us_time = 0.0
for a in cs_array:
    us_time += float(a[0])

us_time

+-------+
|us_time|
+-------+
+-------+



0.0

In [16]:
from pyspark.sql import DataFrame
from pyspark.sql.types import *
from functools import reduce

# schema = StructType([])
# results_df = sqlContext.createDataFrame(sc.emptyRDD(), schema)

bpftrace_ps_pattern = r'^@(q\d)\[([\d\D]*)\]\D*(\d+)\D*(\d+)\D*(\d+)([\d\D]*)'
bpftrace_cs_pattern = r'^@(uc)\D*(\d+)\D*(\d+)\D*(\d+)([\d\D]*)'
iperf_s_pattern = r'[\d\D]*-([\d.]*)\D*([\d.]*)\D*([\d.]*)\D*([\d.]*)\D*([\d/]*)\D*([\d.]*)([\d\D]*)'
openssl_s_pattern = r'\D*([\d]*)\D*([\d]*)\D*([\d]*)\D*([\d]*)\D*([\d]*)\D*([\d.]*)([\d\D]*)'


def readFiletoDF(path) -> DataFrame:
    raw_data_files = glob.glob(path)
    raw_data_files
    return spark.read.text(raw_data_files)

def readBpftracePSLogstoDF(df: DataFrame) -> DataFrame:
    bpftrace_logs_df = df.select(
                                regexp_extract('value', bpftrace_ps_pattern, 1).alias('level'),
                                regexp_extract('value', bpftrace_ps_pattern, 2).alias('program'),
                                regexp_extract('value', bpftrace_ps_pattern, 3).alias('pkt_count'),
                                regexp_extract('value', bpftrace_ps_pattern, 4).alias('pkt_average'),
                                regexp_extract('value', bpftrace_ps_pattern, 5).alias('pkt_total')
                                ).filter(' level != "" ')
    return bpftrace_logs_df

def concateBpftraceCSLogstoDF(df: DataFrame, ps_df: DataFrame) -> DataFrame:
    bpftrace_cs_df = base_df.select(
                                regexp_extract('value', bpftrace_cs_pattern, 2).alias('cs_count'),
                                regexp_extract('value', bpftrace_cs_pattern, 3).alias('cs_average'),
                                regexp_extract('value', bpftrace_cs_pattern, 4).alias('cs_total'),
                                ).filter(' cs_count != ""')

    cs_array = bpftrace_cs_df.toPandas().to_numpy()[0]

    bpftrace_res_df = ps_df.withColumn("cs_count", lit(cs_array[0])
                    ).withColumn("cs_average", lit(cs_array[1])
                    ).withColumn("cs_total", lit(cs_array[2]))
    
    return bpftrace_res_df

def concateIperfLogsToDF(df: DataFrame, in_df: DataFrame) -> DataFrame:
    iperf_logs_df = df.select(
                                regexp_extract('value', iperf_s_pattern, 1).alias('intval'),
                                regexp_extract('value', iperf_s_pattern, 2).alias('transfer'),
                                regexp_extract('value', iperf_s_pattern, 3).alias('rx_bandwidth'),
                                regexp_extract('value', iperf_s_pattern, 4).alias('jitter'),
                                regexp_extract('value', iperf_s_pattern, 6).alias('pkt_loss')
                                ).filter(' intval != "" ')
    
    cs_array = iperf_logs_df.toPandas().to_numpy()[0]

    iperf_res_df = in_df.withColumn("duration", lit(cs_array[0])
                    ).withColumn("rx_gbytes", lit(cs_array[1])
                    ).withColumn("rx_bandwidth", lit(cs_array[2])
                    ).withColumn("jitter", lit(cs_array[3])
                    ).withColumn("pkt_loss", lit(cs_array[4]))

    iperf_res_df.show(5)
    return iperf_res_df

def concateOpensslLogsToDF(df: DataFrame, in_df: DataFrame) -> DataFrame:
    openssl_logs_df = df.select(
                                regexp_extract('value', openssl_s_pattern, 6).alias('us_time')
                                ).filter(' us_time != "" ')
    
    cs_array = openssl_logs_df.toPandas().to_numpy()

    us_time = 0.0
    for a in cs_array:
        us_time += float(a[0])

    us_time

    openssl_res_df = in_df.withColumn("us_time", lit(us_time))

    return openssl_res_df

df_list = []

for x in range(1, 3):
    path = '../ansible/results/bpftrace/server1-{}m-{}.out'.format(1000 + 200*x, x)
    base_df = readFiletoDF(path)
    # base_df.show(5, truncate=False)
    ps_df = readBpftracePSLogstoDF(base_df)
    ps_cs_df = concateBpftraceCSLogstoDF(base_df, ps_df)
    df_1 = ps_cs_df.withColumn("tx_bandwidth", lit(1000 + 100*x))

    path = '../ansible/results/iperf/server1-{}m-{}.out'.format(1000 + 200*x, x)
    base_df = readFiletoDF(path)
    # base_df.show(5, truncate=False)
    df_2 = concateIperfLogsToDF(base_df, df_1)

    path = '../ansible/results/openssl/server1-{}m-{}.out'.format(1000 + 200*x, x)
    base_df = readFiletoDF(path)
    # base_df.show(5, truncate=False)
    df_3 = concateOpensslLogsToDF(base_df, df_2)

    df_list.append(df_3)

results_df = reduce(lambda x, y: x.union(y), df_list)
# results_df.show(5, truncate=False)
results_df.printSchema()


+-----+-------------+---------+-----------+---------+--------+----------+--------+------------+--------+---------+------------+------+--------+
|level|      program|pkt_count|pkt_average|pkt_total|cs_count|cs_average|cs_total|tx_bandwidth|duration|rx_gbytes|rx_bandwidth|jitter|pkt_loss|
+-----+-------------+---------+-----------+---------+--------+----------+--------+------------+--------+---------+------------+------+--------+
|   q1|      openssl|      140|       1405|   196805|    2078|     33403|69412916|        1100|    59.9|     8.37|        1.20| 0.012|    0.16|
|   q1|ksoftirqd/169|        3|       3099|     9297|    2078|     33403|69412916|        1100|    59.9|     8.37|        1.20| 0.012|    0.16|
|   q2|ksoftirqd/169|       25|        863|    21583|    2078|     33403|69412916|        1100|    59.9|     8.37|        1.20| 0.012|    0.16|
|   q2|      openssl|      870|       6923|  6023733|    2078|     33403|69412916|        1100|    59.9|     8.37|        1.20| 0.012|  

In [17]:
from pyspark.sql import SparkSession

results_df.show(5)
results_df.createOrReplaceTempView("LOGS")


df2=spark.sql("select level, sum(pkt_count) as count, avg(rx_gbytes) as rx_bytes from LOGS group by level, tx_bandwidth")
df2.show(15, truncate=True)

+-----+-------------+---------+-----------+---------+--------+----------+--------+------------+--------+---------+------------+------+--------+------------------+
|level|      program|pkt_count|pkt_average|pkt_total|cs_count|cs_average|cs_total|tx_bandwidth|duration|rx_gbytes|rx_bandwidth|jitter|pkt_loss|           us_time|
+-----+-------------+---------+-----------+---------+--------+----------+--------+------------+--------+---------+------------+------+--------+------------------+
|   q1|      openssl|      140|       1405|   196805|    2078|     33403|69412916|        1100|    59.9|     8.37|        1.20| 0.012|    0.16|47.910000000000004|
|   q1|ksoftirqd/169|        3|       3099|     9297|    2078|     33403|69412916|        1100|    59.9|     8.37|        1.20| 0.012|    0.16|47.910000000000004|
|   q2|ksoftirqd/169|       25|        863|    21583|    2078|     33403|69412916|        1100|    59.9|     8.37|        1.20| 0.012|    0.16|47.910000000000004|
|   q2|      openssl| 



+-----+---------+-----------------+
|level|    count|         rx_bytes|
+-----+---------+-----------------+
|   q5|5891511.0|8.369999999999992|
|   q2|  38620.0|9.759999999999993|
|   q1|  20221.0|9.759999999999993|
|   q4| 891927.0|9.759999999999993|
|   q5|6991567.0|9.759999999999993|
|   q4| 787347.0|8.369999999999992|
|   q2|  92641.0|8.369999999999992|
|   q1|  17340.0|8.370000000000003|
+-----+---------+-----------------+



