-
Notifications
You must be signed in to change notification settings - Fork 10
/
generate_daily_druid_banner_activity.hql
93 lines (86 loc) · 3.87 KB
/
generate_daily_druid_banner_activity.hql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
-- Extracts one day of json formatted minutely banner activity to be loaded in Druid
--
-- Usage:
-- hive -f generate_daily_druid_banner_activity.hql \
-- -d source_table=wmf.webrequest \
-- -d destination_directory=/wmf/tmp/druid/daily_json_banner_activity \
-- -d year=2016 \
-- -d month=7 \
-- -d day=10
--
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
ADD JAR /usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar;
DROP TABLE IF EXISTS tmp_banner_activity_${year}_${month}_${day};
CREATE EXTERNAL TABLE IF NOT EXISTS tmp_banner_activity_${year}_${month}_${day} (
`dt` string,
`campaign` string,
`banner` string,
`project` string,
`uselang` string,
`bucket` string,
`anonymous` boolean,
`status_code` string,
`country` string,
`country_matches_geocode` boolean,
`region` string,
`device` string,
`sample_rate` float,
`request_count` bigint,
`normalized_request_count` bigint
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION '${destination_directory}';
CREATE TEMPORARY MACRO uri_param_value(param_name string, col string)
parse_url(concat('http://bla.org/woo/', col), 'QUERY', param_name);
INSERT OVERWRITE TABLE tmp_banner_activity_${year}_${month}_${day}
SELECT
CONCAT(SUBSTRING(dt, 0, 17), '00Z') AS dt,
uri_param_value('campaign', uri_query) AS campaign,
uri_param_value('banner', uri_query) AS banner,
uri_param_value('project', uri_query) AS project,
uri_param_value('uselang', uri_query) AS uselang,
uri_param_value('bucket', uri_query) AS bucket,
uri_param_value('anonymous', uri_query) = 'true' AS anonymous,
uri_param_value('statusCode', uri_query) AS status_code,
uri_param_value('country', uri_query) AS country,
geocoded_data['country_code'] = uri_param_value('country', uri_query) AS country_matches_geocode,
geocoded_data['subdivision'] AS region,
uri_param_value('device', uri_query) AS device,
cast(uri_param_value('recordImpressionSampleRate', uri_query) AS float) AS sample_rate,
COUNT(*) AS request_count,
cast(COUNT(*) / cast(uri_param_value('recordImpressionSampleRate', uri_query) AS float) AS bigint) AS normalized_request_count
FROM
${source_table}
WHERE
year = ${year}
AND month = ${month}
AND day = ${day}
AND webrequest_source = 'text'
-- drop requests with no timestamps
AND dt != '-'
AND uri_path = '/beacon/impression'
AND agent_type = 'user'
AND uri_param_value('debug', uri_query) = 'false'
-- sample_rate can be infinity, leading to Druid indexation failing.
-- We remove those rows from the data
AND cast(uri_param_value('recordImpressionSampleRate', uri_query) AS float) != 'Infinity'
-- TODO: add once added to webrequest
-- AND x_analytics_map['proxy'] IS NULL
GROUP BY
CONCAT(SUBSTRING(dt, 0, 17), '00Z'),
uri_param_value('campaign', uri_query),
uri_param_value('banner', uri_query),
uri_param_value('project', uri_query),
uri_param_value('uselang', uri_query),
uri_param_value('bucket', uri_query),
uri_param_value('anonymous', uri_query) = 'true',
uri_param_value('statusCode', uri_query),
uri_param_value('country', uri_query),
geocoded_data['country_code'] = uri_param_value('country', uri_query),
geocoded_data['subdivision'],
uri_param_value('device', uri_query),
uri_param_value('recordImpressionSampleRate', uri_query)
;
DROP TABLE IF EXISTS tmp_banner_activity_${year}_${month}_${day};