-
Notifications
You must be signed in to change notification settings - Fork 30
/
taar_similarity.py
400 lines (333 loc) · 15.6 KB
/
taar_similarity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
"""
Bug 1386274 - TAAR similarity-based add-on donor list
This job clusters users into different groups based on their
active add-ons. A representative users sample is selected from
each cluster ("donors") and is saved to a model file along
with a feature vector that will be used, by the TAAR library
module, to perform recommendations.
"""
import click
import json
import logging
import numpy as np
from datetime import date, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml import Pipeline
from pyspark.mllib.stat import KernelDensity
from pyspark.statcounter import StatCounter
from scipy.spatial import distance
from .taar_utils import store_json_to_s3
from .taar_utils import load_amo_curated_whitelist
from mozetl.utils import stop_session_safely
# Define the set of feature names to be used in the donor computations.
CATEGORICAL_FEATURES = ["city", "locale", "os"]
CONTINUOUS_FEATURES = [
"subsession_hours_sum",
"bookmark_count",
"tab_open_count",
"total_uri",
"unique_tlds",
]
logging.basicConfig(level=logging.INFO)
logging.getLogger("py4j").setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
def get_samples(spark, date_from):
"""
Get a DataFrame with a valid set of sample to base the next
processing on.
Sample is limited to submissions received since `date_from` and latest row per each client.
Reference documentation is found here:
Firefox Clients Daily telemetry table
https://docs.telemetry.mozilla.org/datasets/batch_view/clients_daily/reference.html
BUG 1485152: PR include active_addons to clients_daily table:
https://github.com/mozilla/telemetry-batch-view/pull/490
"""
df = (
spark.sql("SELECT * FROM clients_daily")
.where("client_id IS NOT null")
.where("active_addons IS NOT null")
.where("size(active_addons) > 2")
.where("size(active_addons) < 100")
.where("channel = 'release'")
.where("app_name = 'Firefox'")
.where("submission_date_s3 >= {}".format(date_from))
.selectExpr(
"client_id as client_id",
"active_addons as active_addons",
"city as city",
"cast(subsession_hours_sum as double)",
"locale as locale",
"os as os",
"places_bookmarks_count_mean AS bookmark_count",
"scalar_parent_browser_engagement_tab_open_event_count_sum "
"AS tab_open_count",
"scalar_parent_browser_engagement_total_uri_count_sum AS total_uri",
"scalar_parent_browser_engagement_unique_domains_count_mean AS unique_tlds",
"row_number() OVER (PARTITION BY client_id ORDER BY submission_date_s3 desc) as rn",
)
.where("rn = 1")
.drop("rn")
)
return df
def get_addons_per_client(users_df, addon_whitelist, minimum_addons_count):
"""Extracts a DataFrame that contains one row
for each client along with the list of active add-on GUIDs.
"""
def is_valid_addon(guid, addon):
return not (
addon.is_system
or addon.app_disabled
or addon.type != "extension"
or addon.user_disabled
or addon.foreign_install
or guid not in addon_whitelist
)
# Create an add-ons dataset un-nesting the add-on map from each
# user to a list of add-on GUIDs. Also filter undesired add-ons.
# Note that this list comprehension was restructured
# from the original longitudinal query. In particular, note that
# each client's 'active_addons' entry is a list containing the
# a dictionary of {addon_guid: {addon_metadata_dict}}
def flatten_valid_guid_generator(p):
for data in p["active_addons"]:
addon_guid = data["addon_id"]
if not is_valid_addon(addon_guid, data):
continue
yield addon_guid
return (
users_df.rdd.map(
lambda p: (p["client_id"], list(flatten_valid_guid_generator(p)))
)
.filter(lambda p: len(p[1]) > minimum_addons_count)
.toDF(["client_id", "addon_ids"])
)
def compute_clusters(addons_df, num_clusters, random_seed):
"""Performs user clustering by using add-on ids as features."""
# Build the stages of the pipeline. We need hashing to make the next
# steps work.
hashing_stage = HashingTF(inputCol="addon_ids", outputCol="hashed_features")
idf_stage = IDF(inputCol="hashed_features", outputCol="features", minDocFreq=1)
# As a future improvement, we may add a sane value for the minimum cluster size
# to BisectingKMeans (e.g. minDivisibleClusterSize). For now, just make sure
# to pass along the random seed if needed for tests.
kmeans_kwargs = {"seed": random_seed} if random_seed else {}
bkmeans_stage = BisectingKMeans(k=num_clusters, **kmeans_kwargs)
pipeline = Pipeline(stages=[hashing_stage, idf_stage, bkmeans_stage])
# Run the pipeline and compute the results.
model = pipeline.fit(addons_df)
return model.transform(addons_df).select(["client_id", "prediction"])
def get_donor_pools(users_df, clusters_df, num_donors, random_seed=None):
"""Samples users from each cluster."""
cluster_population = clusters_df.groupBy("prediction").count().collect()
clusters_histogram = [(x["prediction"], x["count"]) for x in cluster_population]
# Sort in-place from highest to lowest populated cluster.
clusters_histogram.sort(key=lambda x: x[0], reverse=False)
# Save the cluster ids and their respective scores separately.
clusters = [cluster_id for cluster_id, _ in clusters_histogram]
counts = [donor_count for _, donor_count in clusters_histogram]
# Compute the proportion of user in each cluster.
total_donors_in_clusters = sum(counts)
clust_sample = [float(t) / total_donors_in_clusters for t in counts]
sampling_proportions = dict(list(zip(clusters, clust_sample)))
# Sample the users in each cluster according to the proportions
# and pass along the random seed if needed for tests.
sampling_kwargs = {"seed": random_seed} if random_seed else {}
donor_df = clusters_df.sampleBy(
"prediction", fractions=sampling_proportions, **sampling_kwargs
)
# Get the specific number of donors for each cluster and drop the
# predicted cluster number information.
current_sample_size = donor_df.count()
donor_pool_df = donor_df.sample(
False, float(num_donors) / current_sample_size, **sampling_kwargs
)
return clusters, donor_pool_df
def get_donors(
spark, num_clusters, num_donors, addon_whitelist, date_from, random_seed=None
):
# Get the data for the potential add-on donors.
users_sample = get_samples(spark, date_from)
# Get add-ons from selected users and make sure they are
# useful for making a recommendation.
addons_df = get_addons_per_client(users_sample, addon_whitelist, 2)
addons_df.cache()
# Perform clustering by using the add-on info.
clusters = compute_clusters(addons_df, num_clusters, random_seed)
# Sample representative ("donors") users from each cluster.
cluster_ids, donors_df = get_donor_pools(
users_sample, clusters, num_donors, random_seed
)
# Finally, get the feature vectors for users that represent
# each cluster. Since the "active_addons" in "users_sample"
# are in a |MapType| and contain system add-ons as well, just
# use the cleaned up list from "addons_df".
return (
cluster_ids,
(
users_sample.join(donors_df, "client_id")
.drop("active_addons")
.join(addons_df, "client_id", "left")
.drop("client_id")
.withColumnRenamed("addon_ids", "active_addons")
),
)
def format_donors_dictionary(donors_df):
cleaned_records = donors_df.drop("prediction").collect()
# Convert each row to a dictionary.
return [row.asDict() for row in cleaned_records]
def similarity_function(x, y):
"""Similarity function for comparing user features.
This actually really should be implemented in taar.similarity_recommender
and then imported here for consistency.
"""
def safe_get(field, row, default_value):
# Safely get a value from the Row. If the value is None, get the
# default value.
return row[field] if row[field] is not None else default_value
# Extract the values for the categorical and continuous features for both
# the x and y samples. Use an empty string as the default value for missing
# categorical fields and 0 for the continuous ones.
x_categorical_features = [safe_get(k, x, "") for k in CATEGORICAL_FEATURES]
x_continuous_features = [safe_get(k, x, 0) for k in CONTINUOUS_FEATURES]
y_categorical_features = [safe_get(k, y, "") for k in CATEGORICAL_FEATURES]
y_continuous_features = [safe_get(k, y, 0) for k in CONTINUOUS_FEATURES]
# Here a larger distance indicates a poorer match between categorical variables.
j_d = distance.hamming(x_categorical_features, y_categorical_features)
j_c = distance.canberra(x_continuous_features, y_continuous_features)
# Take the product of similarities to attain a univariate similarity score.
# Add a minimal constant to prevent zero values from categorical features.
# Note: since both the distance function return a Numpy type, we need to
# call the |item| function to get the underlying Python type. If we don't
# do that this job will fail when performing KDE due to SPARK-20803 on
# Spark 2.2.0.
return abs((j_c + 0.001) * j_d).item()
def generate_non_cartesian_pairs(first_rdd, second_rdd):
# Add an index to all the elements in each RDD.
rdd1_with_indices = first_rdd.zipWithIndex().map(lambda p: (p[1], p[0]))
rdd2_with_indices = second_rdd.zipWithIndex().map(lambda p: (p[1], p[0]))
# Join the RDDs using the indices as keys, then strip
# them off before returning an RDD like [<v1, v2>, ...]
return rdd1_with_indices.join(rdd2_with_indices).map(lambda p: p[1])
def get_lr_curves(
spark, features_df, cluster_ids, kernel_bandwidth, num_pdf_points, random_seed=None
):
"""Compute the likelihood ratio curves for clustered clients.
Work-flow followed in this function is as follows:
* Access the DataFrame including cluster numbers and features.
* Load same similarity function that will be used in TAAR module.
* Iterate through each cluster and compute in-cluster similarity.
* Iterate through each cluster and compute out-cluster similarity.
* Compute the kernel density estimate (KDE) per similarity score.
* Linearly down-sample both PDFs to 1000 points.
:param spark: the SparkSession object.
:param features_df: the DataFrame containing the user features (e.g. the
ones coming from |get_donors|).
:param cluster_ids: the list of cluster ids (e.g. the one coming from |get_donors|).
:param kernel_bandwidth: the kernel bandwidth used to estimate the kernel densities.
:param num_pdf_points: the number of points to sample for the LR-curves.
:param random_seed: the provided random seed (fixed in tests).
:return: A list in the following format
[(idx, (lr-numerator-for-idx, lr-denominator-for-idx)), (...), ...]
"""
# Instantiate holder lists for inter- and intra-cluster scores.
same_cluster_scores_rdd = spark.sparkContext.emptyRDD()
different_clusters_scores_rdd = spark.sparkContext.emptyRDD()
random_split_kwargs = {"seed": random_seed} if random_seed else {}
for cluster_number in cluster_ids:
# Pick the features for users belonging to the current cluster.
current_cluster_df = features_df.where(col("prediction") == cluster_number)
# Pick the features for users belonging to all the other clusters.
other_clusters_df = features_df.where(col("prediction") != cluster_number)
logger.debug(
"Computing scores for cluster", extra={"cluster_id": cluster_number}
)
# Compares the similarity score between pairs of clients in the same cluster.
cluster_half_1, cluster_half_2 = current_cluster_df.rdd.randomSplit(
[0.5, 0.5], **random_split_kwargs
)
pair_rdd = generate_non_cartesian_pairs(cluster_half_1, cluster_half_2)
intra_scores_rdd = pair_rdd.map(lambda r: similarity_function(*r))
same_cluster_scores_rdd = same_cluster_scores_rdd.union(intra_scores_rdd)
# Compares the similarity score between pairs of clients in different clusters.
pair_rdd = generate_non_cartesian_pairs(
current_cluster_df.rdd, other_clusters_df.rdd
)
inter_scores_rdd = pair_rdd.map(lambda r: similarity_function(*r))
different_clusters_scores_rdd = different_clusters_scores_rdd.union(
inter_scores_rdd
)
# Determine a range of observed similarity values linearly spaced.
all_scores_rdd = same_cluster_scores_rdd.union(different_clusters_scores_rdd)
stats = all_scores_rdd.aggregate(
StatCounter(), StatCounter.merge, StatCounter.mergeStats
)
min_similarity = stats.minValue
max_similarity = stats.maxValue
lr_index = np.arange(
min_similarity,
max_similarity,
float(abs(min_similarity - max_similarity)) / num_pdf_points,
)
# Kernel density estimate for the inter-cluster comparison scores.
kd_dc = KernelDensity()
kd_dc.setSample(different_clusters_scores_rdd)
kd_dc.setBandwidth(kernel_bandwidth)
denominator_density = kd_dc.estimate(lr_index)
# Kernel density estimate for the intra-cluster comparison scores.
kd_sc = KernelDensity()
kd_sc.setSample(same_cluster_scores_rdd)
kd_sc.setBandwidth(kernel_bandwidth)
numerator_density = kd_sc.estimate(lr_index)
# Structure this in the correct output format.
return list(zip(lr_index, list(zip(numerator_density, denominator_density))))
def today_minus_90_days():
return (date.today() + timedelta(days=-90)).strftime("%Y%m%d")
@click.command()
@click.option("--date", required=True)
@click.option("--bucket", default="telemetry-parquet")
@click.option("--prefix", default="taar/similarity/")
@click.option("--num_clusters", default=20)
@click.option("--num_donors", default=1000)
@click.option("--kernel_bandwidth", default=0.35)
@click.option("--num_pdf_points", default=1000)
@click.option("--clients_sample_date_from", default=today_minus_90_days())
def main(
date,
bucket,
prefix,
num_clusters,
num_donors,
kernel_bandwidth,
num_pdf_points,
clients_sample_date_from,
):
logger.info("Sampling clients since {}".format(clients_sample_date_from))
spark = (
SparkSession.builder.appName("taar_similarity")
.enableHiveSupport()
.getOrCreate()
)
if num_donors < 100:
logger.warn(
"Less than 100 donors were requested.", extra={"donors": num_donors}
)
num_donors = 100
logger.info("Loading the AMO whitelist...")
whitelist = load_amo_curated_whitelist()
logger.info("Computing the list of donors...")
# Compute the donors clusters and the LR curves.
cluster_ids, donors_df = get_donors(
spark, num_clusters, num_donors, whitelist, clients_sample_date_from
)
donors_df.cache()
lr_curves = get_lr_curves(
spark, donors_df, cluster_ids, kernel_bandwidth, num_pdf_points
)
# Store them.
donors = format_donors_dictionary(donors_df)
store_json_to_s3(json.dumps(donors, indent=2), "donors", date, prefix, bucket)
store_json_to_s3(json.dumps(lr_curves, indent=2), "lr_curves", date, prefix, bucket)
stop_session_safely(spark)