In [31]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [32]:
# import package
from pyspark.sql import SparkSession

In [33]:
spark = SparkSession.builder.getOrCreate()

In [34]:
spark

In [35]:
data_path = "D:/DE Projects/Data-Engineering-Projects-Notes/Data Engineering Projects/Spark SQL Tutorials/Data"

file_path = data_path + "/location_temp.csv"

In [36]:
# Reading csv file using PySpark

df1 = spark.read.format("csv").option("header", "true").load(file_path)

In [37]:
df1.head(5)

[Row(event_date='03/04/2019 19:48:06', location_id='loc0', temp_celcius='29'),
 Row(event_date='03/04/2019 19:53:06', location_id='loc0', temp_celcius='27'),
 Row(event_date='03/04/2019 19:58:06', location_id='loc0', temp_celcius='28'),
 Row(event_date='03/04/2019 20:03:06', location_id='loc0', temp_celcius='30'),
 Row(event_date='03/04/2019 20:08:06', location_id='loc0', temp_celcius='27')]

In [38]:
df1.show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 2

In [39]:
# Get the number of rows in the dataframe
df1.count()

500000

In [40]:
# Without the header reading the csv
file_path_no_header = data_path + "/utilization.csv"
#df2 = spark.read.format("csv").option("header", "fales").option("inferSchema", "true").load(file_path_no_header)
df2 = spark.read.csv(file_path_no_header, 
  header=False, inferSchema=True)
df2.count()  

500000

In [42]:
# Renaming the column names in pyspark
df2 = df2.withColumnRenamed("_c0", "event_datetime") \
        .withColumnRenamed("_c1", "server_id") \
        .withColumnRenamed("_c2", "cpu_utilization") \
        .withColumnRenamed("_c3", "free_memory") \
        .withColumnRenamed("_c4", "session_count") 

In [43]:
df2.show()

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:14|      100|           0.57|       0.51|           47|
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|
|03/05/2019 08:16:14|      100|           0.56|       0.57|           62|
|03/05/2019 08:21:14|      100|           0.57|       0.56|           50|
|03/05/2019 08:26:14|      100|           0.35|       0.46|           43|
|03/05/2019 08:31:14|      100|           0.41|       0.58|           48|
|03/05/2019 08:36:14|      100|           0.57|       0.35|           58|
|03/05/2019 08:41:14|      100|           0.41|        0.4|           58|
|03/05/2019 08:46:14|      100|           0.53|       0.35|           62|
|03/05/2019 08:51:14|      100|           0.51|        0.6|           45|
|03/05/2019 08:56:14|      100|       

In [44]:
# Reading json file
json_path = data_path + "/utilization.json"
df_json = spark.read.format("json").load(json_path)
df_json.count()

500000

In [45]:
df_json.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
|           0.32|03/05/2019 08:56:14| 

In [46]:
# Print the columns
df_json.columns

['cpu_utilization',
 'event_datetime',
 'free_memory',
 'server_id',
 'session_count']

In [47]:
# Create a sample dataframe
df_json_sample  = df_json.sample(False,fraction=0.1)

In [48]:
df_json_sample.count()

50163

In [49]:
# sorting the dataframe by a column
df_json_sort = df_json.sort('event_datetime')

In [50]:
# Adding filter method
df1_fil = df1.filter(df1["location_id"] == "loc0").show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 2

In [51]:
# Get the count of filtered samples
df1_fil2 = df1.filter(df1["location_id"] == "loc0").count()

1000

In [53]:
# Group by count of locations
df1.groupBy("location_id").count().show()

+-----------+-----+
|location_id|count|
+-----------+-----+
|      loc22| 1000|
|      loc31| 1000|
|      loc82| 1000|
|      loc90| 1000|
|     loc118| 1000|
|      loc39| 1000|
|      loc75| 1000|
|     loc122| 1000|
|      loc24| 1000|
|      loc30| 1000|
|     loc105| 1000|
|      loc96| 1000|
|     loc102| 1000|
|      loc18| 1000|
|      loc27| 1000|
|     loc143| 1000|
|      loc43| 1000|
|     loc123| 1000|
|      loc15| 1000|
|      loc48| 1000|
+-----------+-----+
only showing top 20 rows



In [55]:
# order by
df1.orderBy("location_id").show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 21:23:06|       loc0|          28|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 21:18:06|       loc0|          33|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 2

