# Programming III - Final Project

Create and record a presentation following the instructions below. The presentation should be no more than 20 minutes.

You work at a financial services company that has seen their data intake grow rapidly in recent years. As a result, much of the data processing, wrangling, and analysis that had been using R and Python (pandas) is difficult to do now that datasets are too large to fit on local computers. You and your team have been asked to provide a report on how to accommodate big data and what tools are available to use to process, analyze, and model large amounts of data and use big data processing to process and model a dataset on suspicious activity using transactions data.

## Part 1
The first part of the presentation should address include the following and be comprehensible for both technical and non-technical audiences. 
<br><br>
* How can the organization process big data?
* What tools can be used and how can analysts work with them in familiar programming languages?&
* Given the organization's familiarity with pandas and Python, what differences are there between pandas and PySpark
* What are the main components involved with processing big data? Where does the computational power for processing big data come from?

The presentation should include specific references to big data tools, the data structures used, and specific references to programming (languages, syntax, techniques) that the organization should be aware of in the adoption of big data processing tools.

## Part 2
The second part of the presentation will use a Databricks notebook where you and your team carry out a big data analysis using Spark via PySpark and the bank_transactions.csv dataset. You will carry out the following steps in a Databricks notebook and briefly explain each step that includes code.

1. <span style="color:green"> **(Done)** </span> Upload the dataset to your Databricks filestore
2. <span style="color:green"> **(Done)** </span> Read in the data and call it df
3. <span style="color:green"> **(Done)** </span> Check the schema of the dataset
4. <span style="color:green"> **(Done)** </span> Create a new column called destination_diff by subtracting oldbalanceDest from newbalanceDest
5. <span style="color:green"> **(Done)** </span> What is the mean amount per transaction by type? Execute this code without adding .show() to the line of code.
6. <span style="color:green"> **(Done)** </span> What is the mean amount per transaction by type? Execute this code with .show() on the end of the line of code. How does the output in #5 and #6 differ? Why does it differ? What does this show us about Spark?
7. <span style="color:green"> **(Done)** </span>Make a SQL table from your df and call it df_table
8. <span style="color:green"> **(Done)** </span>Explain how to run a different programming language in a given cell.
9. <span style="color:green"> **(Done)** </span>Write a SQL query to count the number of transaction by type and return the result in descending order. Generate a bar plot of the results using Databricks’ plot options. Explain the results.
10. <span style="color:green"> **(Done)** </span> Create a new column that labels the transaction as suspicious. The new column should be called label and take the value of 1 if:
 * The oldbalanceOrg is less than 54000 and the type of transaction is a transfer and the new balance is less than or equal to 105, or
 * The oldbalanceOrg is greater than 56900 and he newbalanceOrig is less than or equal to 12, or
 * The oldbalanceOrg is greater than 56900 and the newbalanceOrig is greater than 12 and the amount is greater than 1160000,
11. <span style="color:green"> **(Done)** </span>Otherwise, the value should be 0.
12. <span style="color:green"> **(Done)** </span>Calculate the percentage of transactions labeled suspicious (“label”) out of all transactions
13. <span style="color:green"> **(Done)** </span>Import StringIndexer, OneHotEncoder, and VectorAssembler from pyspark. Explain what each of these PySpark processing tools is used for.
14. <span style="color:green"> **(Done)** </span>Convert the column “type” to a numeric column of indices called typeIdx
15. <span style="color:green"> **(Done)** </span>Encode “typeIdx” to create a binary representation of it called “type_bins”
16. <span style="color:green"> **(Done)** </span>The “type_bins” feature is stored as a SparseVector. What is a SparseVector and why does Spark use it to store data?
  * Assemble the columns into a single vector column with the following input columns: "type_bins","amount","oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "destination_diff". Call the output column “features” and select “features” and “label.”
17. <span style="color:green"> **(Done)** </span>Split the data into train and test sets with 30% held out for testing.
18. <span style="color:green"> **(Done)** </span>Import the DecisionTreeClassifier from pyspark.
19. <span style="color:green"> **(Done)** </span>Instantiate the model
20. <span style="color:green"> **(Done)** </span>Fit the model to training data
21. <span style="color:green"> **(Done)** </span>Make predictions using the test data
22. <span style="color:green"> **(Done)** </span>Evaluate the model using the BinaryClassificationEvaluator by using the predictions to get the Area under the ROC and save this number as a variable called dt_auc.
23. <span style="color:green"> **(Done)** </span>Perform steps 18 through 22 again but use a Random Forest Classifier instead of a Decision Tree. Save the result from the Area under the ROC step as a variable called rf_auc
24. <span style="color:green"> **(Done)** </span>Perform steps 18 through 22 again but use a Gradient Boosted Tree instead of a Decision Tree. Save the result from the Area under the ROC step as a variable called gb_auc
25. <span style="color:green"> **(Done)** </span>Print dt_auc, rf_auc, and gb_auc to summarize each models performance and recommend the organization move forward with the best model. Summarize your recommendations and future analysis that would be useful.

