In [2]:
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

## Read in Data

In [3]:
bme_file_location = "sofia/*bme280sof.csv"
sds_file_location = "sofia/*sds011sof.csv"

file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df_bme = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .load(bme_file_location)

df_sds = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .load(sds_file_location)


In [4]:
from pyspark.sql.functions import year, month
from pyspark.sql.functions import to_date
from pyspark.sql.functions import to_timestamp,date_format
from pyspark.sql import functions as F
from pyspark.sql.functions import count, avg
from pyspark.sql.functions import col


df_sds_transformed = df_sds.withColumn('year',year(df_sds.timestamp))\
    .withColumn('month', month(df_sds.timestamp))\
    .withColumn("day", date_format(col("timestamp"), "d"))\
    .withColumn("ts", to_date(col("timestamp")).cast("date"))

df_sds_transformed = df_sds_transformed.groupBy("ts").agg(avg("P1"), avg("P2")).orderBy(["ts"], ascending=True)

df_sds_transformed.show()

+----------+------------------+------------------+
|        ts|           avg(P1)|           avg(P2)|
+----------+------------------+------------------+
|2017-07-01|17.764459663706905| 8.341274009698298|
|2017-07-02| 9.846284524930946| 6.325375406399083|
|2017-07-03| 20.35557791635185|17.195223293020778|
|2017-07-04| 8.984114511906204| 6.868896334621589|
|2017-07-05|10.412705222705204| 7.964031059031034|
|2017-07-06| 10.85810864999049| 8.447780535930221|
|2017-07-07| 9.614079073024804| 7.430200547526521|
|2017-07-08| 12.10184730986929| 9.885236809576535|
|2017-07-09|12.441132935466957|10.319859653725107|
|2017-07-10|14.278580865387667|12.425794746989531|
|2017-07-11|16.458481004748865|13.907630592351836|
|2017-07-12|14.077904752827688|10.800456856017346|
|2017-07-13| 11.50965046888325| 8.878007956805918|
|2017-07-14| 5.461827450735781|  3.10989585931652|
|2017-07-15|10.245437171815821| 7.799760959824183|
|2017-07-16|11.484685678666041|  9.46174505220515|
|2017-07-17| 8.730244358596998|

In [5]:
df_bme_transformed = df_bme.withColumn('year',year(df_bme.timestamp))\
    .withColumn('month', month(df_bme.timestamp))\
    .withColumn("day", date_format(col("timestamp"), "d"))\
    .withColumn("ts", to_date(col("timestamp")).cast("date"))

df_bme_transformed = df_bme_transformed.groupBy("ts").agg(avg("pressure"), avg("temperature"), avg("humidity"))\
    .orderBy(["ts"], ascending=True)

df_bme_transformed.show()

+----------+-----------------+------------------+------------------+
|        ts|    avg(pressure)|  avg(temperature)|     avg(humidity)|
+----------+-----------------+------------------+------------------+
|2017-07-01|94572.18985080464| 33.33327613327619|32.792403355736745|
|2017-07-02|94441.42854684066|28.197254514672572| 44.52180304740427|
|2017-07-03|94668.76243252479| 18.25461707200767| 78.17694325226547|
|2017-07-04|95313.96683276288| 22.32803235375923|  50.4074079911003|
|2017-07-05|95440.82530922632|23.534423652694652|44.841247660928104|
|2017-07-06|95312.02019876736|25.778363851992424| 42.49701185958226|
|2017-07-07|95248.96706425186|27.469182004089852|40.482749797878675|
|2017-07-08|95059.96317789162|  25.7144688644688| 51.47889969005336|
|2017-07-09|95089.78527820377|27.075451422027033| 49.46747614048477|
|2017-07-10| 95128.1010232264|28.758966410703227|44.910974230932034|
|2017-07-11|95059.89666140139|30.580405242122854| 41.59478715493988|
|2017-07-12|94791.26359009359| 30.

In [6]:
combined_df = df_bme_transformed.join(df_sds_transformed, on=['ts'], how='left').orderBy(["ts"], ascending=True)
combined_df.show()
x_train = combined_df.filter(F.col('ts').between("2017-07-01", "2019-01-01"))
#x_test = combined_df.filter(F.col('ts').between("2018-10-02", "2019-05-01"))
x_test = combined_df.filter(F.col('ts') > "2019-01-01")
x_train = x_train.where(col("avg(P1)").isNotNull())
x_test = x_test.where(col("avg(P1)").isNotNull())
# x_train.show(x_train.count(), False)
# x_test.show(x_test.count(), False)
x_train.show()
x_test.show()

