-
Notifications
You must be signed in to change notification settings - Fork 7
/
csv_processor.py
343 lines (296 loc) · 10.7 KB
/
csv_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
"""
Generic class-based CSV Processor.
"""
import csv
import logging
from collections import defaultdict
from io import StringIO
from django.utils.translation import gettext as _
from .exceptions import ValidationError
from .mixins import ChecksumMixin, DeferrableMixin
log = logging.getLogger(__name__)
__all__ = ('CSVProcessor', 'ChecksumMixin', 'DeferrableMixin', 'ValidationError')
class UnicodeWriter:
"""
A CSV writer which will write rows to CSV file "f",
which is encoded in the given encoding.
https://docs.python.org/2/library/csv.html
"""
def __init__(self, f, dialect=csv.excel, **kwds):
# Redirect output to a queue
self.queue = StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
def writerow(self, row):
"""
Write the row
"""
newrow = []
for col in row:
if col is None:
col = ''
elif not isinstance(col, str):
col = str(col)
newrow.append(col.encode('utf8'))
self.writer.writerow(newrow)
# Fetch UTF-8 output from the queue ...
data = self.queue.getvalue()
data = data.decode("utf-8")
# write to the target stream
self.stream.write(data)
# empty queue
self.queue.truncate(0)
return data
def writerows(self, rows):
for row in rows:
self.writerow(row)
class UnicodeDictWriter(csv.DictWriter):
"""
A CSV writer which will write rows to CSV file "f",
which is encoded in the given encoding.
"""
# pylint: disable=super-init-not-called, keyword-arg-before-vararg
def __init__(self, f, fieldnames, restval="", extrasaction="raise",
dialect="excel", *args, **kwds):
self.fieldnames = fieldnames # list of keys for the dict
self.restval = restval # for writing short dicts
if extrasaction.lower() not in ("raise", "ignore"):
raise ValueError(f'extrasaction ({extrasaction!s}) must be \'raise\' or \'ignore\'')
self.extrasaction = extrasaction
self.writer = UnicodeWriter(f, dialect, *args, **kwds)
class ResultDict(dict):
""" Resut Dict. """
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if 'error' not in self:
self['error'] = ''
self['status'] = _('Success')
class Echo:
"""An object that implements just the write method of the file-like
interface.
"""
def write(self, value):
"""Write the value by returning it, instead of storing in a buffer."""
return value
def decode_utf8(input_iterator):
"""
Generator that decodes a utf-8 encoded
input line by line
"""
for line in input_iterator:
yield line if isinstance(line, str) else line.decode('utf-8')
class CSVProcessor:
"""
Generic CSV processor.
Create a subclass that implements process_row(row)
To use:
processor = MyProcessor(optional_args='foo')
processor.process_file(open_csv_file)
result = processor.status()
If you want to separate validation/processing:
processor = MyProcessor(optional_args='foo')
processor.process_file(open_csv_file, autocommit=False)
# save the state somewhere, or send to another process.
processor.commit()
If the subclass saves rows to self.rollback_rows, it's possible to
rollback the saved items by calling processor.rollback()
"""
columns = []
required_columns = []
max_file_size = 2 * 1024 * 1024
def __init__(self, **kwargs):
self.filename = '' # represents original imported file
self.total_rows = 0
self.processed_rows = 0
self.saved_rows = 0
self.stage = []
self.rollback_rows = []
self.result_data = []
self.error_messages = defaultdict(list)
for key, value in kwargs.items():
setattr(self, key, value)
def add_error(self, message, row=0):
"""
Add an error message. Does not store duplicates.
"""
self.error_messages[message].append(row)
def write_file(self, thefile, rows=None, columns=None):
"""
Write the rows to the file.
"""
for row in self.get_iterator(rows, columns):
thefile.write(row)
def get_iterator(self, rows=None, columns=None, error_data=False):
"""
Generate row data for writing to an output CSV file.
Supply rows (dict array) to override output row data.
Supply columns (string array) to override output columns from processor.
Set error_data to a truthy value to return error and status info per-row.
"""
if error_data:
if columns is None:
columns = self.columns
columns = columns + ['status', 'error']
if rows is None:
rows = self.result_data
else:
if columns is None:
columns = self.columns
if rows is None:
rows = self.get_rows_to_export()
writer = csv.DictWriter(Echo(), columns, extrasaction="ignore")
header = writer.writerow(dict(zip(writer.fieldnames, writer.fieldnames)))
yield header
for row in rows:
self.preprocess_export_row(row)
yield writer.writerow(row)
def process_file(self, thefile, autocommit=True):
"""
Read the file, validating and preprocessing each row.
If autocommit=False, rows will be staged for writing. Call commit() to finalize.
If autocommit=True, the staged rows will be committed.
"""
reader = self.read_file(thefile)
if reader:
self.preprocess_file(reader)
thefile.close()
if autocommit and self.can_commit:
self.commit()
# pylint: disable=inconsistent-return-statements
def read_file(self, thefile):
"""
Create a CSV reader and validate the file.
Returns the reader.
file must be open in binary mode
"""
try:
self.filename = getattr(thefile, 'name', '') or ''
reader = csv.DictReader(decode_utf8(thefile))
self.validate_file(thefile, reader)
return reader
except ValidationError as exc:
self.add_error(str(exc))
def preprocess_file(self, reader):
"""
Preprocess the rows, saving them to the staging list.
"""
rownum = processed_rows = 0
snapshot = []
failure = _('Failure')
no_action = _('No Action')
for rownum, row in enumerate(reader, 1):
result = ResultDict(row)
try:
self.validate_row(row)
row = self.preprocess_row(row)
if row:
self.stage.append((rownum, row))
processed_rows += 1
else:
result['status'] = no_action
except ValidationError as e:
self.add_error(str(e), rownum)
result['error'] = str(e)
result['status'] = failure
snapshot.append(result)
self.result_data = snapshot
self.total_rows = rownum
self.processed_rows = processed_rows
def validate_file(self, thefile, reader):
"""
Validate the file.
Returns bool.
"""
if hasattr(thefile, 'size') and self.max_file_size and thefile.size > self.max_file_size:
raise ValidationError(_("The CSV file must be under {} bytes").format(self.max_file_size))
if self.required_columns:
for field in self.required_columns:
if field not in reader.fieldnames:
raise ValidationError(_("Missing column: {}").format(field))
# pylint: disable=unused-argument
def validate_row(self, row):
"""
Validate the fields in the row.
Raise ValidationError for invalid rows.
"""
return True
# pylint: disable=unused-argument
def preprocess_export_row(self, row):
"""
Preprocess row just before writing to CSV.
Returns a row.
"""
def preprocess_row(self, row):
"""
Preprocess the row.
Returns the same row or new row, or None.
"""
return row
def get_rows_to_export(self):
"""
Subclasses should implement this to return rows to export.
"""
return []
@property
def can_commit(self):
"""
Return whether there's data to commit.
"""
return bool(self.stage and not self.error_messages)
def commit(self):
"""
Commit the processed rows to the database.
"""
saved = 0
while self.stage:
rownum, row = self.stage.pop(0)
try:
did_save, rollback_row = self.process_row(row)
if did_save:
saved += 1
if rollback_row:
self.rollback_rows.append((rownum, rollback_row))
except Exception as e:
log.exception('Committing %r', self)
self.add_error(str(e), row=rownum)
if self.result_data:
self.result_data[rownum - 1]['error'] = str(e)
self.result_data[rownum - 1]['status'] = _('Failure')
self.saved_rows = saved
log.info('%r committed %d rows', self, saved)
def rollback(self):
"""
Rollback the previously saved rows, by applying each undo row.
"""
saved = 0
while self.rollback_rows:
rownum, row = self.rollback_rows.pop(0)
try:
did_save, __ = self.process_row(row)
if did_save:
saved += 1
except Exception as e:
log.exception('Rolling back %r', self)
self.add_error(str(e), row=rownum)
self.saved_rows = saved
def status(self):
"""
Return a status dict.
"""
result = {
'total': self.total_rows,
'processed': self.processed_rows,
'saved': self.saved_rows,
'error_rows': [row for row in self.result_data if row.get('error')],
'error_messages': list(self.error_messages.keys()),
'percentage': format(self.saved_rows / float(self.total_rows or 1), '.1%'),
'can_commit': self.can_commit,
}
return result
def process_row(self, row):
"""
Save the row to the database.
Returns success, undo (dictionary of row to use for rolling back the operation) or None
At minimun should implement this method.
"""
return False, None