## Installation: 
1. install conda (python 3)
2. conda install -c conda-forge pyspark 
3. conda install -c akode jupyter-spark 
4. conda install -c conda-forge findspark 
5. conda install -c anaconda nltk 
6. conda install -c conda-forge matplotlib
7. Download data
6. choose settings in first block for MIMIC / Charite
7. start and execute notebook: `jupyter notebook`, or `jupyter nbconvert --ExecutePreprocessor.timeout=0 --to notebook --execute k-anon-venn.ipynb`

## Access notebook:
* http://localhost:8888/notebooks

## Access spark server:
* http://localhost:4040/jobs

In [None]:
## Settings

## MIMIC Local
spark_dir = "/usr/local/Cellar/apache-spark/2.4.1/libexec/"
temp_dir = "/Volumes/Work/TMP"
# temp_dir = None
lang = "english"
f_name = '../data/raw/NOTEEVENTS.csv'
text_col = "TEXT"
id_col = "SUBJECT_ID"
limit = 100000  # 10'000~100 MB, 100'000~1GB~10hours
results_df_file = "/Users/jspenger/dev/SHK/clinicaltextnormalization/notebooks/results_df08082019.parquet"

In [None]:
import pyspark
import pyspark.ml
import pandas
import nltk
import numpy
import re
import findspark
import pyspark.mllib.linalg.distributed
import itertools
import sklearn.cluster
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt

In [None]:
matplotlib.rcParams['figure.dpi'] = 300 

In [None]:
findspark.init(spark_dir)

spark = pyspark.sql.SparkSession.builder.master("local[*]")
spark = spark.config("spark.driver.memory", "8g")
spark = spark.config("spark.executor.memory", "8g")
spark = spark.config("spark.driver.maxResultSize", "8g")
if temp_dir is not None:
    spark = spark.config("spark.local.dir", temp_dir)
spark = spark.getOrCreate()

spark.sparkContext.getConf().getAll()

In [None]:
def __remove_empty_tokens(l_s):
    return list(filter(None, l_s))

rm_empty_tokens = pyspark.sql.functions.udf(__remove_empty_tokens, pyspark.sql.types.ArrayType(pyspark.sql.types.StringType()))
                
def __sentence_tokenizer(s):
    return nltk.tokenize.sent_tokenize(s)

sentence_tokenizer = pyspark.sql.functions.udf(__sentence_tokenizer, pyspark.sql.types.ArrayType(pyspark.sql.types.StringType()))

def __preprocessor(l_s):
    return [re.sub(r'([^a-zA-Z0-9\s]+)([a-zA-Z0-9])|([a-zA-Z0-9])([^a-zA-Z0-9\s]+)',r'\2\3',s).lower() for s in l_s]

preprocessor = pyspark.sql.functions.udf(__preprocessor, pyspark.sql.types.ArrayType(pyspark.sql.types.StringType()))

def __non_alpha_replace(l_s):
    return [re.sub(r'[^\w\s#]+', '@', re.sub('[0-9][0-9]*', '#', s)) for s in l_s]

non_alpha_replace = pyspark.sql.functions.udf(__non_alpha_replace, pyspark.sql.types.ArrayType(pyspark.sql.types.StringType()))

if lang == "english":
    stemmer = nltk.stem.snowball.SnowballStemmer("english")
elif lang == "german":
    stemmer = nltk.stem.snowball.SnowballStemmer("german")

def __lemmatizer(l_s):
    return [stemmer.stem(s) for s in l_s]

lemmatizer = pyspark.sql.functions.udf(__lemmatizer, pyspark.sql.types.ArrayType(pyspark.sql.types.StringType()))

def __ngrams_generator(l_l_s, n):
    return [[" ".join(nls) for nls in zip(*[l_s[i:] for i in range(n)])] for l_s in l_l_s]

ngrams_generator = pyspark.sql.functions.udf(__ngrams_generator, pyspark.sql.types.ArrayType(pyspark.sql.types.ArrayType(pyspark.sql.types.StringType())))

def __nbag_from_ngram(s):
    return " ".join(sorted(s.split(" ")))

nbag_from_ngram = pyspark.sql.functions.udf(__nbag_from_ngram, pyspark.sql.types.StringType())

def __flatten(l_l_s):
    return [s for l_s in l_l_s for s in l_s]

flatten = pyspark.sql.functions.udf(__flatten, pyspark.sql.types.ArrayType(pyspark.sql.types.StringType()))

