### NOTEBOOK PURPOSE AND INTRODUCTION

Given a dataframe derived from a clustering method (e.g. Kmeans) the below script will profile the cluster groups according to the attached features.

The script will automatically determine which features are categorical, one-hot encode them and then calculate the % of the cluster that are within each category.

It will also determine which features are continuous and calculate averages across the cluster group.

The input file requires each record to be a unique ID, with a single column designating its cluster group. As many additional features / columns as desired can be added.

While not part of the core purpsoe of the scipt there are brief examples of other useful functionality such as automatic categorical column cleaning, and discretising output features into codes.

<b>This script is in pyspark (useful for large datasets that would take too long to process with python). Note there is a separate example notebook for python with a similar method).<b>

In [0]:
# Dataiku libraries
import dataiku
from dataiku import spark as dkuspark

client = dataiku.api_client()
user = client.get_auth_info().get('authIdentifier', 'unknown')

# Change this to your recipe name (short)
appName = "cluster_profiles"

# -*- coding: utf-8 -*-
import os
import sys

# Spark imports (std lib)
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, functions as f, Window
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.ml.feature import QuantileDiscretizer, Bucketizer

# Import custom libraries here
import numpy as np
import pandas as pd
import datetime
from fuzzywuzzy import process, fuzz
from operator import add
from functools import reduce

os.environ["PYTHONWARNINGS"] = "ignore::DeprecationWarning"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
if (sys.version_info.major, sys.version_info.minor) != (3, 6):
    raise Exception("This code must be using the Python36 Spark environment.")
if appName == "":
    raise Exception("Please enter a (short) recipe name for Spark")

# Override any spark configurations here
conf = SparkConf()

# Initialise the Spark Context
spark = (SparkSession
         .builder
         .appName("{}: Recipe - {} (Py)".format(user, appName))
         .master("yarn")
         .config(conf=conf)
         .getOrCreate()
         )

sc = spark.sparkContext
sqlContext = SQLContext(sc)

sc

In [0]:
# Read recipe inputs

combined_df_hdfs = dataiku.Dataset("clusters_unprofiled_with_features")
combined_df = dkuspark.get_dataframe(sqlContext, quanta_df_hdfs)

In [0]:
#Remove any columns which have nulls greater than the threshold value specified
sample_df = combined_df.copy()
null_threshold = 0.25
null_number = sample_df.count() * null_threshold

null_list_collected = (
    sample_df
    .select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in sample_df.columns])
    .collect()
)

null_list = [c[0] for c in null_list_collected[0].asDict().items() if c[1] > null_number]

print("Dropping columns: ", null_list)

sample_df = sample_df.drop(*null_list)

In [0]:
# Get list of categorical features
categorical_cols = [c.name for c in sample_df.schema if c.dataType == StringType()]

# print("Categorical columns: ",categorical_cols)

