## SPARK SQL

### Create SparkSession object

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
from time import time

spark = SparkSession \
    .builder \
    .appName("Data Source API using PySpark Demo") \
    .getOrCreate()

In [44]:
df = spark.read.json("../Data/utilization.json")
df.show(10)

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
+---------------+-------------------+-

To use `df` in SQL, you have to create a temporary table

In [9]:
df.createOrReplaceTempView('utilization')

In [13]:
df_sql = spark.sql("""SELECT * FROM utilization LIMIT 10""")
df_sql.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
+---------------+-------------------+-

In [14]:
df_sql.count()

10

In [15]:
spark.sql("""SELECT server_id, session_count FROM utilization LIMIT 10""").show()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      100|           47|
|      100|           43|
|      100|           62|
|      100|           50|
|      100|           43|
|      100|           48|
|      100|           58|
|      100|           58|
|      100|           62|
|      100|           45|
+---------+-------------+



#### Filtering in SQL

In [17]:
spark.sql("""SELECT * FROM utilization WHERE server_id = 120""").show(5)

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.66|03/05/2019 08:06:48|       0.31|      120|           54|
|           0.58|03/05/2019 08:11:48|       0.38|      120|           64|
|           0.55|03/05/2019 08:16:48|       0.61|      120|           54|
|            0.7|03/05/2019 08:21:48|       0.35|      120|           80|
|            0.6|03/05/2019 08:26:48|       0.39|      120|           71|
+---------------+-------------------+-----------+---------+-------------+
only showing top 5 rows



In [20]:
spark.sql("""SELECT server_id, session_count FROM utilization WHERE session_count > 70""").show(10)

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      100|           71|
|      100|           71|
|      100|           71|
|      100|           71|
|      100|           72|
|      100|           72|
|      100|           71|
|      100|           71|
|      100|           71|
|      100|           72|
+---------+-------------+
only showing top 10 rows



In [22]:
spark.sql("""SELECT server_id, session_count FROM utilization WHERE server_id = 120 AND session_count > 70""").show(10)

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      120|           80|
|      120|           71|
|      120|           73|
|      120|           72|
|      120|           78|
|      120|           73|
|      120|           78|
|      120|           73|
|      120|           74|
|      120|           78|
+---------+-------------+
only showing top 10 rows



In [27]:
spark.sql("""SELECT server_id, session_count FROM utilization \
            WHERE server_id = 120 AND session_count > 70 \
            ORDER BY session_count DESC""").show(10)

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
+---------+-------------+
only showing top 10 rows



### Aggregations

In [29]:
spark.sql("""SELECT COUNT(*) AS total_count FROM utilization""").show()

+-----------+
|total_count|
+-----------+
|     500000|
+-----------+



In [30]:
spark.sql("""SELECT COUNT(*) AS total_count FROM utilization WHERE session_count > 70""").show()

+-----------+
|total_count|
+-----------+
|     239659|
+-----------+



In [32]:
spark.sql("""SELECT server_id, COUNT(*) AS total_count FROM utilization GROUP BY server_id""").show(10)

+---------+-----------+
|server_id|total_count|
+---------+-----------+
|      112|      10000|
|      113|      10000|
|      130|      10000|
|      126|      10000|
|      149|      10000|
|      110|      10000|
|      136|      10000|
|      144|      10000|
|      119|      10000|
|      116|      10000|
+---------+-----------+
only showing top 10 rows



In [34]:
spark.sql("""SELECT server_id, COUNT(*) AS total_count FROM utilization \
                GROUP BY server_id \
                ORDER BY server_id DESC""").show(10)

+---------+-----------+
|server_id|total_count|
+---------+-----------+
|      149|      10000|
|      148|      10000|
|      147|      10000|
|      146|      10000|
|      145|      10000|
|      144|      10000|
|      143|      10000|
|      142|      10000|
|      141|      10000|
|      140|      10000|
+---------+-----------+
only showing top 10 rows



In [42]:
spark.sql("SELECT server_id, MIN(session_count), MAX(session_count), round(AVG(session_count), 2)  AS avg_session_count\
            FROM utilization GROUP BY server_id \
            ORDER BY server_id").show(10)

+---------+------------------+------------------+-----------------+
|server_id|min(session_count)|max(session_count)|avg_session_count|
+---------+------------------+------------------+-----------------+
|      100|                37|                72|            54.35|
|      101|                70|               105|            87.33|
|      102|                66|               101|            83.22|
|      103|                66|               101|            83.58|
|      104|                61|                96|            78.75|
|      105|                39|                74|            56.88|
|      106|                32|                67|            49.97|
|      107|                55|                90|             72.7|
|      108|                65|               100|            82.26|
|      109|                46|                81|            63.96|
+---------+------------------+------------------+-----------------+
only showing top 10 rows



### Joining data in SQL

In [83]:
df2 = spark.read.csv("../Data/server_name.csv", header=True, inferSchema=True)
df2.show(10)

+---------+-----------+
|server_id|server_name|
+---------+-----------+
|      100| 100 Server|
|      101| 101 Server|
|      102| 102 Server|
|      103| 103 Server|
|      104| 104 Server|
|      105| 105 Server|
|      106| 106 Server|
|      107| 107 Server|
|      108| 108 Server|
|      109| 109 Server|
+---------+-----------+
only showing top 10 rows



In [84]:
df2.createOrReplaceTempView("server_name")

In [86]:
spark.sql("""SELECT u.server_id, s.server_name, u.session_count
            FROM utilization AS u
            INNER JOIN server_name AS s
            ON u.server_id=s.server_id""").show(10)

+---------+-----------+-------------+
|server_id|server_name|session_count|
+---------+-----------+-------------+
|      100| 100 Server|           47|
|      100| 100 Server|           43|
|      100| 100 Server|           62|
|      100| 100 Server|           50|
|      100| 100 Server|           43|
|      100| 100 Server|           48|
|      100| 100 Server|           58|
|      100| 100 Server|           58|
|      100| 100 Server|           62|
|      100| 100 Server|           45|
+---------+-----------+-------------+
only showing top 10 rows

