In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BDAS').getOrCreate()

In [2]:
mutiplefile = spark.read.format("csv") \
      .option("header", True) \
      .load("../DataSet/*.csv")


In [3]:

print("Size of the data:",mutiplefile.count())

Size of the data: 420768


In [4]:
mutiplefile.createOrReplaceTempView('dataset')
results = spark.sql("SELECT * FROM dataset WHERE year > 2014")

results.count()

227520

In [5]:
df = results.withColumnRenamed("DEWP","DewPointTempeature") \
    .withColumnRenamed("wd","WindDirection")\
    .withColumnRenamed("WSPM","WindSpend")\
    .withColumnRenamed("PM2.5","PM25")
df.show()

+-----+----+-----+---+----+----+----+---+---+----+---+----+----+------------------+----+-------------+---------+-------------+
|   No|year|month|day|hour|PM25|PM10|SO2|NO2|  CO| O3|TEMP|PRES|DewPointTempeature|RAIN|WindDirection|WindSpend|      station|
+-----+----+-----+---+----+----+----+---+---+----+---+----+----+------------------+----+-------------+---------+-------------+
|16105|2015|    1|  1|   0|   3|  21| 10| 16| 400| 54|  -1|1027|               -23|   0|           NW|      0.9|Wanshouxigong|
|16106|2015|    1|  1|   1|   3|  13| 11| 17| 400| 53|   0|1025|             -22.9|   0|           NW|      2.7|Wanshouxigong|
|16107|2015|    1|  1|   2|   3|  13| 10| 15| 400| 55|   0|1027|             -22.9|   0|           NW|      2.4|Wanshouxigong|
|16108|2015|    1|  1|   3|   4|  18| 13| 13| 400| 57|   0|1028|             -24.4|   0|           NW|      2.4|Wanshouxigong|
|16109|2015|    1|  1|   4|   4|  15| 15| 12| 400| 58|   0|1030|             -24.4|   0|           NW|      2.4

In [6]:
from pyspark.sql.functions import col, when
new_df = df
for i in df.columns:
    new_df = df.withColumn(i,when((col(i)=='NA'),None).otherwise(col(i)))
    df = new_df

In [7]:
deleted_np = df.na.drop()

In [8]:

deleted_np.count()

214201

In [9]:
deleted_np.write.csv("../final_dataset/final.csv",header = True )

In [10]:
# create correct data schema.
from pyspark.sql.types import *
schema = StructType([StructField('No',IntegerType(),True),
               StructField('year',IntegerType(),True),
               StructField('month',IntegerType(),True),
               StructField('day',IntegerType(),True),
               StructField('hour',IntegerType(),True),
               StructField('PM25',FloatType(),True),
               StructField('PM10',FloatType(),True),
               StructField('SO2',FloatType(),True),
               StructField('NO2',FloatType(),True),
               StructField('CO',FloatType(),True),
               StructField('O3',FloatType(),True),
               StructField('TEMP',FloatType(),True),
               StructField('PRES',FloatType(),True),
               StructField('DewPointTempeature',FloatType(),True),
               StructField('RAIN',FloatType(),True),
               StructField('WindDirection',StringType(),True),
               StructField('WindSpend',FloatType(),True),
               StructField('station',StringType(),True)])

In [11]:
df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("../final_dataset/final.csv")


In [12]:
df_with_schema.count()

214201

In [None]:
#outlier(do not need to run)
from pyspark import SparkContext
from pyspark.sql.session import SparkSession

In [None]:
#outlier(do not need to run)
cols = ['PM25','SO2','NO2','CO','O3','TEMP','PRES','DewPointTempeature','RAIN','WindSpend']
bounds = {}

for col in cols:
    quantiles = df_with_schema.approxQuantile(
        col, [0.25, 0.75], 0.05
    )
    
    IQR = quantiles[1] - quantiles[0]
    
    bounds[col] = [
        quantiles[0] - 1.5 * IQR,
        quantiles[1] + 1.5 * IQR
    ]

In [None]:
#outlier(do not need to run)
outliers = df_with_schema.select(*['No'] + [
    (
        (df_with_schema[c] < bounds[c][0]) | (df_with_schema[c] > bounds[c][1])
    ).alias(c + '_0') for c in cols
])

outliers.show()

In [None]:
#outlier(do not need to run)
df_outliers = df_with_schema.join(outliers, on='No')


In [None]:
#outlier(do not need to run)
df_outliers.show()

In [None]:
#outlier(do not need to run)
clean_df  = df_outliers.filter(~df_outliers['PM25_0'] & 
                   ~df_outliers['SO2_0'] & 
                   ~df_outliers['NO2_0'] & 
                   ~df_outliers['CO_0'] & 
                   ~df_outliers['O3_0'] & 
                   ~df_outliers['TEMP_0'] & 
                   ~df_outliers['PRES_0'] &
                   ~df_outliers['DewPointTempeature_0']&
                   ~df_outliers['RAIN_0']&
                   ~df_outliers['WindSpend_0'])

In [None]:
#outlier(do not need to run)
clean_df.show()
clean_df.count()

In [None]:
#outlier(do not need to run)
final_data = clean_df.drop(*["PM25_0","SO2_0",'NO2_0','CO_0','O3_0','TEMP_0','PRES_0','DewPointTempeature_0','RAIN_0','WindSpend_0',"No","PM10"])

