# Set Up

In [None]:
# Input each setting value of Azure Data Storage 
MOUNTPOINT = "/mnt/termpaper"

if MOUNTPOINT in [mnt.mountPoint for mnt in dbutils.fs.mounts()]:
  dbutils.fs.unmount(MOUNTPOINT)

STORAGE_ACCOUNT = "cs777"
CONTAINER = "termpaper"
SASTOKEN = dbutils.secrets.get(scope="tomo0530", key="storageaccess")

SOURCE = "wasbs://{container}@{storage_acct}.blob.core.windows.net/".format(container=CONTAINER, storage_acct=STORAGE_ACCOUNT)
URI = "fs.azure.sas.{container}.{storage_acct}.blob.core.windows.net".format(container=CONTAINER, storage_acct=STORAGE_ACCOUNT)

In [None]:
# Mount Azure Data Strorage on Databricks
try:
  dbutils.fs.mount(
    source=SOURCE,
    mount_point=MOUNTPOINT,
    extra_configs={URI:SASTOKEN})
except Exception as e:
  if "Directory already mounted" in str(e):
    pass # Ignore error if already mounted.
  else:
    raise e
print("Success.")

In [None]:
# Set the file path
file1 = MOUNTPOINT + "/a9a.txt"
file2 = MOUNTPOINT + "/kddb-raw-libsvm.t.bz2"
file3 = MOUNTPOINT + "/kdda.bz2"

In [None]:
from functools import wraps
import math
import time
from typing import Tuple, Dict

import numpy as np
import pandas as pd

from sklearn.datasets import load_svmlight_file
from sklearn.feature_selection import chi2
from sklearn.feature_selection import SelectKBest

import pyspark.sql
from pyspark.sql import SparkSession, Column
from pyspark.ml.feature import ChiSqSelector
from pyspark.sql.functions import *
from pyspark.ml.linalg import *

In [None]:
# Functions for this notebook
def read_libsvm_data(filepath: str) -> Tuple[pd.DataFrame, np.ndarray]:
    """
    Reads libsvm file using cache
    :param filepath: the file path
    :return: the feature dataframe and target
    """
    data = load_svmlight_file('/dbfs' + filepath)
    X, y = pd.DataFrame(data[0].toarray()), data[1]
    return X, y


def stop_watch(func):
    """
    Measures the function execution time and prints it
    :param func: the executed function
    :return: the execution result
    """

    @wraps(func)
    def wrapper(*args, **kargs):
        start = time.time()
        result = func(*args, **kargs)
        elapsed_time = time.time() - start
        print(f"It took {elapsed_time} seconds to execute {func.__name__}.")
        return result

    return wrapper


@stop_watch
def feature_selection_chi2_sklearn(X: pd.DataFrame, y: np.ndarray,
                                   k: int) -> None:
    """
    Selects features based on chi square test by scikit-learn
    :param X: the feature dataframe
    :param y: the target array
    :param k: the number of features selected
    :return: None
    """
    selector = SelectKBest(chi2, k=k)
    selector.fit_transform(X, y)
    new_col1 = X.columns[selector.get_support()]
    print('Selected attribute: ', new_col1.to_list())


def read_libsvm_data_by_spark(filepath: str) -> pyspark.sql.DataFrame:
    """
    Reads libsvm file as spark dataframe
    :param filepath: the file path
    :return: the spark dataframe
    """
    spark = SparkSession.builder.appName("feature_selection").getOrCreate()
    df = spark.read.format("libsvm").load(filepath)
    return df


@stop_watch
def feature_selection_chi2_sparkml(df: pyspark.sql.DataFrame, k: int) -> None:
    """
    Selects features based on chi square test by Spark ML
    :param df: the spark dataframe
    :param k: the number of features selected
    :return: None
    """
    selector = ChiSqSelector(numTopFeatures=k, outputCol="selectedFeatures")
    model = selector.fit(df)
    print('Selected attribute: ', model.selectedFeatures)


