In [None]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BDAS').getOrCreate()

# Importing data which has a header. Schema is automatically configured.
df = spark.read.csv('Dataset.csv', header=True, inferSchema=True)

#Show the data table
df.show()

In [None]:
df.printSchema()

In [None]:
df.select('No', 'Year', 'Month','day','Hour').describe().show()
df.select('PM2-5','PM10','SO2','NO2','CO','O3').describe().show()
df.select('TEMP','PRES','DEWP','RAIN','WD','WSPM','Station').describe().show()

In [None]:
#Import package
import matplotlib.pyplot as plt

#Show the relationship between PM10 and PM2.5
x=df.toPandas()['PM10']
y=df.toPandas()['PM2-5']
plt.xlabel('PM10')
plt.ylabel('PM2-5')
plt.plot(x,y)

In [None]:
#Show the relationship between SO2 and PM2.5
x=df.toPandas()['SO2']
y=df.toPandas()['PM2-5']
plt.xlabel('SO2')
plt.ylabel('PM2-5')
plt.plot(x,y)

In [None]:
#Show the relationship between NO2 and PM2.5
x=df.toPandas()['NO2']
y=df.toPandas()['PM2-5']
plt.xlabel('NO2')
plt.ylabel('PM2-5')
plt.plot(x,y)

In [None]:
#Show the relationship between CO and PM2.5
x=df.toPandas()['CO']
y=df.toPandas()['PM2-5']
plt.xlabel('CO')
plt.ylabel('PM2-5')
plt.plot(x,y)

In [None]:
#Show the relationship between O3 and PM2.5
x=df.toPandas()['O3']
y=df.toPandas()['PM2-5']
plt.xlabel('O3')
plt.ylabel('PM2-5')
plt.plot(x,y)

In [None]:
#Show the relationship between TEMP and PM2.5
x=df.toPandas()['TEMP']
y=df.toPandas()['PM2-5']
plt.xlabel('TEMP')
plt.ylabel('PM2-5')
plt.plot(x,y)

In [None]:
#Show the relationship between WSPM and PM2.5
x=df.toPandas()['WSPM']
y=df.toPandas()['PM2-5']
plt.xlabel('WSPM')
plt.ylabel('PM2-5')
plt.plot(x,y)

In [None]:
#Count the null value for each column
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
df=df.drop('No','WD','Station')

In [None]:
df.show()

In [None]:
#Drop any row with missing data
df=df.na.drop()

In [None]:
#Count the null value for each column
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
print((df.count(), len(df.columns)))

In [None]:
df.select('PM2-5').describe().show()

In [None]:
df.select('PM10').describe().show()

In [None]:
df.select('SO2').describe().show()

In [None]:
df.select('NO2').describe().show()

In [None]:
df.select('CO').describe().show()

In [None]:
df.select('O3').describe().show()

In [None]:
df.select('RAIN').describe().show()

In [None]:
df.select('WSPM').describe().show()

In [None]:
#Convert pyspark dataframe to pandas dataframe
#In order to add new attribute
df=df.toPandas()

#Adding a new attribute
def get_grade(value):
    if value <= 12 and value>=0:
        return 'Good'
    elif value <= 35:
        return 'Moderate'
    elif value <= 55:
        return 'Unhealthy for Sensi'
    elif value <= 150:
        return 'Unhealthy'
    elif value <= 250:
        return 'Very Unhealthy'
    elif value <= 500:
        return 'Hazardous'
    elif value > 500:
        return 'Beyond Index'
    else:
        return None 
    
#Adding new attribute to the original table
df.loc[:, "PM2-5_Grade"] = df["PM2-5"].apply(get_grade)
df.reset_index(drop=True,inplace=True)

#Convert pandas dataframe to pyspark dataframe
#In order to add new attribute
df=spark.createDataFrame(df)


In [None]:
df.show()

In [None]:
df.printSchema()

In [None]:
#Change data type
from pyspark.sql.types import IntegerType
df = df.withColumn("Year", df["Year"].cast(IntegerType()))
df = df.withColumn("Month", df["Year"].cast(IntegerType()))
df = df.withColumn("day", df["Year"].cast(IntegerType()))
df = df.withColumn("Hour", df["Year"].cast(IntegerType()))

