<a href="https://colab.research.google.com/github/nandini-mazumdar/learning-spark-again/blob/main/Intro_to_Spark_Chap_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

#Spark SQL

##Reading Data

In [None]:
df = spark.read.json('/content/drive/MyDrive/Colab Notebooks/LinkedIn_Learining_Intro_to_Spark_Data/utilization.json')
df.show(10)

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
+---------------+-------------------+-

In [None]:
df.count()

500000

In [None]:
df.createOrReplaceTempView("utilization")

In [None]:
df_sql = spark.sql("Select * From utilization Limit 20")

In [None]:
df_sql.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
|           0.32|03/05/2019 08:56:14| 

In [None]:
spark.sql("Select server_id, session_count From utilization Limit 10").show()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      100|           47|
|      100|           43|
|      100|           62|
|      100|           50|
|      100|           43|
|      100|           48|
|      100|           58|
|      100|           58|
|      100|           62|
|      100|           45|
+---------+-------------+



In [None]:
df_sql_2 = spark.sql("Select server_id as s_id, session_count as s_count From Utilization")

In [None]:
df_sql_2.count()

500000

In [None]:
df_sql_2.show(10)

+----+-------+
|s_id|s_count|
+----+-------+
| 100|     47|
| 100|     43|
| 100|     62|
| 100|     50|
| 100|     43|
| 100|     48|
| 100|     58|
| 100|     58|
| 100|     62|
| 100|     45|
+----+-------+
only showing top 10 rows



##Filtering

In [None]:
df_sql_3 = spark.sql("Select * From utilization Where server_id = 120")
df_sql_3.show(10)

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.66|03/05/2019 08:06:48|       0.31|      120|           54|
|           0.58|03/05/2019 08:11:48|       0.38|      120|           64|
|           0.55|03/05/2019 08:16:48|       0.61|      120|           54|
|            0.7|03/05/2019 08:21:48|       0.35|      120|           80|
|            0.6|03/05/2019 08:26:48|       0.39|      120|           71|
|           0.53|03/05/2019 08:31:48|       0.35|      120|           49|
|           0.73|03/05/2019 08:36:48|       0.42|      120|           73|
|           0.41|03/05/2019 08:41:48|        0.6|      120|           72|
|           0.62|03/05/2019 08:46:48|       0.57|      120|           57|
|           0.67|03/05/2019 08:51:48|       0.44|      120|           78|
+---------------+-------------------+-

In [None]:
df_sql_3.count()

10000

In [None]:
df_sql_4 = spark.sql("Select server_id, session_count From utilization Where session_count >= 70")
df_sql_4.show(10)
df_sql_4.count()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      100|           70|
|      100|           71|
|      100|           70|
|      100|           71|
|      100|           71|
|      100|           70|
|      100|           70|
|      100|           71|
|      100|           72|
|      100|           72|
+---------+-------------+
only showing top 10 rows



252968

In [None]:
df_sql_5 = spark.sql("Select server_id, session_count From utilization \
                      Where session_count >= 70 \
                      And server_id = 120 \
                      Order by session_count Desc")
df_sql_5.show(10)
df_sql_5.count()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
+---------+-------------+
only showing top 10 rows



3012

##Aggregation

In [None]:
df_count = spark.sql("Select Count(*) From utilization")
df_count.show()

+--------+
|count(1)|
+--------+
|  500000|
+--------+



In [None]:
df_sql = spark.sql("Select server_id, Count(*) as sc \
                    From utilization \
                    Where session_count > 70 \
                    group by server_id \
                    Order by sc Desc")
df_sql.show()

+---------+----+
|server_id|  sc|
+---------+----+
|      101|9808|
|      113|9418|
|      145|9304|
|      103|8744|
|      102|8586|
|      133|8583|
|      108|8375|
|      149|8288|
|      137|8248|
|      148|8027|
|      123|7918|
|      118|7913|
|      112|7425|
|      139|7383|
|      104|7366|
|      121|7084|
|      142|7084|
|      146|7072|
|      126|6365|
|      144|6220|
+---------+----+
only showing top 20 rows



