<img src="images/cads-logo.png" style="height: 100px;" align=left> <img src="images/apache_spark.png" style="height: 20%;width:20%" align=right>

# Apache Spark DataFrames and Spark SQL

Apache Spark is a platform for distributed data processing, and it is particularly well-suited for dealing with massive data sets. The data sets that they do not readily fit within the memory or capacity of a single server.

Apache Spark has a modular architecture. A core platform is called Apache Spark core, and there are several modules, which run on top of the core platform.

In this notebook, we will mostly learn about DataFrames and work with Spark SQL. Apache Spark supports multiple languages, including:
- Scala
- Python
- Java
- Python
- R

Apache Spark's core data structure is the Resilient Distributed Dataset (RDD). RDD is a low-level object that lets Spark work by splitting data across multiple nodes in the cluster. However, working directly with RDDs is hard. Therefore, data scientists and data engineers prefer to use the Spark DataFrame abstraction built on top of RDDs.

We are particularly interested in a data structure called DataFrames. DataFrames are a set of data that are organized into columns and rows. The columns have names, and the rows have a schema. Therefore, in this way, they are very similar or analogous to tables in relational databases. Not only DataFrames are easier to understand, but also they are more optimized for complicated operations than RDDs.

There are a couple of different ways of working with DataFrames. One way is to use the DataFrame API, and basically, that is structured around using methods on DataFrame objects. The second way is Spark SQL that allows us to enter SQL queries which are executed on DataFrames, and those DataFrames are registered as tables.

### Setup Apache Spark on Jupyter
To start working with DataFrames, first of all, we have to create a `SparkSession` object from `SparkContext`. The `SparkContext` is a connection to the running cluster, and `SparkSession` is an interface with that connection.

In [None]:
#!pip install pyspark

In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 64kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 39.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=72dd77aedc1957162585d959e716b5ce062aa5c012cf26b9e6908aecc46a7c0b
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [None]:
# Link google drive (instead of direct upload manually - test)
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd "/content/drive/MyDrive/UM Lecture/CADS/13 BDA with Apache Spark 2day"

/content/drive/MyDrive/UM Lecture/CADS/13 BDA with Apache Spark 2day


In [None]:
# Test if can view the csv
import pandas as pd
pd.read_csv("data/server_names.csv")

Unnamed: 0,server_id,server_name
0,100,Server 100
1,101,Server 101
2,102,Server 102
3,103,Server 103
4,104,Server 104
5,105,Server 105
6,106,Server 106
7,107,Server 107
8,108,Server 108
9,109,Server 109


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.getOrCreate()

The previous line of code returns an existing `SparkSession` if there's already one in the environment, or creates a new one if necessary.

In [None]:
spark

### Make the data set folder accessible

In the following cells, we are going to load a file called `location_temp.csv`, which is a time-series file which contains loacations of sensors and the temperatures taken at particular periods of time. 

In [None]:
import os
MAIN_DIRECTORY = os.getcwd()
MAIN_DIRECTORY

'/content/drive/My Drive/UM Lecture/CADS/13 BDA with Apache Spark 2day'

In [None]:
file_path = MAIN_DIRECTORY + "/data/location_temp.csv"
file_path

'/content/drive/My Drive/UM Lecture/CADS/13 BDA with Apache Spark 2day/data/location_temp.csv'

## Get Started with Spark DataFrames
To create a Spark DataFrame by loading a csv file, we can use `spark.read` function as follows.

In [None]:
df1 = spark.read.format('csv').option('header','true').load(file_path)

# header =true means 1st row as header

We can use `head(n)` method to show the heading of this data frame. `n` is the number of rows and its default value is 1.

In [None]:
df1.head(5)

[Row(event_date='03/04/2019 19:48:06', location_id='loc0', temp_celcius='29'),
 Row(event_date='03/04/2019 19:53:06', location_id='loc0', temp_celcius='27'),
 Row(event_date='03/04/2019 19:58:06', location_id='loc0', temp_celcius='28'),
 Row(event_date='03/04/2019 20:03:06', location_id='loc0', temp_celcius='30'),
 Row(event_date='03/04/2019 20:08:06', location_id='loc0', temp_celcius='27')]

If we want to show the data in a tabular format, we can use `.show(n)` method. `n` is the number of rows and its default value is 20.

In [None]:
# print in table format
df1.show(5)

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
+-------------------+-----------+------------+
only showing top 5 rows



To know the number of rows in the DataFrame, there is a useful method called `count()` that performs a count on the rows in a DataFrame.

In [None]:
# number of rows
df1.count()

500000

One of the useful methods in DataFrame API is `printSchema()` that prints out the schema in the tree format.

In [None]:
df1.printSchema()

# default schema - string

root
 |-- event_date: string (nullable = true)
 |-- location_id: string (nullable = true)
 |-- temp_celcius: string (nullable = true)



In [None]:
df1 = spark.read.format('csv').option('header','true').option('inferSchema','true').load(file_path)

In [None]:
df1.show(5)

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
+-------------------+-----------+------------+
only showing top 5 rows



### Rename column names

Now, let's load another file. In the data folder, we have another file called `utilization.csv`. This file does not have a header row. If we want to use the csv file schema, Spark provides an option to infer the columns' data types automatically. The following cells show how we can work with this type of csv file.

In [None]:
file_path = MAIN_DIRECTORY + "/data/utilization.csv"
file_path

'/content/drive/My Drive/UM Lecture/CADS/13 BDA with Apache Spark 2day/data/utilization.csv'

In [None]:
df2 = spark.read.format('csv').option('header','false').option('inferSchema','true').load(file_path)

In [None]:
df2.count()

500000

In [None]:
df2.show(5)

+-------------------+---+----+----+---+
|                _c0|_c1| _c2| _c3|_c4|
+-------------------+---+----+----+---+
|03/05/2019 08:06:14|100|0.57|0.51| 47|
|03/05/2019 08:11:14|100|0.47|0.62| 43|
|03/05/2019 08:16:14|100|0.56|0.57| 62|
|03/05/2019 08:21:14|100|0.57|0.56| 50|
|03/05/2019 08:26:14|100|0.35|0.46| 43|
+-------------------+---+----+----+---+
only showing top 5 rows



As you can see, we have five rows, but we do not have column names. Because we did not specify a header. So Spark just created column names. Basically used a pattern `_c#`.

Spark allows us to rename the columns. By using `withColumnRenamed()` method.

