In [2]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test1').getOrCreate()



In [3]:
df = spark.read.csv('data of air quality/finaldataset.csv', inferSchema=True, header=True)

In [4]:
#df.show()
df.printSchema()
df=df.drop("Date")
finaldf = df.drop("AQI", 'AQI_Bucket')
finaldf.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- PM25: double (nullable = true)
 |-- NO: double (nullable = true)
 |-- NO2: double (nullable = true)
 |-- NOx: double (nullable = true)
 |-- NH3: double (nullable = true)
 |-- CO: double (nullable = true)
 |-- SO2: double (nullable = true)
 |-- O3: double (nullable = true)
 |-- Benzene: double (nullable = true)
 |-- Toluene: double (nullable = true)
 |-- AQI: integer (nullable = true)
 |-- AQI_Bucket: integer (nullable = true)
 |-- ChangeStrategyOrNot: integer (nullable = true)
 |-- CityIndex: integer (nullable = true)

root
 |-- PM25: double (nullable = true)
 |-- NO: double (nullable = true)
 |-- NO2: double (nullable = true)
 |-- NOx: double (nullable = true)
 |-- NH3: double (nullable = true)
 |-- CO: double (nullable = true)
 |-- SO2: double (nullable = true)
 |-- O3: double (nullable = true)
 |-- Benzene: double (nullable = true)
 |-- Toluene: double (nullable = true)
 |-- ChangeStrategyOrNot: integer (nullable = true)
 |-- CityInde

In [5]:
dataregression = finaldf.drop('ChangeStrategyOrNot')

In [6]:
from pyspark.ml.regression import LinearRegression

In [7]:
lr = LinearRegression(featuresCol='features', labelCol='label', predictionCol='prediction')

In [8]:
from pyspark.ml.feature import RFormula

ff = RFormula(
    formula = 'PM25 ~ .',
    featuresCol = 'features',
    labelCol = 'label'
    )

vecf = ff.fit(dataregression).transform(dataregression)
df2vecf = vecf.select('features','label')
df2vecf.show(truncate = False)

+------------------------------------------------------+-----+
|features                                              |label|
+------------------------------------------------------+-----+
|[7.2,1.27,10.65,25.63,0.56,4.22,2.81,0.01,0.08,15.0]  |31.21|
|[7.19,0.91,10.37,29.16,0.57,4.46,0.18,0.0,0.0,15.0]   |38.39|
|[7.14,1.07,10.48,28.95,0.57,4.53,0.41,0.0,0.0,15.0]   |43.23|
|[7.09,0.36,9.73,28.41,0.48,4.63,0.3,0.0,0.0,15.0]     |33.82|
|[5.63,2.32,8.09,23.98,0.5,4.71,13.02,0.13,0.68,15.0]  |27.14|
|[3.07,2.14,3.41,24.57,0.48,4.84,6.03,0.25,1.34,15.0]  |27.32|
|[3.0,1.48,5.24,23.42,0.47,5.04,8.76,0.24,1.19,15.0]   |31.76|
|[2.97,1.31,4.97,23.41,0.48,5.3,9.96,0.26,1.11,15.0]   |43.8 |
|[3.01,0.83,4.64,24.85,0.49,5.32,6.43,0.27,1.15,15.0]  |35.48|
|[3.01,0.88,4.62,24.44,0.49,5.63,8.62,49.66,50.11,15.0]|51.27|
|[2.93,0.13,3.65,18.55,0.37,5.41,5.64,13.07,13.69,15.0]|33.24|
|[2.98,0.16,3.71,20.24,0.32,5.63,5.73,62.93,62.45,15.0]|35.34|
|[2.96,0.14,3.66,20.21,0.37,6.17,5.59,0.33,0.33,15.0]  

In [9]:
(trainingData, testData) = df2vecf.randomSplit([0.7, 0.3])

In [10]:
lrModel = lr.fit(trainingData)

In [11]:
test_results = lrModel.evaluate(testData)

In [12]:
test_results.residuals.show()
print("RMSE: {}".format(test_results.rootMeanSquaredError))