In [None]:
df_sql_2 = spark.sql("Select server_id, count(*) as total_sc, \
                    min(session_count) as min_sc, \
                    round(avg(session_count),2) as avg_sc, \
                    max(session_count) as max_sc \
                    From utilization \
                    Where session_count > 70 \
                    group by server_id \
                    Order by count(*) Desc")
df_sql_2.show()

+---------+--------+------+------+------+
|server_id|total_sc|min_sc|avg_sc|max_sc|
+---------+--------+------+------+------+
|      101|    9808|    71| 87.67|   105|
|      113|    9418|    71| 86.96|   103|
|      145|    9304|    71| 86.98|   103|
|      103|    8744|    71| 85.76|   101|
|      102|    8586|    71| 85.71|   101|
|      133|    8583|    71| 85.47|   100|
|      108|    8375|    71| 85.12|   100|
|      149|    8288|    71| 84.96|    99|
|      137|    8248|    71| 85.01|    99|
|      148|    8027|    71|  84.7|    99|
|      123|    7918|    71| 84.53|    98|
|      118|    7913|    71| 84.66|    98|
|      112|    7425|    71| 83.55|    97|
|      139|    7383|    71| 83.33|    96|
|      104|    7366|    71| 83.35|    96|
|      121|    7084|    71| 82.89|    95|
|      142|    7084|    71|  82.9|    95|
|      146|    7072|    71| 82.95|    95|
|      126|    6365|    71| 81.56|    93|
|      144|    6220|    71| 81.38|    92|
+---------+--------+------+------+

##Joining Tables

In [None]:
df_util = spark.sql("Select Count(*) From utilization")
df_util.show()

+--------+
|count(1)|
+--------+
|  500000|
+--------+



In [None]:
df_server = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/LinkedIn_Learining_Intro_to_Spark_Data/server_name.csv', header=True, inferSchema=True)
df_server.count()

50

In [None]:
df_server.createOrReplaceTempView('server')

In [None]:
df_serv = spark.sql('Select count(*) From server')
df_serv.show()

+--------+
|count(1)|
+--------+
|      50|
+--------+



In [None]:
spark.sql('Select distinct(server_id) From utilization Order by server_id').show()

+---------+
|server_id|
+---------+
|      100|
|      101|
|      102|
|      103|
|      104|
|      105|
|      106|
|      107|
|      108|
|      109|
|      110|
|      111|
|      112|
|      113|
|      114|
|      115|
|      116|
|      117|
|      118|
|      119|
+---------+
only showing top 20 rows



In [None]:
spark.sql('Select min(server_id), max(server_id) From utilization').show()

+--------------+--------------+
|min(server_id)|max(server_id)|
+--------------+--------------+
|           100|           149|
+--------------+--------------+



In [None]:
df_join = spark.sql('Select u.server_id, s.server_name, u.session_count\
                      From utilization u\
                      Inner Join server s\
                      On s.server_id = u.server_id')
df_join.show()

+---------+-----------+-------------+
|server_id|server_name|session_count|
+---------+-----------+-------------+
|      100| 100 Server|           47|
|      100| 100 Server|           43|
|      100| 100 Server|           62|
|      100| 100 Server|           50|
|      100| 100 Server|           43|
|      100| 100 Server|           48|
|      100| 100 Server|           58|
|      100| 100 Server|           58|
|      100| 100 Server|           62|
|      100| 100 Server|           45|
|      100| 100 Server|           47|
|      100| 100 Server|           60|
|      100| 100 Server|           57|
|      100| 100 Server|           44|
|      100| 100 Server|           47|
|      100| 100 Server|           66|
|      100| 100 Server|           65|
|      100| 100 Server|           66|
|      100| 100 Server|           42|
|      100| 100 Server|           63|
+---------+-----------+-------------+
only showing top 20 rows



##De-duplicate data

In [None]:
from pyspark.sql import Row

In [None]:
sc = spark.sparkContext

In [None]:
df_dup = sc.parallelize([Row(server_name='101 server',cpu_utilization=85,session_count=80),\
                         Row(server_name='101 server',cpu_utilization=80,session_count=90),\
                         Row(server_name='102 server',cpu_utilization=85,session_count=80),\
                         Row(server_name='102 server',cpu_utilization=85,session_count=80)]).toDF()
