In [2]:
import pandas as pd
import numpy as np
import sklearn as skl



In [3]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

'apt-get' is not recognized as an internal or external command,
operable program or batch file.
The system cannot find the path specified.
'wget' is not recognized as an internal or external command,
operable program or batch file.
tar: Error opening archive: Failed to open '$SPARK_VERSION-bin-hadoop2.7.tgz'


Exception: Unable to find py4j, your SPARK_HOME may not be configured correctly

In [4]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

'wget' is not recognized as an internal or external command,
operable program or batch file.


In [None]:
# Start Spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import substring, length, col, expr, to_timestamp, date_format, round

spark = SparkSession.builder.appName("LMPT-Forest-Fires").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()



In [None]:
connection_string = 'lmpt-finalproject.coke2w4vs8wf.us-east-2.rds.amazonaws.com'
password = 'LMPTp4ssw0rd' 
database_name = 'postgres'

# Configure settings for RDS
mode = "append"
jdbc_url=f"jdbc:postgresql://{connection_string}:5432/{database_name}"
config = {"user":"postgres", 
          "password": password, 
          "driver":"org.postgresql.Driver"}

In [None]:

# Read in data 
df = spark.read.jdbc(jdbc_url,table='fires_2006to2018',properties=config)

In [None]:
# Create new dataframe for model with chosen features
fire_df = df[['calendar_year','fire_start_date', 'fire_fighting_start_size', 'bh_fs_date', 'bh_hectares', 'weather_conditions_over_fire', 'true_cause']]  
# The following features have been witheld 'fire_number','size_class', 'start_for_fire_date',,
fire_df.show(10)

In [None]:
# Drop off NA  starting sizes and convert to data type double
fire_df = fire_df[fire_df.fire_fighting_start_size != 'NA']
fire_df = fire_df.withColumn('fire_fighting_start_size',fire_df['fire_fighting_start_size'].cast("double"))
fire_df.show()

In [None]:
# Replace bh_hectares with ratio of fire size between "start" and "being held". Ratio of fire size will be the final predictive variable
fire_df = fire_df.withColumn("fire_growth",col("bh_hectares")/col("fire_fighting_start_size")).drop("bh_hectares")
fire_df.show()

In [None]:
# Review data types
fire_df.dtypes

In [None]:
# Check Null Values in Columns
#Dict_Null = {col:fire_df.filter(df[col].isNull()).count() for col in fire_df.columns}
Dict_Null = {col:fire_df.filter(fire_df[col].isNull()).count() for col in fire_df.columns}
Dict_Null

In [None]:
# Count rows of data 
fire_df.count()

In [None]:
# Replace Null Values 
fire_df = fire_df.na.fill("unknown")
fire_df.show()

In [None]:
fire_df.show(10)

In [None]:
# 1. Removing last 3 digits from fire_number to to identify location area of fire

#fire_df = fire_df.withColumn("fire_number",expr("substring(fire_number, 1, length(fire_number)-3)"))
#fire_df.show()

In [None]:
# 2.Convert discovered_date to look at just months (find trends in months/ seasons)
from pyspark.sql.functions import to_timestamp, date_format
fire_df = fire_df.withColumn('fire_start_date', to_timestamp (col('fire_start_date'))).withColumn('Month', date_format(col('fire_start_date'), 'M'))
fire_df.show()

In [None]:
# 3. Convert start_for_fire_date & bh_fs_date to timestamp to find length of fire held time
#fire_df = fire_df.withColumn('start_for_fire_date', col('start_for_fire_date').cast('timestamp'))
fire_df = fire_df.withColumn('fire_start_date', col('fire_start_date').cast('timestamp'))

In [None]:
# Convert start_for_fire_date & bh_fs_date to timestamp to find length of fire held time
fire_df = fire_df.withColumn('bh_fs_date', col('bh_fs_date').cast('timestamp'))
fire_df.show()