In [None]:
df.printSchema()

In [None]:
#Change data type
from pyspark.sql.types import DoubleType
df = df.withColumn("CO", df["CO"].cast(DoubleType()))

In [None]:
df.printSchema()

In [None]:
print("Year: ", df.corr("PM2-5", "Year"))
print("Month: ", df.corr("PM2-5", "Month"))
print("day: ", df.corr("PM2-5", "day"))
print("Hour: ", df.corr("PM2-5", "Hour"))
print("PM10: ", df.corr("PM2-5", "PM10"))
print("SO2: ", df.corr("PM2-5", "SO2"))
print("NO2: ", df.corr("PM2-5", "NO2"))
print("CO: ", df.corr("PM2-5", "CO"))
print("O3: ", df.corr("PM2-5", "O3"))
print("TEMP: ", df.corr("PM2-5", "TEMP"))
print("PRES: ", df.corr("PM2-5", "PRES"))
print("DEWP: ", df.corr("PM2-5", "DEWP"))
print("RAIN: ", df.corr("PM2-5", "RAIN"))
print("WSPM: ", df.corr("PM2-5", "WSPM"))

In [None]:
#Drop attributes
df=df.drop('Year','Month','day','Hour','PRES','RAIN')

In [None]:
df.show()

In [None]:
file1 = spark.read.csv('1.csv', header=True, inferSchema=True)
file2 = spark.read.csv('2.csv', header=True, inferSchema=True)

In [None]:
file1.show()

In [None]:
file2.show()

In [None]:
file1

In [None]:
import pyspark.sql.functions as f

unionDF = file1.union(file2)
unionDF

In [None]:
unionDF.count()

In [None]:
file2.count()

In [None]:
file1.count()

In [None]:
unionDF.show()

In [None]:
df

In [None]:
df.show()

In [None]:
df.show()

In [None]:
import numpy as np
#Using log function to achieve data transformation
#Convert pyspark dataframe to pandas dataframe
df=df.toPandas()
df['PM2-5_log'] = np.log(df['PM2-5'])


In [None]:
df['PM10_log'] = np.log(df['PM10'])
df['SO2_log'] = np.log(df['SO2'])
df['NO2_log'] = np.log(df['NO2'])
df['CO_log'] = np.log(df['CO'])
df['O3_log'] = np.log(df['O3'])

#Drop the value equal to 0, because log(0) will pop up error message
df = df.drop(df[df['WSPM'] == 0].index)
df.reset_index(drop=True,inplace=True)
df['WSPM_log'] = np.log(df['WSPM'])

In [None]:
#Convert pandas dataframe to pyspark dataframe
#In order to add new attribute
df=spark.createDataFrame(df)

In [None]:
df.show()

In [None]:
#Drop attributes
df=df.drop('PM2-5','PM10','SO2','NO2','CO','O3', 'WSPM')
df=df.drop('PM2-5')

In [None]:
df.show()

In [None]:
df = df.withColumnRenamed("TEMP","TEMP_Transf")
df = df.withColumnRenamed("DEWP","DEWP_Transf")
df = df.withColumnRenamed("PM2-5_log","PM2-5_Transf")
df = df.withColumnRenamed("PM10_log","PM10_Transf")
df = df.withColumnRenamed("SO2_log","SO2_Transf")
df = df.withColumnRenamed("NO2_log","NO2_Transf")
df = df.withColumnRenamed("CO_log","CO_Transf")
df = df.withColumnRenamed("O3_log","O3_Transf")
df = df.withColumnRenamed("WSPM_log","WSPM_Transf")

In [None]:
df.show()

In [None]:
df=df.drop('PM2-5')

In [None]:
df.show()

In [None]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['TEMP_Transf', 'DEWP_Transf', 'PM10_Transf', 'SO2_Transf', 'NO2_Transf', 'CO_Transf', 'O3_Transf', 'WSPM_Transf'], outputCol = 'features')
vector_output = vectorAssembler.transform(df)
vector_output.printSchema()
vector_output.head(1)

In [None]:
vector_output = vector_output.select(['features', 'PM2-5_Transf'])
print(vector_output.head(1))
vector_output.show(5)