In [None]:
df2 = df2.withColumnRenamed('_c0','event_datetime')\
.withColumnRenamed('_c1','server_id')\
.withColumnRenamed('_c2','cpu_utilization')\
.withColumnRenamed('_c3','free_memory')\
.withColumnRenamed('_c4','session_count')

Here is the new DataFrame in the tabular format.

In [None]:
df2.show(5)

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:14|      100|           0.57|       0.51|           47|
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|
|03/05/2019 08:16:14|      100|           0.56|       0.57|           62|
|03/05/2019 08:21:14|      100|           0.57|       0.56|           50|
|03/05/2019 08:26:14|      100|           0.35|       0.46|           43|
+-------------------+---------+---------------+-----------+-------------+
only showing top 5 rows



In [None]:
df2.printSchema()

root
 |-- event_datetime: string (nullable = true)
 |-- server_id: integer (nullable = true)
 |-- cpu_utilization: double (nullable = true)
 |-- free_memory: double (nullable = true)
 |-- session_count: integer (nullable = true)



Another useful method in DataFrame API is `describe()` that computes basic statistics for numeric and string columns.

This include count, mean, stddev, min, and max.

In [None]:
#for 1 column
df2.describe('cpu_utilization').show()

# >1, need to put in list

+-------+-------------------+
|summary|    cpu_utilization|
+-------+-------------------+
|  count|             500000|
|   mean| 0.6205177399999797|
| stddev|0.15875173872913106|
|    min|               0.22|
|    max|                1.0|
+-------+-------------------+



If no columns are given, this function computes statistics for all numerical or string columns.

In [None]:
# for all column
df2.describe().show()

+-------+-------------------+------------------+-------------------+-------------------+------------------+
|summary|     event_datetime|         server_id|    cpu_utilization|        free_memory|     session_count|
+-------+-------------------+------------------+-------------------+-------------------+------------------+
|  count|             500000|            500000|             500000|             500000|            500000|
|   mean|               null|             124.5| 0.6205177399999797|0.37912809999999125|          69.59616|
| stddev|               null|14.430884120552715|0.15875173872913106|0.15830931278376184|14.850676696352798|
|    min|03/05/2019 08:06:14|               100|               0.22|                0.0|                32|
|    max|04/09/2019 01:22:46|               149|                1.0|               0.78|               105|
+-------+-------------------+------------------+-------------------+-------------------+------------------+



### Load a JSON file into a DataFrame
In the following cell, we are trying to load a JSON file into a DataFrame by using `spark.read` command.

In [None]:
file_path = MAIN_DIRECTORY + '/data/utilization.json'

In [None]:
df3 = spark.read.format('json').load(file_path)

In [None]:
df3.show(5)

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.77|03/16/2019 17:21:40|       0.22|      115|           58|
|           0.53|03/16/2019 17:26:40|       0.23|      115|           64|
|            0.6|03/16/2019 17:31:40|       0.19|      115|           82|
|           0.46|03/16/2019 17:36:40|       0.32|      115|           60|
|           0.77|03/16/2019 17:41:40|       0.49|      115|           84|
+---------------+-------------------+-----------+---------+-------------+
only showing top 5 rows



In [None]:
df3.count()

500000

Now, what you will notice here is we did not have to change column names.That is because in JSON, you specify key-value pairs. For example, there was a row that has `cpu_utilization` equals to 0.77, that corresponds to the first row. This row also has a key-value pair with `free_memory` equals to 0.22 and `server_id` equals to 115.

Apache Spark provides an attribute called `columns`, to show the list of a DataFrame's columns.

In [None]:
# return column names - attribute, not method (no '()')
col_names = df3.columns
col_names

['cpu_utilization',
 'event_datetime',
 'free_memory',
 'server_id',
 'session_count']

In [None]:
type(col_names)

list

In [None]:
col_names[0]

'cpu_utilization'

Sometimes we want to work with a subset of data. For example, we have 500000 rows of data in this DataFrame. Although they are not too many rows, it may be more than you want to work with at any particular time. And you would rather work with a sample of the data. To do that, you can use `sample` command.

In [None]:
df3_sample = df3.sample( withReplacement=False ,fraction=0.1)

In [None]:
df3_sample.count()

49821

DataFrame API provides a method called `sort()` to sort the rows based on one or more columns.

In [None]:

df3_sorted = df3.sort('server_id',ascending = [0]).show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.62|03/05/2019 09:37:44|       0.17|      149|           89|
|           0.62|03/05/2019 08:57:44|       0.26|      149|           89|
|           0.61|03/05/2019 09:32:44|        0.1|      149|           70|
|           0.75|03/05/2019 08:32:44|       0.19|      149|           84|
|           0.82|03/05/2019 08:52:44|       0.13|      149|           72|
|           0.68|03/05/2019 09:12:44|       0.24|      149|           76|
|           0.63|03/05/2019 09:27:44|       0.07|      149|           83|
|            0.9|03/05/2019 08:12:44|       0.34|      149|           85|
|           0.83|03/05/2019 08:27:44|       0.42|      149|           73|
|           0.67|03/05/2019 08:42:44|       0.16|      149|           88|
|           0.91|03/05/2019 08:47:44| 

If we want to sort the rows based on more that one coulmn, we can specify the list of columns and sorting order by using the following syntax.

In [None]:
f3_sorted = df3.sort(['event_datetime','server_id'],ascending = [0,1]).show()
# event datetime descending,
# server

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.74|04/09/2019 01:22:46|       0.19|      149|           85|
|           0.83|04/09/2019 01:22:44|       0.21|      148|           69|
|            0.4|04/09/2019 01:22:41|       0.42|      147|           65|
|           0.62|04/09/2019 01:22:39|       0.13|      146|           92|
|           0.77|04/09/2019 01:22:37|       0.12|      145|           88|
|           0.78|04/09/2019 01:22:35|       0.46|      144|           64|
|           0.37|04/09/2019 01:22:33|        0.4|      143|           59|
|           0.77|04/09/2019 01:22:31|       0.27|      142|           68|
|           0.44|04/09/2019 01:22:29|       0.59|      141|           54|
|           0.63|04/09/2019 01:22:28|       0.21|      140|           60|
|           0.67|04/09/2019 01:22:25| 

### Filtering using DataFrame API

Now, let's take a look at how we can use DataFrame API to filter some of the rows in DataFrames.

