In [None]:
!apt-get install openjdk-8-jdk-headless

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly installed, 0 to remove and 24 not upgraded.
Need to get 36.5 MB of archives.
After this operation, 144 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu focal/main amd64 libxtst6 amd64 2:1.2.3-1 [12.8 kB]
Get:2 http://archive.ubuntu.com/ubuntu focal-updates/universe amd64 openjdk-8-jre-headless amd64 8u362-ga-0ubuntu1~20.04.1 [28.2 MB]
Get:3 http://archive.ubuntu.com/ubuntu focal-updates/universe amd64 openjdk-8-jdk-headless amd64 8u362-ga-0ubuntu1~20.04.1 [8,282 kB]
Fetched 36.5

Now install Spark 3.2.1 with Hadoop 2.7

In [None]:
!wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz


--2023-05-14 10:50:09--  https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 272637746 (260M) [application/x-gzip]
Saving to: ‘spark-3.2.1-bin-hadoop2.7.tgz’


2023-05-14 10:50:20 (25.9 MB/s) - ‘spark-3.2.1-bin-hadoop2.7.tgz’ saved [272637746/272637746]



 we just need to unzip that folder.


In [None]:
!tar xf /content/spark-3.2.1-bin-hadoop2.7.tgz


There is one last thing that we need to install and that is the findspark library. It will locate Spark on the system and import it as a regular library.


In [None]:
!pip install -q findspark


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"

We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.

In [None]:
import findspark
findspark.init()
findspark.find()

'/content/spark-3.2.1-bin-hadoop2.7'

Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.

In [None]:
import pyspark
import numpy as np
import pandas as pd

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').appName('flemingo').getOrCreate()

# Read data from CSV file
#you can download it from here: https://raw.githubusercontent.com/besherh/BigDataManagement/main/SparkCSV/flights-larger.csv
data = spark.read.csv('/content/data.csv', sep=',', header=True, inferSchema=True, nullValue='NA')

data.show(5)


+------+-------------+---------+------------+----------------+----------+-----------------------------+------------------+-------+----+--------------+-----------------------------------------+---+
|userId|userSessionId|teamLevel|platformType|count_gameclicks|count_hits|Number_of_purchase(by a user)|avg_Purchase_price|country|team|Purchase_price|duration_btw_1stplayed_&_purchase(months)|age|
+------+-------------+---------+------------+----------------+----------+-----------------------------+------------------+-------+----+--------------+-----------------------------------------+---+
|   937|         5652|        1|     android|              39|         0|                            1|               1.0|     ZM|  11|             1|                                       25| 56|
|  1623|         5659|        1|      iphone|             129|         9|                            1|              10.0|     SV|  13|            10|                                       29| 75|
|    83|       

#Total number of records

In [None]:
data.count()

2375

In [None]:
# Remove the 'flight' column
data = data.drop('userid')

# Number of records with missing 'delay' values
data.filter('platformtype IS NULL').count()



0

In [None]:
# Remove records with missing values in any column and get the number of remaining rows
data = data.dropna()
print(data.count())

2363


In [None]:
from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='country', outputCol='country_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(data)

# Indexer creates a new column with numeric index values
indexed_data = indexer_model.transform(data)

# Repeat the process for the other categorical feature
indexed_data = StringIndexer(inputCol='platformType', outputCol='target').fit(indexed_data).transform(indexed_data)
indexed_data =indexed_data.drop('userSessionId','platformType','country')

indexed_data.show(5)

+---------+----------------+----------+-----------------------------+------------------+----+--------------+-----------------------------------------+---+-----------+------+
|teamLevel|count_gameclicks|count_hits|Number_of_purchase(by a user)|avg_Purchase_price|team|Purchase_price|duration_btw_1stplayed_&_purchase(months)|age|country_idx|target|
+---------+----------------+----------+-----------------------------+------------------+----+--------------+-----------------------------------------+---+-----------+------+
|        1|              39|         0|                            1|               1.0|  11|             1|                                       25| 56|       26.0|   1.0|
|        1|             129|         9|                            1|              10.0|  13|            10|                                       29| 75|       14.0|   0.0|
|        1|             102|        14|                            1|               5.0|  63|             5|                      

