-
Notifications
You must be signed in to change notification settings - Fork 6
/
TransformFunctions.scala
706 lines (626 loc) · 30.8 KB
/
TransformFunctions.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
package org.wikimedia.analytics.refinery.job.refine
/**
* This file contains objects with apply methods suitable for passing
* to Refine to do WMF specific transformations on a DataFrame before
* inserting into a Hive table.
*
* After https://gerrit.wikimedia.org/r/#/c/analytics/refinery/source/+/521563/
* we are merging JSONSchema with Hive schema before we get to these transforms.
* This means that if there are additional columns on Hive that are not on
* the JSON Schema they will already be part of the DataFrame (with null values)
* when we get to these transform functions.
*
* Then, if a transform method is the one that determines the value
* of this Hive-only column, the transform code needs to drop the column
* (it holds a null value as it has not been populated with schema values)
* and re-insert it with the calculated value. See geocode_ip function
* for an example.
*
* See the Refine --transform-functions CLI option documentation.
*/
import org.apache.spark.sql.DataFrame
import java.util.UUID.randomUUID
import scala.util.matching.Regex
import org.apache.spark.sql.functions.{coalesce, col, expr, input_file_name, udf, when}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.wikimedia.analytics.refinery.core.Webrequest
import org.wikimedia.analytics.refinery.job.refine.RefineHelper.TransformFunction
import org.wikimedia.analytics.refinery.spark.sql.PartitionedDataFrame
import org.wikimedia.analytics.refinery.tools.LogHelper
import scala.collection.immutable.ListMap
// These aren't directly used, but are referenced in SQL UDFs.
// Import them to get compile time errors if they aren't available.
import org.wikimedia.analytics.refinery.hive.{GetUAPropertiesUDF, IsSpiderUDF, GetGeoDataUDF, GetHostPropertiesUDF}
/**
* Helper wrapper to apply a series of transform functions
* to PartitionedDataFrames containing 'event' data.
* This just exists to reduce the number of functions
* we need to provide to Refine's --transform-functions CLI options.
*/
object event_transforms {
/**
* Seq of TransformFunctions that will be applied
* in order to a PartitionedDataFrame.
*/
val eventTransformFunctions = Seq[TransformFunction](
remove_canary_events.apply,
deduplicate.apply,
geocode_ip.apply,
parse_user_agent.apply,
add_is_wmf_domain.apply,
add_normalized_host.apply
)
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
eventTransformFunctions.foldLeft(partDf)((currPartDf, fn) => fn(currPartDf))
}
}
/**
* Drop duplicate data based on meta.id and/or uuid.
* - meta.id: Newer (Modern Event Platform) id field.
* - uuid: Legacy EventLogging Capsule id field.
*
* The first non null of these columns will be used as the key for dropping duplicate records.
* If none of these columns exist, this is a no-op.
*
* If the values of all of these columns in a record is null, a temporary uuid
* will be generated for that record and that will be used for deduplication instead.
* In this way records with null id values will never be removed as part of deduplication.
*/
object deduplicate extends LogHelper {
import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
val possibleSourceColumnNames = Seq("meta.id", "uuid")
val possibleSortingColumnNames = Seq("meta.dt", "meta.id", "uuid", "meta.request_id")
/**
* Generates a new random UUID Column value prefixed with 'fake_'
*/
val fakeUuid: UserDefinedFunction = udf(
() => { "fake_" + randomUUID().toString }
)
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
val sourceColumnNames: Seq[String] = partDf.df.findColumnNames(possibleSourceColumnNames)
// No-op
if (sourceColumnNames.isEmpty) {
log.debug(
s"${partDf.partition} does not contain any id columns named " +
s"${possibleSourceColumnNames.mkString(" or ")}, cannot deduplicate. Skipping."
)
partDf
} else {
// Add fake uuid column into the list of idColumns that will be used, and
// then coalesce to use the first non-null value found.
// This guarantees that if a record has NULL for all the possible source id columns,
// NULL itself will not be considered a unique id. In that case, the fake uuid will
// be used as the deduplication key, which will be unique in each row.
val idColumn = coalesce(sourceColumnNames.map(col) :+ fakeUuid():_*)
val tempColumnName = s"__temp__uuid__for_deduplicate"
var tempDf: DataFrame = partDf.df.withColumn(tempColumnName, idColumn)
val sortingColumnNames: Seq[String] = partDf.df.findColumnNames(possibleSortingColumnNames)
if (sortingColumnNames.nonEmpty) {
// We need to sort before deduplicating to ensure picking always the same first record in each group
// of duplicates. This is important for deterministic results.
tempDf = tempDf.sort(sortingColumnNames.map(col):_*)
}
log.info(s"""Dropping duplicates
|based on ${sourceColumnNames.mkString(",")} columns
|in ${partDf.partition}
|sorted by `${sortingColumnNames.mkString(",")}`""".stripMargin)
partDf.copy(df = tempDf
// Drop duplicate records based on the tempColumnName we added to the df.
.dropDuplicates(tempColumnName)
// And drop the tempColumnName column from the result.
.drop(tempColumnName)
)
}
}
}
/**
* Backwards compatible function name for EventLogging refine jobs.
* This will be removed once event Refine job configurations are unified.
*/
object deduplicate_eventlogging extends LogHelper {
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
deduplicate(partDf)
}
}
/**
* Backwards compatible function name for eventbus refine jobs.
* This will be removed once event Refine job configurations are unified.
*/
object deduplicate_eventbus extends LogHelper {
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
deduplicate(partDf)
}
}
/**
* Geocodes an ip address column into a new `geocoded_data` Map[String, String] column
* using MaxmindDatabaseReaderFactory.
*
* The ip address value used will be extracted from one of the following columns:
* - http.client_ip: Newer Modern Event Platform schemas uses this
* - ip: legacy eventlogging capsule has this
* - client_ip: client_ip: used in webrequest table, here just in case something else uses it too.
*
* In the order listed above, the first non null column value in the input DataFrame
* records will be used for geocoding.
* If none of these fields are present in the DataFrame schema, this is a no-op.
*/
object geocode_ip extends LogHelper {
import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
val possibleSourceColumnNames = Seq("http.client_ip", "ip", "client_ip")
val geocodedDataColumnName = "geocoded_data"
// eventlogging-processor would stick the client IP into a top level 'ip' column.
// To be backwards compatible, if this column exists and does not have data, we
// should fill it with the first non null value of one of the possibleSourceColumnNames.
val ipLegacyColumnName = "ip"
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
val spark = partDf.df.sparkSession
val sourceColumnNames = partDf.df.findColumnNames(possibleSourceColumnNames)
// No-op
if (sourceColumnNames.isEmpty) {
log.debug(
s"${partDf.partition} does not contain any ip columns named " +
s"${possibleSourceColumnNames.mkString(" or ")}, cannot geocode. Skipping."
)
partDf
} else {
// If there is only one possible source column, just
// use it when geocoding. Else, we need to COALESCE and use the
// first non null value chosen from possible source columns in each record.
val sourceColumnSql = sourceColumnNames match {
case Seq(singleColumnName) => singleColumnName
case _ => s"COALESCE(${sourceColumnNames.mkString(",")})"
}
log.info(
s"Geocoding $sourceColumnSql into `$geocodedDataColumnName` in ${partDf.partition}"
)
spark.sql(
"CREATE OR REPLACE TEMPORARY FUNCTION get_geo_data AS " +
"'org.wikimedia.analytics.refinery.hive.GetGeoDataUDF'"
)
// Don't try to geocode records where sourceColumnSql is NULL.
val geocodedDataSql = s"(CASE WHEN $sourceColumnSql IS NULL THEN NULL ELSE get_geo_data($sourceColumnSql) END) AS $geocodedDataColumnName"
// Select all columns (except for any pre-existing geocodedDataColumnName) with
// the result of as geocodedDataSql as geocodedDataColumnName.
val workingDf = partDf.df
val columnExpressions = workingDf.columns.filter(
_.toLowerCase != geocodedDataColumnName.toLowerCase
).map(c => s"`$c`") :+ geocodedDataSql
val parsedDf = workingDf.selectExpr(columnExpressions:_*)
// If the original DataFrame has a legacy ip, copy the source column value to it
// for backwards compatibility.
if (parsedDf.hasColumn(ipLegacyColumnName)) {
add_legacy_eventlogging_ip(partDf.copy(df = parsedDf), sourceColumnSql)
}
else {
partDf.copy(df = parsedDf)
}
}
}
/**
* EventLogging legacy data had an 'ip' column that was set by server side
* eventlogging-processor. This function sets it from sourceColumnSql, unless
* the legacy ip column already has a non null value.
*
* @param partDf
* @param sourceColumnSql
* @return
*/
private def add_legacy_eventlogging_ip(
partDf: PartitionedDataFrame,
sourceColumnSql: String
): PartitionedDataFrame = {
val spark = partDf.df.sparkSession
// If ipLegacyColumnName exists and is non NULL, keep it, otherwise set it
// to the value of sourceColumnSql.
// This handles the case where we have an (externally eventlogging-processor) parsed ip
// field, so we shouldn't touch it.
val ipSql =
s"""
|COALESCE(
| $ipLegacyColumnName,
| $sourceColumnSql
|) AS $ipLegacyColumnName
|""".stripMargin
log.info(
s"Setting `$ipLegacyColumnName` column in ${partDf.partition} " +
s"using SQL:\n$ipSql"
)
val workingDf = partDf.df
// Select all columns except for any pre-existing userAgentStructLegacyColumnName with
// the result of as userAgentStructSql as userAgentStructLegacyColumnName.
val columnExpressions = workingDf.columns.filter(
_.toLowerCase != ipLegacyColumnName.toLowerCase
).map(c => s"`$c`") :+ ipSql
partDf.copy(df = workingDf.selectExpr(columnExpressions:_*))
}
}
/**
* Parses user agent into the user_agent_map field using
* org.wikimedia.analytics.refinery.hive.GetUAPropertiesUDF.
* If the incoming DataFrame schema has a legacy eventlogging `useragent` struct column,
* it will also be generated using the values in the parsed user_agent_map as well
* as some extra logic to add the is_mediawiki and is_bot struct fields.
*
* The user agent string will be extracted from one of the following columns:
* - http.request_headers['user-agent']
*
* In the order listed above, the first non null column value in the input DataFrame
* records will be used for user agent parsing.
* If none of these fields are present in the DataFrame schema, this is a no-op.
*/
object parse_user_agent extends LogHelper {
import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
// Only one possible source column currently, but we could add more.
val possibleSourceColumnNames = Seq("http.request_headers.`user-agent`")
val userAgentMapColumnName = "user_agent_map"
// This camelCase userAgent case sensitive name is really a pain.
// df.hasColumn is case-insenstive, but accessing StructType schema fields
// by name is not.
val userAgentStructLegacyColumnName = "useragent"
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
val spark = partDf.df.sparkSession
val sourceColumnNames = partDf.df.findColumnNames(possibleSourceColumnNames)
// No-op
if (sourceColumnNames.isEmpty) {
log.debug(
s"${partDf.partition} does not contain any columns named " +
s"${possibleSourceColumnNames.mkString(" or ")}. " +
"Cannot parse user agent. Skipping."
)
partDf
} else {
// If there is only one possible source column, just
// use it when parsing. Else, we need to COALESCE and use the
// first non null value chosen from possible source columns in each record.
val sourceColumnSql = sourceColumnNames match {
case Seq(singleColumnName) => singleColumnName
case _ => s"COALESCE(${sourceColumnNames.mkString(",")})"
}
log.info(
s"Parsing $sourceColumnSql into `$userAgentMapColumnName` in ${partDf.partition}"
)
spark.sql(
"CREATE OR REPLACE TEMPORARY FUNCTION get_ua_properties AS " +
"'org.wikimedia.analytics.refinery.hive.GetUAPropertiesUDF'"
)
val userAgentMapSql = s"(CASE WHEN $sourceColumnSql IS NULL THEN NULL ELSE get_ua_properties($sourceColumnSql) END) AS $userAgentMapColumnName"
// Select all columns except for any pre-existing userAgentMapColumnName with
// the result of as userAgentMapSql as userAgentMapColumnName.
val workingDf = partDf.df
val columnExpressions = workingDf.columns.filter(
_.toLowerCase != userAgentMapColumnName.toLowerCase
).map(c => s"`$c`") :+ userAgentMapSql
val partDfWithUAMap = partDf.copy(df = workingDf.selectExpr(columnExpressions:_*))
// If the original DataFrame has a legacy useragent struct field, copy the map field
// entries to it for backwards compatibility.
if (should_add_legacy_eventlogging_struct(partDfWithUAMap)) {
add_legacy_eventlogging_struct(partDfWithUAMap, sourceColumnSql)
}
else {
partDfWithUAMap
}
}
}
/**
* If the incoming data has a legacy useragent struct field, OR if
* the destination Hive table has a legacy useragent struct field, then we should
* add the struct. The covers the case where the event schemas do not have
* userAgentStructLegacyColumnName (usually the case), but the Hive table
* does. This is going to be true for all 'legacy' EventLogging datasets
* that once came through eventlogging-processor.
* See: https://phabricator.wikimedia.org/T259944#6372923
*s
* @param partDf
* @return
*/
private def should_add_legacy_eventlogging_struct(partDf: PartitionedDataFrame): Boolean = {
val spark = partDf.df.sparkSession
val tableName = partDf.partition.tableName
// Either the incoming df should already have a struct userAgentStructLegacyColumnName, OR
// the destination Hive table should.
// NOTE: dataframe hasColumn is case insensitive, but StructType schema field by name
// access is not, so when checking that the field is a StructType we need to
// find the field by name case-insensitively.
(
partDf.df.hasColumn(userAgentStructLegacyColumnName) &&
partDf.df.schema.find(Seq(userAgentStructLegacyColumnName), true).head.isStructType
) || (
spark.catalog.tableExists(tableName) &&
spark.table(tableName).hasColumn(userAgentStructLegacyColumnName) &&
spark.table(tableName).schema.find(Seq(userAgentStructLegacyColumnName), true).head.isStructType
)
}
/**
* eventlogging-processor previously handled user agent parsing and
* added the 'userAgent' field as a JSON object, which got inferred as a struct
* in the event schema by Refine jobs. There may be existent uses of
* the useragent struct field in Hive, so we need to keep ensuring
* that a useragent struct exists. This function generates it from
* the user_agent_map entries and also adds is_mediawiki and is_bot fields.
*
* If a record already has a non NULL useragent struct, it is left intact,
* otherwise it will be created from the values of user_agent_map.
*
* @param partDf input PartitionedDataFrame
* @param sourceColumnSql SQL string (or just column name) in partDf.df to get user agent string.
* This is passed to the IsSpiderUDF.
*/
private def add_legacy_eventlogging_struct(
partDf: PartitionedDataFrame,
sourceColumnSql: String
): PartitionedDataFrame = {
val spark = partDf.df.sparkSession
// IsSpiderUDF is used to calculate is_bot value.
spark.sql(
"CREATE OR REPLACE TEMPORARY FUNCTION is_spider AS " +
"'org.wikimedia.analytics.refinery.hive.IsSpiderUDF'"
)
// useragent needs to be a struct with these values from UAParser returned user_agent_map,
// as well as boolean values for is_mediawiki and is_bot.
// See legacy eventlogging parser code at
// https://github.com/wikimedia/eventlogging/blob/master/eventlogging/utils.py#L436-L454
// This is a map from struct column name to SQL that gets the value for that column.
val userAgentLegacyNamedStructFieldSql = ListMap(
"browser_family" -> s"$userAgentMapColumnName.`browser_family`",
"browser_major" -> s"$userAgentMapColumnName.`browser_major`",
"browser_minor" -> s"$userAgentMapColumnName.`browser_minor`",
"device_family" -> s"$userAgentMapColumnName.`device_family`",
// is_bot is true if device_family is Spider, or if device_family is Other and
// the user agent string matches the spider regex defined in refinery Webrequest.java.
"is_bot" -> s"""boolean(
| $userAgentMapColumnName.`device_family` = 'Spider' OR (
| $userAgentMapColumnName.`device_family` = 'Other' AND
| is_spider($sourceColumnSql)
| )
| )""".stripMargin,
// is_mediawiki is true if the user agent string contains 'mediawiki'
"is_mediawiki" -> s"boolean(lower($sourceColumnSql) LIKE '%mediawiki%')",
"os_family" -> s"$userAgentMapColumnName.`os_family`",
"os_major" -> s"$userAgentMapColumnName.`os_major`",
"os_minor" -> s"$userAgentMapColumnName.`os_minor`",
"wmf_app_version" -> s"$userAgentMapColumnName.`wmf_app_version`"
)
// Convert userAgentLegacyNamedStructFieldSql to a named_struct SQL string.
val namedStructSql = "named_struct(\n " + userAgentLegacyNamedStructFieldSql.map({
case (fieldName, sql) =>
s"'$fieldName', $sql"
}).mkString(",\n ") + "\n )"
// Build a SQL statement using named_struct that will generate the
// userAgentStructLegacyColumnName struct.
val userAgentMapCaseNonNullSql =
s"""CASE WHEN $userAgentMapColumnName IS NULL THEN NULL ELSE $namedStructSql END"""
val userAgentStructSql = if (partDf.df.hasColumn(userAgentStructLegacyColumnName)) {
// If userAgentStructLegacyColumnName exists and is non NULL, keep it by COALESCEing it.
// If it doesn't exist on the DF, then we can't refer to it in the SQL, and we don't
// need to COALESCE anyway.
// This handles the case where the value of sourceColumnSql might be null, but
// we have an (externally eventlogging-processor) parsed userAgent EventLogging field,
// so we don't need to and can't reparse it, since we don't have the original user agent.
s"""
|COALESCE(
| $userAgentStructLegacyColumnName,
| $userAgentMapCaseNonNullSql
|) AS $userAgentStructLegacyColumnName
|""".stripMargin
} else {
// Else if user_agent_map is NULL, then set useragent to NULL too.
// Else create useragent struct from user_agent_map
s"$userAgentMapCaseNonNullSql as $userAgentStructLegacyColumnName"
}
log.info(
s"Adding legacy `$userAgentStructLegacyColumnName` struct column in ${partDf.partition} " +
s"using SQL:\n$userAgentStructSql"
)
val workingDf = partDf.df
// Select all columns except for any pre-existing userAgentStructLegacyColumnName with
// the result of as userAgentStructSql as userAgentStructLegacyColumnName.
val columnExpressions = workingDf.columns.filter(
_.toLowerCase != userAgentStructLegacyColumnName.toLowerCase
).map(c => s"`$c`") :+ userAgentStructSql
partDf.copy(df = workingDf.selectExpr(columnExpressions:_*))
}
}
/**
* Adds an is_wmf_domain column based on the return value of Webrequest.isWMFDomain.
*/
object add_is_wmf_domain extends LogHelper {
import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
val possibleSourceColumnNames = Seq("meta.domain", "webHost")
val isWMFDomainColumnName = "is_wmf_domain"
val isWMFDomain: UserDefinedFunction = udf(
(hostname: String) => {
if (hostname == null) {
false;
} else {
Webrequest.getInstance().isWMFHostname(hostname)
}
}
)
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
val sourceColumnNames = partDf.df.findColumnNames(possibleSourceColumnNames)
val sourceColumnSql = sourceColumnNames match {
case Seq(singleColumnName) => singleColumnName
case _ => s"COALESCE(${sourceColumnNames.mkString(",")})"
}
partDf.copy(df = partDf.df.withColumn(
isWMFDomainColumnName, isWMFDomain(expr(sourceColumnSql))
))
}
}
/**
* Filters out records from a domain that is not a wiki
* except if those records match domains on a whitelist.
*
* Accepted values include:
* wikipedia.org, en.wiktionary.org, ro.m.wikibooks,
* zh-an.wikivoyage.org, mediawiki.org, www.wikidata.org,
* translate.google, etc.
*
* Filtered out values include:
* en-wiki.org, en.wikipedia.nom.it, en.wikipedi0.org,
* www.translatoruser-int.com, etc.
*
* Given that domain columns are optional fields we need to accept
* as valid records for which domain is null.
*
* Possible domain columns:
* - meta.domain: newer (Modern Event Platform) events use this
* - webHost: legacy EventLogging Capsule data.
*
* In the order listed above, the first non null column value in the input DataFrame
* records will be used for filtering.
* If none of these fields exist in the input DataFrame schema, this is a no-op.
*/
object filter_allowed_domains extends LogHelper {
import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
val possibleSourceColumnNames = Seq("meta.domain", "webHost")
// translate.google ends up sending events we want, so add an exception to keep it.
var includeList: Regex = List("translate.google").mkString("|").r;
val isAllowedDomain: UserDefinedFunction = udf(
(domain: String) => {
if (domain == null || domain.isEmpty) true
else if (includeList.findFirstMatchIn(domain.toLowerCase()).isDefined) true
else if (Webrequest.getInstance().isWMFHostname(domain)) true
else false
}
)
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
// We don't need to check case insensitively here because
// column access on a DataFrame is case insensitive already.
val sourceColumnNames = partDf.df.findColumnNames(possibleSourceColumnNames)
// No-op
if (sourceColumnNames.isEmpty) {
log.debug(
s"${partDf.partition} does not have any column " +
s"${possibleSourceColumnNames.mkString(" or ")}, not filtering for allowed domains."
)
partDf
} else {
// If there is only one possible source column, just
// use it when filtering. Else, we need to COALESCE and use the
// first non null value chosen from possible source columns in each record.
val sourceColumnSql = sourceColumnNames match {
case Seq(singleColumnName) => singleColumnName
case _ => s"COALESCE(${sourceColumnNames.mkString(",")})"
}
log.info(
s"Filtering for allowed domains in $sourceColumnSql in ${partDf.partition}."
)
partDf.copy(df = partDf.df.filter(isAllowedDomain(expr(sourceColumnSql))))
}
}
}
/**
* Backwards compatible name of this function for legacy EventLogging jobs.
* This will be removed once event Refine job configurations are unified.
*/
object eventlogging_filter_is_allowed_hostname extends LogHelper {
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
filter_allowed_domains.apply(partDf)
}
}
/**
* Removes all records where meta.domain == 'canary'.
* Canary events are fake monitoring events injected
* into the real event streams by
* org.wikimedia.analytics.refinery.job.ProduceCanaryEvents.
*/
object remove_canary_events extends LogHelper {
import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
// We only create canary events with meta.domain == 'canary'
val sourceColumnName: String = "meta.domain"
val canaryDomain: String = "canary"
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
// No-op
if (!partDf.df.hasColumn(sourceColumnName)) {
log.debug(
s"${partDf.partition} does not have column " +
s"$sourceColumnName, not removing canary events."
)
partDf
} else {
log.info(
s"Filtering for events where $sourceColumnName != '$canaryDomain' in ${partDf.partition}."
)
partDf.copy(
df = partDf.df.where(
// I am not sure why we need this IS NULL check here.
// In my manual REPL tests, this is not needed (as expected),
// but the unit test fails (and removes a NULL meta.domain) if I don't add this.
s"$sourceColumnName IS NULL OR $sourceColumnName != '$canaryDomain'"
)
)
}
}
}
/**
* Use GetHostPropertiesUDF to get normalized host data from meta.domain and/or webHost
* and add it as column normalized_host.
*/
object add_normalized_host extends LogHelper {
import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
val possibleSourceColumnNames = Seq("meta.domain", "webHost")
val normalizedHostColumnName = "normalized_host"
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
val spark = partDf.df.sparkSession
// Use GetHostPropertiesUDF to get normalized host data from meta.domain and/or webHost
spark.sql(
"CREATE OR REPLACE TEMPORARY FUNCTION get_host_properties AS " +
"'org.wikimedia.analytics.refinery.hive.GetHostPropertiesUDF'"
)
val sourceColumnNames = partDf.df.findColumnNames(possibleSourceColumnNames)
val sourceColumnSql = sourceColumnNames match {
case Seq(singleColumnName) => singleColumnName
case _ => s"COALESCE(${sourceColumnNames.mkString(",")})"
}
partDf.copy(df = partDf.df.withColumn(
normalizedHostColumnName, expr(s"get_host_properties(${expr(sourceColumnSql)})")
))
}
}
/**
* Convert top-level fields to lower-case, changes dots and dashes to underscores,
* and changes types: Integer to Long and Float to Double.
*/
object normalizeFieldNamesAndWidenTypes extends LogHelper {
import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
// Normalize field names (toLower, etc.)
// and widen types that we can (e.g. Integer -> Long)
partDf.copy(df = partDf.df.normalizeAndWiden().emptyMetadata)
}
}
/**
* Convert top-level fields to lower-case, changes dots and dashes to underscores,
* and changes types: Integer to Long and Float to Double.
*/
object icebergNormalizeFieldNamesAndWidenTypes extends LogHelper {
import org.wikimedia.analytics.refinery.spark.sql.IcebergExtensions._
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
partDf.copy(df = partDf.df.normalizeAndWiden())
}
}
/**
* Parses config-set fields into timestamp. This will fail if the
* spark.refine.transformfunction.parsetimestampfields.timestampfields configuration key
* is empty, or if the field(s) define for this key are not present in the table.
*/
object parseTimestampFields extends LogHelper {
import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
val FieldsToParseParameterName = "spark.refine.transformfunction.parsetimestampfields.timestampfields"
def apply(partDf: PartitionedDataFrame): PartitionedDataFrame = {
val fieldsToParse = partDf.df.sparkSession.conf.getOption(FieldsToParseParameterName)
if (fieldsToParse.isEmpty || fieldsToParse.get.isEmpty) {
throw new IllegalStateException(
"""The configuration defining fields to parse as timestamp for the parseTimestampFields
|transform-function is not set.
|Use spark.refine.transformfunction.parsetimestampfields.timestampfields spark configuration
|to set them.
|""".stripMargin)
}
val timestampColumnNames: Seq[String] = fieldsToParse.get.split(",").map(_.trim)
log.info(s"Parsing timestamp columns ${timestampColumnNames.mkString(",")} in ${partDf.partition}")
val transformers = timestampColumnNames.map(columnName => columnName -> s"TO_TIMESTAMP($columnName)").toMap
partDf.copy(df = partDf.df.transformFields(transformers))
}
}