In [0]:
# this tutorial is based on the content of this blog: https://neptune.ai/blog/apache-spark-tutorial

#Connect the PySpark notebook to Spark master node
# in order to run your python code on a Spark engine, you need to create a spark session
# and create a connection to the master node where you intend to run your python code

# the code below will only run on a platform which has a cluster of machines with apache spark installed
# this cluster will include master and worker nodes
# typically this would be a cloud platform with this kind of installation (a cluster) but you can create
# a 'virtual cluster' within a virtual machine environment such as Docker

from pyspark.sql import SparkSession

#PySpark is the Python API for Apache Spark. It enables you to perform real-time, large-scale data processing in a  #distributed environment using Python.

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

In [0]:
# to get the link to the raw data you have to click on 'raw' in the dataset preview in your online repository
# https://github.com/garthajon/DataScienceColabRepo/blob/main/heart.csv (so click on the 'raw' button here)
# to get the 'raw data' url for your dataset
url = "https://raw.githubusercontent.com/garthajon/DataScienceColabRepo/main/heart.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
path  = SparkFiles.get('heart.csv')
df = spark.read.csv('file:///' + path, header=True, inferSchema= True, sep = ',')
df.show(n=5)

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|
| 41|  0|  1|     130| 204|  0|      0|    172|    0|    1.4|    2|  0|   2|     1|
| 56|  1|  1|     120| 236|  0|      1|    178|    0|    0.8|    2|  0|   2|     1|
| 57|  0|  0|     120| 354|  0|      1|    163|    1|    0.6|    2|  0|   2|     1|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
only showing top 5 rows



In [0]:
# use the display function of the dataframe class to get a view of the data
# the field headings and the datatypes of the fields
display(df)
#  close the spark session
#spark.stop()

age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
63,1,3,145,233,1,0,150,0,2.3,0,0,1,1
37,1,2,130,250,0,1,187,0,3.5,0,0,2,1
41,0,1,130,204,0,0,172,0,1.4,2,0,2,1
56,1,1,120,236,0,1,178,0,0.8,2,0,2,1
57,0,0,120,354,0,1,163,1,0.6,2,0,2,1
57,1,0,140,192,0,1,148,0,0.4,1,0,1,1
56,0,1,140,294,0,0,153,0,1.3,1,0,2,1
44,1,1,120,263,0,1,173,0,0.0,2,0,3,1
52,1,2,172,199,1,1,162,0,0.5,2,0,3,1
57,1,2,150,168,0,1,174,0,1.6,2,0,2,1


Databricks visualization. Run in Databricks to view.

In [0]:
# we can the filter method of the dataframe class to filter on specific categories like a where clause
df.filter((df.age>50) & (df.target=='1')).show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|
| 56|  1|  1|     120| 236|  0|      1|    178|    0|    0.8|    2|  0|   2|     1|
| 57|  0|  0|     120| 354|  0|      1|    163|    1|    0.6|    2|  0|   2|     1|
| 57|  1|  0|     140| 192|  0|      1|    148|    0|    0.4|    1|  0|   1|     1|
| 56|  0|  1|     140| 294|  0|      0|    153|    0|    1.3|    1|  0|   2|     1|
| 52|  1|  2|     172| 199|  1|      1|    162|    0|    0.5|    2|  0|   3|     1|
| 57|  1|  2|     150| 168|  0|      1|    174|    0|    1.6|    2|  0|   2|     1|
| 54|  1|  0|     140| 239|  0|      1|    160|    0|    1.2|    2|  0|   2|     1|
| 64|  1|  3|     110| 211|  0|      0|    144|    1|    1.8|    1|  0|   2|

In [0]:
#pySpark also lets you run `group by` operations similarly to how you would do them in Pandas. Let’s see how that can be done.
# remember this is pyspark which is a python API designed for processing big data on a big data distributed computing #platform/cluster specifically using the Apache Spark Engine
from pyspark.sql import functions as F
df.groupBy(["sex"]).agg(
    F.mean("age").alias("Mean Age")
    ).show()

+---+------------------+
|sex|          Mean Age|
+---+------------------+
|  1| 53.75845410628019|
|  0|55.677083333333336|
+---+------------------+



