In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
data_path = '/Users/natha/Desktop/bootcamp_repo-1/LinkedIn_Spark_SQL_DataFrames/Exercise Files/Data'
file_path_no_header = data_path + "/utilization.csv"
df_util = spark.read.format("csv").option("header", "false").option("inferSchema","true").load(file_path_no_header)

df_util = df_util.withColumnRenamed("_c0", "event_datetime") \
        .withColumnRenamed ("_c1", "server_id")       \
        .withColumnRenamed("_c2", "cpu_utilization")  \
        .withColumnRenamed("_c3", "free_memory")      \
        .withColumnRenamed("_c4", "session_count")

In [4]:
df_util.createOrReplaceTempView("utilization")

In [5]:
spark.sql('SELECT server_id, min(cpu_utilization), max(cpu_utilization), stddev(cpu_utilization) \
           FROM utilization \
           GROUP BY server_id').show()

+---------+--------------------+--------------------+-----------------------+
|server_id|min(cpu_utilization)|max(cpu_utilization)|stddev(cpu_utilization)|
+---------+--------------------+--------------------+-----------------------+
|      108|                0.55|                0.95|    0.11563100171171926|
|      101|                 0.6|                 1.0|    0.11651726263197697|
|      103|                0.56|                0.96|    0.11617507884178278|
|      111|                0.36|                0.76|    0.11530221569464483|
|      107|                0.45|                0.85|    0.11597417369783877|
|      100|                0.27|                0.67|     0.1152264191787964|
|      102|                0.56|                0.96|    0.11549678751286807|
|      109|                0.36|                0.76|    0.11574898623219722|
|      105|                0.29|                0.69|    0.11510721467869486|
|      110|                0.35|                0.75|    0.11533

In [6]:
sql_window = "SELECT event_datetime, server_id, cpu_utilization,  \
         avg(cpu_utilization) OVER (PARTITION BY server_id) avg_server_util \
FROM  \
      utilization"

In [7]:
spark.sql(sql_window).show()

+-------------------+---------+---------------+------------------+
|     event_datetime|server_id|cpu_utilization|   avg_server_util|
+-------------------+---------+---------------+------------------+
|03/05/2019 08:06:16|      101|           0.86|0.7985559999999872|
|03/05/2019 08:11:16|      101|           0.71|0.7985559999999872|
|03/05/2019 08:16:16|      101|           0.77|0.7985559999999872|
|03/05/2019 08:21:16|      101|           0.88|0.7985559999999872|
|03/05/2019 08:26:16|      101|           0.95|0.7985559999999872|
|03/05/2019 08:31:16|      101|           0.88|0.7985559999999872|
|03/05/2019 08:36:16|      101|           0.65|0.7985559999999872|
|03/05/2019 08:41:16|      101|           0.76|0.7985559999999872|
|03/05/2019 08:46:16|      101|           0.76|0.7985559999999872|
|03/05/2019 08:51:16|      101|           0.75|0.7985559999999872|
|03/05/2019 08:56:16|      101|           0.68|0.7985559999999872|
|03/05/2019 09:01:16|      101|           0.61|0.7985559999999

In [8]:
sql_window2 = "SELECT event_datetime, server_id, cpu_utilization,  \
         avg(cpu_utilization) OVER (PARTITION BY server_id) avg_server_util, \
         cpu_utilization - avg(cpu_utilization) OVER (PARTITION BY server_id) delta_server_util \
         FROM utilization"

In [9]:
spark.sql(sql_window2).show()

+-------------------+---------+---------------+------------------+--------------------+
|     event_datetime|server_id|cpu_utilization|   avg_server_util|   delta_server_util|
+-------------------+---------+---------------+------------------+--------------------+
|03/05/2019 08:06:16|      101|           0.86|0.7985559999999872| 0.06144400000001282|
|03/05/2019 08:11:16|      101|           0.71|0.7985559999999872| -0.0885559999999872|
|03/05/2019 08:16:16|      101|           0.77|0.7985559999999872|-0.02855599999998...|
|03/05/2019 08:21:16|      101|           0.88|0.7985559999999872| 0.08144400000001284|
|03/05/2019 08:26:16|      101|           0.95|0.7985559999999872|  0.1514440000000128|
|03/05/2019 08:31:16|      101|           0.88|0.7985559999999872| 0.08144400000001284|
|03/05/2019 08:36:16|      101|           0.65|0.7985559999999872|-0.14855599999998714|
|03/05/2019 08:41:16|      101|           0.76|0.7985559999999872|-0.03855599999998...|
|03/05/2019 08:46:16|      101| 

In [12]:
sql_window3 = "SELECT event_datetime, server_id, cpu_utilization,  \
                      avg(cpu_utilization) OVER (PARTITION BY server_id ORDER BY event_datetime \
                                    ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) rolling_avg_server_util \
                FROM  \
                      utilization"


In [13]:
spark.sql(sql_window3).show()

+-------------------+---------+---------------+-----------------------+
|     event_datetime|server_id|cpu_utilization|rolling_avg_server_util|
+-------------------+---------+---------------+-----------------------+
|03/05/2019 08:06:16|      101|           0.86|     0.7849999999999999|
|03/05/2019 08:11:16|      101|           0.71|     0.7799999999999999|
|03/05/2019 08:16:16|      101|           0.77|     0.7866666666666666|
|03/05/2019 08:21:16|      101|           0.88|     0.8666666666666666|
|03/05/2019 08:26:16|      101|           0.95|     0.9033333333333333|
|03/05/2019 08:31:16|      101|           0.88|     0.8266666666666667|
|03/05/2019 08:36:16|      101|           0.65|     0.7633333333333333|
|03/05/2019 08:41:16|      101|           0.76|     0.7233333333333333|
|03/05/2019 08:46:16|      101|           0.76|     0.7566666666666667|
|03/05/2019 08:51:16|      101|           0.75|                   0.73|
|03/05/2019 08:56:16|      101|           0.68|                 

In [14]:
(0.71+0.78)/2

0.745

In [15]:
(0.71+0.78+0.87)/3

0.7866666666666666