In [10]:
# importing the necessary libraries
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Creating a pyspark session with sql

In [11]:
spark = SparkSession.builder.master('local').getOrCreate()

In [12]:
#reading in pyspark df
spark_df = spark.read.csv('./forestfires.csv', header='true', inferSchema='true')

#observing the datatype of df
type(spark_df)

pyspark.sql.dataframe.DataFrame

In [13]:
spark_df.head()

Row(X=7, Y=5, month='mar', day='fri', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0)

In [14]:
spark_df.columns

['X',
 'Y',
 'month',
 'day',
 'FFMC',
 'DMC',
 'DC',
 'ISI',
 'temp',
 'RH',
 'wind',
 'rain',
 'area']

In [15]:
spark_df[['month','day','rain']]

DataFrame[month: string, day: string, rain: double]

In [16]:
d = spark_df.select('rain')

In [29]:
d

DataFrame[rain: double]

In [18]:
spark_df['rain']

Column<b'rain'>

In [30]:
spark_df_months = spark_df.groupBy('month').agg({'area': 'mean'})
spark_df_months

DataFrame[month: string, avg(area): double]

In [31]:
spark_df_months.collect()

[Row(month='jun', avg(area)=5.841176470588234),
 Row(month='aug', avg(area)=12.489076086956521),
 Row(month='may', avg(area)=19.24),
 Row(month='feb', avg(area)=6.275),
 Row(month='sep', avg(area)=17.942616279069753),
 Row(month='mar', avg(area)=4.356666666666667),
 Row(month='oct', avg(area)=6.638),
 Row(month='jul', avg(area)=14.3696875),
 Row(month='nov', avg(area)=0.0),
 Row(month='apr', avg(area)=8.891111111111112),
 Row(month='dec', avg(area)=13.33),
 Row(month='jan', avg(area)=0.0)]

In [32]:
no_rain = spark_df.filter(spark_df['rain'] == 0.0)
some_rain = spark_df.filter(spark_df['rain'] > 0.0)

In [33]:
some_rain

DataFrame[X: int, Y: int, month: string, day: string, FFMC: double, DMC: double, DC: double, ISI: double, temp: double, RH: int, wind: double, rain: double, area: double]

In [36]:
from pyspark.sql.functions import mean

print(no_rain.select(mean('area')).show(),'\n')

print(some_rain.select(mean('area')).show(),'\n')

+------------------+
|         avg(area)|
+------------------+
|13.023693516699408|
+------------------+

None 

+---------+
|avg(area)|
+---------+
|  1.62375|
+---------+

None 



In [28]:
summer_months = spark_df.filter(spark_df['month'].isin(['jun','jul','aug']))
winter_months = spark_df.filter(spark_df['month'].isin(['dec','jan','feb']))

print('summer months fire area', summer_months.select(mean('area')).show())
print('winter months fire areas', winter_months.select(mean('area')).show())

+------------------+
|         avg(area)|
+------------------+
|12.262317596566525|
+------------------+

summer months fire area None
+-----------------+
|        avg(area)|
+-----------------+
|7.918387096774193|
+-----------------+

winter months fire areas None


# Modelling

In [38]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import feature
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder

In [39]:
fire_df = spark_df.drop('day')
fire_df.head()

Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0)

In [40]:
si = StringIndexer(inputCol='month', outputCol='month_num')
model = si.fit(fire_df)
new_df = model.transform(fire_df)

In [41]:
model.labels

['aug',
 'sep',
 'mar',
 'jul',
 'feb',
 'jun',
 'oct',
 'apr',
 'dec',
 'jan',
 'may',
 'nov']

In [42]:
new_df.head(4)

[Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0),
 Row(X=7, Y=4, month='oct', FFMC=90.6, DMC=35.4, DC=669.1, ISI=6.7, temp=18.0, RH=33, wind=0.9, rain=0.0, area=0.0, month_num=6.0),
 Row(X=7, Y=4, month='oct', FFMC=90.6, DMC=43.7, DC=686.9, ISI=6.7, temp=14.6, RH=33, wind=1.3, rain=0.0, area=0.0, month_num=6.0),
 Row(X=8, Y=6, month='mar', FFMC=91.7, DMC=33.3, DC=77.5, ISI=9.0, temp=8.3, RH=97, wind=4.0, rain=0.2, area=0.0, month_num=2.0)]

In [43]:
new_df.select('month_num').distinct().collect()

[Row(month_num=8.0),
 Row(month_num=0.0),
 Row(month_num=7.0),
 Row(month_num=1.0),
 Row(month_num=4.0),
 Row(month_num=11.0),
 Row(month_num=3.0),
 Row(month_num=2.0),
 Row(month_num=10.0),
 Row(month_num=6.0),
 Row(month_num=5.0),
 Row(month_num=9.0)]

In [45]:
## fitting and transforming the OneHotEncoderEstimator
ohe = feature.OneHotEncoder(inputCols=['month_num'], outputCols=['month_vec'], dropLast=True)
one_hot_encoded = ohe.fit(new_df).transform(new_df)
one_hot_encoded.head()

Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0, month_vec=SparseVector(11, {2: 1.0}))

In [46]:
features = ['X',
 'Y',
 'FFMC',
 'DMC',
 'DC',
 'ISI',
 'temp',
 'RH',
 'wind',
 'rain',
 'month_vec']

target = 'area'

vector = VectorAssembler(inputCols=features, outputCol='features')
vectorized_df = vector.transform(one_hot_encoded)

In [47]:
vectorized_df.head()

Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0, month_vec=SparseVector(11, {2: 1.0}), features=SparseVector(21, {0: 7.0, 1: 5.0, 2: 86.2, 3: 26.2, 4: 94.3, 5: 5.1, 6: 8.2, 7: 51.0, 8: 6.7, 12: 1.0}))

In [48]:
#instantiating and fitting the model
rf_model = RandomForestRegressor(featuresCol='features', 
                                 labelCol='area', predictionCol='prediction').fit(vectorized_df)

In [49]:
rf_model.featureImportances

SparseVector(21, {0: 0.0269, 1: 0.0524, 2: 0.2038, 3: 0.1466, 4: 0.0556, 5: 0.1352, 6: 0.1785, 7: 0.0739, 8: 0.0992, 10: 0.0001, 11: 0.0183, 13: 0.0085, 14: 0.0, 15: 0.0005, 16: 0.0002, 17: 0.0001, 18: 0.0001, 20: 0.0002})

In [53]:
## generating predictions
predictions = rf_model.transform(vectorized_df).select('area', 'prediction')
predictions.head(5)

[Row(area=0.0, prediction=5.977433834483444),
 Row(area=0.0, prediction=5.2023739971779275),
 Row(area=0.0, prediction=5.162434742791962),
 Row(area=0.0, prediction=6.097564294591873),
 Row(area=0.0, prediction=4.851374424502416)]

In [54]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area')

In [55]:
# evaluating r^2
evaluator.evaluate(predictions,{evaluator.metricName: 'r2'})

0.7609797252412228

In [56]:
# evaluating mean absolute error
evaluator.evaluate(predictions,{evaluator.metricName: 'mae'})

13.077691276090865

# Building a pipeline

In [59]:
# importing relevant libraries
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml import Pipeline

In [61]:
## instantiating all necessary estimator objects

string_indexer = StringIndexer(inputCol='month', outputCol='month_num', handleInvalid='keep')
one_hot_encoder = OneHotEncoder(inputCols=['month_num'], outputCols=['month_vec'], dropLast=True)
vector_assember = VectorAssembler(inputCols=features, outputCol='features')
random_forest = RandomForestRegressor(featuresCol='features', labelCol='area')
stages = [string_indexer, one_hot_encoder, vector_assember, random_forest]

# instantiating the pipeline with all them estimator objects
pipeline = Pipeline(stages=stages)

