In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType
import header as hdr

spark = SparkSession.builder.master("local[*]")\
        .appName("SparkByExamples.com")\
        .getOrCreate()

21/11/19 22:29:15 WARN Utils: Your hostname, pasokon resolves to a loopback address: 127.0.1.1; using 192.168.1.96 instead (on interface wlo1)
21/11/19 22:29:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/11/19 22:29:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
def get_dataframe(table_name):
    header = hdr.get_table_header(table_name)
    schema = StructType()

    for h in header:
        schema.add(h, StringType(), True)

    return spark.read.option('delimiter', ',').format("csv")\
            .schema(schema).load(f"{table_name}.csv").fillna('NA')

In [3]:
df = get_dataframe('machine_events')
df.printSchema()

root
 |-- time: string (nullable = false)
 |-- machine_id: string (nullable = false)
 |-- event_type: string (nullable = false)
 |-- platform_id: string (nullable = false)
 |-- cpus: string (nullable = false)
 |-- memory: string (nullable = false)



In [4]:
df.groupBy('cpus').count().show()



+----+-----+
|cpus|count|
+----+-----+
|0.25|  510|
|  NA|   32|
| 0.5|35015|
|   1| 2223|
+----+-----+





In [5]:
task_events = get_dataframe('task_events')
task_events.printSchema()

root
 |-- time: string (nullable = false)
 |-- missing_info: string (nullable = false)
 |-- job_id: string (nullable = false)
 |-- task_index: string (nullable = false)
 |-- machine_id: string (nullable = false)
 |-- event_type: string (nullable = false)
 |-- user: string (nullable = false)
 |-- scheduling_class: string (nullable = false)
 |-- priority: string (nullable = false)
 |-- cpu_request: string (nullable = false)
 |-- memory_request: string (nullable = false)
 |-- disk_space_request: string (nullable = false)
 |-- different_machines_restriction: string (nullable = false)



In [17]:
import pyspark.sql.functions as F

task_per_event = task_events.groupBy('job_id').count()
task_per_event.select(
    F.round(F.mean('count'),2).alias('mean'), F.round(F.stddev('count'), 2).alias('std'),
    F.min('count').alias('min'), F.max('count').alias('max')
).show()



+-----+------+---+-----+
| mean|   std|min|  max|
+-----+------+---+-----+
|91.96|844.02|  1|46037|
+-----+------+---+-----+



