In [2]:
# Section must be included at the beginning of each new notebook. Remember to change the app name. 
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('zli786-Iteration4').getOrCreate()

In [52]:
# Input the dataset
df = spark.read.options(header='True', inferSchema='True').csv("./dataset1.csv")

In [53]:
# show the visualise of the dataset
df.show()

+---+----+-----+---+----+-----+-----+----+----+---+----+----+------+-----+----+---+----+---------+
| No|year|month|day|hour|PM2.5| PM10| SO2| NO2| CO|  O3|TEMP|  PRES| DEWP|RAIN| wd|WSPM|  station|
+---+----+-----+---+----+-----+-----+----+----+---+----+----+------+-----+----+---+----+---------+
|  1|2013|    3|  1|   0|  3.0|  6.0|13.0| 7.0|300|85.0|-2.3|1020.8|-19.7| 0.0|  E| 0.5|Changping|
|  2|2013|    3|  1|   1|  3.0|  3.0| 6.0| 6.0|300|85.0|-2.5|1021.3|-19.0| 0.0|ENE| 0.7|Changping|
|  3|2013|    3|  1|   2|  3.0|  3.0|22.0|13.0|400|74.0|-3.0|1021.3|-19.9| 0.0|ENE| 0.2|Changping|
|  4|2013|    3|  1|   3|  3.0|  6.0|12.0| 8.0|300|81.0|-3.6|1021.8|-19.1| 0.0|NNE| 1.0|Changping|
|  5|2013|    3|  1|   4|  3.0|  3.0|14.0| 8.0|300|81.0|-3.5|1022.3|-19.4| 0.0|  N| 2.1|Changping|
|  6|2013|    3|  1|   5|  3.0|  3.0|10.0|17.0|400|71.0|-4.5|1022.6|-19.5| 0.0|NNW| 1.7|Changping|
|  7|2013|    3|  1|   6|  4.0|  6.0|12.0|22.0|500|65.0|-4.5|1023.4|-19.5| 0.0|NNW| 1.8|Changping|
|  8|2013|

In [54]:
# print the columns name of the dataset
df.columns

['No',
 'year',
 'month',
 'day',
 'hour',
 'PM2.5',
 'PM10',
 'SO2',
 'NO2',
 'CO',
 'O3',
 'TEMP',
 'PRES',
 'DEWP',
 'RAIN',
 'wd',
 'WSPM',
 'station']

In [55]:
# Print the data type in the dataset
df.printSchema()