In [None]:
# Pass in the split between training/test as a list.
# This is based on your test-designs,70/30 splits is used. 
train_data,test_data = vector_output.randomSplit([0.7,0.3])

In [None]:
# Let's check out our training data.
train_data.show()
test_data.show()
# Let's check out the count.
train_data.describe().show()
test_data.describe().show()

In [None]:
#Import package
from pyspark.ml.regression import LinearRegression
#Linear Regression Model
lr1 = LinearRegression(featuresCol='features', labelCol='PM2-5_Transf', maxIter=0, regParam=0)

# Fit the training data.
lr_model = lr1.fit(train_data)

# Print the coefficients.
print("Coefficients: " + str(lr_model.coefficients))

In [None]:
#Intercept
print("Intercept: " + str(lr_model.intercept))

# Summarise the model
training_summary = lr_model.summary

#R2
print("R2: " + str(training_summary.r2))

#RMSE
print("RMSE: " + str(training_summary.rootMeanSquaredError))

In [None]:
#Set the feature labels
feat_labels = ['TEMP_Transf', 'DEWP_Transf','PM10_Transf','SO2_Transf','NO2_Transf','CO_Transf','O3_Transf','WSPM_Transf']

#Show the coefficients 
coefficients =lr_model.coefficients 
indices = np.argsort(coefficients)[::-1]
for f in range(train_data.toPandas().shape[1]):
    print("%2d: %-*s %f" % (f + 1, 30, feat_labels[indices[f]], coefficients[indices[f]]))

In [None]:
#Import package
import matplotlib.pyplot as plt
from matplotlib.pyplot import MultipleLocator
#Importance Graph
x_columns = np.array(['PM10','CO','NO2','O3','SO2','DEWP','WSPM','TEMP'])
plt.figure(figsize=(10,6))
plt.title("Coefficients",fontsize = 18)
plt.ylabel("Coefficients level",fontsize = 14)
plt.rcParams['axes.unicode_minus'] = False
plt.ylim(0, 1)
y_major_locator=MultipleLocator(0.1)
ax=plt.gca()
ax.yaxis.set_major_locator(y_major_locator)

#Get every coefficient
coefficients =lr_model.coefficients 
for i in range(x_columns.shape[0]):
    plt.bar(i,coefficients[indices[i]],color='orange',align='center')
    plt.xticks(np.arange(x_columns.shape[0]),x_columns,fontsize=14)
plt.show()

In [None]:
#Import package
from pyspark.ml.regression import LinearRegression
#Linear Regression Model
lr2 = LinearRegression(featuresCol='features', labelCol='PM2-5_Transf', maxIter=10, regParam=0)

# Fit the training data.
lr_model = lr2.fit(train_data)

# Print the coefficients.
print("Coefficients: " + str(lr_model.coefficients))

In [None]:
#Import package
import matplotlib.pyplot as plt
from matplotlib.pyplot import MultipleLocator
#Importance Graph
x_columns = np.array(['PM10','CO','NO2','O3','SO2','DEWP','WSPM','TEMP'])
plt.figure(figsize=(10,6))
plt.title("Coefficients",fontsize = 18)
plt.ylabel("Coefficients level",fontsize = 14)
plt.rcParams['axes.unicode_minus'] = False
plt.ylim(0, 1)
y_major_locator=MultipleLocator(0.1)
ax=plt.gca()
ax.yaxis.set_major_locator(y_major_locator)

#Get every coefficient
coefficients =lr_model.coefficients 
for i in range(x_columns.shape[0]):
    plt.bar(i,coefficients[indices[i]],color='orange',align='center')
    plt.xticks(np.arange(x_columns.shape[0]),x_columns,fontsize=14)
plt.show()

In [None]:
#Import package
from pyspark.ml.regression import LinearRegression
#Linear Regression Model
lr3= LinearRegression(featuresCol='features', labelCol='PM2-5_Transf', maxIter=10, regParam=0.3)

# Fit the training data.
lr_model = lr3.fit(train_data)

# Print the coefficients.
print("Coefficients: " + str(lr_model.coefficients))

