<a href="https://colab.research.google.com/github/websitecreatr99/Pyspark_WOE_IV/blob/main/WOE_IV_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
!java --version

openjdk 11.0.17 2022-10-18
OpenJDK Runtime Environment (build 11.0.17+8-post-Ubuntu-1ubuntu220.04)
OpenJDK 64-Bit Server VM (build 11.0.17+8-post-Ubuntu-1ubuntu220.04, mixed mode, sharing)


In [6]:
!pip install pyspark[pandas_on_spark]==3.2.1

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark[pandas_on_spark]==3.2.1
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.0/199.0 KB[0m [31m19.1 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853643 sha256=0da1a0bb5fe5b2fd7cd88246a173ff6dee35cb294472ca78df59f3a37b510a2b
  Stored in directory: /root/.cache/pip/wheels/58/94/83/915c9059e4b038e2d43a6058f307fe1c3e8536e5745f3b23b7
Successfully built pyspark
Installing collected pac

In [7]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [8]:
spark = SparkSession.builder \
               .appName('SparkByExamples.com') \
               .getOrCreate()

In [9]:
data = spark.read.csv("/content/Grid-Table-table694.csv", header=True)

In [10]:
type(data)

pyspark.sql.dataframe.DataFrame

In [11]:
data.show()

+-------+---+----+-----+-----+----+----+-------+
|capsule|age|race|dpros|dcaps| psa| vol|gleason|
+-------+---+----+-----+-----+----+----+-------+
|      0| 80|   1|    2|    1| 1.4|   0|      6|
|      0| 72|   1|    3|    2| 6.7|   0|      7|
|      0| 70|   1|    1|    2| 4.9|   0|      6|
|      0| 76|   2|    2|    1|51.2|  20|      7|
|      0| 69|   1|    1|    1|12.3|55.9|      6|
|      1| 71|   1|    3|    2| 3.3|   0|      8|
|      0| 68|   2|    4|    2|31.9|   0|      7|
|      0| 61|   2|    4|    2|66.7|27.2|      7|
|      0| 69|   1|    1|    1| 3.9|  24|      7|
|      0| 68|   2|    1|    2|  13|   0|      6|
|      1| 68|   2|    4|    2|   4|   0|      7|
|      1| 72|   1|    2|    2|21.2|   0|      7|
|      1| 72|   1|    4|    2|22.7|   0|      9|
|      1| 65|   1|    4|    2|  39|   0|      7|
|      0| 75|   1|    1|    1| 7.5|   0|      5|
|      0| 73|   1|    2|    1| 2.6|   0|      5|
|      0| 75|   2|    1|    1| 2.5|   0|      5|
|      0| 70|   1|  

In [34]:
import math

from pyspark.sql import DataFrame
from pyspark.sql import functions as F


class WOE_IV(object):
    def __init__(self, df: DataFrame, cols_to_woe: [str], label_column: str, good_label: str):
        self.df = df
        self.cols_to_woe = cols_to_woe
        self.label_column = label_column
        self.good_label = good_label
        self.fit_data = {}

    def fit(self):
        for col_to_woe in self.cols_to_woe:
            total_good = self.compute_total_amount_of_good()
            total_bad = self.compute_total_amount_of_bad()
            
            woe_df = self.df.select(col_to_woe)
            categories = woe_df.distinct().collect()
            for category_row in categories:
                category = category_row[col_to_woe]
                good_amount = self.compute_good_amount(col_to_woe, category)
                bad_amount = self.compute_bad_amount(col_to_woe, category)

                good_amount = good_amount if good_amount != 0 else 0.5
                bad_amount = bad_amount if bad_amount != 0 else 0.5

                good_dist = good_amount / total_good
                bad_dist = bad_amount / total_bad

                self.build_fit_data(col_to_woe, category, good_dist, bad_dist)

    def transform(self, df: DataFrame):
        def _encode_woe(col_to_woe_):
            return F.coalesce(
                *[F.when(F.col(col_to_woe_) == category, F.lit(woe_iv['woe']))
                  for category, woe_iv in self.fit_data[col_to_woe_].items()]
            )

        for col_to_woe, woe_info in self.fit_data.items():
            df = df.withColumn(col_to_woe + '_woe', _encode_woe(col_to_woe))
        return df

    def compute_total_amount_of_good(self):
        return self.df.select(self.label_column).filter(F.col(self.label_column) == self.good_label).count()

    def compute_total_amount_of_bad(self):
        return self.df.select(self.label_column).filter(F.col(self.label_column) != self.good_label).count()

    def compute_good_amount(self, col_to_woe: str, category: str):
        return self.df.select(col_to_woe, self.label_column)\
                      .filter(
                            (F.col(col_to_woe) == category) & (F.col(self.label_column) == self.good_label)
                      ).count()

    def compute_bad_amount(self, col_to_woe: str, category: str):
        return self.df.select(col_to_woe, self.label_column)\
                      .filter(
                            (F.col(col_to_woe) == category) & (F.col(self.label_column) != self.good_label)
                      ).count()

    def build_fit_data(self, col_to_woe, category, good_dist, bad_dist):
        woe_info = {
            category: {
                'woe': math.log(good_dist / bad_dist),
                'iv':  math.log(good_dist / bad_dist) * (good_dist - bad_dist)
            }
        }

        if col_to_woe not in self.fit_data:
            self.fit_data[col_to_woe] = woe_info
        else:
            self.fit_data[col_to_woe].update(woe_info)

    def compute_iv(self):
        iv_dict = {}

        for woe_col, categories in self.fit_data.items():
            iv_dict[woe_col] = 0
            for category, woe_iv in categories.items():
                iv_dict[woe_col] += woe_iv['iv']
        return iv_dict

In [35]:
from pyspark.sql import SparkSession

# from woe import WOE_IV


if __name__ == '__main__':
    # spark = SparkSession.builder.appName('woe-encoding').getOrCreate()

    df = spark.read.csv("/content/Grid-Table-table694.csv", header=True)

    # cols_to_woe = ['col_a', 'col_b']
    cols_to_woe = ['capsule','age','race','dpros','dcaps', 'psa', 'vol','gleason']
    woe = WOE_IV(df, cols_to_woe, 'capsule', 0)

    # woe encoding
    woe.fit()
    encoded_df = woe.transform(df)

    # information value
    ivs = woe.compute_iv()

In [36]:
print(ivs)

{'capsule': 11.813946865670689, 'age': 0.2646040114629034, 'race': 0.0097655015227033, 'dpros': 0.46969877049741027, 'dcaps': 0.24883754658468282, 'psa': 1.0633445755222402, 'vol': 0.41973615975490813, 'gleason': 1.1571232573722214}