+----------+-----------------+------------------+------------------+------------------+------------------+
|        ts|    avg(pressure)|  avg(temperature)|     avg(humidity)|           avg(P1)|           avg(P2)|
+----------+-----------------+------------------+------------------+------------------+------------------+
|2017-07-01|94572.18985080464| 33.33327613327619|32.792403355736745|17.764459663706905| 8.341274009698298|
|2017-07-02|94441.42854684066|28.197254514672572| 44.52180304740427| 9.846284524930946| 6.325375406399083|
|2017-07-03|94668.76243252479| 18.25461707200767| 78.17694325226547| 20.35557791635185|17.195223293020778|
|2017-07-04|95313.96683276288| 22.32803235375923|  50.4074079911003| 8.984114511906204| 6.868896334621589|
|2017-07-05|95440.82530922632|23.534423652694652|44.841247660928104|10.412705222705204| 7.964031059031034|
|2017-07-06|95312.02019876736|25.778363851992424| 42.49701185958226| 10.85810864999049| 8.447780535930221|
|2017-07-07|95248.96706425186|27.4691

## Test Linear Regression

In [14]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor

vectorAssembler = VectorAssembler(inputCols = ['avg(pressure)', 'avg(temperature)', 'avg(humidity)'], outputCol = 'features')
features_df = vectorAssembler.transform(combined_df)
features_df = features_df.select(['features', 'avg(P2)'])
test_features_df = vectorAssembler.transform(x_test)
train_features_df = vectorAssembler.transform(x_train)
test_features_df = test_features_df.withColumnRenamed("avg(P2)","label")
train_features_df = train_features_df.withColumnRenamed("avg(P2)","label")

lr = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=50)
param_grid = ParamGridBuilder() \
            .addGrid(lr.regParam, [0.1, 0.3, 0.5]) \
            .addGrid(lr.elasticNetParam, [.5, .7, .9]) \
            .build()
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=RegressionEvaluator(), numFolds=5).setParallelism(8)
cvModel = cv.fit(train_features_df)
besty = cvModel.bestModel
print("  ElasticNetParam:", besty._java_obj.parent().getElasticNetParam())
print("  RegParam:", besty._java_obj.parent().getRegParam())
test_predictions = besty.transform(test_features_df)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="label",
    predictionCol="prediction")
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)
evaluator = RegressionEvaluator(
    metricName="r2",
    labelCol="label",
    predictionCol="prediction")
r2 = evaluator.evaluate(test_predictions)
print(r2)

  ElasticNetParam: 0.5
  RegParam: 0.5
13.373296421326268
-0.6713492547504689


In [15]:
from pyspark.ml.regression import GeneralizedLinearRegression

glr = GeneralizedLinearRegression(featuresCol = 'features', labelCol='label', maxIter=50)
param_grid = ParamGridBuilder() \
            .addGrid(glr.family, ['gaussian', 'Gamma'])\
            .addGrid(glr.regParam, [0.1, 0.3, 0.5]) \
            .build()
cv = CrossValidator(estimator=glr, estimatorParamMaps=param_grid, evaluator=RegressionEvaluator(), numFolds=5).setParallelism(8)

cvModel = cv.fit(train_features_df)
besty = cvModel.bestModel
print("  family:", besty._java_obj.parent().getFamily())
print("  RegParam:", besty._java_obj.parent().getRegParam())
test_predictions = besty.transform(test_features_df)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="label",
    predictionCol="prediction")
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)
evaluator = RegressionEvaluator(
    metricName="r2",
    labelCol="label",
    predictionCol="prediction")
r2 = evaluator.evaluate(test_predictions)
print(r2)

  family: gaussian
  RegParam: 0.5
13.628717015104831
-0.7358021349522113


In [16]:
from pyspark.ml.regression import FMRegressor
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler

featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(train_features_df)
fm = FMRegressor(featuresCol="scaledFeatures", stepSize=0.9)
pipeline = Pipeline(stages=[featureScaler, fm])
model = pipeline.fit(train_features_df)
test_predictions = model.transform(test_features_df)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="label",
    predictionCol="prediction")
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

12.597106349168207


In [17]:
from pyspark.ml.regression import IsotonicRegression

IR = IsotonicRegression()
param_grid = ParamGridBuilder() \
            .build()
cv = CrossValidator(estimator=IR, estimatorParamMaps=param_grid, evaluator=RegressionEvaluator(), numFolds=5).setParallelism(8)

cvModel = cv.fit(train_features_df)
besty = cvModel.bestModel
test_predictions = besty.transform(test_features_df)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="label",
    predictionCol="prediction")
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

15.62787164726288


In [18]:
gbt = GBTRegressor(featuresCol = 'features', labelCol='label', maxIter=50)
param_grid = ParamGridBuilder() \
            .addGrid(gbt.maxDepth, [5, 10, 15]) \
            .addGrid(gbt.maxBins, [16]) \
            .build()
