# Policy Laplace on Spark

In [1]:
import pyspark
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()

In [2]:
filepath = "../../differentially-private-set-union/data/clean_askreddit.csv"
reddit = spark.read.load(filepath, format="csv", sep=",",inferSchema="true", header="true").dropna()

In [3]:
import token_utils as ut

tokens_per_user = 100
n_grams = 1

def tokenize(posts):
    posts = [p.split(" ") for p in posts]
    if n_grams > 1:
        posts = [ut.tokens2ngram(p, n_grams) for p in posts]
    words = [tokens for p in posts for tokens in p]
    all_grams = list(set(words))
    if tokens_per_user > 1 and len(all_grams) > tokens_per_user:
        selected_ngrams = np.random.choice(all_grams, size=tokens_per_user, replace=False).tolist()
    else:
        selected_ngrams = all_grams
    return selected_ngrams
        
tokenized = reddit.select("author", "clean_text").rdd.groupByKey().map(lambda row: (row[0], tokenize(row[1]))).persist()

In [4]:
from policy_laplace import PolicyLaplace

epsilon = 3.0
delta = np.exp(-10)
alpha = 5.0

pl = PolicyLaplace(epsilon, delta, alpha, tokens_per_user)

Params Delta_0=100, delta=4.54e-05, l_param=0.3333333333333333, l_rho=4.6473335106659235, Gamma=6.31400017733259


In [5]:
ngh = tokenized.repartition(1).mapPartitions(pl.process_rows).take(1)

In [6]:
output_vocab = {}
for ng in ngh:
    for key, val in ng.items():
        if pl.exceeds_threshold(val):
            output_vocab[key] = val
    print("Retrieved {0} words from {1}".format(len(output_vocab),len(ngh[0].items())))


Retrieved 14758 words from 137921