In [56]:
# Get the mean of the temperature by locationwise
df1.groupBy("location_id").agg({'temp_celcius' : 'mean'}).show()

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|      loc22|           28.251|
|      loc31|           25.196|
|      loc82|           27.355|
|      loc90|           23.216|
|     loc118|           24.219|
|      loc39|           25.199|
|      loc75|           23.209|
|     loc122|            32.36|
|      loc24|           31.109|
|      loc30|           30.211|
|     loc105|           26.217|
|      loc96|           28.138|
|     loc102|           30.327|
|      loc18|           30.198|
|      loc27|           31.239|
|     loc143|           28.213|
|      loc43|           32.196|
|     loc123|           23.424|
|      loc15|           32.171|
|      loc48|           30.244|
+-----------+-----------------+
only showing top 20 rows



In [57]:
df1.groupBy("location_id").agg({'temp_celcius' : 'max'}).show()

+-----------+-----------------+
|location_id|max(temp_celcius)|
+-----------+-----------------+
|       loc0|               36|
|       loc1|               35|
|      loc10|               32|
|     loc100|               34|
|     loc101|               32|
|     loc102|               37|
|     loc103|               32|
|     loc104|               33|
|     loc105|               33|
|     loc106|               34|
|     loc107|               40|
|     loc108|               39|
|     loc109|               31|
|      loc11|               32|
|     loc110|               33|
|     loc111|               38|
|     loc112|               40|
|     loc113|               37|
|     loc114|               36|
|     loc115|               30|
+-----------+-----------------+
only showing top 20 rows



In [58]:
df1.count()

500000

In [59]:
df1_s1 = df1.sample(fraction=0.1, withReplacement=False)

In [60]:
df1_s1.count()

49756

In [61]:
df1_s1.groupBy("location_id").agg({'temp_celcius' : 'mean'}).show()

+-----------+------------------+
|location_id| avg(temp_celcius)|
+-----------+------------------+
|      loc22|28.216867469879517|
|      loc31|             25.51|
|      loc82| 27.27956989247312|
|      loc90| 23.22222222222222|
|     loc118|24.385964912280702|
|      loc39|24.971153846153847|
|      loc75|             23.54|
|     loc122| 32.28703703703704|
|      loc24|31.010416666666668|
|      loc30|             29.83|
|     loc105| 26.33695652173913|
|      loc96| 27.84536082474227|
|     loc102| 30.16842105263158|
|      loc18|              30.0|
|      loc27| 31.04385964912281|
|     loc143|28.155339805825243|
|      loc43|32.472527472527474|
|     loc123|23.242105263157896|
|      loc15| 32.12765957446808|
|      loc48| 30.11578947368421|
+-----------+------------------+
only showing top 20 rows



In [62]:
df1_s1.groupBy("location_id").agg({'temp_celcius' : 'mean'}).orderBy("location_id").show()

+-----------+------------------+
|location_id| avg(temp_celcius)|
+-----------+------------------+
|       loc0|29.568627450980394|
|       loc1|27.895833333333332|
|      loc10|25.129032258064516|
|     loc100| 27.30275229357798|
|     loc101|25.844660194174757|
|     loc102| 30.16842105263158|
|     loc103| 25.37142857142857|
|     loc104|26.285714285714285|
|     loc105| 26.33695652173913|
|     loc106|27.524271844660195|
|     loc107|             33.36|
|     loc108|31.946236559139784|
|     loc109|  24.0561797752809|
|      loc11| 25.10958904109589|
|     loc110|26.419047619047618|
|     loc111|31.205607476635514|
|     loc112|33.285714285714285|
|     loc113|30.353535353535353|
|     loc114|             29.25|
|     loc115|23.593023255813954|
+-----------+------------------+
only showing top 20 rows



In [63]:
df1.groupBy("location_id").agg({'temp_celcius' : 'mean'}).orderBy("location_id").show()

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|       loc0|           29.176|
|       loc1|           28.246|
|      loc10|           25.337|
|     loc100|           27.297|
|     loc101|           25.317|
|     loc102|           30.327|
|     loc103|           25.341|
|     loc104|           26.204|
|     loc105|           26.217|
|     loc106|           27.201|
|     loc107|           33.268|
|     loc108|           32.195|
|     loc109|           24.138|
|      loc11|           25.308|
|     loc110|           26.239|
|     loc111|           31.391|
|     loc112|           33.359|
|     loc113|           30.345|
|     loc114|           29.261|
|     loc115|           23.239|
+-----------+-----------------+
only showing top 20 rows