cv = CrossValidator(estimator=gbt, estimatorParamMaps=param_grid, evaluator=RegressionEvaluator(), numFolds=5).setParallelism(8)

cvModel = cv.fit(train_features_df)
besty = cvModel.bestModel
print("  max depth:", besty._java_obj.parent().getMaxDepth())
print("  max bins:", besty._java_obj.parent().getMaxBins())
test_predictions = besty.transform(test_features_df)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="label",
    predictionCol="prediction")
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

  max depth: 5
  max bins: 16
16.878079695058698


## Window Function Work

In [19]:
from pyspark.sql.window import Window
w = Window.orderBy("ts")
df1_train = x_train.withColumn("prev_avg(P2)", F.lag("avg(P2)").over(w))
df1_train.drop("avg(P1)").show()
df1_test = x_test.withColumn("prev_avg(P2)", F.lag("avg(P2)").over(w))
df1_test.drop("avg(P1)").show()

+----------+-----------------+------------------+------------------+------------------+------------------+
|        ts|    avg(pressure)|  avg(temperature)|     avg(humidity)|           avg(P2)|      prev_avg(P2)|
+----------+-----------------+------------------+------------------+------------------+------------------+
|2017-07-01|94572.18985080464| 33.33327613327619|32.792403355736745| 8.341274009698298|              null|
|2017-07-02|94441.42854684066|28.197254514672572| 44.52180304740427| 6.325375406399083| 8.341274009698298|
|2017-07-03|94668.76243252479| 18.25461707200767| 78.17694325226547|17.195223293020778| 6.325375406399083|
|2017-07-04|95313.96683276288| 22.32803235375923|  50.4074079911003| 6.868896334621589|17.195223293020778|
|2017-07-05|95440.82530922632|23.534423652694652|44.841247660928104| 7.964031059031034| 6.868896334621589|
|2017-07-06|95312.02019876736|25.778363851992424| 42.49701185958226| 8.447780535930221| 7.964031059031034|
|2017-07-07|95248.96706425186|27.4691

In [35]:
df_lags_train = x_train.select("ts", "avg(P2)")
df_lags_test = x_test.select("ts", "avg(P2)")
df_lags_train.show()
for i in range(1, 8): 
    df_lags_train = df_lags_train.withColumn("P1_lag_"+str(i), F.lag(F.col('avg(P2)'), i).over(w))
    
    df_lags_test = df_lags_test.withColumn("P1_lag_"+str(i), F.lag(F.col('avg(P2)'), i).over(w))


+----------+------------------+
|        ts|           avg(P2)|
+----------+------------------+
|2017-07-01| 8.341274009698298|
|2017-07-02| 6.325375406399083|
|2017-07-03|17.195223293020778|
|2017-07-04| 6.868896334621589|
|2017-07-05| 7.964031059031034|
|2017-07-06| 8.447780535930221|
|2017-07-07| 7.430200547526521|
|2017-07-08| 9.885236809576535|
|2017-07-09|10.319859653725107|
|2017-07-10|12.425794746989531|
|2017-07-11|13.907630592351836|
|2017-07-12|10.800456856017346|
|2017-07-13| 8.878007956805918|
|2017-07-14|  3.10989585931652|
|2017-07-15| 7.799760959824183|
|2017-07-16|  9.46174505220515|
|2017-07-17|  7.05922492028453|
|2017-07-18| 9.619662928016169|
|2017-07-19| 8.447159080747532|
|2017-07-20| 8.262260148432995|
+----------+------------------+
only showing top 20 rows



In [36]:
lag_feature_df_train = df_lags_train.select("P1_lag_1", "P1_lag_2", "P1_lag_3", "P1_lag_4", "P1_lag_5", "P1_lag_6", "P1_lag_7", "avg(P2)") 
lag_feature_df_train = lag_feature_df_train.where(col("P1_lag_7").isNotNull()) 
lag_feature_df_test = df_lags_test.select("P1_lag_1", "P1_lag_2", "P1_lag_3", "P1_lag_4", "P1_lag_5", "P1_lag_6", "P1_lag_7","avg(P2)") 
lag_feature_df_test = lag_feature_df_test.where(col("P1_lag_7").isNotNull())

## Lagging P1 values from the past 7 days

