/
aggregate_projectcounts
executable file
·232 lines (192 loc) · 9.12 KB
/
aggregate_projectcounts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Aggregates Wikimedia's hourly projectcount files into per project CSVs
Usage: aggregate_projectcounts [--source SOURCE_DIR] [--target TARGET_DIR]
[--first-date FIRST_DATE] [--last-date LAST_DATE] [--date DATE]
[--log LOG_FILE] [--force] [--all-projects] [--push-target]
[--output-projectviews] [-v ...] [--help]
Options:
-h, --help Show this help message and exit.
--source SOURCE_DIR Read hourly projectcount files from SOURCE_DIR.
[default: \
/mnt/hdfs/wmf/data/archive/pagecounts-all-sites]
--target TARGET_DIR Write daily aggregated CSV files into TARGET_DIR.
For each wiki that has a (maybe still empty) csv
with the corresponding database name in
TARGET_DIR's 'daily_raw' subdirectory, the CSV is
updated with the wiki's projectcounts for the
specified dates.
So for example to compute data for the English
Wikipedia, put a file 'daily_raw/enwiki.csv' in
TARGET_DIR.
[default: ./data]
--first-date FIRST_DATE First day to aggregate for
[default: 2014-09-23]
--last-date LAST_DATE Last day to aggregate for
[default: yesterday]
--date DATE Day to aggregate for (overrides --first-date, and
--last-date)
--force Force recomputation of given days, even if the CSV
would already contain that data.
--all-projects Compute aggregation across projects into a file
named 'all.csv'. Note that the aggregates will be
computed for all dates already in CSVs, regardless
of --first-date, --last-date or --date parameters.
--push-target Assumes the target directory is a git repository,
and automatically hard reset it before the
aggregation, and commit and push after the
aggregation.
--output-projectviews Name the output files projectviews instead of
projectcounts.
--log LOG_FILE In addition to stdout, also log to LOG_FILE
-v, --verbose Increase verbosity
This script computes the following aggregations (in separate sub-directories):
TARGET_DIR/daily_raw -- Aggregation by day for "all available" (not
necessarily "all") hours for a project. Those
CSVs do not hold corrections of any kind.
TARGET_DIR/daily -- daily_raw without bad dates (see below).
TARGET_DIR/weekly_rescaled -- daily summed up per week, and rescaled to
7 days per week.
TARGET_DIR/monthly_rescaled -- daily summed up per month, and rescaled to
30 days per month.
TARGET_DIR/yearly_rescaled -- daily summed up per year, and rescaled to
365 days per year.
The bad dates are read from TARGET_DIR/BAD_DATES.csv. The first column
in that CSV have to be dates, the other columns are ignored, and can
for example be used to document why the date is bad.
"""
# Add parent directory to python path to allow allow loading of modules without
# messing PYTHONPATH on the command line
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from docopt import docopt
import logging
import subprocess
import aggregator
LOG_MESSAGE_FORMAT = '%(asctime)s %(levelname)-6s %(message)s'
LOG_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
GIT_FILE_ABS = '/usr/bin/git'
def setup_logging(verbosity, log_file):
log_level_map = {
1: logging.WARNING,
2: logging.INFO,
3: logging.DEBUG,
}
log_level = log_level_map.get(min(verbosity, 3), logging.ERROR)
if log_file:
log_file_abs = os.path.abspath(log_file)
# Make sure log file's directory exists
if not os.path.exists(os.path.dirname(log_file_abs)):
os.makedirs(os.path.dirname(log_file_abs))
# Root logger at DEBUG
logging.basicConfig(level=logging.DEBUG,
format=LOG_MESSAGE_FORMAT,
datefmt=LOG_DATE_FORMAT,
filename=log_file_abs)
# Console logger at user specified verbosity
console_logger = logging.StreamHandler()
console_logger.setLevel(log_level)
console_logger.setFormatter(logging.Formatter(LOG_MESSAGE_FORMAT))
logging.getLogger('').addHandler(console_logger)
else:
logging.basicConfig(level=log_level,
format=LOG_MESSAGE_FORMAT,
datefmt=LOG_DATE_FORMAT)
def run_git(args):
# To avoid depending on not-per-default-included git module, we run git by
# hand.
git_command = [GIT_FILE_ABS]
git_command.extend(args)
logging.info("Spawning git with : %s", git_command)
subprocess.check_call(git_command)
if __name__ == '__main__':
arguments = docopt(__doc__)
setup_logging(arguments['--verbose'], arguments['--log'])
logging.debug("Parsed arguments: %s" % (arguments))
all_parameters_ok = True
# Setting up directories
source_dir_abs = arguments['--source']
try:
source_dir_abs = aggregator.existing_dir_abs(source_dir_abs)
except ValueError:
all_parameters_ok = False
logging.error("Source directory '%s' does not point to an existing "
"directory" % (source_dir_abs))
target_dir_abs = arguments['--target']
try:
target_dir_abs = aggregator.existing_dir_abs(target_dir_abs)
except ValueError:
all_parameters_ok = False
logging.error("Target directory '%s' does not point to an existing "
"directory" % (target_dir_abs))
# Setting up date parameters
if arguments['--date']:
arguments['--first-date'] = arguments['--date']
arguments['--last-date'] = arguments['--date']
first_date = arguments['--first-date']
try:
first_date = aggregator.parse_string_to_date(first_date)
except ValueError:
all_parameters_ok = False
logging.error("Could not parse first date '%s' to date" % (first_date))
last_date = arguments['--last-date']
try:
last_date = aggregator.parse_string_to_date(last_date)
except ValueError:
all_parameters_ok = False
logging.error("Could not parse last date '%s' to date" % (last_date))
if all_parameters_ok and first_date > last_date:
all_parameters_ok = False
logging.error("first_date '%s' is not before last_date '%s'" %
(first_date, last_date))
force_recomputation = arguments['--force']
compute_all_projects = arguments['--all-projects']
output_projectviews = arguments['--output-projectviews']
if not all_parameters_ok:
logging.error("Parameters could not get parsed")
sys.exit(1)
if arguments["--push-target"]:
os.chdir(target_dir_abs)
run_git(['reset', '--quiet', '--hard'])
run_git(['checkout', '--quiet', 'master'])
run_git(['pull', '--quiet'])
run_git(['reset', '--quiet', '--hard', 'origin/master'])
bad_dates_file_abs = os.path.join(target_dir_abs, 'BAD_DATES.csv')
bad_dates = [aggregator.parse_string_to_date(date)
for date in aggregator.parse_csv_to_first_column_dict(
bad_dates_file_abs).keys()]
additional_aggregators = [
aggregator.update_daily_csv,
aggregator.update_weekly_csv,
aggregator.update_monthly_csv,
aggregator.update_yearly_csv,
]
aggregator.update_per_project_csvs_for_dates(
source_dir_abs,
target_dir_abs,
first_date,
last_date,
bad_dates=bad_dates,
additional_aggregators=additional_aggregators,
force_recomputation=force_recomputation,
compute_all_projects=compute_all_projects,
output_projectviews=output_projectviews,
)
if arguments["--push-target"]:
commit_message = "Automatic commit for dates %s until %s" % (
first_date.isoformat(), last_date.isoformat())
run_git(['commit', '--quiet', '*.csv', '-m', commit_message])
run_git(['push', '--quiet', 'origin', 'HEAD:refs/heads/master'])