/
WikidataSpecialEntityDataMetrics.scala
176 lines (152 loc) · 6.2 KB
/
WikidataSpecialEntityDataMetrics.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package org.wikimedia.analytics.refinery.job.wikidata
import org.apache.spark.sql.SparkSession
import org.joda.time.DateTime
import org.wikimedia.analytics.refinery.core.GraphiteClient
import scopt.OptionParser
/**
* Reports metrics for the wikidata Special:EntityData page to graphite
*
* Usage with spark-submit:
* spark-submit \
* --class org.wikimedia.analytics.refinery.job.WikidataSpecialEntityDataMetrics
* /path/to/refinery-job.jar
* -y <year> -m <month> -d <day>
* [-n <namespace> -t <webrequest-table> -g <graphite-host> -p <graphite-port>]
*/
object WikidataSpecialEntityDataMetrics {
/**
* Config class for CLI argument parser using scopt
*/
case class Params(webrequestTable: String = "wmf.webrequest",
graphiteHost: String = "localhost",
graphitePort: Int = 2003,
graphiteNamespace: String = "daily.wikidata.entitydata",
year: Int = 0, month: Int = 0, day: Int = 0)
/**
* Define the command line options parser
*/
val argsParser = new OptionParser[Params]("Wikidata Special:EntityData Metrics") {
head("Wikidata Special:EntityData Metrics", "")
note("This job reports use of the wikidata Special:EntityData page to graphite daily")
help("help") text ("Prints this usage text")
opt[String]('t', "webrequest-table") optional() valueName ("<table>") action { (x, p) =>
p.copy(webrequestTable = x)
} text ("The webrequest table to query. Defaults to wmf.webrequest")
opt[String]('g', "graphite-host") optional() valueName ("<host>") action { (x, p) =>
p.copy(graphiteHost = x)
} text ("Graphite host. Defaults to localhost")
opt[Int]('p', "graphite-port") optional() valueName ("<port>") action { (x, p) =>
p.copy(graphitePort = x)
} text ("Graphite port. Defaults to 2003")
opt[String]('n', "graphite-namespace") optional() valueName ("<graphite.namespace>") action { (x, p) =>
p.copy(graphiteNamespace = x)
} text ("graphite metric namespace/prefix. Defaults to daily.wikidata.entitydata")
opt[Int]('y', "year") required() action { (x, p) =>
p.copy(year = x)
} text ("Year as an integer")
opt[Int]('m', "month") required() action { (x, p) =>
p.copy(month = x)
} validate { x => if (x > 0 & x <= 12) success else failure("Invalid month")
} text ("Month as an integer")
opt[Int]('d', "day") required() action { (x, p) =>
p.copy(day = x)
} validate { x => if (x > 0 & x <= 31) success else failure("Invalid day")
} text ("Day of month as an integer")
}
def main(args: Array[String]): Unit = {
argsParser.parse(args, Params()) match {
case Some(params) => {
// Initial Spark setup
val appName = s"WikidataSpecialEntityDataMetrics-${params.year}-${params.month}-${params.day}"
val spark = SparkSession.builder().appName(appName).enableHiveSupport().getOrCreate()
val sql = s"""
SELECT
COUNT(1) AS count,
agent_type,
content_type
FROM ${params.webrequestTable}
WHERE webrequest_source = 'text'
AND year = ${params.year}
AND month = ${params.month}
AND day = ${params.day}
AND http_status = 200
AND normalized_host.project_class = 'wikidata'
AND uri_path like '/wiki/Special:EntityData/%'
GROUP BY
agent_type,
content_type"""
val data = spark.sql(sql).collect().map(r => (r.getLong(0), r.getString(1), r.getString(2)))
val metrics = data.foldLeft(Map.empty[String, Long])((acc, v) => {
v match {
case (0L, _, _) => acc
case (count, agentType, contentType) =>
val formatKey = "format." + normalizeFormat( contentType )
val agentTypeKey = "agent_types." + agentType
acc +
(formatKey -> (acc.withDefaultValue(0L)(formatKey) + count)) +
(agentTypeKey -> (acc.withDefaultValue(0L)(agentTypeKey) + count))
}
})
val graphite = new GraphiteClient(params.graphiteHost, params.graphitePort)
val time = new DateTime(params.year, params.month, params.day, 0, 0)
metrics.foreach { case (metricName, count) =>
val metric = "%s.%s".format(params.graphiteNamespace, metricName)
graphite.sendOnce(metric, count, time.getMillis / 1000)
}
val wdqsSql = s"""
SELECT
COUNT(1) AS count,
agent_type,
content_type
FROM ${params.webrequestTable}
WHERE webrequest_source = 'text'
AND year = ${params.year}
AND month = ${params.month}
AND day = ${params.day}
AND http_status = 200
AND normalized_host.project_class = 'wikidata'
AND uri_path like '/wiki/Special:EntityData/%'
AND user_agent LIKE 'Wikidata Query Service Updater%'
GROUP BY
agent_type,
content_type"""
val wdqsData = spark.sql(wdqsSql).collect().map(r => (r.getLong(0), r.getString(1), r.getString(2)))
val wdqsMetrics = wdqsData.foldLeft(Map.empty[String, Long])((acc, v) => {
v match {
case (0L, _, _) => acc
case (count, agentType, contentType) =>
val formatKey = "format." + normalizeFormat( contentType )
val agentTypeKey = "agent_types." + agentType
acc +
(formatKey -> (acc.withDefaultValue(0L)(formatKey) + count)) +
(agentTypeKey -> (acc.withDefaultValue(0L)(agentTypeKey) + count))
}
})
wdqsMetrics.foreach { case (metricName, count) =>
val metric = "%s.wdqs_updater.%s".format(params.graphiteNamespace, metricName)
graphite.sendOnce(metric, count, time.getMillis / 1000)
}
}
case None => sys.exit(1)
}
}
val rdfPat = ".*(/rdf\\+xml).*".r
val phpPat = ".*(/vnd\\.php).*".r
val ntPat = ".*(/n\\-triples).*".r
val n3Pat = ".*(/n3).*".r
val jsonPat = ".*(/json).*".r
val ttlPat = ".*(/turtle).*".r
val htmlPat = ".*(/html).*".r
def normalizeFormat (contentType:String): String = {
contentType match {
case rdfPat(_) => "rdf"
case phpPat(_) => "php"
case ntPat(_) => "nt"
case n3Pat(_) => "n3"
case jsonPat(_) => "json"
case ttlPat(_) => "ttl"
case htmlPat(_) => "html"
case _ => "unknown"
}
}
}