### Setting the data location

There are two ways to access Azure Blob storage: account keys and shared access signatures (SAS).

Here we are accessing using storage account access key

In [None]:
storage_account_name = "account-name"
storage_account_access_key = "access-key"
container = 'Container-name'

In [None]:
file_location = f"wasbs://Container-name@account-name.blob.core.windows.net/Locationcleaned.csv"
file_location_1 = f"wasbs://Container-name@account-name.blob.core.windows.net/GrowTimeSeries (1).csv"

In [None]:
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

### Read the data

Now that we have defined our file location and acess configuration, we can create a DataFrame. Notice that we use an *option* to specify that we want to infer the schema from the file. We can also explicitly set this to a particular schema if we have one already.

First, let's create a DataFrame in Python.

In [None]:
# Loading the time series data
df_timeseries = spark.read.option("inferSchema", "true").option("header", "true").csv(file_location_1)

### Data cleaning for timeseries data

Now that we have our data loaded into a dataframe we can begin cleaning. First, we drop the rows with null values, select relavant rows (i.e excluding rows with imformation on Battery level and Light) as we only Deal with Soil Moisture and Airtemperature.

In [None]:
# Dropping null values from df_timeseries dataset and saving as a new dataset
df_timeseries_cleaned = df_timeseries.dropna()

In [None]:
# Removing records with Reading Type = BatteryLevel & Light
df_timeseries_cleaned = df_timeseries_cleaned.filter((df_timeseries_cleaned.sReadingType != "BatteryLevel") & (df_timeseries_cleaned.sReadingType != "Light") )

In [None]:
# Cleaning the serial column
from pyspark.sql.functions import substring
df_timeseries_cleaned = df_timeseries_cleaned.withColumn("Serial", substring(df_timeseries_cleaned["Serial"], 1, 18))

In [None]:
# dropping rows with soil moisture is less than 0
from pyspark.sql.functions import col, when

df_timeseries_cleaned = df_timeseries_cleaned.filter(~((col("sReadingType") == 'SoilMoisture') & (col("sActualValue") < 0)))

In [None]:
from pyspark.sql.functions import min, max, desc

# the PySpark DataFrame is named "df_timeseries_cleaned" and the column you want to extract min/max from is "sTime", this helps us understand how many years the data is spread over.
min_val = df_timeseries_cleaned.agg(min(col("sTime"))).collect()[0][0]
max_val = df_timeseries_cleaned.agg(max(col("sTime"))).collect()[0][0]
max_vals = df_timeseries_cleaned.orderBy(desc("sTime")).limit(5).collect()


### Creating and Cleaning the Location Data frame.

The data for locations of the sensors is read into df_locations dataframe.

Latitude and Longitude values are adjusted to ease the processing.

*** A preliminary cleaning is done on the location data already, the name of the file used to load the data into the dataframe is apty named Locationscleaned.csv

In [None]:
# Reading the location data
df_locations = spark.read.option("inferSchema", "true").option("header", "true").csv(file_location)

In [None]:
# The location data for the same sensor appears to be slightly varied in every record, calculating an average to to correct this
from pyspark.sql.functions import sum,avg,max,min,mean,count
df_locations = df_locations.groupBy("Serial").agg(avg("Latitude").alias("Latitude"),avg("Longitude").alias("Longitude"))

In [None]:
# Flooring the location values to make it easy to work
import pyspark.sql.functions as func
df_locations = df_locations.withColumn("Latitude", func.floor(df_locations["Latitude"])).withColumn("Longitude", func.floor(df_locations["Longitude"]))

In [None]:
df_locations.display()

Serial,Latitude,Longitude
PI040307AD5I204404,47,19
PI040298AA3L032108,40,25
PI040297AD5I207604,56,-4
PI040298AD5I213075,37,-9
PI040297AD5I204851,48,15
PI040307AD5I203424,47,20
PI040297AD5I206513,48,16
PI040298AD5I213682,56,-4
PI040307AD5I204370,41,26
PI040298AD5I207511,38,-9


### Creating a Single Dataframe from both location and time series data.

In [None]:
#Merging location and time series data.
df = df_locations.join(df_timeseries_cleaned, df_locations.Serial == df_timeseries_cleaned.Serial, 'inner').select(df_timeseries_cleaned.Serial, df_locations.Latitude, df_locations.Longitude, df_timeseries_cleaned.sReadingType, df_timeseries_cleaned.sActualValue, df_timeseries_cleaned.sTime)

In [None]:
# Renaming columns
df = df.withColumnRenamed("sReadingType", "SensorType").withColumnRenamed("sActualValue","Reading").withColumnRenamed("sTime","SensorTime")


In [None]:
from pyspark.sql.functions import date_format

# Creating a Column Datehour from sensorTime Column
df = df.withColumn('DateHour', date_format('SensorTime', 'yyyy-MM-dd HH:00:00'))

In [None]:
df = df.dropDuplicates()

In [None]:
df.display()

Serial,Latitude,Longitude,SensorType,Reading,SensorTime,DateHour
PI040297AD5I203550,56,-3,AirTemperature,7.58,2018-10-21T19:09:06.000Z,2018-10-21 19:00:00
PI040297AD5I203550,56,-3,AirTemperature,8.26,2018-10-21T20:24:06.000Z,2018-10-21 20:00:00
PI040297AA3J001785,47,18,SoilMoisture,24.78,2019-01-05T13:00:05.000Z,2019-01-05 13:00:00
PI040297AA3J001785,47,18,SoilMoisture,27.88,2019-01-05T22:30:05.000Z,2019-01-05 22:00:00
PI040297AA3J001785,47,18,SoilMoisture,25.68,2019-01-07T07:00:05.000Z,2019-01-07 07:00:00
PI040297AD5I203550,56,-3,AirTemperature,14.54,2018-10-24T12:08:56.000Z,2018-10-24 12:00:00
PI040297AD5I203550,56,-3,AirTemperature,11.36,2018-10-24T18:53:56.000Z,2018-10-24 18:00:00
PI040297AA3J001785,47,18,SoilMoisture,27.48,2019-01-11T03:15:05.000Z,2019-01-11 03:00:00
PI040297AA3J001785,47,18,SoilMoisture,27.59,2019-01-11T04:15:05.000Z,2019-01-11 04:00:00
PI040297AA3J001785,47,18,SoilMoisture,33.75,2019-01-13T16:09:12.000Z,2019-01-13 16:00:00


### Two seperates data frames for AirTemperature and SoilMoisture are created and are then merged into a single dataframe to reflect AirTemperature and SoilMoisture values in a single row.

In [None]:
#Splitting df into two dataframes AirTemperature and SoilMoisture
Airtemp = df.filter((df.SensorType == 'AirTemperature' ))
Airtemp = Airtemp.withColumnRenamed("Reading", "AirTempReading")
Soilmoisture = df.filter((df.SensorType != 'AirTemperature' ))
Soilmoisture = Soilmoisture.withColumnRenamed("Reading", "SoilMoistureReading")

In [None]:
# Joining SoilMoisture and AirTemperature dataframes and making them columns of the same row in a resulting dataset
data = Airtemp.join(Soilmoisture, Airtemp.SensorTime == Soilmoisture.SensorTime, 'inner').select(Airtemp.Serial,Airtemp.Latitude,Airtemp.Longitude,Airtemp.AirTempReading,Soilmoisture.SoilMoistureReading, Airtemp.DateHour)

In [None]:
# Saving the dataframe as a csv to the blob storage. Further analysis is done on the stored data.
data.write.format('csv').save('wasbs://' + container + '@' + storage_account_name + '.blob.core.windows.net/' + './Full_data')