
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
from pyspark.sql.functions import isnan, when, count, col

In [0]:
# File location and type
file_location = "/FileStore/tables/tips.csv"
file_type = "csv"


# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.csv(file_location, header=True, inferSchema = True)

df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [0]:
print((df.count(), len(df.columns)))

(244, 7)


In [0]:
df.printSchema()
# Here my dependent feature is total_bill 

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [0]:
## Handling Categorical Feature
from pyspark.ml.feature import StringIndexer # Helps to convert Categorical string features to numerical


In [0]:
indexer = StringIndexer(inputCols=['sex', "smoker","day","time"],outputCols=['sex_indexed', "smoker_indexed","day_indexed","time_indexed"])
df_r = indexer.fit(df).transform(df)
df_r.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|           0.0|        1.0|         0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|


In [0]:
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()
# No Null values

+----------+---+---+------+---+----+----+
|total_bill|tip|sex|smoker|day|time|size|
+----------+---+---+------+---+----+----+
|         0|  0|  0|     0|  0|   0|   0|
+----------+---+---+------+---+----+----+



In [0]:
from pyspark.ml.feature import VectorAssembler # VectorAssembler helps to group independent features together and dependent features together
featureassembler = VectorAssembler(inputCols = ['tip', 'size', 'sex_indexed', "smoker_indexed","day_indexed","time_indexed"], outputCol = "Independent_Features")
output = featureassembler.transform(df_r)