One of the DataFrames that we have created is `df1`, which stores location ID, and temperature measurement at a particular point and time.

In [None]:
df1.show(5)

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
+-------------------+-----------+------------+
only showing top 5 rows



If we want to filter rows based on their `location_id`, we can use `filter` command. `filter(condition)` filters rows using the given condition. `filter()` method essentially allows us to specify a `WHERE` clause.

In [None]:
df1.filter(df1['location_id']=='loc9').show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:08|       loc9|          35|
|03/04/2019 19:53:08|       loc9|          35|
|03/04/2019 19:58:08|       loc9|          35|
|03/04/2019 20:03:08|       loc9|          30|
|03/04/2019 20:08:08|       loc9|          31|
|03/04/2019 20:13:08|       loc9|          30|
|03/04/2019 20:18:08|       loc9|          31|
|03/04/2019 20:23:08|       loc9|          35|
|03/04/2019 20:28:08|       loc9|          30|
|03/04/2019 20:33:08|       loc9|          36|
|03/04/2019 20:38:08|       loc9|          32|
|03/04/2019 20:43:08|       loc9|          32|
|03/04/2019 20:48:08|       loc9|          31|
|03/04/2019 20:53:08|       loc9|          30|
|03/04/2019 20:58:08|       loc9|          35|
|03/04/2019 21:03:08|       loc9|          31|
|03/04/2019 21:08:08|       loc9|          37|
|03/04/2019 21:13:08|       loc9|          31|
|03/04/2019 2

If we want to count all the rows that are located in a specific `location_id`,we can specify the `count()` command.

In [None]:
df1.filter(df1['location_id'] == 'loc9').count()

1000

Sometimes we only need to list one or two columns; in this case, we can use `select()` method that projects a set of expressions and returns a new DataFrame. Let's take a look at how it works.

In [None]:
df1.select(['location_id','temp_celcius']).show()

# can also use without list
# df1.select('location_id','temp_celcius').show()

+-----------+------------+
|location_id|temp_celcius|
+-----------+------------+
|       loc0|          29|
|       loc0|          27|
|       loc0|          28|
|       loc0|          30|
|       loc0|          27|
|       loc0|          27|
|       loc0|          27|
|       loc0|          29|
|       loc0|          32|
|       loc0|          35|
|       loc0|          32|
|       loc0|          28|
|       loc0|          28|
|       loc0|          32|
|       loc0|          34|
|       loc0|          33|
|       loc0|          27|
|       loc0|          28|
|       loc0|          33|
|       loc0|          28|
+-----------+------------+
only showing top 20 rows



### Aggregation using DataFrame API

Now, let's take a look at aggregating using the DataFrame API. In the following cell we will use `groupBy` method that groups the DataFrame using the specified columns, so we can run aggregation on them.

In [None]:
df1.groupBy('location_id').max().show()

+-----------+-----------------+
|location_id|max(temp_celcius)|
+-----------+-----------------+
|     loc196|               36|
|     loc226|               32|
|     loc463|               30|
|     loc150|               39|
|     loc292|               36|
|     loc311|               31|
|      loc22|               35|
|     loc351|               35|
|     loc370|               36|
|     loc419|               36|
|      loc31|               32|
|     loc305|               34|
|      loc82|               34|
|      loc90|               30|
|     loc118|               31|
|     loc195|               34|
|     loc208|               33|
|      loc39|               32|
|      loc75|               30|
|     loc228|               34|
+-----------+-----------------+
only showing top 20 rows



If we want to sort the DataFrame, we can use `orderBy` that returns a new DataFrame sorted by the specified column(s).

In [None]:
df1.orderBy('location_id').show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 21:23:06|       loc0|          28|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 21:18:06|       loc0|          33|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 2

To calculate the average temperature at each location, we can use `agg` operation. Let's take a look at the following example.

In [None]:
df1.groupBy('location_id').agg({'temp_celcius':'mean'}).show()

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|     loc196|           29.225|
|     loc226|           25.306|
|     loc463|           23.317|
|     loc150|           32.188|
|     loc292|           29.159|
|     loc311|           24.308|
|      loc22|           28.251|
|     loc351|           28.194|
|     loc370|            29.14|
|     loc419|           29.141|
|      loc31|           25.196|
|     loc305|           27.314|
|      loc82|           27.355|
|      loc90|           23.216|
|     loc118|           24.219|
|     loc195|            27.25|
|     loc208|           26.206|
|      loc39|           25.199|
|      loc75|           23.209|
|     loc228|           27.295|
+-----------+-----------------+
only showing top 20 rows



There are different aggregation function options, for example, if we want to have the maximum temperature in each location, we can write the following code.

In [None]:
df1.groupBy('location_id').agg({'temp_celcius':'mean'}).orderBy('location_id').show()

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|       loc0|           29.176|
|       loc1|           28.246|
|      loc10|           25.337|
|     loc100|           27.297|
|     loc101|           25.317|
|     loc102|           30.327|
|     loc103|           25.341|
|     loc104|           26.204|
|     loc105|           26.217|
|     loc106|           27.201|
|     loc107|           33.268|
|     loc108|           32.195|
|     loc109|           24.138|
|      loc11|           25.308|
|     loc110|           26.239|
|     loc111|           31.391|
|     loc112|           33.359|
|     loc113|           30.345|
|     loc114|           29.261|
|     loc115|           23.239|
+-----------+-----------------+
only showing top 20 rows



In [None]:
df1.groupBy('location_id').agg({'temp_celcius':'mean'}).orderBy('location_id', ascending = False).show()

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|      loc99|           33.243|
|      loc98|           32.235|
|      loc97|           31.404|
|      loc96|           28.138|
|      loc95|           33.212|
|      loc94|           25.259|
|      loc93|           24.282|
|      loc92|           33.333|
|      loc91|           30.406|
|      loc90|           23.216|
|       loc9|           32.201|
|      loc89|           30.218|
|      loc88|           25.268|
|      loc87|           31.262|
|      loc86|           23.332|
|      loc85|           28.378|
|      loc84|           26.211|
|      loc83|           26.257|
|      loc82|           27.355|
|      loc81|           23.349|
+-----------+-----------------+
only showing top 20 rows



### Data Sampling

