In [1]:
!which python

/home/07.000504/Documents/poetry-envs/pyspark-ds-toolbox-Fn-Rjt-3-py3.7/bin/python


In [2]:
import pandas as pd
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, StructField, StructType, StringType
from pyspark.ml.linalg import VectorUDT

import pyspark.ml.feature as FF
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier


In [3]:
from pyspark_ds_toolbox.ml.eval import get_p1

In [4]:
spark = SparkSession.builder\
                .appName('Ml-Pipes') \
                .master('local[1]') \
                .config('spark.executor.memory', '3G') \
                .config('spark.driver.memory', '3G') \
                .config('spark.memory.offHeap.enabled', 'true') \
                .config('spark.memory.offHeap.size', '3G') \
                .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

21/12/03 17:52:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
def read_data(file): 
    return pd.read_stata("https://raw.github.com/scunning1975/mixtape/master/" + file)

df = read_data('nsw_mixtape.dta')
df = pd.concat((df, read_data('cps_mixtape.dta')))
df.reset_index(level=0, inplace=True)

df = spark.createDataFrame(df.drop(columns=['data_id']))\
    .withColumn('age2', F.col('age')**2)\
    .withColumn('age3', F.col('age')**3)\
    .withColumn('educ2', F.col('educ')**2)\
    .withColumn('educ_re74', F.col('educ')*F.col('re74'))\
    .withColumn('u74', F.when(F.col('re74')==0, 1).otherwise(0))\
    .withColumn('u75', F.when(F.col('re75')==0, 1).otherwise(0))

features=['age', 'age2', 'age3', 'educ', 'educ2', 'marr', 'nodegree', 'black', 'hisp', 're74', 're75', 'u74', 'u75', 'educ_re74']
assembler = FF.VectorAssembler(inputCols=features, outputCol='features')
pipeline = Pipeline(stages = [assembler])
df_assembled = pipeline.fit(df).transform(df)

In [6]:
train_size=0.8
train, test = df_assembled.randomSplit([train_size, (1-train_size)], seed=12345)

model = GBTClassifier(labelCol='treat')
p = Pipeline(stages=[model])
p_fitted = p.fit(train)

df_predicted = p_fitted.transform(test).withColumn('probability', get_p1(F.col('probability')))

df_predicted.printSchema()
v = df_predicted.filter(F.col('index')==3).select('probability').collect()[0][0]
m = df_predicted.select('probability').toPandas().probability.mean()



root
 |-- index: long (nullable = true)
 |-- treat: double (nullable = true)
 |-- age: double (nullable = true)
 |-- educ: double (nullable = true)
 |-- black: double (nullable = true)
 |-- hisp: double (nullable = true)
 |-- marr: double (nullable = true)
 |-- nodegree: double (nullable = true)
 |-- re74: double (nullable = true)
 |-- re75: double (nullable = true)
 |-- re78: double (nullable = true)
 |-- age2: double (nullable = true)
 |-- age3: double (nullable = true)
 |-- educ2: double (nullable = true)
 |-- educ_re74: double (nullable = true)
 |-- u74: integer (nullable = false)
 |-- u75: integer (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: float (nullable = true)
 |-- prediction: double (nullable = false)





In [7]:
import os
# from psutil import virtual_memory
from pyspark import SparkConf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import functions as F, SparkSession, types as T, Window

import operator

import pyspark
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType

In [8]:
schema = StructType([
  StructField('id', IntegerType(), True),
  StructField('feature', StringType(), True),
  StructField('shap', FloatType(), True)
  ])

df = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)


In [9]:
def get_features_permutations(
        df,
        feature_names,
        output_col='features_permutations'
):
    """
    Creates a column for the ordered features and then shuffles it.
    The result is a dataframe with a column `output_col` that contains:
    [feat2, feat4, feat3, feat1],
    [feat3, feat4, feat2, feat1],
    [feat1, feat2, feat4, feat3],
    ...
    """
    return df.withColumn(
        output_col,
        F.shuffle(
            F.array(*[F.lit(f) for f in feature_names])
        )
    )