In [None]:
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=[
    'teamLevel', 'count_gameclicks', 'count_hits',
    'Number_of_purchase(by a user)', 
    'avg_Purchase_price','team',
    'Purchase_price', 'duration_btw_1stplayed_&_purchase(months)', 'age','country_idx'
], outputCol='features')

# Consolidate predictor columns
assembled_data = assembler.transform(indexed_data)

# Check the resulting column
new_data=assembled_data.select('features', 'target').show(5, truncate=False)

+-------------------------------------------------+------+
|features                                         |target|
+-------------------------------------------------+------+
|[1.0,39.0,0.0,1.0,1.0,11.0,1.0,25.0,56.0,26.0]   |1.0   |
|[1.0,129.0,9.0,1.0,10.0,13.0,10.0,29.0,75.0,14.0]|0.0   |
|[1.0,102.0,14.0,1.0,5.0,63.0,5.0,10.0,35.0,16.0] |1.0   |
|[1.0,39.0,4.0,1.0,3.0,18.0,3.0,10.0,58.0,83.0]   |1.0   |
|[1.0,90.0,10.0,1.0,3.0,63.0,3.0,30.0,63.0,46.0]  |1.0   |
+-------------------------------------------------+------+
only showing top 5 rows



## Decision Tree


### Train/test split
To objectively assess a Machine Learning model you need to be able to test it on an independent set of data. You can't use the same data that you used to train the model: of course the model will perform (relatively) well on those data!

You will split the data into two components:

- training data (used to train the model) and
- testing data (used to test the model).

In [None]:
# Split into training and test sets in a 80:20 ratio
train, test = assembled_data.randomSplit([0.8, 0.2], seed=1)

# Check that training set has around 80% of records
ratio = train.count() / assembled_data.count()
train_data=train.select('features', 'target')
test_data=test.select('features', 'target')
print(ratio)
train_data.show(5)

