In [1]:
import pandas as pd
import os

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext




spark =SparkSession\
    .builder\
    .master("local[16]")\
    .appName("solar")\
    .config("spark.driver.memory",'200G')\
    .config('spark.default.parallelism', '16')\
    .getOrCreate()

In [2]:
df = spark.read.csv("final_data.csv", inferSchema =True, header=True)
df.printSchema()

root
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- timezone: string (nullable = true)
 |-- county_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- county_fips_all: string (nullable = true)
 |-- population: double (nullable = true)
 |-- density: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- dni: double (nullable = true)
 |-- res_rate: double (nullable = true)
 |-- annual_kwh_used: double (nullable = true)
 |-- temp_Apr: double (nullable = true)
 |-- temp_Aug: double (nullable = true)
 |-- temp_Dec: double (nullable = true)
 |-- temp_Feb: double (nullable = true)
 |-- temp_Jan: double (nullable = true)
 |-- temp_Jul: double (nullable = true)
 |-- temp_Jun: double (nullable = true)
 |-- temp_Mar: double (nullable = true)
 |-- temp_May: double (nullable = true)
 |-- temp_Nov: double (nullable = tru

In [3]:
label = "dollars_saved"

feature_to_ignore = ["res_rate", "annual_output_w_hrs", "anuual_output_kwh20_sps","annual_output_w_hrs_30_panels","county_fips_all", "percent_current_needs_met", "country_name", "county_name", "city", "state_id", "state_name", "timezone"]
features = [c for c in df.columns if c not in feature_to_ignore + [label]]


In [4]:
features

['zip',
 'lat',
 'lng',
 'population',
 'density',
 'elevation',
 'dni',
 'annual_kwh_used',
 'temp_Apr',
 'temp_Aug',
 'temp_Dec',
 'temp_Feb',
 'temp_Jan',
 'temp_Jul',
 'temp_Jun',
 'temp_Mar',
 'temp_May',
 'temp_Nov',
 'temp_Oct',
 'temp_Sep',
 'pct_cloudy_days_Apr',
 'pct_cloudy_days_Aug',
 'pct_cloudy_days_Dec',
 'pct_cloudy_days_Feb',
 'pct_cloudy_days_Jan',
 'pct_cloudy_days_Jul',
 'pct_cloudy_days_Jun',
 'pct_cloudy_days_Mar',
 'pct_cloudy_days_May',
 'pct_cloudy_days_Nov',
 'pct_cloudy_days_Oct',
 'pct_cloudy_days_Sep',
 'temp_avg',
 'pct_cloudy_days_avg']

In [5]:
df_clean = df.select([c for c in df.columns if c not in feature_to_ignore])
df_clean=df.withColumnRenamed(label,"label")
df_train, df_test = df_clean.randomSplit([0.8, 0.2], 500)
df_train=df_train
df_train.take(1)

[Row(zip=1001, lat=42.06259, lng=-72.62589, timezone='America/New_York', county_name='Hampden', city='Agawam', state_id='MA', state_name='Massachusetts', county_fips_all='25013', population=17312.0, density=581.0, elevation=96.84615384615384, dni=4.776, res_rate=0.1592524036541533, annual_kwh_used=18463.47741, temp_Apr=8.15, temp_Aug=22.75, temp_Dec=-1.15, temp_Feb=-0.3999999999999999, temp_Jan=-4.5, temp_Jul=24.0, temp_Jun=19.75, temp_Mar=1.75, temp_May=16.0, temp_Nov=3.0, temp_Oct=11.2, temp_Sep=18.4, pct_cloudy_days_Apr=70.80000000000001, pct_cloudy_days_Aug=64.85, pct_cloudy_days_Dec=71.65, pct_cloudy_days_Feb=69.15, pct_cloudy_days_Jan=65.3, pct_cloudy_days_Jul=64.25, pct_cloudy_days_Jun=67.19999999999999, pct_cloudy_days_Mar=74.1, pct_cloudy_days_May=70.35, pct_cloudy_days_Nov=74.69999999999999, pct_cloudy_days_Oct=67.25, pct_cloudy_days_Sep=63.65, temp_avg=9.912499999999998, pct_cloudy_days_avg=68.60416666666667, annual_output_w_hrs=168998.8814399332, annual_output_w_hrs_30_pane

In [6]:
from pyspark.ml.feature import PCA
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler 

In [7]:
the_features=[c for c in df.columns if "temp_" in c or "pct_" in c]

In [8]:
other_features=[c for c in features if c not in the_features]

In [9]:
other_features

['zip',
 'lat',
 'lng',
 'population',
 'density',
 'elevation',
 'dni',
 'annual_kwh_used']

In [10]:
assembler = VectorAssembler(inputCols=[c for c in df.columns if "temp_" in c or "pct_" in c], outputCol="temp_pct_features") 

In [11]:
from pyspark.ml.feature import StandardScaler
scaler=StandardScaler(withMean=True,inputCol="temp_pct_features", outputCol="scaled_temp_pct_Features")

In [12]:
pca = PCA( inputCol="temp_pct_features", outputCol="pcaFeatures")
#result.show(3)

In [13]:
assembler2 = VectorAssembler(inputCols=other_features, outputCol="other_features") 

In [14]:
other_scaler=StandardScaler(withMean=True,inputCol="other_features", outputCol="scaled_other_Features")

In [15]:
va = VectorAssembler(inputCols=["pcaFeatures", "scaled_other_Features"], outputCol="model_Features")  

In [16]:
from pyspark.ml.regression import LinearRegression

In [17]:
lr = LinearRegression(maxIter=100,featuresCol='model_Features', labelCol="label")

In [18]:
from pyspark.ml import Pipeline  

In [19]:
pipeline = Pipeline(stages=[assembler, scaler, pca,assembler2,other_scaler,va,lr])

In [20]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [21]:
# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(pca.k, [1,2,3, 6, 12]) \
    .addGrid(lr.regParam, [0.1, 0.3,0.01]) \
    .addGrid(lr.elasticNetParam, [0.2, 0.5,0.8]) \
    .build()

print('len(paramGrid): {}'.format(len(paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters. Print the training time.
import time
t0 = time.time()
cvModel = crossval.setParallelism(4).fit(df_train) # train 4 models in parallel
print("train time:", time.time() - t0)

len(paramGrid): 45
train time: 75.36701607704163


In [22]:
prediction = cvModel.transform(df_test)

In [23]:
model1_MSE=prediction.rdd.map(lambda x: (x.label-x.prediction)**2).reduce(lambda x,y: x+y)/prediction.count()
model1_MSE

13777.38681565987

In [24]:
from pyspark.ml.regression import RandomForestRegressor

In [25]:
rf_assembler = VectorAssembler(inputCols=features, outputCol="features") 

In [26]:
rf = RandomForestRegressor(featuresCol="features")

In [27]:
rf_pipeline = Pipeline(stages=[rf_assembler,rf])

In [28]:
rf_paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [200]) \
    .addGrid(rf.maxDepth, [7]) \
    .build()

print('len(paramGrid): {}'.format(len(rf_paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=rf_pipeline,
                          estimatorParamMaps=rf_paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters. Print the training time.
import time
t0 = time.time()
cvModel_rf = crossval.setParallelism(8).fit(df_train) # train 4 models in parallel
print("train time:", time.time() - t0)

len(paramGrid): 1
train time: 44.93319225311279


In [29]:
prediction_rf = cvModel_rf.transform(df_test)
model2_MSE=prediction_rf.rdd.map(lambda x: (x.label-x.prediction)**2).reduce(lambda x,y: x+y)/prediction_rf.count()
model2_MSE

3789.5524582743283

In [30]:
rf_paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [250]) \
    .addGrid(rf.maxDepth, [7]) \
    .build()

print('len(paramGrid): {}'.format(len(rf_paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=rf_pipeline,
                          estimatorParamMaps=rf_paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters. Print the training time.
import time
t0 = time.time()
cvModel_rf = crossval.setParallelism(8).fit(df_train) # train 4 models in parallel
print("train time:", time.time() - t0)

len(paramGrid): 1
train time: 55.75451421737671


In [31]:
prediction_rf = cvModel_rf.transform(df_test)
model2_1_MSE=prediction_rf.rdd.map(lambda x: (x.label-x.prediction)**2).reduce(lambda x,y: x+y)/prediction_rf.count()
model2_1_MSE

3739.9367090812552

In [32]:
from pyspark.ml.regression import GBTRegressor

In [33]:
gbt = GBTRegressor(featuresCol="features")

In [34]:
gbt_pipeline = Pipeline(stages=[rf_assembler,gbt])

In [35]:
gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [200]) \
    .addGrid(gbt.maxDepth, [3]) \
    .build()

print('len(paramGrid): {}'.format(len(gbt_paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval_gbt = CrossValidator(estimator=gbt_pipeline,
                          estimatorParamMaps=gbt_paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters. Print the training time.
import time
t0 = time.time()
cvModel_gbt = crossval_gbt.setParallelism(16).fit(df_train) # train 4 models in parallel
print("train time:", time.time() - t0)

len(paramGrid): 1
train time: 187.19522190093994


In [36]:
prediction_gbt = cvModel_gbt.transform(df_test)
model3_MSE=prediction_gbt.rdd.map(lambda x: (x.label-x.prediction)**2).reduce(lambda x,y: x+y)/prediction_gbt.count()
model3_MSE

2188.30982639766

In [37]:
gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [200]) \
    .addGrid(gbt.maxDepth, [4]) \
    .build()

print('len(paramGrid): {}'.format(len(gbt_paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval_gbt = CrossValidator(estimator=gbt_pipeline,
                          estimatorParamMaps=gbt_paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters. Print the training time.
import time
t0 = time.time()
cvModel_gbt = crossval_gbt.setParallelism(16).fit(df_train) # train 4 models in parallel
print("train time:", time.time() - t0)

len(paramGrid): 1
train time: 250.116375207901


In [38]:
prediction_gbt = cvModel_gbt.transform(df_test)
model4_MSE=prediction_gbt.rdd.map(lambda x: (x.label-x.prediction)**2).reduce(lambda x,y: x+y)/prediction_gbt.count()
model4_MSE

1619.0349551608672

In [39]:
gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [200]) \
    .addGrid(gbt.maxDepth, [5]) \
    .build()

print('len(paramGrid): {}'.format(len(gbt_paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval_gbt = CrossValidator(estimator=gbt_pipeline,
                          estimatorParamMaps=gbt_paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters. Print the training time.
import time
t0 = time.time()
cvModel_gbt = crossval_gbt.setParallelism(16).fit(df_train) # train 4 models in parallel
print("train time:", time.time() - t0)

len(paramGrid): 1
train time: 341.1487011909485


In [40]:
prediction_gbt = cvModel_gbt.transform(df_test)
model5_MSE=prediction_gbt.rdd.map(lambda x: (x.label-x.prediction)**2).reduce(lambda x,y: x+y)/prediction_gbt.count()
model5_MSE

1350.7579238804585

In [41]:
gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [225]) \
    .addGrid(gbt.maxDepth, [5]) \
    .build()

print('len(paramGrid): {}'.format(len(gbt_paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval_gbt = CrossValidator(estimator=gbt_pipeline,
                          estimatorParamMaps=gbt_paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters. Print the training time.
import time
t0 = time.time()
cvModel_gbt = crossval_gbt.setParallelism(16).fit(df_train) # train 4 models in parallel
print("train time:", time.time() - t0)

len(paramGrid): 1
train time: 402.874489068985


In [42]:
prediction_gbt = cvModel_gbt.transform(df_test)
model6_MSE=prediction_gbt.rdd.map(lambda x: (x.label-x.prediction)**2).reduce(lambda x,y: x+y)/prediction_gbt.count()
model6_MSE

1328.5188606474012

In [43]:
gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [225]) \
    .addGrid(gbt.maxDepth, [7]) \
    .build()

print('len(paramGrid): {}'.format(len(gbt_paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval_gbt = CrossValidator(estimator=gbt_pipeline,
                          estimatorParamMaps=gbt_paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters. Print the training time.
import time
t0 = time.time()
cvModel_gbt = crossval_gbt.setParallelism(16).fit(df_train) # train 4 models in parallel
print("train time:", time.time() - t0)

len(paramGrid): 1
train time: 1008.6885416507721


In [44]:
prediction_gbt = cvModel_gbt.transform(df_test)
model7_MSE=prediction_gbt.rdd.map(lambda x: (x.label-x.prediction)**2).reduce(lambda x,y: x+y)/prediction_gbt.count()
model7_MSE

1239.8791954032304

In [45]:
gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [225]) \
    .addGrid(gbt.maxDepth, [9]) \
    .build()

print('len(paramGrid): {}'.format(len(gbt_paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval_gbt = CrossValidator(estimator=gbt_pipeline,
                          estimatorParamMaps=gbt_paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters. Print the training time.
import time
t0 = time.time()
cvModel_gbt = crossval_gbt.setParallelism(16).fit(df_train) # train 4 models in parallel
print("train time:", time.time() - t0)

len(paramGrid): 1
train time: 2523.4385073184967


In [46]:
prediction_gbt = cvModel_gbt.transform(df_test)
model8_MSE=prediction_gbt.rdd.map(lambda x: (x.label-x.prediction)**2).reduce(lambda x,y: x+y)/prediction_gbt.count()
model8_MSE

1402.4429009027965

In [47]:
gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [225]) \
    .addGrid(gbt.minInstancesPerNode, [10])\
    .addGrid(gbt.maxDepth, [7]) \
    .build()

print('len(paramGrid): {}'.format(len(gbt_paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval_gbt = CrossValidator(estimator=gbt_pipeline,
                          estimatorParamMaps=gbt_paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters. Print the training time.
import time
t0 = time.time()
cvModel_gbt = crossval_gbt.setParallelism(16).fit(df_train) # train 4 models in parallel
print("train time:", time.time() - t0)

len(paramGrid): 1
train time: 741.2755110263824


In [48]:
prediction_gbt = cvModel_gbt.transform(df_test)
model9_MSE=prediction_gbt.rdd.map(lambda x: (x.label-x.prediction)**2).reduce(lambda x,y: x+y)/prediction_gbt.count()
model9_MSE

1245.6296355317386

In [49]:
gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [225]) \
    .addGrid(gbt.minInstancesPerNode, [8])\
    .addGrid(gbt.maxDepth, [7]) \
    .build()

print('len(paramGrid): {}'.format(len(gbt_paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval_gbt = CrossValidator(estimator=gbt_pipeline,
                          estimatorParamMaps=gbt_paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters. Print the training time.
import time
t0 = time.time()
cvModel_gbt = crossval_gbt.setParallelism(16).fit(df_train) # train 4 models in parallel
print("train time:", time.time() - t0)

len(paramGrid): 1
train time: 760.796703338623


In [50]:
prediction_gbt = cvModel_gbt.transform(df_test)
model10_MSE=prediction_gbt.rdd.map(lambda x: (x.label-x.prediction)**2).reduce(lambda x,y: x+y)/prediction_gbt.count()
model10_MSE

1256.5841938024962