In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
        .builder \
        .appName("Spark SQL Query DataFrames") \
        .getOrCreate()

In [3]:
# set data_path
data_path = 'C:/Users/tc18f/Desktop/springboard/Spark/Exercise Files/data'

In [4]:
json_file_path = data_path + '/utilization.json'
df = spark.read.format('json').load(json_file_path)

Py4JJavaError: An error occurred while calling o27.load.
: java.lang.OutOfMemoryError: GC overhead limit exceeded


In [8]:
# since json couldn't be read, let's read the updated csv file instead
csv_file_path = data_path + '/utilization_header_true.csv'
df = spark.read.format('csv').option('header','true').load(csv_file_path)
df.show(10)

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:14|      100|           0.57|       0.51|           47|
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|
|03/05/2019 08:16:14|      100|           0.56|       0.57|           62|
|03/05/2019 08:21:14|      100|           0.57|       0.56|           50|
|03/05/2019 08:26:14|      100|           0.35|       0.46|           43|
|03/05/2019 08:31:14|      100|           0.41|       0.58|           48|
|03/05/2019 08:36:14|      100|           0.57|       0.35|           58|
|03/05/2019 08:41:14|      100|           0.41|        0.4|           58|
|03/05/2019 08:46:14|      100|           0.53|       0.35|           62|
|03/05/2019 08:51:14|      100|           0.51|        0.6|           45|
+-------------------+---------+-------

In [9]:
# create a temp view 'utilization'
df.createOrReplaceTempView('utilization')

In [11]:
# use sql query to select view
df_sql = spark.sql('SELECT * FROM utilization LIMIT 10')

In [12]:
# use show() to see
df_sql.show()

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:14|      100|           0.57|       0.51|           47|
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|
|03/05/2019 08:16:14|      100|           0.56|       0.57|           62|
|03/05/2019 08:21:14|      100|           0.57|       0.56|           50|
|03/05/2019 08:26:14|      100|           0.35|       0.46|           43|
|03/05/2019 08:31:14|      100|           0.41|       0.58|           48|
|03/05/2019 08:36:14|      100|           0.57|       0.35|           58|
|03/05/2019 08:41:14|      100|           0.41|        0.4|           58|
|03/05/2019 08:46:14|      100|           0.53|       0.35|           62|
|03/05/2019 08:51:14|      100|           0.51|        0.6|           45|
+-------------------+---------+-------

In [14]:
# check and see if the limit 10 works
df_sql.count()

10

In [21]:
# choose only server id and session count columns viewing only first 10 of them
df_sql = spark.sql('SELECT server_id, session_count FROM utilization LIMIT 10')
df_sql.show()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      100|           47|
|      100|           43|
|      100|           62|
|      100|           50|
|      100|           43|
|      100|           48|
|      100|           58|
|      100|           58|
|      100|           62|
|      100|           45|
+---------+-------------+



In [22]:
# query with new named columns
df_sql = spark.sql('SELECT server_id AS sid, session_count AS sc FROM utilization')
df_sql.show(10)

+---+---+
|sid| sc|
+---+---+
|100| 47|
|100| 43|
|100| 62|
|100| 50|
|100| 43|
|100| 48|
|100| 58|
|100| 58|
|100| 62|
|100| 45|
+---+---+
only showing top 10 rows



In [23]:
# query with WHERE to filter the selection
df_sql = spark.sql('SELECT * FROM utilization WHERE server_id = 120')
df_sql.show(10)

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:48|      120|           0.66|       0.31|           54|
|03/05/2019 08:11:48|      120|           0.58|       0.38|           64|
|03/05/2019 08:16:48|      120|           0.55|       0.61|           54|
|03/05/2019 08:21:48|      120|            0.7|       0.35|           80|
|03/05/2019 08:26:48|      120|            0.6|       0.39|           71|
|03/05/2019 08:31:48|      120|           0.53|       0.35|           49|
|03/05/2019 08:36:48|      120|           0.73|       0.42|           73|
|03/05/2019 08:41:48|      120|           0.41|        0.6|           72|
|03/05/2019 08:46:48|      120|           0.62|       0.57|           57|
|03/05/2019 08:51:48|      120|           0.67|       0.44|           78|
+-------------------+---------+-------

In [24]:
# query with WHERE and AND
df_sql = spark.sql('SELECT * FROM utilization WHERE server_id=120 AND session_count>70')
df_sql.show(10)

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:21:48|      120|            0.7|       0.35|           80|
|03/05/2019 08:26:48|      120|            0.6|       0.39|           71|
|03/05/2019 08:36:48|      120|           0.73|       0.42|           73|
|03/05/2019 08:41:48|      120|           0.41|        0.6|           72|
|03/05/2019 08:51:48|      120|           0.67|       0.44|           78|
|03/05/2019 08:56:48|      120|           0.67|       0.38|           73|
|03/05/2019 09:06:48|      120|            0.5|       0.29|           78|
|03/05/2019 09:26:48|      120|           0.53|       0.57|           73|
|03/05/2019 09:41:48|      120|           0.54|       0.27|           74|
|03/05/2019 10:06:48|      120|           0.63|       0.47|           78|
+-------------------+---------+-------

