<a href="https://colab.research.google.com/github/resh1604/SIT742/blob/master/SIT742BankAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# SIT742: Modern Data Science 
**(Assessment Task 02: Bank Marketing Data Analytics)**

---
- Materials in this module include resources collected from various open-source online repositories.
- You are free to use, change and distribute this package.

Prepared by **SIT742 Teaching Team**


---

**Project Group Information:**

- Names:
- Student IDs:
- Emails:

---

## 1. Import Spark

In [1]:
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q http://www-us.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
#!tar xf spark-2.4.0-bin-hadoop2.7.tgz
#!pip install -q findspark

#import os
#os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

!pip install wget
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"



In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession 

## 2. Read and check data

In [4]:
!pip install wget  



In [0]:
import wget

link_to_data = 'https://github.com/tulip-lab/sit742/raw/master/Assessment/2019/data/bank.csv'
DataSet = wget.download(link_to_data)

In [6]:
!ls

'bank (1).csv'	 sample_data		     spark-2.4.0-bin-hadoop2.7.tgz
 bank.csv	 spark-2.4.0-bin-hadoop2.7   spark-2.4.0-bin-hadoop2.7.tgz.1


In [8]:
spark = SparkSession.builder.appName('ml-bank').getOrCreate()

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("age", IntegerType()),
    StructField("job", StringType()),
    StructField("marital", StringType()),
    StructField("education", StringType()),
    StructField("default", StringType()),
    StructField("balance", IntegerType()),
    StructField("housing", StringType()),
    StructField("contact", StringType()),
    StructField("day", IntegerType()),
    StructField("month", StringType()),
    StructField("duration", IntegerType()),
    StructField("campaign", IntegerType()),
    StructField("pdays", IntegerType()),
    StructField("previous", IntegerType()),
    StructField("poutcome", StringType()),
    StructField("deposit", StringType())
])

# Import the 'bank.csv' as a Spark dataframe and name it as df
df = spark.read.format("csv").\
        option("header", "true").\
            load("bank.csv")
        
df = df.withColumn("age", df["age"].cast(IntegerType()))
df = df.withColumn("balance", df["balance"].cast(IntegerType()))
df = df.withColumn("day", df["day"].cast(IntegerType()))
df = df.withColumn("duration", df["duration"].cast(IntegerType()))
df = df.withColumn("campaign", df["campaign"].cast(IntegerType()))
df = df.withColumn("pdays", df["pdays"].cast(IntegerType()))
df = df.withColumn("previous", df["previous"].cast(IntegerType()))

df

DataFrame[age: int, job: string, marital: string, education: string, default: string, balance: int, housing: string, loan: string, contact: string, day: int, month: string, duration: int, campaign: int, pdays: int, previous: int, poutcome: string, deposit: string]

In [9]:
df.head(1)

[Row(age=59, job='admin.', marital='married', education='secondary', default='no', balance=2343, housing='yes', loan='no', contact='unknown', day=5, month='may', duration=1042, campaign=1, pdays=-1, previous=0, poutcome='unknown', deposit='yes')]

In [10]:
# check data distribution
# you may use printSchema() 
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



## 3. Select features

In [12]:
# select features ('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit') as df2
columns = ['age',
           'job', 
           'marital',
           'education',
           'default', 
           'balance', 
           'housing',
           'loan',
           'campaign',
           'pdays', 
           'previous',
           'poutcome',
           'deposit']
df2 = df.select([c for c in df.columns if c in columns])
df2.head(1)

[Row(age=59, job='admin.', marital='married', education='secondary', default='no', balance=2343, housing='yes', loan='no', campaign=1, pdays=-1, previous=0, poutcome='unknown', deposit='yes')]

In [0]:
# remove invalid rows/records using spark.sql 
df2 = df2.na.drop()

In [15]:
# convert categorical features to numeric features  using One hot encoding, 
from pyspark.ml import Pipeline

from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, OneHotEncoder

cols = ["job", 
        "marital",
        "education",
        "default",
        "housing",
        "loan",
        "poutcome",
        "deposit"]

for col in cols:
  stringIndexer = StringIndexer(inputCol=col, outputCol="{}Index".format(col))
  model = stringIndexer.fit(df2)
  indexed = model.transform(df2)

  encoder = OneHotEncoder(inputCol="{}Index".format(col), outputCol="{}Vec".format(col))
  df2 = encoder.transform(indexed)
  
df2.show()