In [0]:
#If you’re coming from the world of SQL, you might be interested in querying the data frame as if it was an SQL table.
#You can do that by using the `SQLContext` to register a temporary SQL table. After that, you can run SQL queries normally.
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)
#df.registerTempTable('df_table')
# deprecatiion of registerTempTable in Spark version 2.0 use createOrReplaceTempView instead
df.createOrReplaceTempView('df_table')
df_sql = sqlContext.sql('select age,target,sex,slope,cp from df_table where age>30 ORDER BY age DESC')
df_sql.show()



+---+------+---+-----+---+
|age|target|sex|slope| cp|
+---+------+---+-----+---+
| 77|     0|  1|    2|  0|
| 76|     1|  0|    1|  2|
| 74|     1|  0|    2|  1|
| 71|     1|  0|    2|  1|
| 71|     1|  0|    2|  2|
| 71|     1|  0|    1|  0|
| 70|     1|  1|    2|  1|
| 70|     0|  1|    0|  0|
| 70|     0|  1|    1|  0|
| 70|     0|  1|    1|  2|
| 69|     1|  0|    2|  3|
| 69|     1|  1|    1|  3|
| 69|     0|  1|    1|  2|
| 68|     1|  1|    2|  2|
| 68|     1|  0|    1|  2|
| 68|     0|  1|    1|  2|
| 68|     0|  1|    1|  0|
| 67|     1|  0|    2|  2|
| 67|     1|  0|    1|  2|
| 67|     1|  0|    2|  0|
+---+------+---+-----+---+
only showing top 20 rows



Data preprocessing with Spark
Once you’re done with data exploration, the next step is to convert the data into a format that would be accepted by Spark’s MLlib. In this case, the features need to be transformed into a single vector that will be passed to the machine learning model.

The conventional pyspark dataframe is not a suitable data structure for the Apache Spark machine learning module, we need to create an appropriate ML data structure out of the data frame called a vector and we need a 'vectorassembler' in order to do that.

This can be done using the `VectorAssembler`. Let’s import it and instantiate it using the features in the dataset.
Spark's machine learning library works with vectors not regular data frames, so you need to first create/instantiate a 
vector assembler, and then use the vector assembler to transform your data frame into a vector

In [0]:
 # create the vector assembler which will convert our dataframe into a vector suitable for sparks Machine learning library (ML)
# note that when this cell was first run, the VectorAssembler class could not be imported because the numpy
# package was missing (this was given in the error message), i therefore had to open the dockerfile image
#definition of the jupyter image and add numpy to the pip install statement in that file
# then delete the container, then delete the jupyter image, recreate the jupyter image and recreate the container
# note that when you create your VectorAssembler, you state the list of what your 'feature' columns
# will be, and the feature columns are the columns used to predict the predicted (or in this case target)
# column, in other words your feature columns are together used for the machine learning classification 
#output
from pyspark.ml.feature import VectorAssembler
feat_cols = ['age',
 'sex',
 'cp',
 'trestbps',
 'chol',
 'fbs',
 'restecg',
 'thalach',
 'exang',
 'oldpeak',
 'slope',
 'ca',
 'thal']
vec_assember = VectorAssembler(inputCols = feat_cols, outputCol='features' )

In [0]:
# next we use the vector assembler object to create a vector from the dataframe
final_data = vec_assember.transform(df)

In [0]:
# the take function of the vector object can be used to view part of the vector
# you can pass a numerical parameter to specify how many items in the vector than you want to view
#You will notice a vector known as `features` that contains all the values of the features.
# note below that although the attribute 'target' is included in the vector
# it is not included in the list of features (the features vector), this is because when you 
# assembled the vector you did not state in your 'features' list that target should be in your features
# list
# 'target' is the last attribute/column and this is actually the classification field that the
# the model will try to correctly classify by using the features
final_data.take(1)

Out[10]: [Row(age=63, sex=1, cp=3, trestbps=145, chol=233, fbs=1, restecg=0, thalach=150, exang=0, oldpeak=2.3, slope=0, ca=0, thal=1, target=1, features=DenseVector([63.0, 1.0, 3.0, 145.0, 233.0, 1.0, 0.0, 150.0, 0.0, 2.3, 0.0, 0.0, 1.0]))]

