## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Churn Classification').getOrCreate()

In [2]:
# File location and type
# file_location = "/FileStore/tables/customer_churn.csv"
file_location = "./customer_churn.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

DataFrame[Names: string, Age: double, Total_Purchase: double, Account_Manager: int, Years: double, Num_Sites: double, Onboard_date: string, Location: string, Company: string, Churn: int]

In [3]:
df.show()

+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|     

In [4]:
# Create a view or table

temp_table_name = "customer_churn_csv"

df.createOrReplaceTempView(temp_table_name)

In [5]:
spark.sql('select * from customer_churn_csv').show()

+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|     

In [6]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "customer_churn_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

In [8]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [9]:
assembler = VectorAssembler(inputCols=[
 'Age',
 'Total_Purchase',
 'Years',
 'Num_Sites'],outputCol='features').transform(df)



In [10]:
from pyspark.ml.classification import LogisticRegression

In [11]:
log_reg = LogisticRegression(featuresCol='features',labelCol='Churn')

In [12]:
train_data,test_data = assembler.randomSplit([0.7,0.3])

In [13]:
fit_logreg = log_reg.fit(train_data)

In [28]:
fit_logreg.summary.predictions.describe().show()


+-------+-------------+-----------------+------------------+------------------+------------------+------------------+-------------------+--------------------+--------------------+-------------------+-------------------+
|summary|        Names|              Age|    Total_Purchase|   Account_Manager|             Years|         Num_Sites|       Onboard_date|            Location|             Company|              Churn|         prediction|
+-------+-------------+-----------------+------------------+------------------+------------------+------------------+-------------------+--------------------+--------------------+-------------------+-------------------+
|  count|          641|              641|               641|               641|               641|               641|                641|                 641|                 641|                641|                641|
|   mean|         null|41.65834633385335|10116.063057722306|0.4836193447737909| 5.266708268330728| 8.578783151326054|   

In [14]:
preds_and_labels = fit_logreg.evaluate(test_data)

In [15]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [31]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Churn')

In [33]:
auc = my_eval.evaluate(preds_and_labels.predictions)

In [35]:
auc

0.6901643248392475

In [None]:
df_eval = spark.read.csv('./new_customers.csv',header=True,inferSchema=True)
df_eval.show()

# Predict New Data

In [40]:
df_new = spark.read.csv('./new_customers.csv',header=True,inferSchema=True)

In [41]:
final_lr_model = log_reg.fit(assembler)

In [43]:
test_new_customers = VectorAssembler(inputCols=[
 'Age',
 'Total_Purchase',
 'Years',
 'Num_Sites'],outputCol='features').transform(df_new)


In [44]:
final_results = final_lr_model.transform(test_new_customers)

In [47]:
final_results.select('Company','prediction').show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