+---+-----------+--------+---------+-------+-------+-------+----+--------+-----+--------+--------+-------+--------+--------------+------------+-------------+--------------+-------------+------------+-------------+------------+-------------+---------+-------------+-------------+-------------+------------+----------+
|age|        job| marital|education|default|balance|housing|loan|campaign|pdays|previous|poutcome|deposit|jobIndex|        jobVec|maritalIndex|   maritalVec|educationIndex| educationVec|defaultIndex|   defaultVec|housingIndex|   housingVec|loanIndex|      loanVec|poutcomeIndex|  poutcomeVec|depositIndex|depositVec|
+---+-----------+--------+---------+-------+-------+-------+----+--------+-----+--------+--------+-------+--------+--------------+------------+-------------+--------------+-------------+------------+-------------+------------+-------------+---------+-------------+-------------+-------------+------------+----------+
| 59|     admin.| married|secondary|     no|   23

### 3.1 normalisation

In [0]:
numeric_variables = [
    'age',
    'balance',
    'pdays',
    'previous'
]


vecAssembler = VectorAssembler(inputCols=numeric_variables, outputCol="nums")
df = vecAssembler.transform(df)
df.show()

+---+-----------+--------+---------+-------+-------+-------+----+--------+-----+--------+--------+-------+--------+--------------+------------+-------------+--------------+-------------+------------+-------------+------------+-------------+---------+-------------+-------------+-------------+------------+----------+--------------------+
|age|        job| marital|education|default|balance|housing|loan|campaign|pdays|previous|poutcome|deposit|jobIndex|        jobVec|maritalIndex|   maritalVec|educationIndex| educationVec|defaultIndex|   defaultVec|housingIndex|   housingVec|loanIndex|      loanVec|poutcomeIndex|  poutcomeVec|depositIndex|depositVec|                nums|
+---+-----------+--------+---------+-------+-------+-------+----+--------+-----+--------+--------+-------+--------+--------------+------------+-------------+--------------+-------------+------------+-------------+------------+-------------+---------+-------------+-------------+-------------+------------+----------+--------

In [0]:
# then apply Min-Max normalisation on each attribute using MinMaxScaler  
from pyspark.ml.feature import MinMaxScaler
  
scaler = MinMaxScaler(inputCol="nums", outputCol="scaledNums")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(df)

# rescale each feature to range [min, max].
df = scalerModel.transform(df)

df.show()

+---+-----------+--------+---------+-------+-------+-------+----+--------+-----+--------+--------+-------+--------+--------------+------------+-------------+--------------+-------------+------------+-------------+------------+-------------+---------+-------------+-------------+-------------+------------+----------+--------------------+--------------------+
|age|        job| marital|education|default|balance|housing|loan|campaign|pdays|previous|poutcome|deposit|jobIndex|        jobVec|maritalIndex|   maritalVec|educationIndex| educationVec|defaultIndex|   defaultVec|housingIndex|   housingVec|loanIndex|      loanVec|poutcomeIndex|  poutcomeVec|depositIndex|depositVec|                nums|          scaledNums|
+---+-----------+--------+---------+-------+-------+-------+----+--------+-----+--------+--------+-------+--------+--------------+------------+-------------+--------------+-------------+------------+-------------+------------+-------------+---------+-------------+-------------+----

## 4. Unsupervised learning

### 4.1 K-means

In [0]:
df2 = df.selectExpr('scaledNums as features')
df2.show() 