@udf("map<long, double>")
def vector_as_mapUDF(v: pyspark.sql.Column) -> Dict[list, list]:
    """
    Extracts indices and values from SparseVector or DenseVector
    :param v: the spark dataframe's column
    :return: the map object of indices and values
    """
    if isinstance(v, SparseVector):
        return dict(zip(v.indices.tolist(), v.values.tolist()))
    elif isinstance(v, DenseVector):
        return dict(zip(range(len(v)), v.values.tolist()))


@udf("double")
def chi2UDF(observed: pyspark.sql.Column, freq: pyspark.sql.Column,
            total: pyspark.sql.Column) -> Column:
    """
    Calculates the chi squared value from the features
    :param observed: the observed feature column
    :param freq: the frequent feature column
    :param total: the total feature column
    :return: the chi squared value
    """
    expected = freq * total
    return math.pow(observed - expected, 2) / expected


@stop_watch
def feature_selection_chi2_sparse(df: pyspark.sql.DataFrame, k: int) -> None:
    """
    Selects features based on chi square test over the paper's idea
    :param df: the spark dataframe
    :param k: the number of features selected
    :return: None
    """
    # sum of all features
    feature_sum = (df
                   .select(explode(vector_as_mapUDF("features"))
                           .alias("feature", "value"))
                   .groupBy(col("feature"))
                   .sum("value")
                   .withColumnRenamed("sum(value)", "total"))

    # compute the observed statistics
    observed = (df
                .select("label", explode(vector_as_mapUDF("features"))
                        .alias("feature", "value"))
                .groupBy(col("label"), col("feature"))
                .sum("value")
                .withColumnRenamed("sum(value)", "observed"))

    # class (label) probabilities
    class_count = df.groupBy("label").count()
    total_count = class_count.agg(sum("count")).first()[0]
    class_prob = (class_count
                  .select(col("label"),
                          (col("count") / total_count).alias("freq")))

    # join the three dataframe
    oe = (observed
          .join(class_prob, "label")
          .join(feature_sum, "feature"))

    # compute chi-squared value and sort by it
    chisquare = (oe
                 .withColumn("chi2",
                             chi2UDF(col("observed"), col("freq"),
                                     col("total")))
                 .groupBy("feature")
                 .sum("chi2")
                 .sort(desc("sum(chi2)")))

    # print the selected features
    print('Selected attribute: ',
          chisquare.select(collect_list('feature')).first()[0][:k])


# Experiment 1 (single node)

In [None]:
# Set the number of features selected
k = 5

## Dataset 1
Rows: 32,561 Features: 122

In [None]:
# Scikit-Learn
X, y = read_libsvm_data(file1)
feature_selection_chi2_sklearn(X, y, k)

In [None]:
# Read data as Spark Dataframe
df = read_libsvm_data_by_spark(file1)

In [None]:
# Spark ML
feature_selection_chi2_sparkml(df, k)

In [None]:
# Sparse Version 
feature_selection_chi2_sparse(df, k)

## Dataset 2
Rows: 748,401 Features: 1,163,024

In [None]:
df = read_libsvm_data_by_spark(file2)

In [None]:
feature_selection_chi2_sparkml(df, k)

In [None]:
feature_selection_chi2_sparse(df, k)

## Dataset 3
Rows: 8,407,752 Features: 20,216,830

In [None]:
df = read_libsvm_data_by_spark(file3)

In [None]:
feature_selection_chi2_sparkml(df, k)

In [None]:
feature_selection_chi2_sparse(df, k)

# Experiment 2 (multi node)

## Dataset 2

In [None]:
df = read_libsvm_data_by_spark(file2)

In [None]:
feature_selection_chi2_sparkml(df, k)

In [None]:
feature_selection_chi2_sparse(df, k)

## Dataset 3

In [None]:
df = read_libsvm_data_by_spark(file3)

In [None]:
feature_selection_chi2_sparkml(df, k)

In [None]:
feature_selection_chi2_sparse(df, k)