# Machine Learning with Spark

Machine learning is a process for extracting patterns from your data, using statistics, linear algebra, and numerical optimization. This notebook focuses on supervised learning, with linear regression. Specifically, focuses on univariate linear regression. Univariate linear regression focuses on determining relationship between one independent (explanatory variable) variable and one dependent variable.

### Dataset

In [0]:
# the directory of the dataset, the files, etc.
display(dbutils.fs.ls('/databricks-datasets/learning-spark-v2/sf-airbnb/'))

path,name,size,modificationTime
dbfs:/databricks-datasets/learning-spark-v2/sf-airbnb/README-sf-airbnb.md,README-sf-airbnb.md,1064,1575931327000
dbfs:/databricks-datasets/learning-spark-v2/sf-airbnb/lr-pipeline-model/,lr-pipeline-model/,0,1665977670565
dbfs:/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean-100p.parquet/,sf-airbnb-clean-100p.parquet/,0,1665977670565
dbfs:/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet/,sf-airbnb-clean.parquet/,0,1665977670565
dbfs:/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-numeric.csv,sf-airbnb-numeric.csv,554979,1588481870000
dbfs:/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb.csv,sf-airbnb.csv,34234636,1575931330000


In [0]:
filePath = """/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet/"""
airbnbDF = spark.read.parquet(filePath)

In [0]:
airbnbDF.columns
airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",
                "number_of_reviews", "price").show(5)

### Creating Training and Test Datasets

Before we begin feature engineering and modeling, we will divide our data set into two groups: train and test. Depending on the size of your data set, your train/test ratio may vary, but many data scientists use 80/20 as a standard train/test split.

In [0]:
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)
print(f"train: {trainDF.count()} - test {testDF.count()}")

In [0]:
# caching the training set is helpful since the dataset will be used many times during eda, ml, etc.
trainDF.cache()
# this action is necessary to eagerly trigger the caching
trainDF.count()

In [0]:
# notice the run of this cell is less than the above, since the dataframe is cached already
trainDF.count()

### Preparing Features with Transformers

This step prepares the data to build a linear regression model predicting price given the number of bedrooms. Linear regression requires that all the input features are contained within a single vector in your DataFrame. Thus, we need to transform our data.

[VectorAssembler](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html#vectorassembler) takes a list of input columns and creates a new DataFrame with an additional column, which we will call features. It combines the values of those input columns into a single vector.

In [0]:
from pyspark.ml.feature import VectorAssembler

vecAssember = VectorAssembler(inputCols=['bedrooms'], outputCol='features')

In [0]:
vecTrainDF = vecAssember.transform(trainDF)
vecTrainDF.select(['bedrooms', 'features', 'price']).show(3)

In [0]:
trainDF.count(), len(trainDF.columns), ' ', vecTrainDF.count(), len(vecTrainDF.columns) 

### Using Estimator to Build the Model

with `VectorAssembler`, we have our data prepared and transformed into a format that our linear regression model expects. In Spark, [LinearRegression](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegression.html?highlight=linearregression#linearregression) is a type of estimator — it takes in a DataFrame and returns a Model. 

Estimators **learn** parameters from your data, have an estimator_name.fit() method, and are eagerly evaluated (i.e., kick off Spark jobs), whereas transformers are lazily evaluated.

In [0]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features', labelCol='price')
lrModel = lr.fit(vecTrainDF)

In [0]:
print("price = {m:.2f} * bedrooms + {b:.2f}".format(
    m=lrModel.coefficients[0], b=lrModel.intercept))

### Creating a Pipeline

If we want to apply our model to our test set, then we need to prepare that data in the same way as the training set (i.e., pass it through the vector assembler). Oftentimes data preparation pipelines will have multiple steps, and it becomes cumbersome to remember not only which steps to apply, but also the ordering of the steps. 

In Spark, Pipelines are esti‐ mators, whereas PipelineModels—fitted Pipelines—are transformers.

In [0]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[vecAssember, lr])
pipelineModel = pipeline.fit(trainDF)

In [0]:
predDF = pipelineModel.transform(testDF)
predDF.select('bedrooms', 'features', 'price', 'prediction').show(10)