Sometimes, we may want to use sampling, particularly when we have large data sets, and we are doing kind of an exploratory analysis. We want to get kind of an understanding at a high level of what the data is like. Sampling can be beneficial for doing quick operations. Now, let's see how we can take a sample, or a subset of that, but randomly. In PySpark, `sample()` method returns a sampled subset of this DataFrame, and it usually takes two parameters, `fraction` that specifies the fraction of rows to generate, range [0.0, 1.0]. The second parameter is `withReplacement`, which is a boolean parameter. Usually, we assign `false` to it, in this case, what that means is each time we pull a row out of our sampling, we don't put it back in, so we will never get duplicates, we will always get unique values. 

In [None]:
df1_sample = df1.sample(False,0.1)

In [None]:
df1_sample.count()

49820

Now, let's run some simple descriptive statistics on our sample. 

In [None]:
df1_sample.groupBy('location_id').agg({'temp_celcius':'mean'}).show(10)

+-----------+------------------+
|location_id| avg(temp_celcius)|
+-----------+------------------+
|     loc196|             29.25|
|     loc226|25.303370786516854|
|     loc463| 23.07070707070707|
|     loc150|32.029411764705884|
|     loc292|  29.6734693877551|
|     loc311|              24.4|
|      loc22|28.181818181818183|
|     loc351|28.240506329113924|
|     loc370| 29.09009009009009|
|     loc419| 29.15686274509804|
+-----------+------------------+
only showing top 10 rows



Now, let's compare these results to results of the original data set, the DataFrame `df1`, which has 500000 rows.

In [None]:
df1.groupBy('location_id').agg({'temp_celcius':'mean'}).show(10)

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|     loc196|           29.225|
|     loc226|           25.306|
|     loc463|           23.317|
|     loc150|           32.188|
|     loc292|           29.159|
|     loc311|           24.308|
|      loc22|           28.251|
|     loc351|           28.194|
|     loc370|            29.14|
|     loc419|           29.141|
+-----------+-----------------+
only showing top 10 rows



As you can see, when we did the sampling and took 10% when we took the average of location zero, we got something that was about 29.4, but the actual is approximately 29.18. Therefore, we can see by sampling, we get very close to what the average is for the actual population. One of the things to consider is the size of the sample that we are drawing.

### Save Data from DataFrame

Sometimes after we have been working with DataFrames and creating new DataFrames and running calculations and doing sampling, we might want to save our results out. To do this, we can use `write` object and specify the `csv()` method within that, and then specify a name or what we'd like to save. It saves the DataFrame to disk using the csv format.

In [None]:
df1.write.csv("df1.csv")

# 2 partition because colab assign 2 nodes for this cluster

Now, let's take a look at the current directory

In [None]:
!ls df1.csv/

part-00000-0a892874-30e9-4334-a503-df61a20413fe-c000.csv  _SUCCESS
part-00001-0a892874-30e9-4334-a503-df61a20413fe-c000.csv


Now, what you will notice here is that `df1.csv` is not a single file. It is a directory. And what is in that directory is four different files with `csv` extensions, and that is because of the way Apache Spark works internally. Spark can break up DataFrames into partition subsets, and in this case, there were four partitions. Each partition has its own file. There is also a `_SUCCESS` flag that was written out. Now, let's list the contents of one of these files. 

To write the DataFrame in JSON format, you can use the following code.

In [None]:
df1.write.json('df1.json')

In [None]:
!ls df1.json

part-00000-5570dacc-ece7-41d8-a5f1-7968a13414b0-c000.json  _SUCCESS
part-00001-5570dacc-ece7-41d8-a5f1-7968a13414b0-c000.json


In [None]:
# for windows, to mimic linux, install:
winutils.exe

## Querying DataFrames with SQL

Up to now, we've been using the Spark DataFrame API to work with DataFrames. Now, we're going to switch gears and we're going to work with SQL. In particular, we're going to use Spark SQL for working with DataFrames.

In this part, we will use `utilization.json` that includes cpu utilization, the amount of free memory at a particular point in time, and then the number of sessions that are currently connected to the server at the particular point in time.

In [None]:
df3.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.77|03/16/2019 17:21:40|       0.22|      115|           58|
|           0.53|03/16/2019 17:26:40|       0.23|      115|           64|
|            0.6|03/16/2019 17:31:40|       0.19|      115|           82|
|           0.46|03/16/2019 17:36:40|       0.32|      115|           60|
|           0.77|03/16/2019 17:41:40|       0.49|      115|           84|
|           0.62|03/16/2019 17:46:40|       0.31|      115|           73|
|           0.71|03/16/2019 17:51:40|       0.54|      115|           67|
|           0.67|03/16/2019 17:56:40|       0.54|      115|           83|
|           0.72|03/16/2019 18:01:40|       0.26|      115|           68|
|           0.62|03/16/2019 18:06:40|       0.52|      115|           60|
|           0.58|03/16/2019 18:11:40| 

In [None]:
df_util = df3

In [None]:
df_util.count()

500000

To work with SQL in Spark, we have to create a temporary view. And to do that, we specify the DataFrame, and then we call the method `createOrReplaceTempView()` and then we should specify a name for this table. Let's do it.

In [None]:
df_util.createOrReplaceTempView('utilization')
# view is read only table

Now, we have the ability to query on a table called utilization. We will create that by executing a SQL command in the Spark session.

In [None]:
df_sql = spark.sql('SELECT * FROM utilization LIMIT 10')

In [None]:
df_sql.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.77|03/16/2019 17:21:40|       0.22|      115|           58|
|           0.53|03/16/2019 17:26:40|       0.23|      115|           64|
|            0.6|03/16/2019 17:31:40|       0.19|      115|           82|
|           0.46|03/16/2019 17:36:40|       0.32|      115|           60|
|           0.77|03/16/2019 17:41:40|       0.49|      115|           84|
|           0.62|03/16/2019 17:46:40|       0.31|      115|           73|
|           0.71|03/16/2019 17:51:40|       0.54|      115|           67|
|           0.67|03/16/2019 17:56:40|       0.54|      115|           83|
|           0.72|03/16/2019 18:01:40|       0.26|      115|           68|
|           0.62|03/16/2019 18:06:40|       0.52|      115|           60|
+---------------+-------------------+-

If we want to project on specific columns, we can do it in the following way.

In [None]:
# if want specific column
spark.sql('SELECT server_id,free_memory FROM utilization LIMIT 10').show()