In [0]:
output.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|Independent_Features|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|[1.01,2.0,1.0,0.0...|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|[1.66,3.0,0.0,0.0...|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|[3.5,3.0,0.0,0.0,...|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|[3.31,2.0,0.0,0.0...|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|[3.61,4.0,1.0,0.0...|
|     25.29|4.71|  Male|    No|S

In [0]:
final_df = output.select("Independent_Features","total_bill")
final_df.show()

+--------------------+----------+
|Independent_Features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
|[4.71,4.0,0.0,0.0...|     25.29|
|[2.0,2.0,0.0,0.0,...|      8.77|
|[3.12,4.0,0.0,0.0...|     26.88|
|[1.96,2.0,0.0,0.0...|     15.04|
|[3.23,2.0,0.0,0.0...|     14.78|
|[1.71,2.0,0.0,0.0...|     10.27|
|[5.0,4.0,1.0,0.0,...|     35.26|
|[1.57,2.0,0.0,0.0...|     15.42|
|[3.0,4.0,0.0,0.0,...|     18.43|
|[3.02,2.0,1.0,0.0...|     14.83|
|[3.92,2.0,0.0,0.0...|     21.58|
|[1.67,3.0,1.0,0.0...|     10.33|
|[3.71,3.0,0.0,0.0...|     16.29|
|[3.5,3.0,1.0,0.0,...|     16.97|
|(6,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [0]:
from pyspark.ml.regression import RandomForestRegressor
## train test split
train_data, test_data = final_df.randomSplit([0.75,0.25])


In [0]:
train_data.show()

+--------------------+----------+
|Independent_Features|total_bill|
+--------------------+----------+
|(6,[0,1],[1.25,2.0])|     10.07|
|(6,[0,1],[1.25,2.0])|     10.51|
|(6,[0,1],[1.45,2.0])|      9.55|
|(6,[0,1],[1.47,2.0])|     10.77|
|(6,[0,1],[1.75,2.0])|     17.82|
|(6,[0,1],[1.97,2.0])|     12.02|
| (6,[0,1],[2.0,2.0])|     12.69|
| (6,[0,1],[2.0,2.0])|     13.37|
| (6,[0,1],[2.0,3.0])|     16.31|
|(6,[0,1],[2.24,3.0])|     16.04|
|(6,[0,1],[2.34,4.0])|     17.81|
| (6,[0,1],[2.5,4.0])|     18.35|
|(6,[0,1],[2.64,3.0])|     17.59|
|(6,[0,1],[2.72,2.0])|     13.28|
|(6,[0,1],[3.15,3.0])|     20.08|
|(6,[0,1],[3.27,2.0])|     17.78|
|(6,[0,1],[3.35,3.0])|     20.65|
| (6,[0,1],[3.6,3.0])|     24.06|
|(6,[0,1],[3.76,2.0])|     18.24|
|(6,[0,1],[4.08,2.0])|     17.92|
+--------------------+----------+
only showing top 20 rows



In [0]:
test_data.show()

+--------------------+----------+
|Independent_Features|total_bill|
+--------------------+----------+
|(6,[0,1],[2.01,2.0])|     20.23|
|(6,[0,1],[2.31,3.0])|     18.69|
| (6,[0,1],[3.0,2.0])|      14.0|
| (6,[0,1],[3.0,4.0])|     20.45|
|(6,[0,1],[3.18,2.0])|     19.82|
|(6,[0,1],[3.39,2.0])|     11.61|
| (6,[0,1],[5.0,3.0])|     31.27|
|(6,[0,1],[7.58,4.0])|     39.42|
|[1.5,2.0,0.0,1.0,...|     15.69|
|[1.5,2.0,1.0,0.0,...|     26.41|
|[1.5,2.0,1.0,0.0,...|     10.65|
|[1.58,2.0,0.0,1.0...|     13.42|
|[1.92,1.0,0.0,1.0...|      8.58|
|[1.96,2.0,0.0,0.0...|     15.04|
|[2.0,2.0,0.0,0.0,...|     13.13|
|[2.0,2.0,1.0,0.0,...|     10.33|
|[2.0,2.0,1.0,0.0,...|     14.15|
|[2.0,2.0,1.0,1.0,...|      13.0|
|[2.01,2.0,1.0,1.0...|     12.74|
|[2.02,2.0,0.0,1.0...|     15.48|
+--------------------+----------+
only showing top 20 rows



In [0]:
rf = RandomForestRegressor(featuresCol="Independent_Features", labelCol="total_bill")
model = rf.fit(train_data)

In [0]:
model

Out[20]: RandomForestRegressionModel: uid=RandomForestRegressor_34ed77c339c7, numTrees=20, numFeatures=6

In [0]:
rf.featureImportances

Out[16]: SparseVector(6, {0: 0.544, 1: 0.2711, 2: 0.0374, 3: 0.0514, 4: 0.0625, 5: 0.0337})

In [0]:
predictions = model.transform(test_data)
predictions.show()

+--------------------+----------+------------------+
|Independent_Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[2.01,2.0])|     20.23|14.279193088299644|
|(6,[0,1],[2.31,3.0])|     18.69|17.031495403757937|
| (6,[0,1],[3.0,2.0])|      14.0| 19.89963293381151|
| (6,[0,1],[3.0,4.0])|     20.45|25.567680452314164|
|(6,[0,1],[3.18,2.0])|     19.82| 19.77713556979109|
|(6,[0,1],[3.39,2.0])|     11.61| 19.77713556979109|
| (6,[0,1],[5.0,3.0])|     31.27|29.959806411857784|
|(6,[0,1],[7.58,4.0])|     39.42|39.103845321345325|
|[1.5,2.0,0.0,1.0,...|     15.69|15.605338883424688|
|[1.5,2.0,1.0,0.0,...|     26.41|12.361961979854946|
|[1.5,2.0,1.0,0.0,...|     10.65|12.492343609228618|
|[1.58,2.0,0.0,1.0...|     13.42|12.454633850931678|
|[1.92,1.0,0.0,1.0...|      8.58|12.255269614820566|
|[1.96,2.0,0.0,0.0...|     15.04|13.834372945750621|
|[2.0,2.0,0.0,0.0,...|     13.13| 14.61502076321094|
|[2.0,2.0,1.0,0.0,...|     10.33|13.1640671313

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="total_bill"\
                                , predictionCol="prediction", metricName="rmse")
print ("Root Mean Squared Error (RMSE) on test data = ",evaluator.evaluate(predictions))
# Root Mean Squared Error (RMSE) on test data =  0.07470797713961008

evaluator = RegressionEvaluator(labelCol="total_bill",\
                                predictionCol="prediction", metricName="r2")
print("R Squared (R2) on test data =", evaluator.evaluate(predictions))

evaluator = RegressionEvaluator(labelCol="total_bill",\
                                predictionCol="prediction", metricName="mse")
print("R Squared (mse) on test data =", evaluator.evaluate(predictions))

Root Mean Squared Error (RMSE) on test data =  6.425795297611247
R Squared (R2) on test data = 0.4175188162303233
R Squared (mse) on test data = 41.29084520680281