In [None]:
def normalize(dataset=None, dataset_inputCol=None, dataset_outputCol=None):
    ## train model on text sentences
    model = pyspark.ml.feature.Word2Vec(inputCol=dataset_inputCol, outputCol="EMBEDDING").fit(dataset)
    w2v = model.getVectors().toPandas()
    w2vv = numpy.array(list(w2v['vector'].map(lambda x: x.values)))
    w2vw = list(w2v['word'])

    ## cluster the 1 embeddings to find similar words using DBSCAN
    dbscan = sklearn.cluster.DBSCAN(metric='cosine', eps=0.1, min_samples=5)
    dbscan.fit(w2vv)

    ## build dictionary for normalizing key to value:
    ## key is element in cluster and value is cluster core
    lexicon = {}
    for i, label in enumerate(dbscan.labels_):
        if label != -1:
            lexicon[w2vw[i]] = w2vw[dbscan.core_sample_indices_[label]]

    ## remove all keys that are also values
    for key in set(lexicon.values()):
        lexicon.pop(key)

    if (len(lexicon) == 0):
        return dataset.withColumn(dataset_outputCol, pyspark.sql.functions.col(dataset_inputCol))
    else:
        return dataset.withColumn(dataset_outputCol, 
            pyspark.sql.functions.udf(
                lambda l_s: [lexicon.get(s, s) for s in l_s], 
                pyspark.sql.types.ArrayType(pyspark.sql.types.StringType()))(dataset_inputCol)
            )

In [None]:
# %%time
results_df = None

## load data
df = spark.read.options(
    mode='FAILFAST', 
    multiLine=True, 
    escape='"').csv(f_name, header=True)

if limit is not None:
    df = df.limit(limit)

# tokenize sentences
df_1 = df.withColumn(
    "df_1", 
    pyspark.sql.functions.explode(sentence_tokenizer(text_col)))
# tokenize words
df_2_old = pyspark.ml.feature.Tokenizer(
    inputCol = "df_1", 
    outputCol = "df_2_old"
    ).transform(df_1)
# remove empty tokens
df_2 = df_2_old.withColumn(
    "df_2",
    rm_empty_tokens("df_2_old")).select(id_col, "df_2").persist(pyspark.StorageLevel.OFF_HEAP)

for preprocess_bool in [True, False]:
    if preprocess_bool:
        df_3 = df_2.withColumn(
            "df_3", 
            preprocessor("df_2")).drop("df_2")
    else:
        df_3 = df_2.withColumn(
            "df_3", 
            pyspark.sql.functions.col("df_2")).drop("df_2")
        
    for non_alpha_replace_bool in [True, False]:
        if not preprocess_bool and non_alpha_replace_bool:
            continue
        if non_alpha_replace_bool:
            df_4 = df_3.withColumn(
                "df_4", 
                non_alpha_replace("df_3")).drop("df_3")
        else:
            df_4 = df_3.withColumn("df_4", pyspark.sql.functions.col("df_3")).drop("df_3")
       
        for lemmatize_bool in [True, False]:
            if not non_alpha_replace_bool and lemmatize_bool:
                continue
            if lemmatize_bool:
                df_5 = df_4.withColumn(
                    "df_5", 
                    lemmatizer("df_4")).drop("df_4")
            else:
                df_5 = df_4.withColumn(
                    "df_5", 
                    pyspark.sql.functions.col("df_4")).drop("df_4")
            for normalization_bool in [True, False]:
                if not lemmatize_bool and normalization_bool:
                    continue
                if normalization_bool:
                    df_6 = normalize(
                        dataset=df_5, 
                        dataset_inputCol="df_5", 
                        dataset_outputCol="df_6").drop("df_5").persist(pyspark.StorageLevel.OFF_HEAP)
                else:
                    df_6 = df_5.withColumn(
                        "df_6", 
                        pyspark.sql.functions.col("df_5")).drop("df_5").persist(pyspark.StorageLevel.OFF_HEAP)
                
                for n in [1, 2, 4, 8]:
                    df_7 = pyspark.ml.feature.NGram(
                        n=n, 
                        inputCol="df_6", 
                        outputCol="NGRAMS"
                        ).transform(df_6)\
                        .withColumn(
                            "df_7", 
                            pyspark.sql.functions.explode("NGRAMS")).drop("df_6")

                    for ngram_bool in [True, False]:
                        if not ngram_bool and n == 1:
                            continue
                        if not normalization_bool and not ngram_bool:
                            continue
                        
                        strategy = ""
                        if preprocess_bool:
                            strategy += " preprocess "
                        if non_alpha_replace_bool:
                            strategy += " non_alpha_replace "
                        if lemmatize_bool:
                            strategy += " lemmatize "
                        if normalization_bool:
                            strategy += " normalization "

                        if ngram_bool:
                            strategy += " " + str(n) + "-gram "
                        else:
                            strategy += " " + str(n) + "-bag "
                        if ngram_bool:
                            df_8 = df_7.withColumn(
                                    "df_8", 
                                    pyspark.sql.functions.col("df_7")).drop("df_7")
                        else:
                            df_8 = df_7.withColumn("df_8", nbag_from_ngram("df_7")).drop("df_7")

                        # count
                        df_9 = df_8.groupBy("df_8").agg(
                            pyspark.sql.functions.count("df_8").alias("COUNT"), 
                            pyspark.sql.functions.countDistinct("df_8", id_col).alias("DISTINCTCOUNT")
                        ).drop('df_8')
                        
                        result_df = df_9.withColumn("STRATEGY", pyspark.sql.functions.lit(strategy))
                        if results_df == None:
                            results_df = result_df
                        else:
                            results_df = results_df.union(result_df)

