In [1]:
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import sys
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local").appName("Irish Marine Institute - Data Prep").getOrCreate()

In [3]:
df = spark.read.csv("dataset/IrishNationalTideGaugeNetwork.csv", header=True, inferSchema=True)
df.show(vertical=True)

-RECORD 0------------------------------------
 time                 | UTC                  
 altitude             | m                    
 latitude             | degrees_north        
 longitude            | degrees_east         
 station_id           | null                 
 datasourceid         | null                 
 Water_Level_LAT      | metres               
 Water_Level_OD_Malin | metres               
 QC_Flag              | null                 
-RECORD 1------------------------------------
 time                 | 2006-10-26T13:00:00Z 
 altitude             | 0.0                  
 latitude             | 53.585               
 longitude            | -6.1081              
 station_id           | Skerries Harbour     
 datasourceid         | 11                   
 Water_Level_LAT      | 4.679                
 Water_Level_OD_Malin | 1.82                 
 QC_Flag              | 1                    
-RECORD 2------------------------------------
 time                 | 2006-10-26

In [4]:
df.printSchema()

root
 |-- time: string (nullable = true)
 |-- altitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- station_id: string (nullable = true)
 |-- datasourceid: integer (nullable = true)
 |-- Water_Level_LAT: string (nullable = true)
 |-- Water_Level_OD_Malin: string (nullable = true)
 |-- QC_Flag: integer (nullable = true)



In [5]:
from pyspark.sql.functions import from_utc_timestamp
from pyspark.sql.functions import col
from pyspark.sql.functions import year, to_date
from pyspark.sql.functions import dayofmonth, month, year,hour, minute

# Convert the string column to a datetime column
df = df.withColumn("Water_Level_LAT", col("Water_Level_LAT").cast("float"))
df = df.withColumn("Water_Level_OD_Malin", col("Water_Level_OD_Malin").cast("float"))
df = df.withColumn("latitude", col("latitude").cast("float"))
df = df.withColumn("longitude", col("longitude").cast("float"))

df = df.withColumn("minute", minute(to_date(df.time)))
df = df.withColumn("hour", hour(to_date(df.time)))
df = df.withColumn("day", dayofmonth(to_date(df.time)))
df = df.withColumn("month", month(to_date(df.time)))
df = df.withColumn("year", year(to_date(df.time)))


df = df.withColumn("day", dayofmonth(to_date(df.time)))
df = df.withColumn("month", month(to_date(df.time)))
df = df.withColumn("year", year(to_date(df.time)))

df = df.filter((df.latitude == '53.585') & (df.longitude == '-6.1081'))
df = df.na.drop()
df.show()
df.printSchema()




+--------------------+--------+--------+---------+----------------+------------+---------------+--------------------+-------+------+----+---+-----+----+
|                time|altitude|latitude|longitude|      station_id|datasourceid|Water_Level_LAT|Water_Level_OD_Malin|QC_Flag|minute|hour|day|month|year|
+--------------------+--------+--------+---------+----------------+------------+---------------+--------------------+-------+------+----+---+-----+----+
|2006-10-26T13:00:00Z|     0.0|  53.585|  -6.1081|Skerries Harbour|          11|          4.679|                1.82|      1|     0|   0| 26|   10|2006|
|2006-10-26T14:00:00Z|     0.0|  53.585|  -6.1081|Skerries Harbour|          11|          4.939|                2.08|      1|     0|   0| 26|   10|2006|
|2006-10-26T14:06:00Z|     0.0|  53.585|  -6.1081|Skerries Harbour|          11|          4.969|                2.11|      1|     0|   0| 26|   10|2006|
|2006-10-26T14:12:00Z|     0.0|  53.585|  -6.1081|Skerries Harbour|          11|  

In [6]:
df.dtypes

[('time', 'string'),
 ('altitude', 'string'),
 ('latitude', 'float'),
 ('longitude', 'float'),
 ('station_id', 'string'),
 ('datasourceid', 'int'),
 ('Water_Level_LAT', 'float'),
 ('Water_Level_OD_Malin', 'float'),
 ('QC_Flag', 'int'),
 ('minute', 'int'),
 ('hour', 'int'),
 ('day', 'int'),
 ('month', 'int'),
 ('year', 'int')]

In [7]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="time" ,outputCol="indexedTime")
indexed = indexer.fit(df).transform(df)
indexed.show(vertical=True)

-RECORD 0------------------------------------
 time                 | 2006-10-26T13:00:00Z 
 altitude             | 0.0                  
 latitude             | 53.585               
 longitude            | -6.1081              
 station_id           | Skerries Harbour     
 datasourceid         | 11                   
 Water_Level_LAT      | 4.679                
 Water_Level_OD_Malin | 1.82                 
 QC_Flag              | 1                    
 minute               | 0                    
 hour                 | 0                    
 day                  | 26                   
 month                | 10                   
 year                 | 2006                 
 indexedTime          | 0.0                  