In [25]:
# query using ORDER BY and DESC
df_sql = spark.sql('SELECT * \
                   FROM utilization \
                   WHERE server_id=120 AND session_count>70 \
                   ORDER BY session_count DESC')
df_sql.show(10)

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 23:26:48|      120|           0.72|       0.64|           80|
|03/06/2019 11:31:48|      120|           0.61|       0.25|           80|
|03/06/2019 04:21:48|      120|           0.46|       0.55|           80|
|03/06/2019 10:56:48|      120|           0.65|       0.45|           80|
|03/06/2019 08:26:48|      120|            0.6|       0.39|           80|
|03/05/2019 19:16:48|      120|           0.58|       0.56|           80|
|03/06/2019 08:51:48|      120|           0.65|       0.31|           80|
|03/05/2019 08:21:48|      120|            0.7|       0.35|           80|
|03/06/2019 10:01:48|      120|           0.44|       0.41|           80|
|03/05/2019 19:01:48|      120|           0.54|       0.55|           80|
+-------------------+---------+-------

In [27]:
# query using GROPUBY BY
df_sql = spark.sql('SELECT server_id, count(*) \
                   FROM utilization \
                   WHERE session_count>70 \
                   GROUP BY server_id \
                   ORDER BY count(*) DESC')
df_sql.show(10)

+---------+--------+
|server_id|count(1)|
+---------+--------+
|      101|    9808|
|      113|    9418|
|      145|    9304|
|      103|    8744|
|      102|    8586|
|      133|    8583|
|      108|    8375|
|      149|    8288|
|      137|    8248|
|      148|    8027|
+---------+--------+
only showing top 10 rows



In [30]:
# more selections with agg
df_sql = spark.sql('SELECT server_id, min(session_count), round(avg(session_count),2) AS avg_session_count, max(session_count) \
                   FROM utilization \
                   WHERE session_count>70 \
                   GROUP BY server_id \
                   ORDER BY count(*) DESC')
df_sql.show(10)

+---------+------------------+-----------------+------------------+
|server_id|min(session_count)|avg_session_count|max(session_count)|
+---------+------------------+-----------------+------------------+
|      101|               100|            87.67|                99|
|      113|               100|            86.96|                99|
|      145|               100|            86.98|                99|
|      103|               100|            85.76|                99|
|      102|               100|            85.71|                99|
|      133|               100|            85.47|                99|
|      108|               100|            85.12|                99|
|      149|                71|            84.96|                99|
|      137|                71|            85.01|                99|
|      148|                71|             84.7|                99|
+---------+------------------+-----------------+------------------+
only showing top 10 rows



In [31]:
# create another table so we can join them
df_util = df

In [32]:
# create a temp view 'utilization'
df_util.createOrReplaceTempView('utilization')

In [33]:
# load the server name csv
df_server = spark.read.csv(data_path + '/server_name.csv', header=True)

In [34]:
df_server.show(10)

+---------+-----------+
|server_id|server_name|
+---------+-----------+
|      100| 100 Server|
|      101| 101 Server|
|      102| 102 Server|
|      103| 103 Server|
|      104| 104 Server|
|      105| 105 Server|
|      106| 106 Server|
|      107| 107 Server|
|      108| 108 Server|
|      109| 109 Server|
+---------+-----------+
only showing top 10 rows



In [35]:
df_server.createOrReplaceTempView('server_name')

In [37]:
df_count = spark.sql("SELECT DISTINCT server_id FROM utilization ORDER BY server_id")
df_count.show(10)

+---------+
|server_id|
+---------+
|      100|
|      101|
|      102|
|      103|
|      104|
|      105|
|      106|
|      107|
|      108|
|      109|
+---------+
only showing top 10 rows



In [39]:
spark.sql("SELECT min(server_id), max(server_id) FROM utilization").show()

+--------------+--------------+
|min(server_id)|max(server_id)|
+--------------+--------------+
|           100|           149|
+--------------+--------------+



In [41]:
# joining the two tables
df_join = spark.sql("SELECT u.server_id, sn.server_name, u.session_count \
                    FROM utilization u \
                    INNER JOIN server_name sn \
                    ON sn.server_id = u.server_id")
df_join.show(10)

+---------+-----------+-------------+
|server_id|server_name|session_count|
+---------+-----------+-------------+
|      100| 100 Server|           47|
|      100| 100 Server|           43|
|      100| 100 Server|           62|
|      100| 100 Server|           50|
|      100| 100 Server|           43|
|      100| 100 Server|           48|
|      100| 100 Server|           58|
|      100| 100 Server|           58|
|      100| 100 Server|           62|
|      100| 100 Server|           45|
+---------+-----------+-------------+
only showing top 10 rows