### 1. Upload the dataset to your Databricks filestore

### 2. Read in the data and call it df

In [0]:
%python 
import pyspark.sql.functions as F
# File location and type
file_location = "/FileStore/tables/bank_transactions.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0
1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0
1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0
1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0
1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0
1,PAYMENT,7817.71,C90045638,53860.0,46042.29,M573487274,0.0,0.0,0,0
1,PAYMENT,7107.77,C154988899,183195.0,176087.23,M408069119,0.0,0.0,0,0
1,PAYMENT,7861.64,C1912850431,176087.23,168225.59,M633326333,0.0,0.0,0,0
1,PAYMENT,4024.36,C1265012928,2671.0,0.0,M1176932104,0.0,0.0,0,0
1,DEBIT,5337.77,C712410124,41720.0,36382.23,C195600860,41898.0,40348.79,0,0


### 3. Check the schema of the dataset

In [0]:
%python
# Print schema
df.printSchema()

In [0]:
%python 
# Change schema (datatype) if needed
#df1 = df.selectExpr("cast(step as int) step",
#    "cast(isFraud as boolean) isFraud",
#    "cast(isFlaggedFraud as boolean) isFlaggedFraud")
#df1.printSchema()
#df1.show()

### 4.Create a new column called destination_diff by subtracting oldbalanceDest from newbalanceDest

In [0]:
%python
# Create new column AS 'destination_diff'
df1 = df.select('*', (df.newbalanceDest - df.oldbalanceDest).alias('destination_diff'))
# show the top rows 
df1.show()

### 5. What is the mean amount per transaction by type? Execute this code without adding .show() to the line of code.

In [0]:
%python
df1_mean = df1.groupBy("type").mean("amount")
display(df1_mean)

type,avg(amount)
TRANSFER,910647.009645492
CASH_IN,168920.2420040967
CASH_OUT,176273.96434614089
PAYMENT,13057.604660187457
DEBIT,5483.665313767132


### 6. What is the mean amount per transaction by type? Execute this code with .show() on the end of the line of code. How does the output in #5 and #6 differ? Why does it differ? What does this show us about Spark?

In [0]:
%python
df1_mean2 = df1.groupBy("type").mean("amount").show()

### 7. Make a SQL table from your df and call it df_table

In [0]:
%python
df1.createOrReplaceTempView("df_table")

### 8. Explain how to run a different programming language in a given cell.

In [0]:
%python

### 9. Write a SQL query to count the number of transaction by type and return the result in descending order. Generate a bar plot of the results using Databricks’ plot options. Explain the results.

In [0]:
%sql
SELECT COUNT(*) as number, type as transaction_type
FROM df_table
GROUP BY transaction_type
ORDER BY number DESC;


number,transaction_type
2237500,CASH_OUT
2151495,PAYMENT
1399284,CASH_IN
532909,TRANSFER
41432,DEBIT


### 10. Create a new column that labels the transaction as suspicious. The new column should be called label and take the value of 1 if:
 * The oldbalanceOrg is less than 54000 and the type of transaction is a transfer and the new balance is less than or equal to 105, or
 * The oldbalanceOrg is greater than 56900 and he newbalanceOrig is less than or equal to 12, or
 * The oldbalanceOrg is greater than 56900 and the newbalanceOrig is greater than 12 and the amount is greater than 1160000,

In [0]:
%python
display(df)

step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0
1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0
1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0
1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0
1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0
1,PAYMENT,7817.71,C90045638,53860.0,46042.29,M573487274,0.0,0.0,0,0
1,PAYMENT,7107.77,C154988899,183195.0,176087.23,M408069119,0.0,0.0,0,0
1,PAYMENT,7861.64,C1912850431,176087.23,168225.59,M633326333,0.0,0.0,0,0
1,PAYMENT,4024.36,C1265012928,2671.0,0.0,M1176932104,0.0,0.0,0,0
1,DEBIT,5337.77,C712410124,41720.0,36382.23,C195600860,41898.0,40348.79,0,0


In [0]:
%python
import pyspark.sql.functions as F