0.8087177316969953
+--------------------+------+
|            features|target|
+--------------------+------+
|[1.0,8.0,1.0,2.0,...|   2.0|
|[1.0,8.0,1.0,2.0,...|   2.0|
|[1.0,19.0,2.0,1.0...|   1.0|
|[1.0,22.0,1.0,1.0...|   0.0|
|[1.0,27.0,4.0,1.0...|   0.0|
+--------------------+------+
only showing top 5 rows



### Build a Decision Tree
Now that you've split the flights data into training and testing sets, you can use the training set to fit a Decision Tree model.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier(labelCol='target')
tree.setMaxBins(216)
tree_model = tree.fit(train_data)

# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(test_data)
prediction.select('target', 'prediction', 'probability').show(5, False)

+------+----------+---------------------------------------------------------------------------------------+
|target|prediction|probability                                                                            |
+------+----------+---------------------------------------------------------------------------------------+
|0.0   |0.0       |[0.9240710823909531,0.050080775444264945,0.011308562197092083,0.0,0.014539579967689823]|
|0.0   |0.0       |[0.9240710823909531,0.050080775444264945,0.011308562197092083,0.0,0.014539579967689823]|
|0.0   |0.0       |[0.9240710823909531,0.050080775444264945,0.011308562197092083,0.0,0.014539579967689823]|
|0.0   |0.0       |[0.7,0.3,0.0,0.0,0.0]                                                                  |
|0.0   |0.0       |[0.9240710823909531,0.050080775444264945,0.011308562197092083,0.0,0.014539579967689823]|
+------+----------+---------------------------------------------------------------------------------------+
only showing top 5 rows



### Evaluate the Decision Tree
You can assess the quality of your model by evaluating how well it performs on the testing data. Because the model was not trained on these data, this represents an objective assessment of the model.

A confusion matrix gives a useful breakdown of predictions versus known values. It has four cells which represent the counts of:

- True Negatives (TN) — model predicts negative outcome & known outcome is negative
- True Positives (TP) — model predicts positive outcome & known outcome is positive
- False Negatives (FN) — model predicts negative outcome but known outcome is positive
- False Positives (FP) — model predicts positive outcome but known outcome is negative.

In [None]:
# Create a confusion matrix
prediction.groupBy('target', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND target = 0').count()
TP = prediction.filter('prediction = 1 AND target = 1').count()
FN = prediction.filter('prediction = 0 AND target = 1').count()
FP = prediction.filter('prediction = 1 AND target = 0').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

+------+----------+-----+
|target|prediction|count|
+------+----------+-----+
|   2.0|       0.0|    2|
|   1.0|       1.0|  142|
|   3.0|       2.0|   20|
|   0.0|       1.0|   20|
|   0.0|       4.0|    1|
|   2.0|       2.0|   47|
|   1.0|       0.0|    6|
|   3.0|       1.0|    3|
|   2.0|       1.0|   16|
|   1.0|       2.0|   11|
|   0.0|       0.0|  161|
|   1.0|       3.0|    1|
|   4.0|       0.0|    1|
|   3.0|       3.0|   11|
|   3.0|       0.0|    1|
|   4.0|       1.0|    9|
+------+----------+-----+

0.9209726443768997


In [None]:
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('precision = {:.2f}\nrecall   = {:.2f}'.format(precision, recall))

precision = 0.88
recall   = 0.96


In [None]:
from re import VERBOSE
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define the layers of the MLP
layers = [10, 10,15, 5]

# Set the number of epochs and the dropout rate
num_epochs = 500


# Create the MLP object and set the parameters
mlp = MultilayerPerceptronClassifier(featuresCol='features', labelCol='target', layers=layers, seed=42,
                                     maxIter=num_epochs, blockSize=500, stepSize=0.1)

# Train the MLP on the training data
mlp_model = mlp.fit(train_data)

# Make predictions on the test data
predictions = mlp_model.transform(test_data)
# Evaluate the performance of the MLP using the accuracy metric
evaluator = MulticlassClassificationEvaluator(labelCol='target', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy = {:.2f}%'.format(accuracy * 100))

Accuracy = 77.88%


In [None]:
!pip install sparkdl

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sparkdl
  Downloading sparkdl-0.2.2-py3-none-any.whl (99 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.7/99.7 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sparkdl
Successfully installed sparkdl-0.2.2


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import MultilayerPerceptronClassifier
from sparkdl import KerasImageFileTransformer

# Define the Keras model
from keras.models import Sequential
from keras.layers import Dense

keras_model = Sequential()
keras_model.add(Dense(10, input_shape=(10,), activation='relu'))
keras_model.add(Dense(5, activation='softmax'))

# Define the Spark pipeline

transformer = KerasImageFileTransformer(inputCol="features", outputCol="predictions", modelFile=keras_model, \
                                         imageShape=(10,), outputMode="vector")
mlp = MultilayerPerceptronClassifier(featuresCol="features", labelCol="target", predictionCol="prediction", \
                                      maxIter=100, layers=[10, 50, 5], blockSize=128, seed=1234)
pipeline = Pipeline(stages=[assembled_data, transformer, mlp])

# Fit the model on the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = {}".format(accuracy))

Exception ignored in: <function JavaWrapper.__del__ at 0x7f7fddd7f0a0>
Traceback (most recent call last):
  File "/content/spark-3.2.1-bin-hadoop2.7/python/pyspark/ml/wrapper.py", line 39, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'MultilayerPerceptronClassifier' object has no attribute '_java_obj'
Exception ignored in: <function JavaWrapper.__del__ at 0x7f7fddd7f0a0>
Traceback (most recent call last):
  File "/content/spark-3.2.1-bin-hadoop2.7/python/pyspark/ml/wrapper.py", line 39, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'MultilayerPerceptronClassifier' object has no attribute '_java_obj'


ModuleNotFoundError: ignored