+--------------------+
|            features|
+--------------------+
|[0.53246753246753...|
|[0.49350649350649...|
|[0.29870129870129...|
|[0.48051948051948...|
|[0.46753246753246...|
|[0.31168831168831...|
|[0.49350649350649...|
|[0.54545454545454...|
|[0.24675324675324...|
|[0.12987012987012...|
|[0.25974025974025...|
|[0.15584415584415...|
|[0.14285714285714...|
|[0.36363636363636...|
|[0.16883116883116...|
|[0.22077922077922...|
|[0.18181818181818...|
|[0.40259740259740...|
|[0.29870129870129...|
|[0.40259740259740...|
+--------------------+
only showing top 20 rows



In [0]:
# Perform unsupervised learning on df2 with k-means 
# you can use whole df2 as both training and testing data, 
# evaluate the clustering result using Accuracy.  

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator


# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(df2)

# Make predictions
predictions = model.transform(df2)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Silhouette with squared euclidean distance = 0.5585376851872408
Cluster Centers: 
[0.47145453 0.09911939 0.05347191 0.01343832]
[0.20494051 0.09284213 0.06561405 0.01487672]


### 4.2 PCA

In [0]:
# generate a scatter plot using the first two PCA components to investigate the data distribution.
 
from pyspark.ml.feature import PCA, StandardScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

standardizer = StandardScaler(withMean=True, withStd=True,
                              inputCol='features',
                              outputCol='std_features')
model = standardizer.fit(df2)
df2 = model.transform(df2)

pca = PCA(k=2, inputCol="std_features", outputCol="pcaFeatures")
model = pca.fit(df2)

result = model.transform(df2).select("pcaFeatures")
result.show(truncate=False)

+------------------------------------------+
|pcaFeatures                               |
+------------------------------------------+
|[0.5024480111418868,1.2876640294358876]   |
|[0.5696986581689144,0.6104554195913101]   |
|[0.6016201800130624,-0.014764689663041394]|
|[0.5155958010388783,1.0788875953172095]   |
|[0.5745372759476424,0.521809213003675]    |
|[0.6279346675693007,-0.23110756666275342] |
|[0.5509097950203947,0.7809033950435555]   |
|[0.541400103465537,0.956675889966385]     |
|[0.6483246401447347,-0.5279590366276407]  |
|[0.5632651836351033,0.04229805083768032]  |
|[0.6418723077642554,-0.4470493296857937]  |
|[0.6695321744365385,-0.8769786751215933]  |
|[0.6762477902496722,-0.9602768250315613]  |
|[0.6005935071664849,0.10642765655551936]  |
|[0.6560190590894179,-0.7320152703983291]  |
|[0.5646760896940652,0.18612872525217847]  |
|[0.6541382844941198,-0.6925776186302129]  |
|[0.5995466760232857,0.1830514255072168]   |
|[0.6307010318926183,-0.2785790720509119]  |
|[0.595334

## 5. Supervised learning

In [0]:
df2 = df.selectExpr('scaledNums as features', 'poutcomeIndex as label')
df2.show() 

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.53246753246753...|  0.0|
|[0.49350649350649...|  0.0|
|[0.29870129870129...|  0.0|
|[0.48051948051948...|  0.0|
|[0.46753246753246...|  0.0|
|[0.31168831168831...|  0.0|
|[0.49350649350649...|  0.0|
|[0.54545454545454...|  0.0|
|[0.24675324675324...|  0.0|
|[0.12987012987012...|  0.0|
|[0.25974025974025...|  0.0|
|[0.15584415584415...|  0.0|
|[0.14285714285714...|  0.0|
|[0.36363636363636...|  0.0|
|[0.16883116883116...|  0.0|
|[0.22077922077922...|  0.0|
|[0.18181818181818...|  0.0|
|[0.40259740259740...|  0.0|
|[0.29870129870129...|  0.0|
|[0.40259740259740...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [0]:
train, test = df2.randomSplit([0.7, 0.3], seed = 742)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 7822
Test Dataset Count: 3340


### 5.1 LogisticRegression

In [0]:
# Logistic Regression

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator 

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(train)


# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10,
                         regParam=0.3,
                         elasticNetParam=0.8, 
                         family="multinomial")

# Fit the model
mlrModel = mlr.fit(train)

In [0]:
# exam the coefficients
# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

Multinomial coefficients: 4 X 4 CSCMatrix
(0,1) -3.3467
Multinomial intercepts: [1.8584058718122476,-0.29277621249950403,-0.4539798385650064,-1.111649820747737]


### 5.2 Decision tree

In [0]:
# Decision tree

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="label", 
                            featuresCol="features")


# Train model.  This also runs the indexers.
model = dt.fit(train)

# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|[0.0,0.0777958228...|
|       0.0|  0.0|[0.0,0.0789883135...|
|       2.0|  2.0|[0.0,0.0846668408...|
|       0.0|  0.0|[0.01298701298701...|
|       0.0|  0.0|[0.01298701298701...|
+----------+-----+--------------------+
only showing top 5 rows

Test Error = 0.115868 


### 5.3 NaiveBayes

In [0]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train)

# select example rows to display.
predictions = model.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[0.0,0.0777958228...|  0.0|[-0.4026581009425...|[0.75514649558636...|       0.0|
|[0.0,0.0789883135...|  0.0|[-0.4043642558999...|[0.75526616923914...|       0.0|
|[0.0,0.0846668408...|  2.0|[-1.3648044693040...|[0.58428306177620...|       0.0|
|[0.01298701298701...|  0.0|[-0.4115222093901...|[0.75684612238886...|       0.0|
|[0.01298701298701...|  0.0|[-0.4122209204679...|[0.75689490078114...|       0.0|
|[0.01298701298701...|  0.0|[-0.4125784005542...|[0.75691985456091...|       0.0|
|[0.02597402597402...|  0.0|[-0.4109780919298...|[0.75788310163379...|       0.0|
|[0.02597402597402...|  0.0|[-0.4118555430507...|[0.75794417488597...|       0.0|
|[0.02597402597402...|  0.0|[-0.4122292722319...|[0.75797018433582...|       0.0|
|[0.025974025974