<a rel="license" href="http://creativecommons.org/licenses/by-nc-nd/4.0/"> <img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by-nc-nd/4.0/88x31.png"/> </a> <br/> This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-nc-nd/4.0/"> Creative Commons Attribution-NonCommercial-NoDerivatives 4.0 International License. </a>

### Spark Simple Random Forest Model Tutorial
### Sohel Khan

#### This is an introductory tutorial

### The origin of data is http://archive.ics.uci.edu/ml/

In [4]:
target_url = ("http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv") 
import pandas as pd
wine = pd.read_csv( target_url, header = 0, sep =";")

In [5]:
wine.head()

### Convert data from Pandas DataFrame to PySpark SQL DataFrame

In [7]:
raw_data_df = sqlContext.createDataFrame(wine)

In [8]:
raw_data_df.count()

In [9]:
raw_data_df.show(3)

### Explore the Data

In [11]:
# In this raw data point, 2001.0 is the label, and the remaining values are features
print raw_data_df.count()
raw_data_df.show(3)


In [12]:
raw_data_df.select("fixed acidity", "residual sugar","alcohol","quality").show(3)

### Transform Data to LabeledPoint

In [14]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np
from pyspark.sql import functions as sql_functions

def create_label_point_function(df):
   
    
    return (df
            .select("quality", "alcohol","density","fixed acidity")
            .map(lambda x: LabeledPoint(x[0], x[1:]))
            .toDF())

  
label_points_df = create_label_point_function(raw_data_df)

first_point_features = label_points_df.first().features
first_point_label = label_points_df.first().label
print first_point_features, first_point_label

In [15]:
label_points_df.show(3, truncate=True)

### Perform "feature engineering"
[PolynomialExpansion](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.PolynomialExpansion)

In [17]:
from pyspark.ml.feature import PolynomialExpansion
polynomial_expansion = PolynomialExpansion(degree=2, inputCol="features", outputCol="polyFeatures") # feature transformation

### Sample the Data: Training, validation, and test sets

Use the [randomSplit method](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit)

In [19]:

weights = [.8, .1, .1]
seed = 42
train_data_df, val_data_df, test_data_df = label_points_df.randomSplit(weights, seed)




train_data_df.cache()
val_data_df.cache()
test_data_df.cache()



n_train = train_data_df.count()
n_val = val_data_df.count()
n_test = test_data_df.count()

print n_train, n_val, n_test, n_train + n_val + n_test
print label_points_df.count()

### Let's try Random Simple Forest Model
https://spark.apache.org/docs/latest/ml-classification-regression.html

https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-trees

In [21]:
from pyspark.ml.regression import RandomForestRegressor

random_forest_model = RandomForestRegressor(featuresCol='polyFeatures')

### Execute Piplelined Model

[Pipeline](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Pipeline)

In [23]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[polynomial_expansion,random_forest_model])  #create the pipleline

pipeline_model = pipeline.fit(train_data_df)  #develop pipleine model with train data

predictions_df = pipeline_model.transform(test_data_df) #predict with test data


### Evaluate the Model

In [25]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()
rmse_test_pipeline = evaluator.evaluate(predictions_df, {evaluator.metricName: "rmse"})
print('RMSE for test data set using pipelines: {0:.3f}'.format(rmse_test_pipeline))

In [26]:
def squared_error(label, prediction):
    """Calculates the squared error for a single prediction."""
    return float((label - prediction)**2)

In [27]:
predictions = np.asarray(predictions_df
                         .select('prediction')
                         .collect())
actual = np.asarray(predictions_df
                      .select('label')
                      .collect())
error = np.asarray(predictions_df
                     .rdd
                     .map(lambda lp: squared_error(lp.label, lp.prediction))
                     .collect())

In [28]:
print 'prediction maximum:' , max(predictions)
print 'actual maximum:', max(actual)
print 'maximum error:', max(error)

### Draw Simple Plot

In [30]:
import matplotlib.pyplot as plt
x_points = np.arange(0, len(predictions))
figsize=(10.5, 6)
fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
plt.scatter(x_points, predictions, c='r',  label='Predictions')
plt.scatter(x_points, actual, c='g', label = 'Actual')
ax.set_xlabel('Wine bottle #'), ax.set_ylabel(r'Quality')
plt.legend()
display(fig)


### What's went wrong??