+---------+-----------+
|server_id|free_memory|
+---------+-----------+
|      115|       0.22|
|      115|       0.23|
|      115|       0.19|
|      115|       0.32|
|      115|       0.49|
|      115|       0.31|
|      115|       0.54|
|      115|       0.54|
|      115|       0.26|
|      115|       0.52|
+---------+-----------+



### Filtering DataFrames with SQL
Next, we are going to take a look at how to filter DataFrames using Spark SQL.  

In [None]:
spark.sql('SELECT * FROM utilization WHERE server_id = 149 LIMIT 10').show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.74|03/05/2019 08:07:44|       0.27|      149|           66|
|            0.9|03/05/2019 08:12:44|       0.34|      149|           85|
|           0.59|03/05/2019 08:17:44|       0.19|      149|           84|
|            0.6|03/05/2019 08:22:44|       0.08|      149|           81|
|           0.83|03/05/2019 08:27:44|       0.42|      149|           73|
|           0.75|03/05/2019 08:32:44|       0.19|      149|           84|
|            0.9|03/05/2019 08:37:44|       0.14|      149|           92|
|           0.67|03/05/2019 08:42:44|       0.16|      149|           88|
|           0.91|03/05/2019 08:47:44|       0.31|      149|           71|
|           0.82|03/05/2019 08:52:44|       0.13|      149|           72|
+---------------+-------------------+-

In [None]:
spark.sql('SELECT server_id as SID,session_count as SC \
FROM utilization WHERE session_count > 50 LIMIT 10').show()

+---+---+
|SID| SC|
+---+---+
|115| 58|
|115| 64|
|115| 82|
|115| 60|
|115| 84|
|115| 73|
|115| 67|
|115| 83|
|115| 68|
|115| 60|
+---+---+



In [None]:
spark.sql('SELECT server_id,session_count \
FROM utilization  WHERE session_count > 70 AND server_id = 120 \
ORDER BY session_count DESC \
LIMIT 10').show()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
+---------+-------------+



### Aggregation DataFrames with SQL

When we work with SQL in databases, we often use SQL to perform aggregations and the same holds true when working with SQL in Spark. Let's write some basic queries against the DataFrame and do a very simple aggregations.

In [None]:
spark.sql('SELECT count(*) FROM utilization').show()

+--------+
|count(1)|
+--------+
|  500000|
+--------+



In [None]:
spark.sql('SELECT count(*) FROM utilization WHERE session_count > 70').show()

+--------+
|count(1)|
+--------+
|  239659|
+--------+



In [None]:
spark.sql('SELECT server_id,count(*) FROM utilization \
WHERE session_count > 70 \
GROUP BY server_id').show()

+---------+--------+
|server_id|count(1)|
+---------+--------+
|      112|    7425|
|      113|    9418|
|      130|    2891|
|      126|    6365|
|      149|    8288|
|      110|    2826|
|      136|    4316|
|      144|    6220|
|      116|    1167|
|      145|    9304|
|      143|     144|
|      107|    5646|
|      146|    7072|
|      103|    8744|
|      139|    7383|
|      114|    2128|
|      115|    5284|
|      104|    7366|
|      120|    2733|
|      128|    3719|
+---------+--------+
only showing top 20 rows



In [None]:
spark.sql('SELECT server_id,count(*) FROM utilization \
WHERE session_count > 70 \
GROUP BY server_id \
ORDER BY count(*)').show()

+---------+--------+
|server_id|count(1)|
+---------+--------+
|      143|     144|
|      100|     391|
|      105|    1110|
|      116|    1167|
|      135|    1654|
|      147|    1783|
|      132|    2048|
|      114|    2128|
|      134|    2147|
|      120|    2733|
|      110|    2826|
|      125|    2843|
|      130|    2891|
|      111|    3093|
|      141|    3097|
|      109|    3129|
|      129|    3222|
|      117|    3605|
|      128|    3719|
|      136|    4316|
+---------+--------+
only showing top 20 rows



In [None]:
spark.sql("SELECT server_id,min(session_count) ,max(session_count) ,\
round(avg(session_count),2) \
FROM utilization \
WHERE session_count > 70 \
GROUP BY server_id \
ORDER BY count(*) DESC  ").show()

+---------+------------------+------------------+--------+----------------------------+
|server_id|min(session_count)|max(session_count)|count(1)|round(avg(session_count), 2)|
+---------+------------------+------------------+--------+----------------------------+
|      101|                71|               105|    9808|                       87.67|
|      113|                71|               103|    9418|                       86.96|
|      145|                71|               103|    9304|                       86.98|
|      103|                71|               101|    8744|                       85.76|
|      102|                71|               101|    8586|                       85.71|
|      133|                71|               100|    8583|                       85.47|
|      108|                71|               100|    8375|                       85.12|
|      149|                71|                99|    8288|                       84.96|
|      137|                71|  

### Joining DataFrames with SQL

One of the most useful features of SQL is the ability to join tables. We can join in Spark SQL as well.

First, we are going to create another temporary table based on the `server_names.csv` file.

In [None]:
file_path = MAIN_DIRECTORY + '/data/server_names.csv'

In [128]:
df_servers = spark.read.csv(file_path,header = True)

In [131]:
df_servers.createOrReplaceTempView('server_names')

Now, let's quickly do a check on `server_id` in `utilization` table.

In [135]:
spark.sql("SELECT DISTINCT server_id FROM utilization ORDER BY server_id").show(5)

+---------+
|server_id|
+---------+
|      100|
|      101|
|      102|
|      103|
|      104|
+---------+
only showing top 5 rows



Now, let's see what the minimum and maximum of server_id is.

In [132]:
spark.sql('SELECT min(server_id), max(server_id) FROM utilization ').show()

+--------------+--------------+
|min(server_id)|max(server_id)|
+--------------+--------------+
|           100|           149|
+--------------+--------------+



Well, let's join these two tables.

In [134]:
spark.sql("SELECT u.server_id, sn.server_name, u.session_count \
from utilization u \
INNER JOIN server_names sn \
ON sn.server_id = u.server_id").show()

+---------+-----------+-------------+
|server_id|server_name|session_count|
+---------+-----------+-------------+
|      115| Server 115|           58|
|      115| Server 115|           64|
|      115| Server 115|           82|
|      115| Server 115|           60|
|      115| Server 115|           84|
|      115| Server 115|           73|
|      115| Server 115|           67|
|      115| Server 115|           83|
|      115| Server 115|           68|
|      115| Server 115|           60|
|      115| Server 115|           60|
|      115| Server 115|           62|
|      115| Server 115|           78|
|      115| Server 115|           66|
|      115| Server 115|           89|
|      115| Server 115|           76|
|      115| Server 115|           87|
|      115| Server 115|           62|
|      115| Server 115|           67|
|      115| Server 115|           58|
+---------+-----------+-------------+
only showing top 20 rows



