In [2]:
!pip install pyspark==3.0
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

Collecting pyspark==3.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[K     |████████████████████████████████| 204.7 MB 39 kB/s s eta 0:00:01        | 147.0 MB 84.1 MB/s eta 0:00:01
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 30.1 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044182 sha256=b8fef102b516c5442a6209b3d83c745881135b1154193abef7b1b7128f8c3522
  Stored in directory: /home/spark/shared/.cache/pip/wheels/4e/c5/36/aef1bb711963a619063119cc032176106827a129c0be20e301
Successfully built pyspark
[31mERROR: sparktspy-nojars 2.0.5.0 has requirement pyspark==3.0.1, but you'll have pyspark 3.0.0 which is incompatible.[0m
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.0


In [3]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

In [4]:
# Import data from github
!git clone https://github.com/wchill/HMP_Dataset.git

Cloning into 'HMP_Dataset'...
remote: Enumerating objects: 865, done.[K
remote: Total 865 (delta 0), reused 0 (delta 0), pack-reused 865[K
Receiving objects: 100% (865/865), 1010.96 KiB | 0 bytes/s, done.
Checking out files: 100% (848/848), done.


In [5]:
# View all datasets avaialble in the repository
! ls HMP_Dataset

Brush_teeth	Drink_glass  Getup_bed	  Pour_water	 Use_telephone
Climb_stairs	Eat_meat     impdata.py   README.txt	 Walk
Comb_hair	Eat_soup     Liedown_bed  Sitdown_chair
Descend_stairs	final.py     MANUAL.txt   Standup_chair


In [6]:
# Viewing all datasets avaialble in Brush teeth
! ls HMP_Dataset/Brush_teeth

Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt
Accelerometer-2011-04-11-13-29-54-brush_teeth-f1.txt
Accelerometer-2011-05-30-08-35-11-brush_teeth-f1.txt
Accelerometer-2011-05-30-09-36-50-brush_teeth-f1.txt
Accelerometer-2011-05-30-10-34-16-brush_teeth-m1.txt
Accelerometer-2011-05-30-21-10-57-brush_teeth-f1.txt
Accelerometer-2011-05-30-21-55-04-brush_teeth-m2.txt
Accelerometer-2011-05-31-15-16-47-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-42-22-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-45-50-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-45-27-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-48-05-brush_teeth-f1.txt


In [7]:
# Importing relevant libraries for creating schema
from pyspark.sql.types import StructType, StructField, IntegerType

# Creating schema with 3 columns as per data
schema = StructType({
    StructField('x', IntegerType(), True),
    StructField('y', IntegerType(), True),
    StructField('z', IntegerType(), True),
})

In [8]:
# Import relevant library to help with datapreprocessing
import os

In [9]:
# get list of folders/files in folder HMP_Dataset
file_list = os.listdir('HMP_Dataset')
file_list

['.git',
 '.idea',
 'Brush_teeth',
 'Climb_stairs',
 'Comb_hair',
 'Descend_stairs',
 'Drink_glass',
 'Eat_meat',
 'Eat_soup',
 'Getup_bed',
 'Liedown_bed',
 'MANUAL.txt',
 'Pour_water',
 'README.txt',
 'Sitdown_chair',
 'Standup_chair',
 'Use_telephone',
 'Walk',
 'final.py',
 'impdata.py']

In [10]:
# filter list for folders containing data
file_list_filtered = [s for s in file_list if '.' not in s]
file_list_filtered

['Brush_teeth',
 'Climb_stairs',
 'Comb_hair',
 'Descend_stairs',
 'Drink_glass',
 'Eat_meat',
 'Eat_soup',
 'Getup_bed',
 'Liedown_bed',
 'Pour_water',
 'Sitdown_chair',
 'Standup_chair',
 'Use_telephone',
 'Walk']

In [11]:
# Store data as a schema
df = None

from pyspark.sql.functions import lit

for category in file_list_filtered:
    data_files = os.listdir('HMP_Dataset/'+category)
    
    for data_file in data_files:
        #print(data_file)
        temp_df = spark.read.option('header', 'false').option('delimiter', ' ').csv('HMP_Dataset/'+category+'/'+data_file, schema = schema)
        temp_df = temp_df.withColumn('class', lit(category))
        temp_df = temp_df.withColumn('source', lit(data_file))
        
        if df is None:
            df = temp_df
        else:
            df = df.union(temp_df)

In [12]:
# Show schema
df.show()

+---+---+---+-----------+--------------------+
|  z|  x|  y|      class|              source|
+---+---+---+-----------+--------------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|
| 20| 50| 34|Brush_teeth|Accelerometer-201...|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|
| 20| 51| 35|Brush_teeth|Accelerometer-201...|
| 18| 49| 34|Brush_teeth|Accelerometer-201...|
| 19| 48| 34|Brush_teeth|Accelerometer-201...|
| 16| 53| 34|Brush_teeth|Accelerometer-201...|
| 18| 52| 35|

In [13]:
df.count()

446529

Read/Save into PARQUET

In [None]:
df.write.parquet('HMP.parquet')

#df = spark.read.parquet('HMP.parquet')
#df.createOrReplaceTempView('HMP')

Create a pipeline

In [18]:
df = df.sample(False, 0.01)

In [19]:
df.show()

+---+---+---+-----------+--------------------+
|  z|  x|  y|      class|              source|
+---+---+---+-----------+--------------------+
| 24| 49| 39|Brush_teeth|Accelerometer-201...|
| 38| 54| 42|Brush_teeth|Accelerometer-201...|
| 22| 49| 45|Brush_teeth|Accelerometer-201...|
| 31| 52| 39|Brush_teeth|Accelerometer-201...|
| 34| 54| 41|Brush_teeth|Accelerometer-201...|
| 37| 53| 30|Brush_teeth|Accelerometer-201...|
| 25| 39| 49|Brush_teeth|Accelerometer-201...|
| 32| 49| 12|Brush_teeth|Accelerometer-201...|
| 26| 31| 41|Brush_teeth|Accelerometer-201...|
| 54| 47| 43|Brush_teeth|Accelerometer-201...|
|  2| 33| 41|Brush_teeth|Accelerometer-201...|
|  0| 41| 43|Brush_teeth|Accelerometer-201...|
| 13| 39| 38|Brush_teeth|Accelerometer-201...|
| 12| 39| 34|Brush_teeth|Accelerometer-201...|
| 35| 52| 45|Brush_teeth|Accelerometer-201...|
| 23| 49| 40|Brush_teeth|Accelerometer-201...|
| 31| 55| 37|Brush_teeth|Accelerometer-201...|
| 30| 48| 41|Brush_teeth|Accelerometer-201...|
| 20| 49| 38|

In [21]:
df.count()

454

In [22]:
# Index the class name aka automatic mapping
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = 'class', outputCol = 'classIndex')
indexed = indexer.fit(df).transform(df)

indexed.show()

+---+---+---+-----------+--------------------+----------+
|  z|  x|  y|      class|              source|classIndex|
+---+---+---+-----------+--------------------+----------+
| 24| 49| 39|Brush_teeth|Accelerometer-201...|       5.0|
| 38| 54| 42|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 49| 45|Brush_teeth|Accelerometer-201...|       5.0|
| 31| 52| 39|Brush_teeth|Accelerometer-201...|       5.0|
| 34| 54| 41|Brush_teeth|Accelerometer-201...|       5.0|
| 37| 53| 30|Brush_teeth|Accelerometer-201...|       5.0|
| 25| 39| 49|Brush_teeth|Accelerometer-201...|       5.0|
| 32| 49| 12|Brush_teeth|Accelerometer-201...|       5.0|
| 26| 31| 41|Brush_teeth|Accelerometer-201...|       5.0|
| 54| 47| 43|Brush_teeth|Accelerometer-201...|       5.0|
|  2| 33| 41|Brush_teeth|Accelerometer-201...|       5.0|
|  0| 41| 43|Brush_teeth|Accelerometer-201...|       5.0|
| 13| 39| 38|Brush_teeth|Accelerometer-201...|       5.0|
| 12| 39| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 35| 52| 45|B

In [24]:
# One Hot Encode the class index
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol = 'classIndex', outputCol = 'categoryVector')
encoded = encoder.fit(indexed).transform(indexed)

encoded.show()

+---+---+---+-----------+--------------------+----------+--------------+
|  z|  x|  y|      class|              source|classIndex|categoryVector|
+---+---+---+-----------+--------------------+----------+--------------+
| 24| 49| 39|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|
| 38| 54| 42|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|
| 22| 49| 45|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|
| 31| 52| 39|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|
| 34| 54| 41|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|
| 37| 53| 30|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|
| 25| 39| 49|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|
| 32| 49| 12|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|
| 26| 31| 41|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|
| 54| 47| 43|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|
|  2| 33| 41|Brush_teeth|Accelerometer-201...|     

In [25]:
# Turn x,y,z into a single vector
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['x','y','z'], outputCol = 'features')
vectorAssembled = vectorAssembler.transform(encoded)

vectorAssembled.show()

+---+---+---+-----------+--------------------+----------+--------------+----------------+
|  z|  x|  y|      class|              source|classIndex|categoryVector|        features|
+---+---+---+-----------+--------------------+----------+--------------+----------------+
| 24| 49| 39|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[49.0,39.0,24.0]|
| 38| 54| 42|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[54.0,42.0,38.0]|
| 22| 49| 45|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[49.0,45.0,22.0]|
| 31| 52| 39|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[52.0,39.0,31.0]|
| 34| 54| 41|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[54.0,41.0,34.0]|
| 37| 53| 30|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[53.0,30.0,37.0]|
| 25| 39| 49|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[39.0,49.0,25.0]|
| 32| 49| 12|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[49.0,12.0,32.0]|
| 26| 31| 

In [26]:
# Normalize the data
from pyspark.ml.feature import Normalizer

normaliser = Normalizer(inputCol = 'features', outputCol = 'normalizedFeatures')
normalised = normaliser.transform(vectorAssembled)

normalised.show()

+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
|  z|  x|  y|      class|              source|classIndex|categoryVector|        features|  normalizedFeatures|
+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
| 24| 49| 39|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[49.0,39.0,24.0]|[0.73061124874900...|
| 38| 54| 42|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[54.0,42.0,38.0]|[0.69004302254196...|
| 22| 49| 45|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[49.0,45.0,22.0]|[0.69928680572414...|
| 31| 52| 39|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[52.0,39.0,31.0]|[0.72208294496276...|
| 34| 54| 41|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[54.0,41.0,34.0]|[0.71194521064855...|
| 37| 53| 30|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[53.0,30.0,37.0]|[0.74375435432843...|
|

In [27]:
# Creating a pipeline for preprocessing data
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [indexer,encoder,vectorAssembler,normaliser])

model = pipeline.fit(df)

prediction = model.transform(df)

prediction.show()

+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
|  z|  x|  y|      class|              source|classIndex|categoryVector|        features|  normalizedFeatures|
+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
| 24| 49| 39|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[49.0,39.0,24.0]|[0.73061124874900...|
| 38| 54| 42|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[54.0,42.0,38.0]|[0.69004302254196...|
| 22| 49| 45|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[49.0,45.0,22.0]|[0.69928680572414...|
| 31| 52| 39|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[52.0,39.0,31.0]|[0.72208294496276...|
| 34| 54| 41|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[54.0,41.0,34.0]|[0.71194521064855...|
| 37| 53| 30|Brush_teeth|Accelerometer-201...|       5.0|(13,[5],[1.0])|[53.0,30.0,37.0]|[0.74375435432843...|
|

ETL

In [None]:
import os
import fnmatch

d = 'HMP_Dataset/'

# filter list for all folders containing data (folders that don't start with .)
file_list_filtered = [s for s in os.listdir(d) if os.path.isdir(os.path.join(d,s)) & ~fnmatch.fnmatch(s, '.*')]

from pyspark.sql.functions import lit

#create pandas data frame for all the data

df = None

for category in file_list_filtered:
    data_files = os.listdir('HMP_Dataset/'+category)
    
    #create a temporary pandas data frame for each data file
    for data_file in data_files:
        #print(data_file)
        temp_df = spark.read.option("header", "false").option("delimiter", " ").csv('HMP_Dataset/'+category+'/'+data_file,schema=schema)
        
        #create a column called "source" storing the current CSV file
        temp_df = temp_df.withColumn("source", lit(data_file))
        
        #create a column called "class" storing the current data folder
        temp_df = temp_df.withColumn("class", lit(category))
        
        #append to existing data frame list
        #data_frames = data_frames + [temp_df]
                                                                                                             
        if df is None:
            df = temp_df
        else:
            df = df.union(temp_df)

In [None]:
df.write.parquet('hmp.parquet')

SKLearn Pipeline

In [None]:
!pip install ibex

In [None]:
from ibex.sklearn.preprocessing import StandardScaler
from ibex.sklearn.preprocessing import LabelEncoder
from ibex.sklearn.preprocessing import OneHotEncoder

from ibex import trans

pipeline = (trans(LabelEncoder(), in_cols='class') + 
    trans(StandardScaler(), in_cols=['x', 'y', 'z']) + 
    trans(OneHotEncoder(), in_cols=['functiontransformer_0'][0]) + 
    trans(None, in_cols='source')
)

df_scaled = pipeline.fit_transform(df)

Linear Regression

In [28]:
#register a corresponding query table
df.createOrReplaceTempView('df')
df_energy = spark.sql("""
select sqrt(sum(x*x)+sum(y*y)+sum(z*z)) as label, class from df group by class
""")      
df_energy.createOrReplaceTempView('df_energy')
df_join = spark.sql('select * from df inner join df_energy on df.class=df_energy.class')
df_join.show()

+---+---+---+-----------+--------------------+-----------------+-----------+
|  z|  x|  y|      class|              source|            label|      class|
+---+---+---+-----------+--------------------+-----------------+-----------+
| 24| 49| 39|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|
| 38| 54| 42|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|
| 22| 49| 45|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|
| 31| 52| 39|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|
| 34| 54| 41|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|
| 37| 53| 30|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|
| 25| 39| 49|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|
| 32| 49| 12|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|
| 26| 31| 41|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|
| 54| 47| 43|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|

In [29]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, normalizer,lr])

In [30]:
model = pipeline.fit(df_join)

In [31]:
prediction = model.transform(df_join)

In [32]:
prediction.show()

+---+---+---+-----------+--------------------+-----------------+-----------+----------------+--------------------+------------------+
|  z|  x|  y|      class|              source|            label|      class|        features|       features_norm|        prediction|
+---+---+---+-----------+--------------------+-----------------+-----------+----------------+--------------------+------------------+
| 24| 49| 39|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|[49.0,39.0,24.0]|[0.4375,0.3482142...| 397.3754999617832|
| 38| 54| 42|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|[54.0,42.0,38.0]|[0.40298507462686...|386.46797661102596|
| 22| 49| 45|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|[49.0,45.0,22.0]|[0.42241379310344...|404.66685982211425|
| 31| 52| 39|Brush_teeth|Accelerometer-201...|385.8924720696168|Brush_teeth|[52.0,39.0,31.0]|[0.42622950819672...| 390.7518308169358|
| 34| 54| 41|Brush_teeth|Accelerometer-201...|385.892472069616

In [33]:
model.stages[2].summary.r2

0.02054481784295581

Logistic Regression with split

In [34]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [35]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler


indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

In [36]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [37]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer,lr])

In [None]:
model = pipeline.fit(df_train)
prediction = model.transform(df_train)

In [None]:
prediction.printSchema()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
binEval.evaluate(prediction)

In [None]:
prediction = model.transform(df_test)
binEval.evaluate(prediction)

SupportVector Machine

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

indexer = StringIndexer(inputCol="class", outputCol="label")
encoder = OneHotEncoder(inputCol="label", outputCol="labelVec")
vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")
normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

In [None]:
from pyspark.ml.classification import LinearSVC

lsvc = LinearSVC(maxIter=10, regParam=0.1)

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer,lsvc])

In [None]:
df.createOrReplaceTempView('df')
df_two_class = spark.sql("select * from df where class in ('Use_telephone','Standup_chair')")

splits = df_two_class.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [None]:
model = pipeline.fit(df_train)
prediction = model.transform(df_train)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(prediction)

In [None]:
prediction = model.transform(df_test)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(prediction)