def calculate_shapley_values(
        df,
        model,
        row_of_interest,
        feature_names,
        features_col='features',
        column_to_examine='anomalyScore'
):
    """
    # Based on the algorithm described here:
    # https://christophm.github.io/interpretable-ml-book/shapley.html#estimating-the-shapley-value
    # And on Baskerville's implementation for IForest/ AnomalyModel here:
    # https://github.com/equalitie/baskerville/blob/develop/src/baskerville/util/model_interpretation/helpers.py#L235
    """
    schema = StructType([
        StructField('id', IntegerType(), True),
        StructField('feature', StringType(), True),
        StructField('shap', FloatType(), True)
    ])

    results = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)

    features_perm_col = 'features_permutations'
    # spark = get_spark_session()
    marginal_contribution_filter = F.avg('marginal_contribution').alias('shap_value')
    # broadcast the row of interest and ordered feature names
    ROW_OF_INTEREST_BROADCAST = spark.sparkContext.broadcast(row_of_interest)
    ORDERED_FEATURE_NAMES = spark.sparkContext.broadcast(feature_names)

    # persist before continuing with calculations
    if not df.is_cached:
        df = df.persist()

    # get permutations
    features_df = get_features_permutations(
        df,
        feature_names,
        output_col=features_perm_col
    )

    # set up the udf - x-j and x+j need to be calculated for every row
    def calculate_x(
            feature_j, z_features, curr_feature_perm
    ):
        """
        The instance  x+j is the instance of interest,
        but all values in the order before feature j are
        replaced by feature values from the sample z
        The instance  x−j is the same as  x+j, but in addition
        has feature j replaced by the value for feature j from the sample z
        """
        x_interest = ROW_OF_INTEREST_BROADCAST.value
        ordered_features = ORDERED_FEATURE_NAMES.value
        x_minus_j = list(z_features).copy()
        x_plus_j = list(z_features).copy()
        f_i = curr_feature_perm.index(feature_j)
        after_j = False
        for f in curr_feature_perm[f_i:]:
            # replace z feature values with x of interest feature values
            # iterate features in current permutation until one before j
            # x-j = [z1, z2, ... zj-1, xj, xj+1, ..., xN]
            # we already have zs because we go row by row with the udf,
            # so replace z_features with x of interest
            f_index = ordered_features.index(f)
            new_value = x_interest[f_index]
            x_plus_j[f_index] = new_value
            if after_j:
                x_minus_j[f_index] = new_value
            after_j = True

        # minus must be first because of lag
        return Vectors.dense(x_minus_j), Vectors.dense(x_plus_j)

    udf_calculate_x = F.udf(calculate_x, T.ArrayType(VectorUDT()))

    # persist before processing
    features_df = features_df.persist()

    for f in feature_names:
        # x column contains x-j and x+j in this order.
        # Because lag is calculated this way:
        # F.col('anomalyScore') - (F.col('anomalyScore') one row before)
        # x-j needs to be first in `x` column array so we should have:
        # id1, [x-j row i,  x+j row i]
        # ...
        # that with explode becomes:
        # id1, x-j row i
        # id1, x+j row i
        # ...
        # to give us (x+j - x-j) when we calculate marginal contribution
        # Note that with explode, x-j and x+j for the same row have the same id
        # This gives us the opportunity to use lag with
        # a window partitioned by id
        x_df = features_df.withColumn('x', udf_calculate_x(
            F.lit(f), features_col, features_perm_col
        )).persist()

        # Calculating SHAP values for f
        x_df = x_df.selectExpr(
            'id', f'explode(x) as {features_col}'
        ).cache()
        x_df = model.transform(x_df).withColumn('probability', get_p1(F.col('probability')))

        # marginal contribution is calculated using a window and a lag of 1.
        # the window is partitioned by id because x+j and x-j for the same row
        # will have the same id
        x_df = x_df.withColumn(
            'marginal_contribution',
            F.col(column_to_examine) - F.lag(F.col(column_to_examine), 1).over(Window.partitionBy('id').orderBy('id'))
        )
        # calculate the average
        x_df = x_df.filter(x_df.marginal_contribution.isNotNull())
        
        temp = pd.DataFrame.from_dict({
            'id': [row_of_interest['id']],
            'feature': [f],
            'shap_value': [x_df.select(marginal_contribution_filter).first().shap_value]
        })
        temp = spark.createDataFrame(temp)
        # results[f] = x_df.select(
        #     marginal_contribution_filter
        # ).first().shap_value
        # x_df.unpersist()
        print(f'Marginal Contribution for feature: {f} = {x_df.select(marginal_contribution_filter).first().shap_value}')
        # break
        results = results.union(temp)
        #del x_df
 
    # ordered_results = sorted(
    #     results.items(),
    #     key=operator.itemgetter(1),
    #     reverse=True
    # )
    return results

In [13]:
a = calculate_shapley_values(
    df = df_predicted.withColumnRenamed('index', 'id'),
    model = p_fitted,
    row_of_interest = df_predicted.filter(F.col('index')==3).withColumnRenamed('index', 'id').first(),
    feature_names = features,
    features_col='features',
    column_to_examine='probability'
)
type(a)



Marginal Contribution for feature: age = 0.0036901990057246347




Marginal Contribution for feature: age2 = 0.0005360889033629344




Marginal Contribution for feature: age3 = 0.00030300699254374596




Marginal Contribution for feature: educ = 0.009800358720959015




Marginal Contribution for feature: educ2 = 0.0004174735999100252




Marginal Contribution for feature: marr = -0.0029881995673345253




Marginal Contribution for feature: nodegree = 0.000807448134790344




Marginal Contribution for feature: black = -0.004834027502177084




Marginal Contribution for feature: hisp = 0.004992575887268543




Marginal Contribution for feature: re74 = -0.009940294177895085




Marginal Contribution for feature: re75 = -0.016294625012973768




Marginal Contribution for feature: u74 = 0.006179875643123063




Marginal Contribution for feature: u75 = 0.0025265846272611732




Marginal Contribution for feature: educ_re74 = -0.000784117700148887




pyspark.sql.dataframe.DataFrame

In [14]:
a.show(5)

+---+-------+--------------------+
| id|feature|                shap|
+---+-------+--------------------+
|  3|    age|0.003690199005724...|
|  3|   age2|5.360889033629344E-4|
|  3|   age3|3.030069925437459...|
|  3|   educ|0.009800358720959015|
|  3|  educ2|4.174735999100252E-4|
+---+-------+--------------------+
only showing top 5 rows



In [15]:
print(df_predicted.select('probability').toPandas().probability.mean() + a.select(F.sum('shap')).collect()[0][0])

print(f'{v}')



0.04829068469926582
0.04364077374339104




In [None]:
print(df_predicted.select('probability').toPandas().probability.mean() + a.select(F.sum('shap')).collect()[0][0])

print(f'{v}')



0.052278824448785434
0.043646443635225296




In [84]:
v

0.043646443635225296