In [1]:
import findspark
 
findspark.init('/home/jacobs_cloud/spark-3.0.0-bin-hadoop2.7')

In [2]:
#import modules and set the configuration
import pyspark
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import col
conf = pyspark.SparkConf().setAppName("DLGtestapp").setMaster("local")

In [3]:
# start spark context session, sql context
sc = pyspark.SparkContext(conf=conf)
sql_con = SQLContext(sc)



In [4]:
#read in data as dataframe and change ScreenTemperature datatype to 'double'
df = sql_con.read.csv('weather*', inferSchema = False, header = True) 
df = df.withColumn('ScreenTemperature',
        col('ScreenTemperature').cast('double'))

In [5]:
#count number of rows
df.count()

194697

In [6]:
#check datatypes
df.dtypes

[('ForecastSiteCode', 'string'),
 ('ObservationTime', 'string'),
 ('ObservationDate', 'string'),
 ('WindDirection', 'string'),
 ('WindSpeed', 'string'),
 ('WindGust', 'string'),
 ('Visibility', 'string'),
 ('ScreenTemperature', 'double'),
 ('Pressure', 'string'),
 ('SignificantWeatherCode', 'string'),
 ('SiteName', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string'),
 ('Region', 'string'),
 ('Country', 'string')]

In [7]:
#display head
df.show(5)

+----------------+---------------+-------------------+-------------+---------+--------+----------+-----------------+--------+----------------------+--------------------+--------+---------+--------------------+--------+
|ForecastSiteCode|ObservationTime|    ObservationDate|WindDirection|WindSpeed|WindGust|Visibility|ScreenTemperature|Pressure|SignificantWeatherCode|            SiteName|Latitude|Longitude|              Region| Country|
+----------------+---------------+-------------------+-------------+---------+--------+----------+-----------------+--------+----------------------+--------------------+--------+---------+--------------------+--------+
|            3002|              0|2016-03-01T00:00:00|            8|       23|      30|     16000|            -99.0|    null|                     8|   BALTASOUND (3002)| 60.7490|  -0.8540|   Orkney & Shetland|SCOTLAND|
|            3005|              0|2016-03-01T00:00:00|            8|       26|      34|      5000|              4.9|    1004

In [8]:
#write dataframe to parquet
df.write.parquet("weather.parquet")

In [9]:
#read in resulting parquet file
weather_parquetFile = sql_con.read.parquet("weather.parquet")

In [10]:
#display head
weather_parquetFile.show(5)

+----------------+---------------+-------------------+-------------+---------+--------+----------+-----------------+--------+----------------------+--------------------+--------+---------+--------------------+--------+
|ForecastSiteCode|ObservationTime|    ObservationDate|WindDirection|WindSpeed|WindGust|Visibility|ScreenTemperature|Pressure|SignificantWeatherCode|            SiteName|Latitude|Longitude|              Region| Country|
+----------------+---------------+-------------------+-------------+---------+--------+----------+-----------------+--------+----------------------+--------------------+--------+---------+--------------------+--------+
|            3002|              0|2016-03-01T00:00:00|            8|       23|      30|     16000|            -99.0|    null|                     8|   BALTASOUND (3002)| 60.7490|  -0.8540|   Orkney & Shetland|SCOTLAND|
|            3005|              0|2016-03-01T00:00:00|            8|       26|      34|      5000|              4.9|    1004

In [11]:
#check if all rows were imported 
weather_parquetFile.count() == df.count()

True

In [12]:
#create weather SQL table from parquetfile dataframe
weather_parquetFile.createOrReplaceTempView("weather")

In [13]:
#check if we have null values for any row in the Region column
null_regions = sql_con.sql("SELECT COUNT(Region) FROM weather WHERE Region IS NULL")
null_regions.show()

+-------------+
|count(Region)|
+-------------+
|            0|
+-------------+



In [14]:
#findthe max temperature and display the corresponding Region and Date
query = sql_con.sql("SELECT DISTINCT ScreenTemperature as Temp, ObservationDate as Date, Region\
                    FROM weather WHERE ScreenTemperature = (SELECT MAX(ScreenTemperature) FROM weather)")

query.show(truncate = False)

+----+-------------------+----------------------+
|Temp|Date               |Region                |
+----+-------------------+----------------------+
|15.8|2016-03-17T00:00:00|Highland & Eilean Siar|
+----+-------------------+----------------------+



In [15]:
#stop spark session
sc.stop()