-
Notifications
You must be signed in to change notification settings - Fork 821
/
TabularSHAP.scala
101 lines (82 loc) · 3.7 KB
/
TabularSHAP.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.
package com.microsoft.azure.synapse.ml.explainers
import breeze.stats.distributions.RandBasis
import com.microsoft.azure.synapse.ml.core.schema.DatasetExtensions
import com.microsoft.azure.synapse.ml.logging.FeatureNames
import org.apache.spark.injections.UDFUtils
import org.apache.spark.ml.ComplexParamsReadable
import org.apache.spark.ml.param.shared.HasInputCols
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
class TabularSHAP(override val uid: String)
extends KernelSHAPBase(uid)
with HasInputCols
with HasBackgroundData {
logClass(FeatureNames.Explainers)
def this() = {
this(Identifiable.randomUID("TabularSHAP"))
}
def setInputCols(values: Array[String]): this.type = this.set(inputCols, values)
def setInputCols(values: Seq[String]): this.type = setInputCols(values.toArray)
override protected def createSamples(df: DataFrame,
idCol: String,
coalitionCol: String,
weightCol: String,
targetClassesCol: String): DataFrame = {
val instanceCol = DatasetExtensions.findUnusedColumnName("instance", df)
val backgroundCol = DatasetExtensions.findUnusedColumnName("background", df)
val instances = df.select(col(idCol), col(targetClassesCol), struct(getInputCols.map(col): _*).alias(instanceCol))
val background = this.getBackgroundData
.select(struct(getInputCols.map(col): _*).alias(backgroundCol))
val featureSize = this.getInputCols.length
val effectiveNumSamples = KernelSHAPBase.getEffectiveNumSamples(this.getNumSamplesOpt, featureSize)
val sampleType = StructType(this.getInputCols.map {
feature =>
df.schema.fields.find(_.name == feature).getOrElse {
throw new Exception(s"Column $feature not found in schema ${df.schema.simpleString}")
}
})
val infWeightVal = this.getInfWeight
val samplesUdf = UDFUtils.oldUdf(
{
(instance: Row, background: Row) =>
val sampler = new KernelSHAPTabularSampler(instance, background, effectiveNumSamples, infWeightVal)
(1 to effectiveNumSamples) map {
_ =>
implicit val randBasis: RandBasis = RandBasis.mt0
sampler.sample
}
},
getSampleSchema(sampleType)
)
val samplesCol = DatasetExtensions.findUnusedColumnName("samples", df)
instances.crossJoin(background)
.withColumn(samplesCol, explode(samplesUdf(col(instanceCol), col(backgroundCol))))
.select(
col(idCol),
expr(s"$samplesCol.$sampleField.*"),
col(samplesCol).getField(coalitionField).alias(coalitionCol),
col(samplesCol).getField(weightField).alias(weightCol),
col(targetClassesCol)
)
}
override def validateSchema(schema: StructType): Unit = {
super.validateSchema(schema)
if (this.get(backgroundData).isDefined) {
this.getInputCols.foreach {
col =>
val inputField: StructField = schema(col)
val backgroundField = this.getBackgroundData.schema(col)
require(
DataType.equalsStructurally(inputField.dataType, backgroundField.dataType, ignoreNullability = true),
s"Field $col has type ${inputField.dataType} from input instance, but type ${backgroundField.dataType} " +
s"from background dataset."
)
}
}
}
}
object TabularSHAP extends ComplexParamsReadable[TabularSHAP]