### De-Duplicating with DataFrame API

When we're working with Data Frames, Spark provides some ways to de-duplicate data. So, let's take a look at how to do that. In this part also we will learn how we can create small data sets to work within the Jupiter Notebook session. Before doing anything, please restart the Jupyter kernel.

In [137]:
from pyspark import SparkContext
from pyspark.sql import Row

In [138]:
sc = SparkContext.getOrCreate()

`sc` stands for `SparkContext`. It is a global variable that gives us access to the Spark Context. Here, what we want to do is create a DataFrame, and to do that, we will use `parallelize` method that creates a parallelized data structure. Spark automatically parallelize DataFrames. But now we are going to create this data manually, so we are specifying `parallelized` explicitly.

In [141]:
rdd = sc.parallelize([Row(server_name = 'Server 101',cpu_utilization = 85,session_count = 80),
                     Row(server_name = 'Server 101',cpu_utilization = 80,session_count = 90),
                     Row(server_name = 'Server 102',cpu_utilization = 85,session_count = 80),
                     Row(server_name = 'Server 102',cpu_utilization = 85,session_count = 80)])

In [142]:
ss = SparkSession(sc)

`toDF()` turns that parallelized data structure to into a DataFrame.

In [143]:
df_dup = rdd.toDF()

In [144]:
df_dup.show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| Server 101|             85|           80|
| Server 101|             80|           90|
| Server 102|             85|           80|
| Server 102|             85|           80|
+-----------+---------------+-------------+



Now, we are going to drop duplicates. To do that we can use `drop_duplicates()` method which returns a new DataFrame with duplicate rows removed, optionally only considering certain columns.

In [145]:
 df_dup.drop_duplicates().show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| Server 101|             85|           80|
| Server 102|             85|           80|
| Server 101|             80|           90|
+-----------+---------------+-------------+



If we want to drop any time there is a duplicate in one of the columns, we can do that as well. Let's take a look at the following example.

In [146]:
df_dup.drop_duplicates(['server_name']).show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| Server 101|             85|           80|
| Server 102|             85|           80|
+-----------+---------------+-------------+



In [147]:
df_dup.drop_duplicates(['server_name','cpu_utilization']).show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| Server 101|             80|           90|
| Server 101|             85|           80|
| Server 102|             85|           80|
+-----------+---------------+-------------+



### Working with null values

It is not uncommon to have data missing from DataFrame. When we are working with SQL, we are used to work with nulls. When we working with DataFrames, the absence of data is indicated by an NA. So in this section, we are going to look how we can work with NAs and Nulls using DataFrames and Spark SQL.

In this section, we are importing a couple of things, we have not seen before. Let's take a look at them.

In [148]:
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType

Now, we are going to add a column and set that column's values equall to null or NA. In this case, we will use `lit()` function that is a way for us to interact with column literals in PySpark. Spark SQL functions lit() is used to add a new column by assigning a literal or constant value to Spark DataFrame. 

In [153]:
df = rdd.toDF()
df_na = df.withColumn('na_col',lit(None).cast(StringType()))
df_na.show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| Server 101|             85|           80|  null|
| Server 101|             80|           90|  null|
| Server 102|             85|           80|  null|
| Server 102|             85|           80|  null|
+-----------+---------------+-------------+------+



Now, one of the things that we can do is globally replace all nulls or NAs with some value. And we can do that with `fillna()` function. 

In [151]:
df_na.fillna('A').show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| Server 101|             85|           80|     A|
| Server 101|             80|           90|     A|
| Server 102|             85|           80|     A|
| Server 102|             85|           80|     A|
+-----------+---------------+-------------+------+



Now, Let's create a DataFrame that has versions both with the nulls and with the As.

In [154]:
df_union = df_na.fillna('A').union(df_na)

In [155]:
df_union.show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| Server 101|             85|           80|     A|
| Server 101|             80|           90|     A|
| Server 102|             85|           80|     A|
| Server 102|             85|           80|     A|
| Server 101|             85|           80|  null|
| Server 101|             80|           90|  null|
| Server 102|             85|           80|  null|
| Server 102|             85|           80|  null|
+-----------+---------------+-------------+------+



Now we can drop only rows with NAs in them.

In [156]:
df_union.na.drop().show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| Server 101|             85|           80|     A|
| Server 101|             80|           90|     A|
| Server 102|             85|           80|     A|
| Server 102|             85|           80|     A|
+-----------+---------------+-------------+------+



Well, let's see how we can do that with Spark SQL.

In [157]:
df_union.createOrReplaceTempView('na_table')

In [160]:
# same result
spark.sql("SELECT * FROM na_table WHERE na_col IS NOT NULL").show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| Server 101|             85|           80|     A|
| Server 101|             80|           90|     A|
| Server 102|             85|           80|     A|
| Server 102|             85|           80|     A|
+-----------+---------------+-------------+------+



## Exploratory Data Analysis with DataFrame API

DataFrame API provides some tools for some higher level tasks like exploratory data analysis. In this section, we are going to learn how to use DataFrame API for doing some basic EDA with the utilization DataFrame. First, let's take a look at this DataFrame.

In [163]:
df_util.show(10)

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.77|03/16/2019 17:21:40|       0.22|      115|           58|
|           0.53|03/16/2019 17:26:40|       0.23|      115|           64|
|            0.6|03/16/2019 17:31:40|       0.19|      115|           82|
|           0.46|03/16/2019 17:36:40|       0.32|      115|           60|
|           0.77|03/16/2019 17:41:40|       0.49|      115|           84|
|           0.62|03/16/2019 17:46:40|       0.31|      115|           73|
|           0.71|03/16/2019 17:51:40|       0.54|      115|           67|
|           0.67|03/16/2019 17:56:40|       0.54|      115|           83|
|           0.72|03/16/2019 18:01:40|       0.26|      115|           68|
|           0.62|03/16/2019 18:06:40|       0.52|      115|           60|
+---------------+-------------------+-

In [164]:
df_util.count()

500000

One of the useful methods for doing exploratory data analysis is `.describe()`. Let's see how it works.