results_df.repartition("STRATEGY").write.partitionBy("STRATEGY").format("parquet").save(results_df_file)

In [None]:
%%time

results_df = spark.read.parquet(results_df_file)
results_df.show()

In [None]:
results_df.groupBy("STRATEGY").agg(
    pyspark.sql.functions.count("*"),
    pyspark.sql.functions.sum("COUNT"),
    pyspark.sql.functions.sum("DISTINCTCOUNT"),
    ).withColumn("order", pyspark.sql.functions.regexp_extract("STRATEGY", "(\d+)" , 1))\
    .sort(["order", "STRATEGY"])\
    .show(100, False)


In [None]:
%%time

results_df.select('STRATEGY').distinct().sort("STRATEGY").show(1000, False)

In [None]:
%%time

strategies_df = results_df.select('STRATEGY').distinct().sort("STRATEGY")
strategies_set = [strategies_df.filter("STRATEGY LIKE '%{}%'".format(i)).rdd.flatMap(lambda x: x).collect() for i in [1, 2, 4, 8]]
print(list(strategies_set))

In [None]:
# translate labels
trans_labels = {
    '1-gram': '1g',
    '2-gram': '2g',
    '4-gram': '4g',
    '8-gram': '8g',
    '1-bag': '1gb',
    '2-bag': '2gb',
    '4-bag': '4gb',
    '8-bag': '8gb',
    'preprocess': 'pr',
    'non_alpha_replace': 'su',
    'lemmatize': 'le',
    'normalization': 'wn'}
trans_labels_inv = {v: k for k, v in trans_labels.items()}

strategies_set_mod = [[" ".join(map(lambda x: trans_labels.get(x, x), s.split())) for s in strategies] for strategies in strategies_set ]
[l.sort() for l in strategies_set_mod]
# strategies_set
strategies_set = [[" " + "  ".join(map(lambda x: trans_labels_inv.get(x, x), s.split())) + " " for s in strategies] for strategies in strategies_set_mod ]
strategies_set

In [None]:
# k-corpus-size

fig, axs = plt.subplots(2, 2, figsize=(1.618 * 10.0 / 2.3, 10.0 / 2.0), sharex=True, sharey=True)
axs = numpy.ravel(axs)

from pyspark.ml.feature import Bucketizer

bins = [1, 20, 40, 60, 80, 100, float("inf")]
bucketizer = Bucketizer(splits=bins, inputCol="DISTINCTCOUNT", outputCol="buckets")