In [37]:
# vectorAssembler = VectorAssembler(inputCols = ["P1_lag_1", "P1_lag_2", "P1_lag_3", "P1_lag_4", "P1_lag_5", "P1_lag_6", "P1_lag_7","P1_lag_8", "P1_lag_9", "P1_lag_10", "P1_lag_11", "P1_lag_12", "P1_lag_13", "P1_lag_14"], outputCol = 'features')
vectorAssembler = VectorAssembler(inputCols = ["P1_lag_1", "P1_lag_2", "P1_lag_3", "P1_lag_4", "P1_lag_5", "P1_lag_6", "P1_lag_7"], outputCol = 'features')
lag_df_train = vectorAssembler.transform(lag_feature_df_train)
lag_df_train = lag_df_train.select(['features', 'avg(P2)'])
lag_df_test = vectorAssembler.transform(lag_feature_df_test)
lag_df_test = lag_df_test.select(['features', 'avg(P2)'])

lag_df_test = lag_df_test.withColumnRenamed("avg(P2)","label")
lag_df_train = lag_df_train.withColumnRenamed("avg(P2)","label")

lr = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=50)
param_grid = ParamGridBuilder() \
            .addGrid(lr.regParam, [0.1, 0.3, 0.5]) \
            .addGrid(lr.elasticNetParam, [.5, .7, .9]) \
            .build()
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=RegressionEvaluator(), numFolds=5).setParallelism(8)

cvModel = cv.fit(lag_df_train)
besty = cvModel.bestModel
print("  ElasticNetParam:", besty._java_obj.parent().getElasticNetParam())
print("  RegParam:", besty._java_obj.parent().getRegParam())
test_predictions = besty.transform(lag_df_test)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="label",
    predictionCol="prediction")
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)
evaluator = RegressionEvaluator(
    metricName="r2",
    labelCol="label",
    predictionCol="prediction")
r2 = evaluator.evaluate(test_predictions)
print(r2)

  ElasticNetParam: 0.9
  RegParam: 0.5
5.37711055703234
0.06921662035409137


In [38]:
glr = GeneralizedLinearRegression(featuresCol = 'features', labelCol='label', maxIter=50)
param_grid = ParamGridBuilder() \
            .addGrid(glr.family, ['gaussian', 'Gamma'])\
            .addGrid(glr.regParam, [0.1, 0.3, 0.5]) \
            .build()
cv = CrossValidator(estimator=glr, estimatorParamMaps=param_grid, evaluator=RegressionEvaluator(), numFolds=5).setParallelism(8)

cvModel = cv.fit(lag_df_train)
besty = cvModel.bestModel
print("  family:", besty._java_obj.parent().getFamily())
print("  RegParam:", besty._java_obj.parent().getRegParam())
test_predictions = besty.transform(lag_df_test)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="label",
    predictionCol="prediction")
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)
evaluator = RegressionEvaluator(
    metricName="r2",
    labelCol="label",
    predictionCol="prediction")
r2 = evaluator.evaluate(test_predictions)
print(r2)

  family: gaussian
  RegParam: 0.5
5.709785426738006
-0.04951889904381068


In [39]:
gbt = GBTRegressor(featuresCol = 'features', labelCol='label', maxIter=50)
param_grid = ParamGridBuilder() \
            .addGrid(gbt.maxDepth, [5, 10, 15]) \
            .addGrid(gbt.maxBins, [16]) \
            .build()
cv = CrossValidator(estimator=gbt, estimatorParamMaps=param_grid, evaluator=RegressionEvaluator(), numFolds=5).setParallelism(8)

cvModel = cv.fit(lag_df_train)
besty = cvModel.bestModel
print("  max depth:", besty._java_obj.parent().getMaxDepth())
print("  max bins:", besty._java_obj.parent().getMaxBins())
test_predictions = besty.transform(lag_df_test)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="label",
    predictionCol="prediction")
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

  max depth: 5
  max bins: 16
12.839929143771244


In [40]:
from pyspark.ml.regression import IsotonicRegression
IR = IsotonicRegression()
param_grid = ParamGridBuilder() \
            .build()
cv = CrossValidator(estimator=IR, estimatorParamMaps=param_grid, evaluator=RegressionEvaluator(), numFolds=5).setParallelism(8)

cvModel = cv.fit(lag_df_train)
besty = cvModel.bestModel
test_predictions = besty.transform(lag_df_test)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="label",
    predictionCol="prediction")
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

5.751204596565843


## Lagging P1 values from past  7 days AND lagging temps / pressures / humidities

In [3]:
zoo

NameError: name 'zoo' is not defined

In [None]:
# convert them into long rows with the lag information next...
         # groupBy sensor?
         # get averages for each day for each sensor
         # take the past 7 days lag information for variables
         # https://www.slideshare.net/SparkSummit/time-series-analytics-with-spark-spark-summit-east-talk-by-simon-ouellette
        

In [12]:
# https://medium.com/@sergey.ivanchuk/practical-pyspark-window-function-examples-cb5c7e1a3c41