In [166]:
df_util.describe().show()

+-------+------------------+-------------------+------------------+----------------+-----------------+
|summary|   cpu_utilization|     event_datetime|       free_memory|       server_id|    session_count|
+-------+------------------+-------------------+------------------+----------------+-----------------+
|  count|            500000|             500000|            500000|          500000|           500000|
|   mean|0.6205177399999874|               null|0.3791280999999869|           124.5|         69.59616|
| stddev| 0.158751738729129|               null|0.1583093127837621|14.4308841205532|14.85067669635288|
|    min|              0.22|03/05/2019 08:06:14|               0.0|             100|               32|
|    max|               1.0|04/09/2019 01:22:46|              0.78|             149|              105|
+-------+------------------+-------------------+------------------+----------------+-----------------+



`.describe()` actually produces another DataFrame with summary statistics about the DataFrame. For example, in this case, we see that there are several columns; there is a summary column, followed by the name of a column in the original DataFrame. For each of those columns in the original DataFrame, we have the same statistics that are calculated.
Using `.describe()`  is an excellent way to get a high-level view of what a data set might be like.

Another statistics we often want to know, is there a correlation between two of the variables?

In [168]:
df_util.stat.corr('cpu_utilization','free_memory')

-0.4704771573080708

In [169]:
df_util.stat.corr('session_count','free_memory')

-0.5008320848876533

Sometimes we want to know how frequent are some items, what are the most frequently occurring items?

There is a method called `.freqItems()` items for frequent items, which we can use with the DataFrame.

In [173]:
df_util.freqItems(['server_id']).show(truncate=False)

