In [1]:
pip install -q findspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
import findspark
findspark.init('/home/bigdata/Documents/spark-3.0.0')

In [3]:
#Library

import pandas as pd
import numpy as np
from pyspark.sql.session import SparkSession
import matplotlib.pyplot as plt
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import*
from math import sqrt
import os
from pyspark.sql.types import IntegerType
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col, column, expr, when, lit
from pyspark.sql.functions import mean, min, max, sum, round, count, datediff, to_date
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import FloatType
from pyspark.sql import functions as F 
from pyspark.sql.functions import *

In [4]:
spark = SparkSession.builder.appName('DLG').getOrCreate()

In [5]:
data = spark.read.format("csv").option("header","True").load("/home/bigdata/Data Engineer Test/weather.20160301.csv")

In [6]:
data.show()

+----------------+---------------+-------------------+-------------+---------+--------+----------+-----------------+--------+----------------------+--------------------+--------+---------+--------------------+--------+
|ForecastSiteCode|ObservationTime|    ObservationDate|WindDirection|WindSpeed|WindGust|Visibility|ScreenTemperature|Pressure|SignificantWeatherCode|            SiteName|Latitude|Longitude|              Region| Country|
+----------------+---------------+-------------------+-------------+---------+--------+----------+-----------------+--------+----------------------+--------------------+--------+---------+--------------------+--------+
|            3002|              0|2016-03-01T00:00:00|            8|       23|      30|     16000|           -99.00|    null|                     8|   BALTASOUND (3002)| 60.7490|  -0.8540|   Orkney & Shetland|SCOTLAND|
|            3005|              0|2016-03-01T00:00:00|            8|       26|      34|      5000|             4.90|    1004

In [7]:
#read second weather dataset
data2 = spark.read.format("csv").option("header","True").load("/home/bigdata/Data Engineer Test/weather.20160201.csv")

In [8]:
data2.show()

+----------------+---------------+-------------------+-------------+---------+--------+----------+-----------------+--------+----------------------+--------------------+--------+---------+--------------------+--------+
|ForecastSiteCode|ObservationTime|    ObservationDate|WindDirection|WindSpeed|WindGust|Visibility|ScreenTemperature|Pressure|SignificantWeatherCode|            SiteName|Latitude|Longitude|              Region| Country|
+----------------+---------------+-------------------+-------------+---------+--------+----------+-----------------+--------+----------------------+--------------------+--------+---------+--------------------+--------+
|            3002|              0|2016-02-01T00:00:00|           12|        8|    null|     30000|             2.10|     997|                     8|   BALTASOUND (3002)| 60.7490|  -0.8540|   Orkney & Shetland|SCOTLAND|
|            3005|              0|2016-02-01T00:00:00|           10|        2|    null|     35000|             0.10|     997

In [9]:
data.printSchema()

root
 |-- ForecastSiteCode: string (nullable = true)
 |-- ObservationTime: string (nullable = true)
 |-- ObservationDate: string (nullable = true)
 |-- WindDirection: string (nullable = true)
 |-- WindSpeed: string (nullable = true)
 |-- WindGust: string (nullable = true)
 |-- Visibility: string (nullable = true)
 |-- ScreenTemperature: string (nullable = true)
 |-- Pressure: string (nullable = true)
 |-- SignificantWeatherCode: string (nullable = true)
 |-- SiteName: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)



In [10]:
data.count()

101442

In [11]:
#show schema of the dataset
data2.printSchema()

root
 |-- ForecastSiteCode: string (nullable = true)
 |-- ObservationTime: string (nullable = true)
 |-- ObservationDate: string (nullable = true)
 |-- WindDirection: string (nullable = true)
 |-- WindSpeed: string (nullable = true)
 |-- WindGust: string (nullable = true)
 |-- Visibility: string (nullable = true)
 |-- ScreenTemperature: string (nullable = true)
 |-- Pressure: string (nullable = true)
 |-- SignificantWeatherCode: string (nullable = true)
 |-- SiteName: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)