In [None]:
#Import package
import matplotlib.pyplot as plt
from matplotlib.pyplot import MultipleLocator
#Importance Graph
x_columns = np.array(['PM10','CO','NO2','O3','SO2','DEWP','WSPM','TEMP'])
plt.figure(figsize=(10,6))
plt.title("Coefficients",fontsize = 18)
plt.ylabel("Coefficients level",fontsize = 14)
plt.rcParams['axes.unicode_minus'] = False
plt.ylim(0, 1)
y_major_locator=MultipleLocator(0.1)
ax=plt.gca()
ax.yaxis.set_major_locator(y_major_locator)

#Get every coefficient
coefficients =lr_model.coefficients 
for i in range(x_columns.shape[0]):
    plt.bar(i,coefficients[indices[i]],color='orange',align='center')
    plt.xticks(np.arange(x_columns.shape[0]),x_columns,fontsize=14)
plt.show()

In [None]:
#Import package 
from pyspark.ml.regression import RandomForestRegressor

random_forest1 = RandomForestRegressor(numTrees=5,featuresCol='features', labelCol='PM2-5_Transf')
rf1 = random_forest1.fit(train_data)

# Print the coefficients.
print("Coefficients: " + str(rf1.featureImportances))

In [None]:
#Import package 
from pyspark.ml.regression import RandomForestRegressor

random_forest2 = RandomForestRegressor(numTrees=10,featuresCol='features', labelCol='PM2-5_Transf')
rf2 = random_forest2.fit(train_data)

# Print the coefficients.
print("Coefficients: " + str(rf2.featureImportances))

In [None]:
#Import package 
from pyspark.ml.regression import RandomForestRegressor

random_forest3 = RandomForestRegressor(numTrees=100,featuresCol='features', labelCol='PM2-5_Transf')
rf3 = random_forest3.fit(train_data)

# Print the coefficients.
print("Coefficients: " + str(rf3.featureImportances))

In [None]:
#Import evaluation
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="PM2-5_Transf", predictionCol="prediction", metricName='r2')

In [None]:
#Linear
lr1= LinearRegression(featuresCol='features', labelCol='PM2-5_Transf', maxIter=0, regParam=0)
lr2= LinearRegression(featuresCol='features', labelCol='PM2-5_Transf', maxIter=10, regParam=0)
lr3= LinearRegression(featuresCol='features', labelCol='PM2-5_Transf', maxIter=10, regParam=0.3)

# Fit the training data
lr_model1 = lr1.fit(train_data)
lr_model2 = lr2.fit(train_data)
lr_model3 = lr3.fit(train_data)

#Prediction
pred1 = lr_model1.transform(test_data)
pred2 = lr_model2.transform(test_data)
pred3 = lr_model3.transform(test_data)

#Evaluation
e1 = evaluator.evaluate(pred1)
e2 = evaluator.evaluate(pred1)
e3 = evaluator.evaluate(pred1)

#Output
print("R2 Model 1: " + str(e1))
print("R2 Model 2: " + str(e2))
print("R2 Model 3: " + str(e3))


In [None]:
#Random forest
random_forest1 = RandomForestRegressor(numTrees=5,featuresCol='features', labelCol='PM2-5_Transf')
random_forest2 = RandomForestRegressor(numTrees=10,featuresCol='features', labelCol='PM2-5_Transf')
random_forest3 = RandomForestRegressor(numTrees=100,featuresCol='features', labelCol='PM2-5_Transf')

rf1 = random_forest1.fit(train_data)
rf2 = random_forest2.fit(train_data)
rf3 = random_forest3.fit(train_data)

#Prediction
pred1 = rf1.transform(test_data)
pred2 = rf2.transform(test_data)
pred3 = rf3.transform(test_data)

#Evaluation
e1 = evaluator.evaluate(pred1)
e2 = evaluator.evaluate(pred2)
e3 = evaluator.evaluate(pred3)

#Output
print("Random Forest Regression Model 1: " + str(e1))
print("Random Forest Regression Model 2: " + str(e2))
print("Random Forest Regression Model 3: " + str(e3))


In [None]:
random_forest3 = RandomForestRegressor(numTrees=100,featuresCol='features', labelCol='PM2-5_Transf')
rf3 = random_forest3.fit(train_data)
pred3 = rf3.transform(test_data)
pred3.head(5)