In [None]:
# Check dates have been converted to timestamps
fire_df.dtypes

In [None]:
# Find the length of fire held 
fire_df = fire_df.withColumn("bh_fs_date",to_timestamp(col("bh_fs_date"),"HH:mm:ss.SSS")) \
   .withColumn("fire_start_date",to_timestamp(col("fire_start_date"),"HH:mm:ss.SSS")) \
   .withColumn("DiffInSeconds", col("bh_fs_date").cast("long") - col("fire_start_date").cast("long")) \
   .withColumn("DiffInMinutes",(col("DiffInSeconds")/60)) \
   .withColumn("DiffInHours",(col("DiffInSeconds")/3600)) 
fire_df.show(truncate=False)

In [None]:
#import required libraries
from pyspark.ml.feature import StringIndexer
weather_conditions_over_fire_indexer = StringIndexer(inputCol="weather_conditions_over_fire", outputCol="weather_conditions_over_fireIndex")

#Use one hot encoding to encode catergoratical columns 
ec_df = weather_conditions_over_fire_indexer.fit(fire_df).transform(fire_df)
ec_df.show()

In [None]:
#Use one hot encoding to encode catergoratical columns 
true_cause_indexer = StringIndexer(inputCol="true_cause", outputCol="true_cause_fireIndex")

#Fits a model to the input dataset with optional parameters.
ec_df1 = true_cause_indexer.fit(fire_df).transform(fire_df)
ec_df1.show()

In [None]:
#import module
from pyspark.ml import Pipeline

#Create pipeline and pass all stages
pipeline = Pipeline(stages=[weather_conditions_over_fire_indexer,
                            true_cause_indexer,
                    ])

                    

In [None]:
# Create pipeline to pass all stages 
df_transformed = pipeline.fit(fire_df).transform(fire_df)
df_transformed.show()

In [None]:
# Clean dataframe for model 
final_df = df_transformed.drop("weather_conditions_over_fire").drop("true_cause").drop("Month").drop("DiffInSeconds").drop("DiffInMinutes")\
.drop("fire_start_date").drop("bh_fs_date")
final_df.show(10)

In [None]:
# dropping all infinity and NaN values before hitting the database
final_df = final_df.replace([np.inf, -np.inf], np.nan)
final_df = final_df[final_df.fire_growth != np.nan]
final_df = final_df[final_df.DiffInHours != np.nan]
final_df = final_df[final_df.fire_fighting_start_size != np.nan]
final_df.show()

In [None]:
# Decide on features and label: 
#"user-input" for how many days before a fire is predicted, we estimate the size before and the fire size is being held i.e. predicting change in starting fire size and discovered size by the number of days 

# Split our preprocessed data into our features and target arrays
# Output labels 
y = final_df.select("fire_growth").toPandas()

# Features data 
X = final_df.drop("fire_growth").toPandas()

# Split the preprocessed data into a training & test dataset
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=78)

In [None]:
# Check the shape of X 
X.shape

In [None]:
# Check y shape samples
y.shape

In [None]:
# Define the linear regression model
from sklearn.linear_model import LinearRegression
model = LinearRegression()

In [None]:
# Create a StandardScaler instance
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()

# Fit the StandardScaler
X_scaler = scaler.fit(X_train)

# Scale the data 
X_trained_scaled = X_scaler.transform(X_train)
X_test_scaled = X_scaler.transform(X_test)



In [None]:
# Train the model
model.fit(X_train, y_train

In [None]:
# Evaluate the model
y_pred = model.predict(X_test_scaled)
y_pred

In [None]:
# Retrieving the model intercept and slope 
print(model.coef_)
print(model.intercept_)

In [None]:
# Calculating the R squared value 
from sklearn.metrics import r2_score
r2_score(y_test, y_pred)

In [None]:

# Determine the shape of our training and testing sets.
# X & y_train are 75% & X & y_test are 25%
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)