# Machine Learning in Spark

In [5]:
import pyspark

# un-comment the following lines if running locally
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import feature
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoderEstimator

In [3]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

This example assumes that we have a holdout validation dataset somewhere else, so we don't need to perform a train-test split, we only need to perform cross validation

Follow [these instructions](https://docs.databricks.com/data/data.html#import-data-1) to import `US_births_2000-2014_SSA.csv` into Databricks

In [6]:
# this file path will be different if you are running Spark locally
df = spark.read.format('csv').option('header', 'true').\
load('/FileStore/tables/US_births_2000_2014_SSA-daa0e.csv')

AnalysisException: 'Path does not exist: file:/FileStore/tables/US_births_2000_2014_SSA-daa0e.csv;'

In [7]:
df.toPandas().head(3)

Unnamed: 0,year,month,date_of_month,day_of_week,births
0,2000,1,1,6,9083
1,2000,1,2,7,8006
2,2000,1,3,1,11363


In [8]:
df.dtypes

In [9]:
df = df.withColumn('births', df['births'].cast('int'))
df = df.withColumn('day_of_week', df['day_of_week'].cast('int'))
df = df.withColumn('date_of_month', df['date_of_month'].cast('int'))
df = df.withColumn('month', df['month'].cast('int'))
df = df.withColumn('year', df['year'].cast('int'))

In [10]:
ohe = feature.OneHotEncoderEstimator(inputCols=['date_of_month',
                                                'day_of_week'],
                                     outputCols=['date_vec',
                                                  'day_vec'],
                                     dropLast=True)
one_hot_encoded = ohe.fit(df).transform(df)
one_hot_encoded.head()

Note the 'SparseVector' we've created!

In [12]:
features = ['year', 'month', 'date_of_month', 'day_of_week']

target = 'births'

vector = VectorAssembler(inputCols=features, outputCol='features')
vectorized_df = vector.transform(one_hot_encoded)

The Vector Assembler is often what we want when we're building a model in Spark. [How does the VectorAssembler work?](https://spark.apache.org/docs/2.1.0/ml-features.html#vectorassembler)

In [14]:
vectorized_df.columns

In [15]:
rf_model = RandomForestRegressor(featuresCol='features',
                                 labelCol='births',
                                 predictionCol="prediction").fit(vectorized_df)

In [16]:
predictions = rf_model.transform(vectorized_df).select("births", "prediction")
predictions.head(3)

Let's evaluate our model! [Here](https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html) is a reference for the many metrics available in Spark.

In [18]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='births')

evaluator.evaluate(predictions, {evaluator.metricName:"r2"})

In [19]:
evaluator.evaluate(predictions, {evaluator.metricName:"mae"})

In [20]:
one_hot_encoder = OneHotEncoderEstimator(inputCols=['date_of_month',
                                                'day_of_week'],
                                     outputCols=['date_vec',
                                                  'day_vec'],
                                     dropLast=True)
vector_assember = VectorAssembler(inputCols=features,
                                  outputCol='features')
random_forest = RandomForestRegressor(featuresCol='features',
                                      labelCol='births')
stages = [one_hot_encoder, vector_assember, random_forest]

pipeline = Pipeline(stages=stages)

Note: The stages in a pipeline can be either *Transformers* or *Estimators*. An estimator fits a DataFrame to produce a Transformer.

In [22]:
random_forest.params

In [23]:
params = ParamGridBuilder().addGrid(random_forest.maxDepth,
                                    [5,10,15]).addGrid(random_forest.numTrees,
                                                       [20,50,100]).build()

In [24]:
reg_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='births',
                                    metricName = 'mae')

In [25]:
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=params,
    evaluator=reg_evaluator,
    parallelism=4
)

In [26]:
df.show(n=5)

In [27]:
df.limit(1000)

In [28]:
cross_validated_model = cv.fit(df.limit(1000).cache())

In [29]:
cross_validated_model.avgMetrics

In [30]:
cross_validated_model.

## .bestModel

In [32]:
cross_validated_model.bestModel.stages

In [33]:
cross_validated_model.bestModel.stages[2].getNumTrees

## Challenge

Look at [this documentation](https://docs.databricks.com/data/databricks-datasets.html) to find large datasets that come pre-loaded on DBFS (Databricks file system).  Choose one, and build an ML model based on it.

In [35]:
display(dbutils.fs.ls("/databricks-datasets"))

path,name,size
dbfs:/databricks-datasets/,databricks-datasets/,0
dbfs:/databricks-datasets/README.md,README.md,976
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359
dbfs:/databricks-datasets/adult/,adult/,0
dbfs:/databricks-datasets/airlines/,airlines/,0
dbfs:/databricks-datasets/amazon/,amazon/,0
dbfs:/databricks-datasets/asa/,asa/,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0
dbfs:/databricks-datasets/bikeSharing/,bikeSharing/,0


In [36]:
display(dbutils.fs.ls("/databricks-datasets/Rdatasets/"))

path,name,size
dbfs:/databricks-datasets/Rdatasets/LICENSE.txt,LICENSE.txt,58
dbfs:/databricks-datasets/Rdatasets/README.md,README.md,767
dbfs:/databricks-datasets/Rdatasets/data-001/,data-001/,0


In [37]:
with open("/databricks-datasets/Rdatasets/README.md") as f:
    x = ''.join(f.readlines())

print(x)

^ I'm not sure why that breaks but I'm going to leave it there for now