-RECORD 1------------------------------------
 time                 | 2006-10-26T14:00:00Z 
 altitude             | 0.0                  
 latitude             | 53.585               
 longitude            | -6.1081              
 station_id           | Skerries H

In [8]:
fd_1 = indexed.selectExpr("cast(time as string)dateTime",
                          "cast(minute as int)minute",
                          "cast(hour as int)hour",
                          "cast(day as int)day",
                          "cast(month as int)month",
                          "cast(year as int)year",
                          "cast(Water_Level_LAT as float)label",
                          "cast(Water_Level_OD_Malin as float) Water_Level_OD_Malin")

tran_fd_1 = fd_1.filter((fd_1.year < 2020))
test_fd_1 = fd_1.filter((fd_1.year > 2019))

In [9]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['minute','hour','day','month','year'],
                                  handleInvalid="skip",
                                  outputCol ='features')

In [10]:
vhouse_df_tran = vectorAssembler.transform(tran_fd_1)
trainingData = vhouse_df_tran.select("features", "label")
trainingData.show()

vhouse_df_test = vectorAssembler.transform(test_fd_1)
testData = vhouse_df_test.select("features", "label")
testData.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,0.0,26.0,10....|4.679|
|[0.0,0.0,26.0,10....|4.939|
|[0.0,0.0,26.0,10....|4.969|
|[0.0,0.0,26.0,10....|4.859|
|[0.0,0.0,26.0,10....|4.809|
|[0.0,0.0,26.0,10....|4.789|
|[0.0,0.0,26.0,10....|4.739|
|[0.0,0.0,26.0,10....|4.669|
|[0.0,0.0,26.0,10....|4.609|
|[0.0,0.0,26.0,10....|4.579|
|[0.0,0.0,26.0,10....|4.499|
|[0.0,0.0,26.0,10....|4.479|
|[0.0,0.0,26.0,10....|4.359|
|[0.0,0.0,26.0,10....|4.289|
|[0.0,0.0,26.0,10....|4.259|
|[0.0,0.0,26.0,10....|4.139|
|[0.0,0.0,26.0,10....|4.049|
|[0.0,0.0,26.0,10....|3.969|
|[0.0,0.0,26.0,10....|3.859|
|[0.0,0.0,26.0,10....|3.809|
+--------------------+-----+
only showing top 20 rows

+--------------------+-------+
|            features|  label|
+--------------------+-------+
|[0.0,0.0,3.0,11.0...|-29.904|
|[0.0,0.0,3.0,11.0...|-29.904|
|[0.0,0.0,3.0,11.0...|-29.904|
|[0.0,0.0,3.0,11.0...|-29.899|
|[0.0,0.0,3.0,11.0...|-29.904|
|[0.0,0.0,3.0,11.0...|  3.097|

In [11]:
from pyspark.ml.regression import LinearRegression
gbt = GBTRegressor(featuresCol="features", maxIter=30, maxDepth = 11)
# Chain indexer and GBT in a Pipeline
#pipeline = Pipeline(stages=[featureIndexer, gbt])
# Train model. This also runs the indexer.
model1 = gbt.fit(trainingData)
# Make predictions.
tdata2 = model1.transform(testData)

NameError: name 'GBTRegressor' is not defined

In [None]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [None]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures",
                               handleInvalid="skip").fit(trSet)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = trSet.randomSplit([0.7, 0.3])
# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=30, maxDepth = 11)
# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])
# Train model. This also runs the indexer.
model0 = pipeline.fit(trainingData)
# Make predictions.
tdata2 = model0.transform(testData)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
rmse = evaluator.evaluate(tdata2)
print("R Squared on test data = %g" % rmse)
gbtModel = model0.stages[1]
print(gbtModel) 
# summary only 8)Getting Important Features
model0.stages[1].featureImportances
model0.stages[-1].trees

In [None]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
mae= evaluator.evaluate(tdata2)
print("MAE on test data = %g" % mae)


In [None]:
sf03 = tdata2.toPandas()
import seaborn
import matplotlib.pyplot as plt
from scipy.stats import *
seaborn.set(style="whitegrid", font_scale = 1.8)
fig, ax = plt.subplots()
seaborn.set(color_codes=True)
seaborn.set(rc={'figure.figsize':(20, 10)})
seaborn.regplot(x="label", y="prediction", fit_reg=False, ax=ax,data = sf03,scatter_kws={"color": "b"});
seaborn.regplot(x="label", y="prediction",scatter=False, ax=ax, data = sf03 , line_kws={"color": "red"});


In [None]:
tdata2.show()
