In [2]:
%%configure -f
{ 
    "conf":{
        "spark.pyspark.python": "python3",
        "spark.executor.instances": "10"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1645377711004_0002,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1645377711004_0002,pyspark,idle,Link,Link,✔


In [38]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType, TimestampType

In [43]:
def get_dataframe_from_csv(file_name):
    df = (spark.read
      .format("s3selectCSV")
      .option("inferSchema", "true")
      .option("header", "true")
      .load(file_name))

    df = df.na.drop()
    df=df.withColumn("pressure", df["pressure"].cast("float").alias("pressure"))
    df=df.withColumn("temperature", df["temperature"].cast("float").alias("temperature"))
    df=df.withColumn("humidity", df["humidity"].cast("float").alias("humidity"))
    
    return df

In [44]:
def remove_outliers(df):
    df = df.withColumn("relation", col("humidity") / col("temperature"))

    IQRdf = (df.agg(
                 expr("percentile(relation , array(0.25))")[0].alias("lower"),
                 expr("percentile(relation , array(0.75))")[0].alias("upper"),
                 expr("percentile(relation , array(0.50))")[0].alias("median"))
             .withColumn("deviation", (col("upper") - col("lower"))/2))

    df = (df.join(IQRdf)
          .filter(abs(col("relation") - col("median")) >= (col("deviation") * 2.2)))

    df = (df.drop("lower")
          .drop("upper")
          .drop("median")
          .drop("deviation"))
        
    return df

In [45]:
schema = StructType([
    StructField("_c0", IntegerType(), True),
    StructField("sensor_id", IntegerType(), True),
    StructField("location", IntegerType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("pressure", FloatType(), True),
    StructField("temperature", FloatType(), True),
    StructField("humidity", FloatType(), True),
    StructField("relation", DoubleType(), True),
  ])

df = spark.createDataFrame([], schema)

for i in range(2017, 2018):
    for j in range(7, 13):
        file_name = "s3://sofia-air-quality-dataset/{}-{:02d}_bme280sof.csv".format(i, j)
        aux = get_dataframe_from_csv(file_name)
        aux = remove_outliers(aux)
        df = df.union(aux)

root
 |-- _c0: integer (nullable = true)
 |-- sensor_id: integer (nullable = true)
 |-- location: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- pressure: float (nullable = true)
 |-- temperature: float (nullable = true)
 |-- humidity: float (nullable = true)
 |-- relation: double (nullable = true)

root
 |-- _c0: integer (nullable = true)
 |-- sensor_id: integer (nullable = true)
 |-- location: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- pressure: float (nullable = true)
 |-- temperature: float (nullable = true)
 |-- humidity: float (nullable = true)
 |-- relation: double (nullable = true)

root
 |-- _c0: integer (nullable = true)
 |-- sensor_id: integer (nullable = true)
 |-- location: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- tim

In [47]:
df_reg_temp = df = df.withColumn("date", to_date(col("timestamp")))

In [50]:
df_reg_temp = (df.groupby("date")
               .agg(
                   avg("temperature").alias("temperature"),
                   avg("humidity").alias("humidity"),
                   avg("pressure").alias("pressure"))
               .withColumn("relation", col("humidity") / col("temperature"))
               .orderBy("date"))

In [51]:
df_reg_temp.show(n=5)

+----------+------------------+-----------------+-----------------+-----------------+
|      date|       temperature|         humidity|         pressure|         relation|
+----------+------------------+-----------------+-----------------+-----------------+
|2017-07-02|19.377313433594963|81.63064673883999|94004.93950171019|4.212691662267008|
|2017-07-03|17.377409705492607|84.78500034402376|94656.82726140604|4.879035585909282|
|2017-07-04| 16.74652957137202|76.00736052607147| 95123.0164612676|4.538693238030946|
|2017-07-05|15.467403399257337| 74.0439253946078|95447.05046610169|4.787094736157395|
|2017-07-06|15.867656325227305|74.86759029673512| 95290.9246314184|4.718251313378041|
+----------+------------------+-----------------+-----------------+-----------------+
only showing top 5 rows

In [52]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(outputCol="features")
vecAssembler.setInputCols(["relation", "temperature", "humidity"])
vdf = vecAssembler.transform(df_reg_temp)
vdf = vdf.select(["features", "relation", "temperature", "humidity"])
vdf.show(3)

+--------------------+-----------------+------------------+-----------------+
|            features|         relation|       temperature|         humidity|
+--------------------+-----------------+------------------+-----------------+
|[4.21269166226700...|4.212691662267008|19.377313433594963|81.63064673883999|
|[4.87903558590928...|4.879035585909282|17.377409705492607|84.78500034402376|
|[4.53869323803094...|4.538693238030946| 16.74652957137202|76.00736052607147|
+--------------------+-----------------+------------------+-----------------+
only showing top 3 rows

In [53]:
splits = vdf.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [54]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = "features", labelCol="relation", maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.9966132411957674,0.0,0.0]
Intercept: 0.017657884882711774

In [55]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 0.299797
r2: 0.999989

In [56]:
train_df.describe().show()

+-------+------------------+------------------+------------------+
|summary|          relation|       temperature|          humidity|
+-------+------------------+------------------+------------------+
|  count|               122|               122|               122|
|   mean| 5.213800540104589|-3.215223890587103| 82.53706275648601|
| stddev| 88.88530181186259| 33.55123041843445|  9.09932393079846|
|    min|-344.7366089500445|           -142.75|56.456836517399815|
|    max| 487.8574307414297|20.910073170496297|             100.0|
+-------+------------------+------------------+------------------+

In [62]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.show(n=5)
lr_predictions.select("prediction", "relation", "features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="relation",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+--------------------+-----------------+------------------+-----------------+-----------------+
|            features|         relation|       temperature|         humidity|       prediction|
+--------------------+-----------------+------------------+-----------------+-----------------+
|[4.21269166226700...|4.212691662267008|19.377313433594963|81.63064673883999|4.216082176573019|
|[4.87903558590928...|4.879035585909282|17.377409705492607|84.78500034402376|4.880169354065251|
|[4.78709473615739...|4.787094736157395|15.467403399257337| 74.0439253946078| 4.78853988579573|
|[4.24747456791105...| 4.24747456791105|17.773345070825496|75.49183117503851|4.250747280905135|
|[4.09358694552932...|4.093586945529321|19.011199951171875|77.82399993896485| 4.09738083878337|
+--------------------+-----------------+------------------+-----------------+-----------------+
only showing top 5 rows

+-----------------+-----------------+--------------------+
|       prediction|         relation|            fea

In [63]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 0.444643

In [64]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 7
objectiveHistory: [0.49999999999999994, 0.40102463824382684, 0.00313344096521352, 0.003053440427515304, 0.003044413005872045, 0.0030444099591633213, 0.0030444099591632233]
+--------------------+
|           residuals|
+--------------------+
|-0.00228642559909...|
|-0.00167830570654...|
|-0.00345024444580...|
|-0.00285680856605...|
|-0.00305623382280...|
|-7.27542546576032...|
|-9.03043374882450...|
|-0.00145045441450...|
|-7.81172177928724E-4|
|0.013850198106787559|
|-0.02674376809498...|
|-0.02636471933063...|
|-0.02432425794905...|
|-0.02103869089806...|
|-0.02080963878800468|
|-0.00320971932896...|
|-0.00433729530668...|
|-0.00312124115563...|
|-0.00327323385276...|
|-0.00233209749089...|
+--------------------+
only showing top 20 rows

In [65]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","relation","features").show()

+-------------------+-------------------+--------------------+
|         prediction|           relation|            features|
+-------------------+-------------------+--------------------+
|  4.216082176573019|  4.212691662267008|[4.21269166226700...|
|  4.880169354065251|  4.879035585909282|[4.87903558590928...|
|   4.78853988579573|  4.787094736157395|[4.78709473615739...|
|  4.250747280905135|   4.24747456791105|[4.24747456791105...|
|   4.09738083878337|  4.093586945529321|[4.09358694552932...|
|  4.209623468031279|  4.206211005303238|[4.20621100530323...|
|  4.660072386436954|  4.658190669817039|[4.65819066981703...|
|  4.398058120809037|  4.395286009516174|[4.39528600951617...|
|  4.355782787383221|  4.352867013181054|[4.35286701318105...|
|   4.30011082122218|  4.297005858763474|[4.29700585876347...|
| 14.689088428138156| 14.721287994982093|[14.7212879949820...|
|-25.784419959813867|-25.889760218054548|[-25.889760218054...|
|-1.9270131649691333|-1.9512795630916648|[-1.9512795630

In [66]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'relation')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(labelCol="relation", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 64.4116

In [67]:
dt_model.featureImportances

SparseVector(3, {0: 0.925, 1: 0.0394, 2: 0.0356})

In [68]:
df_reg_temp.take(1)

[Row(date=datetime.date(2017, 7, 2), temperature=19.377313433594963, humidity=81.63064673883999, pressure=94004.93950171019, relation=4.212691662267008)]

In [69]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol = 'features', labelCol = 'relation', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'relation', 'features').show(5)

+-----------------+-----------------+--------------------+
|       prediction|         relation|            features|
+-----------------+-----------------+--------------------+
|4.378995403194841|4.212691662267008|[4.21269166226700...|
|4.804887123086766|4.879035585909282|[4.87903558590928...|
|4.596032991956885|4.787094736157395|[4.78709473615739...|
|4.620139566424155| 4.24747456791105|[4.24747456791105...|
|4.378995403194841|4.093586945529321|[4.09358694552932...|
+-----------------+-----------------+--------------------+
only showing top 5 rows

In [70]:
gbt_evaluator = RegressionEvaluator(
    labelCol="relation", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 65.2448