In [62]:
pipeline

Pipeline_0a5a647ba14f

# Cross Validation in Pyspark

In [69]:
# creating parameter grid

params = ParamGridBuilder().addGrid(random_forest.maxDepth, [5, 10, 15]).addGrid(random_forest.numTrees, \
                                                                                 [20 ,50, 100]).build()

In [70]:
print('total combinations of parameters: ', len(params))

params[0]

total combinations of parameters:  9


{Param(parent='RandomForestRegressor_14254a68ddcc', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
 Param(parent='RandomForestRegressor_14254a68ddcc', name='numTrees', doc='Number of trees to train (>= 1).'): 20}

In [71]:
# instantiating the evaluator by which we will measure our model's performance
reg_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area', metricName = 'mae')

#instantiating crossvalidator estimator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=reg_evaluator, parallelism=4)

In [72]:
# fitting crossvalidator
cross_validated_model = cv.fit(fire_df)

In [73]:
cross_validated_model.avgMetrics

[20.84051309268642,
 20.077284373491054,
 20.455921571582255,
 21.642818951994684,
 20.747862793367844,
 21.01789121507841,
 21.79324342845812,
 20.859052438344907,
 21.08425083950355]

In [74]:
predictions = cross_validated_model.transform(spark_df)
predictions.select('prediction', 'area').show(300)

+------------------+-------+
|        prediction|   area|
+------------------+-------+
| 6.078023670569959|    0.0|
| 4.898224556740818|    0.0|
| 5.441774803766447|    0.0|
|  5.80959458690686|    0.0|
|   6.1581017064693|    0.0|
|6.0817908137348935|    0.0|
|3.8519022365484967|    0.0|
| 7.933736747028446|    0.0|
|10.681791069614963|    0.0|
| 6.256824389404297|    0.0|
|5.6904006127150835|    0.0|
| 6.226400426708835|    0.0|
| 6.942929649927971|    0.0|
| 9.005892503470223|    0.0|
| 82.00938575158094|    0.0|
| 6.873589388672895|    0.0|
| 5.316270240176767|    0.0|
| 6.243557822004782|    0.0|
| 4.992480147079991|    0.0|
| 5.225887891069408|    0.0|
| 10.29237684316396|    0.0|
| 4.432525738741523|    0.0|
| 5.494708782800936|    0.0|
| 8.323393928518794|    0.0|
|6.4404824494674155|    0.0|
| 5.076066052167882|    0.0|
|  6.13658071939946|    0.0|
|10.870854306751108|    0.0|
| 59.35637233093607|    0.0|
| 9.597347911073143|    0.0|
|44.940947323835616|    0.0|
| 6.1774814089

In [75]:
type(cross_validated_model.bestModel)

pyspark.ml.pipeline.PipelineModel

In [76]:
cross_validated_model.bestModel.stages

[StringIndexerModel: uid=StringIndexer_be9c0efe0294, handleInvalid=keep,
 OneHotEncoderModel: uid=OneHotEncoder_99d27c6f46a0, dropLast=true, handleInvalid=error, numInputCols=1, numOutputCols=1,
 VectorAssembler_7a24b1dd0997,
 RandomForestRegressionModel: uid=RandomForestRegressor_14254a68ddcc, numTrees=50, numFeatures=22]

In [84]:
optimal_rf_model = cross_validated_model.bestModel.stages[3]

In [85]:
optimal_rf_model.featureImportances

SparseVector(22, {0: 0.0904, 1: 0.0805, 2: 0.1846, 3: 0.1244, 4: 0.1309, 5: 0.0579, 6: 0.0788, 7: 0.0706, 8: 0.1478, 9: 0.0005, 10: 0.0017, 11: 0.0186, 12: 0.0003, 13: 0.0071, 14: 0.0001, 15: 0.0049, 16: 0.0004, 17: 0.0, 18: 0.0002, 19: 0.0, 20: 0.0002})

In [86]:
optimal_rf_model.getNumTrees

50