In [13]:
final_data = df_with_schema.drop(*["No","PM10"])
final_data.show()

+----+-----+---+----+----+----+-----+------+----+----+------+------------------+----+-------------+---------+-------------+
|year|month|day|hour|PM25| SO2|  NO2|    CO|  O3|TEMP|  PRES|DewPointTempeature|RAIN|WindDirection|WindSpend|      station|
+----+-----+---+----+----+----+-----+------+----+----+------+------------------+----+-------------+---------+-------------+
|2015|    1|  1|   0| 3.0|10.0| 16.0| 400.0|54.0|-1.0|1027.0|             -23.0| 0.0|           NW|      0.9|Wanshouxigong|
|2015|    1|  1|   1| 3.0|11.0| 17.0| 400.0|53.0| 0.0|1025.0|             -22.9| 0.0|           NW|      2.7|Wanshouxigong|
|2015|    1|  1|   2| 3.0|10.0| 15.0| 400.0|55.0| 0.0|1027.0|             -22.9| 0.0|           NW|      2.4|Wanshouxigong|
|2015|    1|  1|   3| 4.0|13.0| 13.0| 400.0|57.0| 0.0|1028.0|             -24.4| 0.0|           NW|      2.4|Wanshouxigong|
|2015|    1|  1|   4| 4.0|15.0| 12.0| 400.0|58.0| 0.0|1030.0|             -24.4| 0.0|           NW|      2.4|Wanshouxigong|
|2015|  

In [14]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
def NationalQuanlityRemark(value):
    if   value <= 50: 
        return 'Good'
    elif value > 50 and value <= 100:
        return "Satisfactory"
    elif value > 100 and value <= 200:
        return 'Moderate'
    elif value > 200 and value <= 300:
        return 'Poor'
    elif value > 300 and value <=400:
        return 'Very Poor'
    else:
        return 'Severe'

In [15]:
udfsomefunc = F.udf(NationalQuanlityRemark, StringType())
NQR = final_data.withColumn("NQR", udfsomefunc("PM25"))
NQR.show()

+----+-----+---+----+----+----+-----+------+----+----+------+------------------+----+-------------+---------+-------------+------------+
|year|month|day|hour|PM25| SO2|  NO2|    CO|  O3|TEMP|  PRES|DewPointTempeature|RAIN|WindDirection|WindSpend|      station|         NQR|
+----+-----+---+----+----+----+-----+------+----+----+------+------------------+----+-------------+---------+-------------+------------+
|2015|    1|  1|   0| 3.0|10.0| 16.0| 400.0|54.0|-1.0|1027.0|             -23.0| 0.0|           NW|      0.9|Wanshouxigong|        Good|
|2015|    1|  1|   1| 3.0|11.0| 17.0| 400.0|53.0| 0.0|1025.0|             -22.9| 0.0|           NW|      2.7|Wanshouxigong|        Good|
|2015|    1|  1|   2| 3.0|10.0| 15.0| 400.0|55.0| 0.0|1027.0|             -22.9| 0.0|           NW|      2.4|Wanshouxigong|        Good|
|2015|    1|  1|   3| 4.0|13.0| 13.0| 400.0|57.0| 0.0|1028.0|             -24.4| 0.0|           NW|      2.4|Wanshouxigong|        Good|
|2015|    1|  1|   4| 4.0|15.0| 12.0| 400

In [16]:
def season(value):
    if   value >=4 and value <6: 
        return 'Spring'
    elif value >=6 and value<9:
        return "Summer"
    elif value > 9 and value <= 11:
        return 'Autumn'
    else:
        return 'Winter'

In [17]:
Seasonfunc = F.udf(season, StringType())
new_df = NQR.withColumn("Season", Seasonfunc("year"))
new_df.show()

+----+-----+---+----+----+----+-----+------+----+----+------+------------------+----+-------------+---------+-------------+------------+------+
|year|month|day|hour|PM25| SO2|  NO2|    CO|  O3|TEMP|  PRES|DewPointTempeature|RAIN|WindDirection|WindSpend|      station|         NQR|Season|
+----+-----+---+----+----+----+-----+------+----+----+------+------------------+----+-------------+---------+-------------+------------+------+
|2015|    1|  1|   0| 3.0|10.0| 16.0| 400.0|54.0|-1.0|1027.0|             -23.0| 0.0|           NW|      0.9|Wanshouxigong|        Good|Winter|
|2015|    1|  1|   1| 3.0|11.0| 17.0| 400.0|53.0| 0.0|1025.0|             -22.9| 0.0|           NW|      2.7|Wanshouxigong|        Good|Winter|
|2015|    1|  1|   2| 3.0|10.0| 15.0| 400.0|55.0| 0.0|1027.0|             -22.9| 0.0|           NW|      2.4|Wanshouxigong|        Good|Winter|
|2015|    1|  1|   3| 4.0|13.0| 13.0| 400.0|57.0| 0.0|1028.0|             -24.4| 0.0|           NW|      2.4|Wanshouxigong|        Good|

In [None]:
df_0 = new_df.filter(col("NQR") == 'Severe')
df_0.count()

In [18]:
new_df.write.csv("../final_dataset/finishedpreprocessing.csv",header = True)

In [None]:
new_df.count()

In [None]:
new_df.printSchema()