+-------------------+
|          residuals|
+-------------------+
| 14.917306604707733|
|-12.645443638250466|
| -4.262193403844485|
| 15.526301877404087|
| 13.813013706426704|
|-3.7423562625965587|
| 12.227319422173785|
| 20.887768479020153|
|-11.751651025408648|
|  29.25397385776315|
| 10.026975868892102|
| 31.523921214370226|
|  2.404143478063645|
|  22.84792418207185|
| 30.352177704393675|
| 20.514472444091048|
| 12.756218841305696|
| -7.465769514567864|
| -2.333538738193411|
| 17.189253419272845|
+-------------------+
only showing top 20 rows

RMSE: 39.316678423220374


In [13]:
print("Coefficients: {}".format(str(lrModel.coefficients))) # For each feature...
print('\n')
print("Intercept:{}".format(str(lrModel.intercept)))

Coefficients: [0.7622371061467684,0.6616363337852524,0.007712805540991076,0.6580277872132876,4.521110406723865,0.6103366644868007,0.3667111356689157,-0.7512829564552937,0.8156184710030225,0.7083833345415311]


Intercept:-21.701324651280956


In [14]:
trainingSummary = lrModel.summary

In [15]:
trainingSummary.residuals.show()

# Print Root Mean Squared Error. 
print("RMSE: {}".format(trainingSummary.rootMeanSquaredError))

# Print R-Squared.
print("r2: {}".format(trainingSummary.r2))

+-------------------+
|          residuals|
+-------------------+
|  22.96072928322856|
|  8.609417021618818|
| 14.217939114583672|
|   16.3309019813588|
| -3.493708619318639|
|  87.62391879465443|
| -8.876755892888756|
| 7.2274066459122075|
| 15.025330904375911|
| 10.106236804202549|
| 21.684815021262096|
|  16.85719480146028|
| -3.844233306715765|
| -4.543269913432496|
| 13.908253759167597|
| 17.491126754492647|
|  17.70179413348473|
| -2.438083492289813|
|-3.5843945155505352|
| -11.21422388631622|
+-------------------+
only showing top 20 rows

RMSE: 36.85891723626681
r2: 0.6993210489948706


In [16]:
#Advanced

In [17]:
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [18]:
assembler = VectorAssembler(
    inputCols=["PM25", "NO", 
               "NO2",'NOx',"NH3","CO","SO2","O3","Benzene","Toluene","CityIndex"],
    outputCol="features")

In [19]:
output = assembler.transform(dataregression)

In [20]:
final_data = output.select("features",'PM25')

In [21]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [22]:
lr = LinearRegression(labelCol='PM25')
lrModel = lr.fit(train_data)
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Coefficients: [0.9999999999999997,-1.205256668120019e-15,-1.873913885598802e-15,1.8665948207085526e-15,3.502763130821986e-16,-1.0706039909933274e-14,3.0320280824286152e-15,4.3690832600247235e-16,5.716638585077843e-16,-5.427314473469501e-16,-3.873336085171711e-15] Intercept: -7.135581406111148e-15


In [23]:
test_results = lrModel.evaluate(test_data)

In [24]:
# Interesting results! This shows the difference between the predicted value and the test data.
test_results.residuals.show()

# Let's get some evaluation metrics (as discussed in the previous linear regression notebook).
print("RSME: {}".format(test_results.rootMeanSquaredError))
#RMSE, on average, the discrepancy between the actual and predicted result

+--------------------+
|           residuals|
+--------------------+
|3.752553823233029...|
|-8.43769498715119...|
|1.065814103640150...|
|3.108624468950438...|
|2.131628207280300...|
|1.332267629550187...|
|7.105427357601002...|
|-2.93098878501041...|
|1.865174681370263...|
|-3.19744231092045...|
|1.065814103640150...|
|3.197442310920451...|
|4.174438572590588...|
|2.664535259100375...|
|-8.88178419700125...|
|1.065814103640150...|
|4.352074256530613...|
|-8.88178419700125...|
|-2.66453525910037...|
|2.753353101070388...|
+--------------------+
only showing top 20 rows

RSME: 4.951631379167478e-14


In [25]:
# We can also get the R2 value. 
print("R2: {}".format(test_results.r2))
#R2 percentage of variance account 

R2: 1.0


In [26]:
final_data.describe().show()

+-------+------------------+
|summary|              PM25|
+-------+------------------+
|  count|              6674|
|   mean|62.302305963440304|
| stddev| 67.65959569304867|
|    min|              0.99|
|    max|            685.36|
+-------+------------------+