In [0]:
# Clean all categorical columns to remove similar values
cat_cleaned_df = sample_df
manual_override_list = ['categories_you_do_not_want_deleted]

for col in categorical_cols:
    values = [tuple(c)[0] for c in cat_cleaned_df.select(col).distinct().collect()]
    values = [v for v in values if v != None]
    values_dedupe = list(process.dedupe(values, threshold=99))
    values_removed = [v for v in values if v not in values_dedupe]

    if len(values_removed) > 0:
        replacements_dict = {}
        for removed_value in values_removed:
            replacement_value = process.extract(removed_value, values_dedupe, limit=1)[0][0]
            replacements_dict[removed_value] = replacement_value

        for item in replacements_dict:
            if item not in manual_override_list:
                print(f"Column: {col}, replacing: {item} with {replacements_dict[item]}")
                cat_cleaned_df = cat_cleaned_df.withColumn(col,
                        f.when(f.col(col) == item, f.lit(replacements_dict[item])).otherwise(f.col(col)))

In [0]:
# ONE HOT ENCODING OF CATEGORICAL FEATURES
def clean_category(cat):
    cat = cat.replace("/", "_")
    cat = cat.replace(" ", "-")
    return cat

encoded_df = cat_cleaned_df

# Loop through the columns and do a manual one-hot encoding
for col in categorical_cols:
    rows = cat_cleaned_df.select(col).distinct().collect()
    for category in [tuple(c)[0] for c in rows]:
        if category is not None:
            # Replace certain chars
            category_str = clean_category(category)
            encoded_df = (
                encoded_df
                .withColumn(f"{col}_{category_str}",
                            f.when(f.col(col) == category, f.lit(1))
                            .otherwise(f.lit(0)))
            )
    # Remove the original column (no longer needed)
    encoded_df = encoded_df.drop(col)

In [0]:
# Convert BooleanType() columns to IntegerType() - required for aggregate function
bool_fix_list = [c.name for c in encoded_df.schema if c.dataType == BooleanType()]
for col in bool_fix_list:
    encoded_df = encoded_df.withColumn(col, f.col(col).cast(IntegerType()))

# Create the dictionary of columns on which we wish to aggregate
indexfield = "unique_identifier"
groupbyfield = "cluster_group"

# Get a list of columns that are boolean in nature that should be summed in aggregate function
sum_cols = []
for col in encoded_df.columns:
    values = [tuple(c)[0] for c in encoded_df.select(col).distinct().collect()]
    if values in ([1,0], [0,1], [0], [1]):
        sum_cols.append(col)

# Get list of integer and float columns that will be averaged when aggregated
avg_cols = [c.name for c in encoded_df.schema
            if c.dataType in [IntegerType(), DoubleType(), LongType()] 
            and c.name not in [indexfield, groupbyfield, *sum_cols]]

# Create a dictionary of aggregate levels for each column
agg_dict = {}
for col in avg_cols:
    agg_dict[col] = "avg"
for col in sum_cols:
    agg_dict[col] = "sum"
agg_dict[indexfield] = "count"

print("Features summed: ", sum_cols, "\n")
print("Features averaged: ", avg_cols, "\n")
print("Features counted: ", indexfield)

In [0]:
# Aggregate up by cluster
grouped_df = encoded_df.groupBy(groupbyfield).agg(agg_dict)

In [0]:
# Rename the 'avg' columns
cols_to_rename = [c for c in grouped_df.columns if "avg(" in c]
for col in cols_to_rename:
    rename = col[4:-1]  # remove the `avg()` from the name
    grouped_df = grouped_df.withColumnRenamed(col, f"{rename}_avg")

# Rename the 'sum' columns
cols_to_rename = [c for c in grouped_df.columns if "sum(" in c]
for col in cols_to_rename:
    rename = col[4:-1]  # remove the `sum()` from the name
    grouped_df = grouped_df.withColumnRenamed(col, f"{rename}")

# Rename the 'count' column
cols_to_rename = [c for c in grouped_df.columns if "count(" in c]
for col in cols_to_rename:
    rename = col[6:-1]  # remove the `count()` from the name
    grouped_df = grouped_df.withColumnRenamed(col, f"{rename}_num")

In [0]:
# Turn absolute values into % values by cluster
for col in sum_cols:
    grouped_df = (grouped_df.withColumn(col,
                            (f.col(col) * 100 / f.col(f"{indexfield}_num")))
                            .withColumnRenamed(col, f"{col}_pct")
                 )

In [0]:
# Calculate % of total population column
sample_size = grouped_df.agg(f.sum(f"{indexfield}_num")).collect()[0][0]

grouped_df = grouped_df.withColumn(f"{indexfield}_pct",
            (f.col(f"{indexfield}_num") * 100 / sample_size)
)

In [0]:
# Split continuous feature into 5 codes using quintiles, C1 is low, 5 is high
example_discretizer = QuantileDiscretizer(numBuckets=5, inputCol="continuous_feature_example", outputCol="feature_code")

grouped_df = example_discretizer.fit(grouped_df).setHandleInvalid("keep").transform(grouped_df)

# Set mapping of code bins
t = {0.0: "C1",
      1.0: "C2",
      2.0: "C3",
      3.0: "C4",
      4.0: "C5"}

udf_rename = udf(lambda x: t[x], StringType())
grouped_df = grouped_df.withColumn("feature_code", udf_rename("feature_code"))

In [0]:
# Split continuous feature into 3 codes using pre defined limits, C1 is low, 3 is high

grouped_df = grouped_df.withColumn("num_amenities_avg", num_amenities)

# Split example data into 3 groups
splits = [0, 1200, 2600, float("inf")]

bucketizer = Bucketizer(splits=splits, inputCol="num_amenities_avg", outputCol="test_code")

grouped_df = bucketizer.transform(grouped_df)

# Set mapping of churn bins
t = {0.0:"low",
      1.0: "med",
      2.0:"high"}

udf_rename = udf(lambda x: t[x], StringType())
grouped_df = grouped_df.withColumn("test_code", udf_rename("test_code"))
grouped_df = grouped_df.drop("num_amenities_avg")

In [0]:
# Add partition to profiles
date_value = quanta_df.limit(1).select("date").collect()[0]["date"]

# Create date partition column based on value in input table
grouped_df = grouped_df.withColumn("date", f.lit(date_value))