# truncate=False returns all the result
# most req server id in descending order

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|server_id_freqItems                                                                                                                                                                                                                                       |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[137, 146, 101, 110, 119, 128, 104, 131, 122, 140, 113, 149, 134, 125, 116, 107, 142, 124, 133, 106, 115, 127, 118, 136, 100, 109, 145, 139, 130, 121, 148, 103, 112, 147, 129, 138, 120, 132, 141, 105, 123, 114, 126, 144, 135, 108, 117, 102,

We can create a result-set that shows some basic statistics for one of the columns by using Spark SQL. Let's do it.

In [178]:
spark.sql("SELECT min(cpu_utilization), max(cpu_utilization),avg(cpu_utilization), \
           stddev(cpu_utilization) FROM utilization ").show()

+--------------------+--------------------+--------------------+-----------------------+
|min(cpu_utilization)|max(cpu_utilization)|avg(cpu_utilization)|stddev(cpu_utilization)|
+--------------------+--------------------+--------------------+-----------------------+
|                0.22|                 1.0|  0.6205177399999874|      0.158751738729129|
+--------------------+--------------------+--------------------+-----------------------+



And if we want to group the result-set by `server_id`, we can write the following query.

In [182]:
spark.sql("SELECT server_id, min(cpu_utilization) as MIN, max(cpu_utilization) as MAX, \
           round(avg(cpu_utilization),2) as AVG, \
           round(stddev(cpu_utilization),2) as  STDDEV FROM utilization \
           GROUP BY server_id").show()

+---------+----+----+----+------+
|server_id| MIN| MAX| AVG|STDDEV|
+---------+----+----+----+------+
|      112|0.52|0.92|0.72|  0.12|
|      113|0.58|0.98|0.78|  0.12|
|      130|0.35|0.75|0.55|  0.12|
|      126|0.48|0.88|0.68|  0.12|
|      149|0.54|0.94|0.74|  0.12|
|      110|0.35|0.75|0.55|  0.12|
|      136|0.41| 0.8|0.61|  0.12|
|      144|0.47|0.87|0.67|  0.11|
|      119|0.22|0.62|0.42|  0.12|
|      116| 0.3| 0.7| 0.5|  0.12|
|      145|0.58|0.98|0.78|  0.11|
|      124|0.24|0.64|0.44|  0.12|
|      143|0.26|0.66|0.46|  0.12|
|      107|0.45|0.85|0.65|  0.12|
|      146| 0.5| 0.9| 0.7|  0.11|
|      103|0.56|0.96|0.76|  0.12|
|      139|0.51|0.91|0.72|  0.12|
|      138|0.24|0.64|0.44|  0.12|
|      114|0.33|0.73|0.53|  0.12|
|      115|0.44|0.84|0.64|  0.12|
+---------+----+----+----+------+
only showing top 20 rows



Now, we are going to calculate statistics on buckets or histograms of data. The idea is, rather than look at each server individually, Spark buckets values according to how frequently they occur in certain ranges. So if we want to know how often does a CPU utilization fall in the range of 1-10 or 11-20 or 21-30, all the way up to 90-91, we could put each of those CPU utilization measures into its bucket and count how many times a CPU utilization goes into that bucket. Let's do it.

In [184]:
# bucketing
spark.sql("SELECT event_datetime,server_id,floor(cpu_utilization *100/10) as Bucket,cpu_utilization FROM utilization").show()

+-------------------+---------+------+---------------+
|     event_datetime|server_id|Bucket|cpu_utilization|
+-------------------+---------+------+---------------+
|03/16/2019 17:21:40|      115|     7|           0.77|
|03/16/2019 17:26:40|      115|     5|           0.53|
|03/16/2019 17:31:40|      115|     6|            0.6|
|03/16/2019 17:36:40|      115|     4|           0.46|
|03/16/2019 17:41:40|      115|     7|           0.77|
|03/16/2019 17:46:40|      115|     6|           0.62|
|03/16/2019 17:51:40|      115|     7|           0.71|
|03/16/2019 17:56:40|      115|     6|           0.67|
|03/16/2019 18:01:40|      115|     7|           0.72|
|03/16/2019 18:06:40|      115|     6|           0.62|
|03/16/2019 18:11:40|      115|     5|           0.58|
|03/16/2019 18:16:40|      115|     5|           0.51|
|03/16/2019 18:21:40|      115|     5|           0.54|
|03/16/2019 18:26:40|      115|     8|           0.84|
|03/16/2019 18:31:40|      115|     6|           0.65|
|03/16/201

So far, what we have done is we have listed for each server in what  CPU utilization bucket falls at a particular time. Now we want to see how often does a CPU utilization falls into one of those ten buckets.

In [185]:
spark.sql("SELECT count(*), FLOOR(cpu_utilization*100/10) as Bucket \
           FROM utilization GROUP BY Bucket \
           ORDER BY Bucket ").show()

+--------+------+
|count(1)|Bucket|
+--------+------+
|    8186|     2|
|   37029|     3|
|   68046|     4|
|  104910|     5|
|  116725|     6|
|   88242|     7|
|   56598|     8|
|   20207|     9|
|      57|    10|
+--------+------+



## Timeseries Analysis

In this section, we are going to work with timeseries data, and timeseries data has a set of measures and a timestamp associated with them. First, let's take a look at utilization table again.

In [186]:
spark.sql("SELECT * FROM utilization ").show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.77|03/16/2019 17:21:40|       0.22|      115|           58|
|           0.53|03/16/2019 17:26:40|       0.23|      115|           64|
|            0.6|03/16/2019 17:31:40|       0.19|      115|           82|
|           0.46|03/16/2019 17:36:40|       0.32|      115|           60|
|           0.77|03/16/2019 17:41:40|       0.49|      115|           84|
|           0.62|03/16/2019 17:46:40|       0.31|      115|           73|
|           0.71|03/16/2019 17:51:40|       0.54|      115|           67|
|           0.67|03/16/2019 17:56:40|       0.54|      115|           83|
|           0.72|03/16/2019 18:01:40|       0.26|      115|           68|
|           0.62|03/16/2019 18:06:40|       0.52|      115|           60|
|           0.58|03/16/2019 18:11:40| 

Sometimes we might want to compare a value within a group. For example, we would like to compare the current CPU utilization for a server to the average CPU utilization of just that server, not the entire population.

We can do that using a windowing function, and in SQL, the windowing functions are specified using an `OVER...PARTITION BY` statement. So let's take a look at how to use that.

In [190]:
#windowing function - OVER(PARTITION BY _____)
spark.sql("SELECT event_datetime, server_id, cpu_utilization, \
          avg(cpu_utilization) OVER (PARTITION BY server_id) as avg_server_util \
          FROM utilization ").show()

+-------------------+---------+---------------+------------------+
|     event_datetime|server_id|cpu_utilization|   avg_server_util|
+-------------------+---------+---------------+------------------+
|03/05/2019 08:06:34|      112|           0.71|0.7153870000000067|
|03/05/2019 08:11:34|      112|           0.78|0.7153870000000067|
|03/05/2019 08:16:34|      112|           0.87|0.7153870000000067|
|03/05/2019 08:21:34|      112|           0.82|0.7153870000000067|
|03/05/2019 08:26:34|      112|           0.62|0.7153870000000067|
|03/05/2019 08:31:34|      112|            0.9|0.7153870000000067|
|03/05/2019 08:36:34|      112|           0.89|0.7153870000000067|
|03/05/2019 08:41:34|      112|           0.81|0.7153870000000067|
|03/05/2019 08:46:34|      112|           0.88|0.7153870000000067|
|03/05/2019 08:51:34|      112|           0.89|0.7153870000000067|
|03/05/2019 08:56:34|      112|           0.84|0.7153870000000067|
|03/05/2019 09:01:34|      112|           0.71|0.7153870000000

Now, we have different timestamps for each server ID, different CPU utilization at those particular times, but in this piece of result-set, the average server utilization is always 0.7153 for server ID 112.

Now, we want to calculate the difference any one of these measurements of CPU utilization from the average of that server is?

In [196]:
spark.sql("SELECT event_datetime, server_id, cpu_utilization, \
          avg(cpu_utilization) OVER (PARTITION BY server_id) as avg_server_util, \
          cpu_utilization - avg(cpu_utilization) OVER (PARTITION BY server_id) as delta_server_util\
          FROM utilization").show()



+-------------------+---------+---------------+------------------+--------------------+
|     event_datetime|server_id|cpu_utilization|   avg_server_util|   delta_server_util|
+-------------------+---------+---------------+------------------+--------------------+
|03/05/2019 08:06:34|      112|           0.71|0.7153870000000067|-0.00538700000000...|
|03/05/2019 08:11:34|      112|           0.78|0.7153870000000067| 0.06461299999999337|
|03/05/2019 08:16:34|      112|           0.87|0.7153870000000067| 0.15461299999999334|
|03/05/2019 08:21:34|      112|           0.82|0.7153870000000067|  0.1046129999999933|
|03/05/2019 08:26:34|      112|           0.62|0.7153870000000067|-0.09538700000000666|
|03/05/2019 08:31:34|      112|            0.9|0.7153870000000067| 0.18461299999999337|
|03/05/2019 08:36:34|      112|           0.89|0.7153870000000067| 0.17461299999999336|
|03/05/2019 08:41:34|      112|           0.81|0.7153870000000067|  0.0946129999999934|
|03/05/2019 08:46:34|      112| 

That is one of the operations that we can do with the windowing functions. We can compare a particular value in a row to a value of some aggregate function applied to a sub-set of rows.

Another operation that we can do with windowing functions is looking around the neighborhood of a row. For example, we might want to calculate in a sliding window, look at the last three values and average them or look at the previous value, current value, next value, and average them. Let's do it.

In [200]:
spark.sql("SELECT event_datetime, server_id, cpu_utilization, \
          avg(cpu_utilization) OVER (PARTITION BY server_id ORDER BY event_datetime \
                                      ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as avg_server_util \
                                      FROM utilization").show()

+-------------------+---------+---------------+------------------+
|     event_datetime|server_id|cpu_utilization|   avg_server_util|
+-------------------+---------+---------------+------------------+
|03/05/2019 08:06:34|      112|           0.71|             0.745|
|03/05/2019 08:11:34|      112|           0.78|0.7866666666666666|
|03/05/2019 08:16:34|      112|           0.87|0.8233333333333333|
|03/05/2019 08:21:34|      112|           0.82|              0.77|
|03/05/2019 08:26:34|      112|           0.62|0.7799999999999999|
|03/05/2019 08:31:34|      112|            0.9|0.8033333333333333|
|03/05/2019 08:36:34|      112|           0.89|0.8666666666666667|
|03/05/2019 08:41:34|      112|           0.81|              0.86|
|03/05/2019 08:46:34|      112|           0.88|              0.86|
|03/05/2019 08:51:34|      112|           0.89|              0.87|
|03/05/2019 08:56:34|      112|           0.84|0.8133333333333334|
|03/05/2019 09:01:34|      112|           0.71|0.7999999999999

#### Great Job!