In [12]:
#count
data2.count()

93255

In [13]:
# merging two wether data together with no effect on all dataset
data_df = data.unionAll(data2)

In [14]:
#count
data_df.count()

194697

In [15]:
#convert and save merged csv file to parquet file format
data_df.write.parquet("/home/bigdata/parquet/dataset.parquet")

In [16]:
#read dataset.parquet file
df =spark.read.format("parquet").option("header","True").load("/home/bigdata/parquet/dataset.parquet")

In [17]:
df.show()

+----------------+---------------+-------------------+-------------+---------+--------+----------+-----------------+--------+----------------------+--------------------+--------+---------+--------------------+--------+
|ForecastSiteCode|ObservationTime|    ObservationDate|WindDirection|WindSpeed|WindGust|Visibility|ScreenTemperature|Pressure|SignificantWeatherCode|            SiteName|Latitude|Longitude|              Region| Country|
+----------------+---------------+-------------------+-------------+---------+--------+----------+-----------------+--------+----------------------+--------------------+--------+---------+--------------------+--------+
|            3002|              0|2016-02-01T00:00:00|           12|        8|    null|     30000|             2.10|     997|                     8|   BALTASOUND (3002)| 60.7490|  -0.8540|   Orkney & Shetland|SCOTLAND|
|            3005|              0|2016-02-01T00:00:00|           10|        2|    null|     35000|             0.10|     997

In [18]:
#print schema
df.printSchema()

root
 |-- ForecastSiteCode: string (nullable = true)
 |-- ObservationTime: string (nullable = true)
 |-- ObservationDate: string (nullable = true)
 |-- WindDirection: string (nullable = true)
 |-- WindSpeed: string (nullable = true)
 |-- WindGust: string (nullable = true)
 |-- Visibility: string (nullable = true)
 |-- ScreenTemperature: string (nullable = true)
 |-- Pressure: string (nullable = true)
 |-- SignificantWeatherCode: string (nullable = true)
 |-- SiteName: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)



In [19]:
#converted ObservationTime from string to Float
df= df.withColumn("ObservationTime", df["ObservationTime"].cast("timestamp"))

In [20]:
from pyspark.sql.functions import unix_timestamp, from_unixtime
import numpy as np
import datetime
from pyspark.sql.functions import year, month, dayofmonth

In [21]:
#removing T from ObservationDate
df = df.withColumn('ObservationDate', regexp_replace(col('ObservationDate'), "T", " "))

In [22]:
df =df.withColumn("ObservationDate", df["ObservationDate"].cast("timestamp"))

In [23]:
#day (yyyy-mm-dd HH:mm:ss)
df.select("ObservationDate").show()

+-------------------+
|    ObservationDate|
+-------------------+
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
|2016-02-01 00:00:00|
+-------------------+
only showing top 20 rows



In [24]:
#WindDirection converted from string to float
df= df.withColumn("WindDirection", df["WindDirection"].cast(FloatType()))

In [25]:
#WindSpeed converted from string to integer
df= df.withColumn("WindSpeed", df["WindSpeed"].cast(IntegerType()))

In [26]:
#visibilty converted from from to integer
df= df.withColumn("Visibility", df["Visibility"].cast(IntegerType()))

In [27]:
#ScreenTemperature converted from string to float
df= df.withColumn("ScreenTemperature", df["ScreenTemperature"].cast(FloatType()))

In [28]:
#Pressure converted from string to integer
df= df.withColumn("Pressure", df["Pressure"].cast(IntegerType()))

In [29]:
#Latitude converted from string to Float
df= df.withColumn("Latitude", df["Latitude"].cast(FloatType()))

In [30]:
#Longitude converted from string to Float
df= df.withColumn("Longitude", df["Longitude"].cast(FloatType()))

In [31]:
#print schema after converting datatype
df.printSchema()