root
 |-- No: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- PM2.5: double (nullable = true)
 |-- PM10: double (nullable = true)
 |-- SO2: double (nullable = true)
 |-- NO2: double (nullable = true)
 |-- CO: integer (nullable = true)
 |-- O3: double (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- PRES: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- RAIN: double (nullable = true)
 |-- wd: string (nullable = true)
 |-- WSPM: double (nullable = true)
 |-- station: string (nullable = true)



In [56]:
# Count the size of the dataset
df.count()

35064

In [57]:
# Rename the dataset's target column, from PM2.5 to Target
df1 = df.withColumnRenamed("PM2.5","Target")
df1.columns

['No',
 'year',
 'month',
 'day',
 'hour',
 'Target',
 'PM10',
 'SO2',
 'NO2',
 'CO',
 'O3',
 'TEMP',
 'PRES',
 'DEWP',
 'RAIN',
 'wd',
 'WSPM',
 'station']

In [58]:
# Generated the descriptive statistics of the dataset
# To perform better layout, separate the columns into three parts
# Time attributes 
df1.describe([
    'year',
    'month',
    'day',
    'hour',
]).show()

+-------+------------------+-----------------+------------------+-----------------+
|summary|              year|            month|               day|             hour|
+-------+------------------+-----------------+------------------+-----------------+
|  count|             35064|            35064|             35064|            35064|
|   mean| 2014.662559890486|6.522929500342231|15.729637234770705|             11.5|
| stddev|1.1772134318242622|3.448752360047857|  8.80021752943156|6.922285262428006|
|    min|              2013|                1|                 1|                0|
|    max|              2017|               12|                31|               23|
+-------+------------------+-----------------+------------------+-----------------+



In [59]:
# The concentration of Inorganic air pollutants, and PM
df1.describe([
    'Target',
    'PM10',
    'SO2',
    'NO2',
    'CO',
    'O3',
]).show()

+-------+-----------------+-----------------+------------------+------------------+------------------+------------------+
|summary|           Target|             PM10|               SO2|               NO2|                CO|                O3|
+-------+-----------------+-----------------+------------------+------------------+------------------+------------------+
|  count|            34290|            34482|             34436|             34397|             33543|             34460|
|   mean|71.09974336541265|94.65787077315701|14.958905587176204| 44.18208550745705|1152.3013445428255|57.940002617527554|
| stddev| 72.3269261250205|83.44173842092754|20.975331415701525|29.519796285531235|1103.0562821491628| 54.31667439264105|
|    min|              2.0|              2.0|            0.2856|            1.8477|               100|            0.2142|
|    max|            882.0|            999.0|             310.0|             226.0|             10000|             429.0|
+-------+---------------

In [60]:
# The Influencing factors in the dataset
df1.describe([
    'TEMP',
    'PRES',
    'DEWP',
    'RAIN',
    'wd',
    'WSPM',
    'station'
]).show()

+-------+------------------+------------------+------------------+-------------------+-----+------------------+---------+
|summary|              TEMP|              PRES|              DEWP|               RAIN|   wd|              WSPM|  station|
+-------+------------------+------------------+------------------+-------------------+-----+------------------+---------+
|  count|             35011|             35014|             35011|              35013|34924|             35021|    35064|
|   mean| 13.68611128792636|1007.7602777935998|1.5054954157264813|0.06036614971581961| null|1.8538362696667647|     null|
| stddev|11.365312950567468|10.225663530494572|13.822098888069776| 0.7528993068240762| null|1.3098083299251684|     null|
|    min|             -16.6|             982.4|             -35.1|                0.0|    E|               0.0|Changping|
|    max|              41.4|            1036.5|              27.2|               52.1|  WSW|              10.0|Changping|
+-------+---------------

In [61]:
# Remove the null value
df2 = df1.na.drop()
df2.count()

32681

In [62]:
# Import VectorAssembler and Vectors to vectorization the data
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [71]:
# The input columns are the feature column names, and the output column is the feature.
assembler = VectorAssembler(
    inputCols=["month", "hour", "PM10", "SO2", 'NO2',
               "CO", "O3", "TEMP", "DEWP", "RAIN", "WSPM"],
    outputCol="Features")

In [72]:
# Now that we've created the assembler variable, let's actually transform the data.
df3 = assembler.transform(df2)

In [73]:
# Using print schema, the features output column has been added. 
df3.printSchema()
# The "features" column is a dense vector that combines the various features as expected.
df3.head(1)

root
 |-- No: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- Target: double (nullable = true)
 |-- PM10: double (nullable = true)
 |-- SO2: double (nullable = true)
 |-- NO2: double (nullable = true)
 |-- CO: integer (nullable = true)
 |-- O3: double (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- PRES: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- RAIN: double (nullable = true)
 |-- wd: string (nullable = true)
 |-- WSPM: double (nullable = true)
 |-- station: string (nullable = true)
 |-- Features: vector (nullable = true)



[Row(No=1, year=2013, month=3, day=1, hour=0, Target=3.0, PM10=6.0, SO2=13.0, NO2=7.0, CO=300, O3=85.0, TEMP=-2.3, PRES=1020.8, DEWP=-19.7, RAIN=0.0, wd='E', WSPM=0.5, station='Changping', Features=DenseVector([3.0, 0.0, 6.0, 13.0, 7.0, 300.0, 85.0, -2.3, -19.7, 0.0, 0.5]))]

In [74]:
# Let's select two columns (the feature and predictor).
# This is now in the appropriate format to be processed by Spark.
dataset = df3.select("Features","Target")
dataset.show()

+--------------------+------+
|            Features|Target|
+--------------------+------+
|[3.0,0.0,6.0,13.0...|   3.0|
|[3.0,1.0,3.0,6.0,...|   3.0|
|[3.0,2.0,3.0,22.0...|   3.0|
|[3.0,3.0,6.0,12.0...|   3.0|
|[3.0,4.0,3.0,14.0...|   3.0|
|[3.0,5.0,3.0,10.0...|   3.0|
|[3.0,6.0,6.0,12.0...|   4.0|
|[3.0,7.0,6.0,25.0...|   3.0|
|[3.0,8.0,25.0,13....|   9.0|
|[3.0,9.0,29.0,5.0...|  11.0|
|[3.0,10.0,10.0,3....|   9.0|
|[3.0,11.0,3.0,4.0...|   3.0|
|[3.0,12.0,6.0,4.0...|   3.0|
|[3.0,13.0,101.0,5...|   3.0|
|[3.0,14.0,60.0,5....|   9.0|
|[3.0,15.0,34.0,6....|   3.0|
|[3.0,16.0,28.0,5....|   3.0|
|[3.0,17.0,25.0,19...|   6.0|
|[3.0,18.0,17.0,40...|   4.0|
|[3.0,19.0,19.0,37...|   7.0|
+--------------------+------+
only showing top 20 rows



In [75]:
# To avoid overfit, do a randomised 70/30 split. 
train_data,test_data = dataset.randomSplit([0.7,0.3])

In [76]:
# The descriptive statistics of training data.
train_data.describe().show()

# The descriptive statistics of testing data.
test_data.describe().show()

+-------+-----------------+
|summary|           Target|
+-------+-----------------+
|  count|            22934|
|   mean|69.96537455306532|
| stddev|70.81669153788606|
|    min|              3.0|
|    max|            581.0|
+-------+-----------------+

+-------+-----------------+
|summary|           Target|
+-------+-----------------+
|  count|             9747|
|   mean|71.12868574946138|
| stddev|71.27837864606425|
|    min|              3.0|
|    max|            662.0|
+-------+-----------------+