for i, strategies in enumerate(strategies_set):
    
    bar_width = 20.0/8.0
    labels = [" ".join(map(lambda x: trans_labels.get(x, x), s.split())) for s in strategies]
    xticks = [i + float(bar_width) * float(len(labels)) / 2.0 for i in bins[:-1]]
    xticklabels = [str(x) for x in bins[:-1]]
    offsets = [i*bar_width for i in range(len(labels))]
    
    for offset, label, strategy in zip(offsets, labels, strategies):
        r = results_df.filter(results_df.STRATEGY == strategy)
        bucketData = bucketizer.transform(r)
        data = bucketData.groupby("buckets", "STRATEGY").agg(
            pyspark.sql.functions.sum("COUNT").alias("COUNTSUM"), 
            pyspark.sql.functions.sum("DISTINCTCOUNT").alias("DISTINCTCOUNTSUM"),
            pyspark.sql.functions.count("*").alias("NUMCOUNT")
        ).toPandas().sort_values(by="buckets")
        hist = data["NUMCOUNT"].reset_index(drop=True)
        axs[i].bar(numpy.array(bins[:-1]) + offset, numpy.flip(numpy.cumsum(numpy.flip(hist[:]))), width=bar_width, label=label, log=True, align='edge')
    axs[i].legend(bbox_to_anchor=(0,1.02,1,0.2), loc="lower left", mode="expand", borderaxespad=0, ncol=2)
    axs[i].set_xticks(xticks)
    axs[i].set_xticklabels(xticklabels)

axs[0].set_ylabel("corpus size")
axs[2].set_ylabel("corpus size")
axs[2].set_xlabel("k") 
axs[3].set_xlabel("k")

plt.tight_layout()
plt.subplots_adjust(top=0.79)
plt.suptitle('k-anonymous corpus size')
plt.show() 

In [None]:
# k-corpus-cover

fig, axs = plt.subplots(2, 2, figsize=(1.618 * 10.0 / 2.3, 10.0 / 2.0), sharey=True)
axs = numpy.ravel(axs)

from pyspark.ml.feature import Bucketizer

bins = [1, 20, 40, 60, 80, 100, float("inf")]
bucketizer = Bucketizer(splits=bins, inputCol="DISTINCTCOUNT", outputCol="buckets")

for i, strategies in enumerate(strategies_set):
    
    bar_width = 20.0/8.0
    labels = [" ".join(map(lambda x: trans_labels.get(x, x), s.split())) for s in strategies]
    xticks = [i + float(bar_width) * float(len(labels)) / 2.0 for i in bins[:-1]]
    xticklabels = [str(x) for x in bins[:-1]]
    offsets = [i*bar_width for i in range(len(labels))]
    
    for offset, label, strategy in zip(offsets, labels, strategies):
        r = results_df.filter(results_df.STRATEGY == strategy)
        bucketData = bucketizer.transform(r)
        data = bucketData.groupby("buckets", "STRATEGY").agg(
            pyspark.sql.functions.sum("COUNT").alias("COUNTSUM"), 
            pyspark.sql.functions.sum("DISTINCTCOUNT").alias("DISTINCTCOUNTSUM"),
            pyspark.sql.functions.count("*").alias("NUMCOUNT")
        ).toPandas().sort_values(by="buckets")
        hist = data["COUNTSUM"].reset_index(drop=True)
        axs[i].bar(numpy.array(bins[:-1]) + offset, numpy.flip(numpy.cumsum(numpy.flip(hist[:]))) / numpy.sum(hist), width=bar_width, label=label, align='edge')
    axs[i].legend(bbox_to_anchor=(0,1.02,1,0.2), loc="lower left", mode="expand", borderaxespad=0, ncol=2)
    axs[i].set_xticks(xticks)
    axs[i].set_xticklabels(xticklabels)
#     axs[i].set_yticklabels(['{:.0%}'.format(x) for x in axs[i].get_yticks()])
    axs[i].yaxis.set_major_formatter(matplotlib.ticker.PercentFormatter(1))
    
axs[0].set_ylabel("corpus cover")
axs[2].set_ylabel("corpus cover")
axs[2].set_xlabel("k") 
axs[3].set_xlabel("k")

plt.tight_layout()
plt.subplots_adjust(top=0.79)
plt.suptitle('k-anonymous corpus cover')
plt.show() 

In [None]:
# k-corpus-ratio

fig, axs = plt.subplots(2, 2, figsize=(1.618 * 10.0 / 2.3, 10.0 / 2.0), sharey=True, sharex=True)
axs = numpy.ravel(axs)

from pyspark.ml.feature import Bucketizer

bins = [1, 20, 40, 60, 80, 100, float("inf")]
bucketizer = Bucketizer(splits=bins, inputCol="DISTINCTCOUNT", outputCol="buckets")