df1 = df1.withColumn("label", F.when(
                                     (df1["oldbalanceOrg"] < 54000) & (df1["type"] == "TRANSFER") & (df1["newbalanceOrig"] <=105) |
                                      (df1["oldbalanceOrg"] > 56900) & (df1["newbalanceOrig"] <= 12) |
                                      (df1["oldbalanceOrg"] > 56900) & (df1["newbalanceOrig"] > 12) & (df["amount"] > 1160000),
                                     1).otherwise(0))
display(df1)

step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud,destination_diff,label
1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0,0.0,0
1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0,0.0,0
1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0,0.0,1
1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0,-21182.0,0
1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0,0.0,0
1,PAYMENT,7817.71,C90045638,53860.0,46042.29,M573487274,0.0,0.0,0,0,0.0,0
1,PAYMENT,7107.77,C154988899,183195.0,176087.23,M408069119,0.0,0.0,0,0,0.0,0
1,PAYMENT,7861.64,C1912850431,176087.23,168225.59,M633326333,0.0,0.0,0,0,0.0,0
1,PAYMENT,4024.36,C1265012928,2671.0,0.0,M1176932104,0.0,0.0,0,0,0.0,0
1,DEBIT,5337.77,C712410124,41720.0,36382.23,C195600860,41898.0,40348.79,0,0,-1549.2099999999991,0


### 11. Otherwise, the value should be 0.

In [0]:
%python
display(df1)

step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud,destination_diff,label
1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0,0.0,0
1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0,0.0,0
1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0,0.0,1
1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0,-21182.0,0
1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0,0.0,0
1,PAYMENT,7817.71,C90045638,53860.0,46042.29,M573487274,0.0,0.0,0,0,0.0,0
1,PAYMENT,7107.77,C154988899,183195.0,176087.23,M408069119,0.0,0.0,0,0,0.0,0
1,PAYMENT,7861.64,C1912850431,176087.23,168225.59,M633326333,0.0,0.0,0,0,0.0,0
1,PAYMENT,4024.36,C1265012928,2671.0,0.0,M1176932104,0.0,0.0,0,0,0.0,0
1,DEBIT,5337.77,C712410124,41720.0,36382.23,C195600860,41898.0,40348.79,0,0,-1549.2099999999991,0


### 12. Calculate the percentage of transactions labeled suspicious (“label”) out of all transactions

In [0]:
%python
sus = df1.filter(df1["label"] == 1).count()
total = df1.count()
perc = (sus / total) * 100
str(round(perc, 2))

### 13. Import StringIndexer, OneHotEncoder, and VectorAssembler from pyspark. Explain what each of these PySpark processing tools is used for.

In [0]:
%python
## ML Pre-processing
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler

### 14. Convert the column “type” to a numeric column of indices called typeIdx

In [0]:
%python
df1 = StringIndexer(inputCol = "type", outputCol = "typeldx").fit(df1).transform(df1)

In [0]:
%python
df1.show()

### 15. Encode “typeIdx” to create a binary representation of it called “type_bins”

In [0]:
%python
df1 = OneHotEncoder(inputCol = "typeldx", outputCol = "type_bins").fit(df1).transform(df1)


In [0]:
%python
display(df1)

step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud,destination_diff,label,typeldx,type_bins
1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0,0.0,0,1.0,"List(0, 4, List(1), List(1.0))"
1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0,0.0,0,1.0,"List(0, 4, List(1), List(1.0))"
1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0,0.0,1,3.0,"List(0, 4, List(3), List(1.0))"
1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0,-21182.0,0,0.0,"List(0, 4, List(0), List(1.0))"
1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0,0.0,0,1.0,"List(0, 4, List(1), List(1.0))"
1,PAYMENT,7817.71,C90045638,53860.0,46042.29,M573487274,0.0,0.0,0,0,0.0,0,1.0,"List(0, 4, List(1), List(1.0))"
1,PAYMENT,7107.77,C154988899,183195.0,176087.23,M408069119,0.0,0.0,0,0,0.0,0,1.0,"List(0, 4, List(1), List(1.0))"
1,PAYMENT,7861.64,C1912850431,176087.23,168225.59,M633326333,0.0,0.0,0,0,0.0,0,1.0,"List(0, 4, List(1), List(1.0))"
1,PAYMENT,4024.36,C1265012928,2671.0,0.0,M1176932104,0.0,0.0,0,0,0.0,0,1.0,"List(0, 4, List(1), List(1.0))"
1,DEBIT,5337.77,C712410124,41720.0,36382.23,C195600860,41898.0,40348.79,0,0,-1549.2099999999991,0,4.0,"List(0, 4, List(), List())"


### 16. The “type_bins” feature is stored as a SparseVector. What is a SparseVector and why does Spark use it to store data?
  * Assemble the columns into a single vector column with the following input columns: "type_bins","amount","oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "destination_diff". Call the output column “features” and select “features” and “label.”