root
 |-- ForecastSiteCode: string (nullable = true)
 |-- ObservationTime: timestamp (nullable = true)
 |-- ObservationDate: timestamp (nullable = true)
 |-- WindDirection: float (nullable = true)
 |-- WindSpeed: integer (nullable = true)
 |-- WindGust: string (nullable = true)
 |-- Visibility: integer (nullable = true)
 |-- ScreenTemperature: float (nullable = true)
 |-- Pressure: integer (nullable = true)
 |-- SignificantWeatherCode: string (nullable = true)
 |-- SiteName: string (nullable = true)
 |-- Latitude: float (nullable = true)
 |-- Longitude: float (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)



In [32]:
#fill missing value integer with zero --no effect on result
df =df.na.fill(0)

In [33]:
#show df dataset
df.show()

+----------------+---------------+-------------------+-------------+---------+--------+----------+-----------------+--------+----------------------+--------------------+--------+---------+--------------------+--------+
|ForecastSiteCode|ObservationTime|    ObservationDate|WindDirection|WindSpeed|WindGust|Visibility|ScreenTemperature|Pressure|SignificantWeatherCode|            SiteName|Latitude|Longitude|              Region| Country|
+----------------+---------------+-------------------+-------------+---------+--------+----------+-----------------+--------+----------------------+--------------------+--------+---------+--------------------+--------+
|            3002|           null|2016-02-01 00:00:00|         12.0|        8|    null|     30000|              2.1|     997|                     8|   BALTASOUND (3002)|  60.749|   -0.854|   Orkney & Shetland|SCOTLAND|
|            3005|           null|2016-02-01 00:00:00|         10.0|        2|    null|     35000|              0.1|     997

In [34]:
#create a temporary view using DataFrame
df.createOrReplaceTempView("weather")

In [35]:
#order by Descending ScreenTemperature to observe hottest day and region
maxtemp_df =spark.sql("select Region , ObservationDate, ScreenTemperature from weather order by ScreenTemperature DESC")

In [36]:
#show hottest Region, Day and Temperature
maxtemp_df.show()

+--------------------+-------------------+-----------------+
|              Region|    ObservationDate|ScreenTemperature|
+--------------------+-------------------+-----------------+
|Highland & Eilean...|2016-03-17 00:00:00|             15.8|
|  South West England|2016-02-21 00:00:00|             15.6|
|Highland & Eilean...|2016-03-16 00:00:00|             15.5|
|       West Midlands|2016-03-01 00:00:00|             15.4|
|Highland & Eilean...|2016-03-16 00:00:00|             15.3|
|  South West England|2016-02-21 00:00:00|             15.3|
|Highland & Eilean...|2016-03-16 00:00:00|             15.3|
|     East of England|2016-02-21 00:00:00|             15.2|
|Highland & Eilean...|2016-03-17 00:00:00|             15.2|
|       East Midlands|2016-02-21 00:00:00|             15.2|
|  South West England|2016-02-21 00:00:00|             15.2|
|  South West England|2016-02-21 00:00:00|             15.1|
|  South West England|2016-02-21 00:00:00|             15.1|
|London & South Ea...|20

In [37]:
#hottest day(highest temperature), Hottest Region and Date
maxtemp_df.show(1)

+--------------------+-------------------+-----------------+
|              Region|    ObservationDate|ScreenTemperature|
+--------------------+-------------------+-----------------+
|Highland & Eilean...|2016-03-17 00:00:00|             15.8|
+--------------------+-------------------+-----------------+
only showing top 1 row



In [38]:
#Hotest day Temperature
maxtemp_df.select("ScreenTemperature").show(1)

+-----------------+
|ScreenTemperature|
+-----------------+
|             15.8|
+-----------------+
only showing top 1 row



In [39]:
#hottest day (yyyy-mm-dd HH:mm:ss)
maxtemp_df.select("ObservationDate").show(1)

+-------------------+
|    ObservationDate|
+-------------------+
|2016-03-17 00:00:00|
+-------------------+
only showing top 1 row



In [40]:
#hottest Region
maxtemp_df.select("Region").show(1)

+--------------------+
|              Region|
+--------------------+
|Highland & Eilean...|
+--------------------+
only showing top 1 row

