In [1]:
# initiate findspark
import findspark
findspark.init()

In [2]:
# import package
from pyspark.sql import SparkSession

In [3]:
# create Spark Session
spark = SparkSession.builder.appName("Data Analysis with Spark").getOrCreate()

In [4]:
spark

In [5]:
# load data to DataFrames
df_util = spark.read.format("json").load("df2.json")

In [6]:
df_util.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
|           0.32|03/05/2019 08:56:14| 

In [7]:
# create temporary view table
df_util.createOrReplaceTempView("utilization")

In [8]:
spark.catalog.listTables()

[Table(name='utilization', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

#### Exploratory Data Analysis with DataFrames

In [9]:
# summary statistic
df_util.describe().show()

+-------+------------------+-------------------+-------------------+------------------+-----------------+
|summary|   cpu_utilization|     event_datetime|        free_memory|         server_id|    session_count|
+-------+------------------+-------------------+-------------------+------------------+-----------------+
|  count|            500000|             500000|             500000|            500000|           500000|
|   mean|0.6205177399999616|               null|0.37912809999999375|             124.5|         69.59616|
| stddev|0.1587517387291305|               null|0.15830931278376148|14.430884120552617|14.85067669635284|
|    min|              0.22|03/05/2019 08:06:14|                0.0|               100|               32|
|    max|               1.0|04/09/2019 01:22:46|               0.78|               149|              105|
+-------+------------------+-------------------+-------------------+------------------+-----------------+



In [10]:
# get correlation between two variables
df_util.stat.corr("cpu_utilization", "free_memory")

-0.47047715730807493

In [11]:
df_util.stat.corr("free_memory", "session_count")

-0.5008320848876711

In [12]:
# get most frequently occuring values
df_util.stat.freqItems(["server_id", "session_count"]).show()

+--------------------+-----------------------+
| server_id_freqItems|session_count_freqItems|
+--------------------+-----------------------+
|[137, 146, 101, 1...|   [92, 101, 83, 104...|
+--------------------+-----------------------+



In [13]:
# sampling data from DataFrame
df_util_sample = df_util.sample(fraction=0.05, withReplacement=False)

In [14]:
df_util_sample.count()

25194

#### Exploratory Data Analysis with Spark SQL

In [15]:
spark.sql("SELECT avg(cpu_utilization), stddev(cpu_utilization), min(cpu_utilization), max(cpu_utilization) FROM utilization").show()

+--------------------+----------------------------+--------------------+--------------------+
|avg(cpu_utilization)|stddev_samp(cpu_utilization)|min(cpu_utilization)|max(cpu_utilization)|
+--------------------+----------------------------+--------------------+--------------------+
|  0.6205177399999616|          0.1587517387291305|                0.22|                 1.0|
+--------------------+----------------------------+--------------------+--------------------+



In [16]:
spark.sql("SELECT server_id, avg(cpu_utilization), stddev(cpu_utilization), min(cpu_utilization), max(cpu_utilization) \
           FROM utilization \
           GROUP BY server_id").show()

+---------+--------------------+----------------------------+--------------------+--------------------+
|server_id|avg(cpu_utilization)|stddev_samp(cpu_utilization)|min(cpu_utilization)|max(cpu_utilization)|
+---------+--------------------+----------------------------+--------------------+--------------------+
|      112|  0.7153870000000067|         0.11528867845082576|                0.52|                0.92|
|      113|  0.7833319999999914|         0.11544345150353694|                0.58|                0.98|
|      130|  0.5519759999999908|         0.11568834774245991|                0.35|                0.75|
|      126|  0.6784700000000012|         0.11542612970702058|                0.48|                0.88|
|      149|  0.7447700000000035|         0.11543517500295467|                0.54|                0.94|
|      110|  0.5537749999999892|         0.11533251724450215|                0.35|                0.75|
|      136|  0.6052509999999953|         0.11597405743182258|   

In [17]:
# calculate bucket / histogram
spark.sql("SELECT server_id, FLOOR(cpu_utilization*100/10) AS bucket FROM utilization").show()

+---------+------+
|server_id|bucket|
+---------+------+
|      100|     5|
|      100|     4|
|      100|     5|
|      100|     5|
|      100|     3|
|      100|     4|
|      100|     5|
|      100|     4|
|      100|     5|
|      100|     5|
|      100|     3|
|      100|     6|
|      100|     6|
|      100|     5|
|      100|     2|
|      100|     4|
|      100|     4|
|      100|     6|
|      100|     4|
|      100|     5|
+---------+------+
only showing top 20 rows



In [18]:
# Calculate number of each bucket
spark.sql("SELECT count(*), FLOOR(cpu_utilization*100/10) AS bucket FROM utilization GROUP BY bucket ORDER BY bucket").show()

+--------+------+
|count(1)|bucket|
+--------+------+
|    8186|     2|
|   37029|     3|
|   68046|     4|
|  104910|     5|
|  116725|     6|
|   88242|     7|
|   56598|     8|
|   20207|     9|
|      57|    10|
+--------+------+



In [19]:
spark.sql("SELECT server_id, min(cpu_utilization), max(cpu_utilization), stddev(cpu_utilization) \
           FROM utilization \
           GROUP BY server_id").show()

+---------+--------------------+--------------------+----------------------------+
|server_id|min(cpu_utilization)|max(cpu_utilization)|stddev_samp(cpu_utilization)|
+---------+--------------------+--------------------+----------------------------+
|      112|                0.52|                0.92|         0.11528867845082576|
|      113|                0.58|                0.98|         0.11544345150353694|
|      130|                0.35|                0.75|         0.11568834774245991|
|      126|                0.48|                0.88|         0.11542612970702058|
|      149|                0.54|                0.94|         0.11543517500295467|
|      110|                0.35|                0.75|         0.11533251724450215|
|      136|                0.41|                 0.8|         0.11597405743182258|
|      144|                0.47|                0.87|         0.11478654960489501|
|      119|                0.22|                0.62|         0.11516031929842008|
|   

In [20]:
# windowing
sql_window = spark.sql("SELECT event_datetime, server_id, cpu_utilization, \
                        avg(cpu_utilization) OVER (PARTITION BY server_id) AS avg_server_util \
                        FROM utilization")

sql_window.show()

+-------------------+---------+---------------+------------------+
|     event_datetime|server_id|cpu_utilization|   avg_server_util|
+-------------------+---------+---------------+------------------+
|03/05/2019 08:06:34|      112|           0.71|0.7153870000000067|
|03/05/2019 08:11:34|      112|           0.78|0.7153870000000067|
|03/05/2019 08:16:34|      112|           0.87|0.7153870000000067|
|03/05/2019 08:21:34|      112|           0.82|0.7153870000000067|
|03/05/2019 08:26:34|      112|           0.62|0.7153870000000067|
|03/05/2019 08:31:34|      112|            0.9|0.7153870000000067|
|03/05/2019 08:36:34|      112|           0.89|0.7153870000000067|
|03/05/2019 08:41:34|      112|           0.81|0.7153870000000067|
|03/05/2019 08:46:34|      112|           0.88|0.7153870000000067|
|03/05/2019 08:51:34|      112|           0.89|0.7153870000000067|
|03/05/2019 08:56:34|      112|           0.84|0.7153870000000067|
|03/05/2019 09:01:34|      112|           0.71|0.7153870000000

Selisih antara cpu_utilzation pada suatu waktu dengan rata-rata cpu_utilization sepanjang waktu untuk masing-masing server_id.

---
Difference between cpu_utilzation at a time and the mean cpu_utilization over time for each server_id.

In [21]:
sql_window1 = spark.sql("SELECT event_datetime, server_id, cpu_utilization, \
                        avg(cpu_utilization) OVER (PARTITION BY server_id) AS avg_server_util, \
                        cpu_utilization - avg(cpu_utilization) OVER (PARTITION BY server_id) AS delta_server_util \
                        FROM utilization")

sql_window1.show()

+-------------------+---------+---------------+------------------+--------------------+
|     event_datetime|server_id|cpu_utilization|   avg_server_util|   delta_server_util|
+-------------------+---------+---------------+------------------+--------------------+
|03/05/2019 08:06:34|      112|           0.71|0.7153870000000067|-0.00538700000000...|
|03/05/2019 08:11:34|      112|           0.78|0.7153870000000067| 0.06461299999999337|
|03/05/2019 08:16:34|      112|           0.87|0.7153870000000067| 0.15461299999999334|
|03/05/2019 08:21:34|      112|           0.82|0.7153870000000067|  0.1046129999999933|
|03/05/2019 08:26:34|      112|           0.62|0.7153870000000067|-0.09538700000000666|
|03/05/2019 08:31:34|      112|            0.9|0.7153870000000067| 0.18461299999999337|
|03/05/2019 08:36:34|      112|           0.89|0.7153870000000067| 0.17461299999999336|
|03/05/2019 08:41:34|      112|           0.81|0.7153870000000067|  0.0946129999999934|
|03/05/2019 08:46:34|      112| 

Membandingkan cpu_utilization pada suatu waktu dengan rata-rata cpu_utilization dari tiga baris satu waktu sebelum, pada saat itu, dan setelahnya.

---
Compares cpu_utilization at a time with the average cpu_utilization of three lines one time before, at that time, and after.

In [22]:
sql_window2 = spark.sql("SELECT event_datetime, server_id, cpu_utilization, \
                         avg(cpu_utilization) OVER (PARTITION BY server_id ORDER BY event_datetime \
                                                    ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS avg_cpu_util \
                         FROM utilization")

In [23]:
sql_window2.show()

+-------------------+---------+---------------+------------------+
|     event_datetime|server_id|cpu_utilization|      avg_cpu_util|
+-------------------+---------+---------------+------------------+
|03/05/2019 08:06:34|      112|           0.71|             0.745|
|03/05/2019 08:11:34|      112|           0.78|0.7866666666666666|
|03/05/2019 08:16:34|      112|           0.87|0.8233333333333333|
|03/05/2019 08:21:34|      112|           0.82|              0.77|
|03/05/2019 08:26:34|      112|           0.62|0.7799999999999999|
|03/05/2019 08:31:34|      112|            0.9|0.8033333333333333|
|03/05/2019 08:36:34|      112|           0.89|0.8666666666666667|
|03/05/2019 08:41:34|      112|           0.81|              0.86|
|03/05/2019 08:46:34|      112|           0.88|              0.86|
|03/05/2019 08:51:34|      112|           0.89|              0.87|
|03/05/2019 08:56:34|      112|           0.84|0.8133333333333334|
|03/05/2019 09:01:34|      112|           0.71|0.7999999999999

#### Basic Machine Learning with DataFrames

##### Clustering

In [24]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [25]:
vectorAssembler = VectorAssembler(inputCols=["cpu_utilization", "free_memory", "session_count"], outputCol="features")

In [26]:
vcluster_df = vectorAssembler.transform(df_util)
vcluster_df.show()

+---------------+-------------------+-----------+---------+-------------+----------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|        features|
+---------------+-------------------+-----------+---------+-------------+----------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|[0.57,0.51,47.0]|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|[0.47,0.62,43.0]|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|[0.56,0.57,62.0]|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|[0.57,0.56,50.0]|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|[0.35,0.46,43.0]|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|[0.41,0.58,48.0]|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|[0.57,0.35,58.0]|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58| [0.41,0.4,58.0]|

In [29]:
kmeans = KMeans().setK(3)

In [30]:
kmeans = kmeans.setSeed(1)

In [31]:
kmeans_model = kmeans.fit(vcluster_df)

In [32]:
kmeans_model.clusterCenters()

[array([ 0.61918113,  0.38080285, 68.75004716]),
 array([ 0.71174897,  0.28808911, 86.87510507]),
 array([ 0.51439668,  0.48445202, 50.49452021])]

In [33]:
predictions = kmeans_model.transform(vcluster_df)

In [34]:
predictions.show()

+---------------+-------------------+-----------+---------+-------------+----------------+----------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|        features|prediction|
+---------------+-------------------+-----------+---------+-------------+----------------+----------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|[0.57,0.51,47.0]|         2|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|[0.47,0.62,43.0]|         2|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|[0.56,0.57,62.0]|         0|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|[0.57,0.56,50.0]|         2|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|[0.35,0.46,43.0]|         2|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|[0.41,0.58,48.0]|         2|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|[0.57,0.3

##### Regression

In [35]:
from pyspark.ml.regression import LinearRegression

In [38]:
va = VectorAssembler(inputCols=["cpu_utilization"], outputCol="features")

In [39]:
vreg_df = va.transform(df_util)

In [40]:
vreg_df.show()

+---------------+-------------------+-----------+---------+-------------+--------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|features|
+---------------+-------------------+-----------+---------+-------------+--------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|  [0.57]|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|  [0.47]|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|  [0.56]|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|  [0.57]|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|  [0.35]|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|  [0.41]|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|  [0.57]|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|  [0.41]|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|  [0.53]|
|   

In [44]:
lr = LinearRegression(featuresCol="features", labelCol="session_count")

In [45]:
lrModel = lr.fit(vreg_df)

In [48]:
lrModel.coefficients

DenseVector([47.024])

In [49]:
lrModel.intercept

40.41695103548856

In [50]:
lrModel.summary.rootMeanSquaredError

12.83799022593107