In [0]:
# take the vector and use the function randomsplit  to split the data into a training vector and a test vector
# splitting the parts 0.7:0.3
# the first argument of the randomsplit function is the weighting which must add up to 1
# the second argyument of the randomsplitfunction is the 'seed' which is the size of the sample
# in this case 42% of the dataset
training,testing = final_data.randomSplit([0.7,0.3],seed=42)

In [0]:
# import the logistic regression model/algorithm from the machine learning package/library
from pyspark.ml.classification import LogisticRegression

In [0]:
# next create an instance of the model, specifying as your two paramters the label/classification 
# column i.e. which field the model should be attempting to classify
# and secondly the set of features that the model should use to effect the classification/prediction
lr = LogisticRegression(labelCol='target',featuresCol='features')

In [0]:
# now train the model to classify the 'target' classifiction correctly
# note that the training dataset has 'known'/established values
# and since we know the values/classification and are using known/actual classifications to train
# the model or for the model to 'learn to classify from' this is an example of
# 'supervised' machine learning because the model actually known classifications to work from
# another common word for training a supervised machine learning model to classify is called 
#'fitting the data' - note how we use the training portion of the vector to effect the training
lrModel = lr.fit(training)

In [0]:
#next we use the test portion of the data to actually effect a classification/prediction
# note that the value in the output predicted column, resulting from this classification attempt
# after the training has taken place (once the model has been trained) should match the classification
# value in the 'target' column
predictions = lrModel.transform(testing)

In [0]:
# now we can have a look at the output of the prediction of the model from the test dataset/vector
# and eyeball how accurate the prediction is 
predictions.select('target','prediction','probability','age','sex').show()

+------+----------+--------------------+---+---+
|target|prediction|         probability|age|sex|
+------+----------+--------------------+---+---+
|     1|       1.0|[0.02332762256461...| 34|  1|
|     1|       1.0|[0.08090313549404...| 35|  1|
|     1|       1.0|[0.36018428647252...| 37|  1|
|     1|       0.0|[0.52121020436607...| 38|  1|
|     1|       1.0|[0.03356685381595...| 39|  0|
|     0|       0.0|[0.75379044530973...| 39|  1|
|     1|       1.0|[0.09877461243623...| 39|  1|
|     1|       1.0|[0.01989559218277...| 41|  0|
|     1|       1.0|[0.03963078542695...| 41|  0|
|     0|       1.0|[0.37342653283289...| 41|  1|
|     1|       1.0|[0.10774051357158...| 41|  1|
|     1|       1.0|[0.24595054276821...| 41|  1|
|     1|       1.0|[0.18754189946980...| 42|  0|
|     1|       1.0|[0.01137506205122...| 42|  0|
|     1|       1.0|[0.21846468460838...| 42|  1|
|     1|       1.0|[0.19741108994433...| 42|  1|
|     1|       1.0|[0.08272556985828...| 42|  1|
|     1|       1.0|[

In [0]:
# create a dataframe from the predictions output with two columns to use to pass through a binary classification evaluator
# use SQL to create the dataframe in order to rename columns and cast the column type to double
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)
predictions.registerTempTable('df_table2')
# note the binary classification evaluator expects a column called 'label' as being the actual classification to evaluate your
# prediction against
# and it must be of data type double
dataset = sqlContext.sql('select cast(target as double) as ''label'',prediction from df_table2 where age>30 ORDER BY age DESC')
dataset.show()




+-----+----------+
|label|prediction|
+-----+----------+
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
+-----+----------+
only showing top 20 rows



In [0]:
#Since this is binary classification, the `BinaryClassificationEvaluator` function
#can be used to evaluate the model (the output is 1 or 0 so is binary so therefore
# the binary classifier evaluator is suitable to evaluate this model)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
# the binary classification evaluator uses the 'label' column and the 'prediction' column to evaluate the success of the prediction
evaluator.setRawPredictionCol("prediction")
#evaluator.evaluate(predictions)
evaluator.evaluate(dataset)

Out[18]: 0.8693877551020409