for i, strategies in enumerate(strategies_set):
    
    bar_width = 20.0/8.0
    labels = [" ".join(map(lambda x: trans_labels.get(x, x), s.split())) for s in strategies]
    xticks = [i + float(bar_width) * float(len(labels)) / 2.0 for i in bins[:-1]]
    xticklabels = [str(x) for x in bins[:-1]]
    offsets = [i*bar_width for i in range(len(labels))]
    
    basic = None
    
    for offset, label, strategy in zip(offsets, labels, strategies):
        r = results_df.filter(results_df.STRATEGY == strategy)
        bucketData = bucketizer.transform(r)
        data = bucketData.groupby("buckets", "STRATEGY").agg(
            pyspark.sql.functions.sum("COUNT").alias("COUNTSUM"), 
            pyspark.sql.functions.sum("DISTINCTCOUNT").alias("DISTINCTCOUNTSUM"),
            pyspark.sql.functions.count("*").alias("NUMCOUNT")
        ).toPandas().sort_values(by="buckets")
        hist = data["COUNTSUM"].reset_index(drop=True)
        if basic is None:
            basic = numpy.flip(numpy.cumsum(numpy.flip(hist[:])))
        x = numpy.array(bins[:-1]) + offset
        y = numpy.flip(numpy.cumsum(numpy.flip(hist[:]))) / basic
        axs[i].bar(x, y, width=bar_width, label=label, align='edge')
    axs[i].legend(bbox_to_anchor=(0,1.02,1,0.2), loc="lower left", mode="expand", borderaxespad=0, ncol=2)
    axs[i].set_xticks(xticks)
    axs[i].set_xticklabels(xticklabels)
    
axs[0].set_ylabel("corpus ratio")
axs[2].set_ylabel("corpus ratio")
axs[2].set_xlabel("k") 
axs[3].set_xlabel("k")

plt.tight_layout()
plt.subplots_adjust(top=0.79)
plt.suptitle('k-anonymous corpus ratio')
plt.show() 

In [None]:
# k-token-ratio

fig, axs = plt.subplots(2, 2, figsize=(1.618 * 10.0 / 2.3, 10.0 / 2.0), sharex=True)
axs = numpy.ravel(axs)

from pyspark.ml.feature import Bucketizer

bins = [1, 20, 40, 60, 80, 100, float("inf")]
bucketizer = Bucketizer(splits=bins, inputCol="COUNT", outputCol="buckets")

for i, strategies in enumerate(strategies_set):
    
    bar_width = 20.0/8.0
    labels = [" ".join(map(lambda x: trans_labels.get(x, x), s.split())) for s in strategies]
    xticks = [i + float(bar_width) * float(len(labels)) / 2.0 for i in bins[:-1]]
    xticklabels = [str(x) for x in bins[:-1]]
    offsets = [i*bar_width for i in range(len(labels))]
    
    basic = None
    b = None
    for offset, label, strategy in zip(offsets, labels, strategies):
        r = results_df.filter(results_df.STRATEGY == strategy)
        bucketData = bucketizer.transform(r)
        data = bucketData.groupby("buckets", "STRATEGY").agg(
            pyspark.sql.functions.sum("COUNT").alias("COUNTSUM"), 
            pyspark.sql.functions.sum("DISTINCTCOUNT").alias("DISTINCTCOUNTSUM"),
            pyspark.sql.functions.count("*").alias("NUMCOUNT")
        ).toPandas().sort_values(by="buckets")
        hist = data["NUMCOUNT"].reset_index(drop=True)
        if basic is None:
            basic = numpy.flip(numpy.cumsum(numpy.flip(hist[:])))
            b = hist
        x = numpy.array(bins[:-1]) + offset
        y1 = numpy.flip(numpy.cumsum(numpy.flip(hist[:])))
        acc = 0
        accl = [0]
        for j in range(len(basic) - 1):
            acc = acc + b[j]
            acc = acc - hist[j]
            accl.append(acc)
        y2 = basic + accl
        y = y2 / y1
        axs[i].bar(x, y, width=bar_width, label=label, align='edge')
    axs[i].legend(bbox_to_anchor=(0,1.02,1,0.2), loc="lower left", mode="expand", borderaxespad=0, ncol=2)
    axs[i].set_xticks(xticks)
    axs[i].set_xticklabels(xticklabels)
    
axs[0].set_ylabel("token ratio")
axs[2].set_ylabel("token ratio")
axs[2].set_xlabel("k") 
axs[3].set_xlabel("k")

plt.tight_layout()
plt.subplots_adjust(top=0.79)
plt.suptitle('k-anonymous token ratio')
plt.show() 