df_dup.show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| 101 server|             85|           80|
| 101 server|             80|           90|
| 102 server|             85|           80|
| 102 server|             85|           80|
+-----------+---------------+-------------+



In [None]:
df_dup.dropDuplicates().show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| 101 server|             85|           80|
| 102 server|             85|           80|
| 101 server|             80|           90|
+-----------+---------------+-------------+



In [None]:
df_dup.dropDuplicates(['server_name','session_count']).show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| 101 server|             80|           90|
| 102 server|             85|           80|
| 101 server|             85|           80|
+-----------+---------------+-------------+



##Missing NA values

In [None]:
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType

In [None]:
df_miss = sc.parallelize([Row(server_name='101 server',cpu_utilization=85,session_count=80),\
                          Row(server_name='101 server',cpu_utilization=80,session_count=90),\
                          Row(server_name='102 server',cpu_utilization=85,session_count=40),\
                          Row(server_name='103 server',cpu_utilization=70,session_count=80),\
                          Row(server_name='104 server',cpu_utilization=60,session_count=80)]).toDF()
df_miss.show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| 101 server|             85|           80|
| 101 server|             80|           90|
| 102 server|             85|           40|
| 103 server|             70|           80|
| 104 server|             60|           80|
+-----------+---------------+-------------+



In [None]:
df_na = df_miss.withColumn('na_col', lit(None).cast(StringType()))
df_na.show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 server|             85|           80|  null|
| 101 server|             80|           90|  null|
| 102 server|             85|           40|  null|
| 103 server|             70|           80|  null|
| 104 server|             60|           80|  null|
+-----------+---------------+-------------+------+



###1. Dataframe API

In [None]:
# replace Nulls

df_na.fillna('A').show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 server|             85|           80|     A|
| 101 server|             80|           90|     A|
| 102 server|             85|           40|     A|
| 103 server|             70|           80|     A|
| 104 server|             60|           80|     A|
+-----------+---------------+-------------+------+



In [None]:
df_2 = df_na.fillna('A').union(df_na)

In [None]:
df_2.show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 server|             85|           80|     A|
| 101 server|             80|           90|     A|
| 102 server|             85|           40|     A|
| 103 server|             70|           80|     A|
| 104 server|             60|           80|     A|
| 101 server|             85|           80|  null|
| 101 server|             80|           90|  null|
| 102 server|             85|           40|  null|
| 103 server|             70|           80|  null|
| 104 server|             60|           80|  null|
+-----------+---------------+-------------+------+



In [None]:
df_2.na.drop().show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 server|             85|           80|     A|
| 101 server|             80|           90|     A|
| 102 server|             85|           40|     A|
| 103 server|             70|           80|     A|
| 104 server|             60|           80|     A|
+-----------+---------------+-------------+------+



###2. Spark SQL

In [None]:
df_2.createOrReplaceTempView('na_table')

In [None]:
spark.sql("Select * From na_table").show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 server|             85|           80|     A|
| 101 server|             80|           90|     A|
| 102 server|             85|           40|     A|
| 103 server|             70|           80|     A|
| 104 server|             60|           80|     A|
| 101 server|             85|           80|  null|
| 101 server|             80|           90|  null|
| 102 server|             85|           40|  null|
| 103 server|             70|           80|  null|
| 104 server|             60|           80|  null|
+-----------+---------------+-------------+------+



In [None]:
spark.sql("Select * From na_table\
            Where na_col IS NULL").show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 server|             85|           80|  null|
| 101 server|             80|           90|  null|
| 102 server|             85|           40|  null|
| 103 server|             70|           80|  null|
| 104 server|             60|           80|  null|
+-----------+---------------+-------------+------+



In [None]:
spark.sql("Select * From na_table\
            Where na_col IS NOT NULL").show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 server|             85|           80|     A|
| 101 server|             80|           90|     A|
| 102 server|             85|           40|     A|
| 103 server|             70|           80|     A|
| 104 server|             60|           80|     A|
+-----------+---------------+-------------+------+