In [0]:
%python

In [0]:
%python
modeling_df = VectorAssembler(inputCols = ["type_bins", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "destination_diff"],
                             outputCol = "features").transform(df1).select("features", "label")
modeling_df.show()

### 17. Split the data into train and test sets with 30% held out for testing.

In [0]:
%python
from pyspark.ml.classification import LogisticRegression
df_train, df_test = modeling_df.randomSplit([0.7, 0.3])

### 18. Import the DecisionTreeClassifier from pyspark.

In [0]:
%python
from pyspark.ml.classification import DecisionTreeClassifier

### 19. Instantiate the model

In [0]:
%python
dec_tree = DecisionTreeClassifier(labelCol = "label", featuresCol = "features", seed = 123, maxDepth = 3)

### 20. Fit the model to training data

In [0]:
%python
fitted_mod = dec_tree.fit(df_train)

In [0]:
%python
display(fitted_mod)

treeNode
"{""index"":5,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[0.0],""feature"":3,""overflow"":false}"
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":111086.31999999989,""categories"":null,""feature"":9,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":59901.5,""categories"":null,""feature"":5,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":93.6,""categories"":null,""feature"":6,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":9,""featureType"":""continuous"",""prediction"":null,""threshold"":805325.0700000001,""categories"":null,""feature"":4,""overflow"":false}"
"{""index"":8,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


### 21. Make predictions using the test data

In [0]:
%python
predicts = fitted_mod.transform(df_test)

### 22. Evaluate the model using the BinaryClassificationEvaluator by using the predictions to get the Area under the ROC and save this number as a variable called dt_auc.

In [0]:
%python
from pyspark.ml.evaluation import BinaryClassificationEvaluator

#instantiate evaluator object
evaluator = BinaryClassificationEvaluator(labelCol = "label",
                                         rawPredictionCol = "prediction",
                                         metricName = "areaUnderROC")
dt_auc = evaluator.evaluate(predicts)
print(f"Area under ROC: {dt_auc}")

### 23. Perform steps 18 through 22 again but use a Random Forest Classifier instead of a Decision Tree. Save the result from the Area under the ROC step as a variable called rf_auc

In [0]:
%python
from pyspark.ml.classification import RandomForestClassifier

In [0]:
%python
rand_forest = RandomForestClassifier(labelCol = "label", featuresCol = "features", seed = 123, maxDepth = 3)

In [0]:
%python
fitted_mod = rand_forest.fit(df_train)

In [0]:
%python
predicts = fitted_mod.transform(df_test)

In [0]:
%python
#instantiate evaluator object
evaluator = BinaryClassificationEvaluator(labelCol = "label",
                                         rawPredictionCol = "prediction",
                                         metricName = "areaUnderROC")
rf_auc = evaluator.evaluate(predicts)
print(f"Area under ROC: {rf_auc}")

### 24. Perform steps 18 through 22 again but use a Gradient Boosted Tree instead of a Decision Tree. Save the result from the Area under the ROC step as a variable called gb_auc

In [0]:
%python
from pyspark.ml.classification import GBTClassifier

In [0]:
%python
gbt = GBTClassifier(labelCol = "label", featuresCol = "features", seed = 123, maxDepth = 3)

In [0]:
%python
fitted_mod = gbt.fit(df_train)

In [0]:
%python
predicts = fitted_mod.transform(df_test)

In [0]:
%python
#instantiate evaluator object
evaluator = BinaryClassificationEvaluator(labelCol = "label",
                                         rawPredictionCol = "prediction",
                                         metricName = "areaUnderROC")
gb_auc = evaluator.evaluate(predicts)
print(f"Area under ROC: {gb_auc}")

### 25. Print dt_auc, rf_auc, and gb_auc to summarize each models performance and recommend the organization move forward with the best model. Summarize your recommendations and future analysis that would be useful.

In [0]:
%python
print(f"Area under Decision Tree ROC: {dt_auc}")
print(f"Area under Random Forest ROC: {rf_auc}")
print(f"Area under Gradient Boosted Tree ROC: {gb_auc}")

##### Out of the 3 models run on this data, the Gradient Boosted Trees method had the highest accuracy, at 98%. This was followed by Decision Trees at 97%, and then Random Forest at 87%. We therefore recommend the organization proceed with implementing the Gradient Boosted Trees model to help predict *fraud*.
##### It would be helpful to perform further analysis on all transactions coming from accounts that were found to have at least one fraudulent transaction. Data analysis on these accounts would identify overall trends in fraudulent activity. This information would further help to improve the fraud prediction models and to stop fraudulent activity more rapidly.