This repository has been archived by the owner on Sep 1, 2023. It is now read-only.
/
sorter.py
315 lines (258 loc) · 8.95 KB
/
sorter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# ----------------------------------------------------------------------
# Numenta Platform for Intelligent Computing (NuPIC)
# Copyright (C) 2013, Numenta, Inc. Unless you have an agreement
# with Numenta, Inc., for a separate license for this software code, the
# following terms and conditions apply:
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero Public License version 3 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Affero Public License for more details.
#
# You should have received a copy of the GNU Affero Public License
# along with this program. If not, see http://www.gnu.org/licenses.
#
# http://numenta.org/licenses/
# ----------------------------------------------------------------------
import os
import sys
from operator import itemgetter
import psutil
from nupic.support import title
from nupic.data.file_record_stream import FileRecordStream
"""The sorter sorts PF datasets in the standard File format
- It supports sorting by multiple fields
- It allows sorting of datasets that don't fit in memory
- It allows selecting a subset of the original fields
The sorter uses merge sort ()
"""
def sort(filename, key, outputFile, fields=None, watermark=1024 * 1024 * 100):
"""Sort a potentially big file
filename - the input file (standard File format)
key - a list of field names to sort by
outputFile - the name of the output file
fields - a list of fields that should be included (all fields if None)
watermark - when available memory goes bellow the watermark create a new chunk
sort() works by reading as records from the file into memory
and calling _sortChunk() on each chunk. In the process it gets
rid of unneeded fields if any. Once all the chunks have been sorted and
written to chunk files it calls _merge() to merge all the chunks into a
single sorted file.
Note, that sort() gets a key that contains field names, which it converts
into field indices for _sortChunk() becuase _sortChunk() doesn't need to know
the field name.
sort() figures out by itself how many chunk files to use by reading records
from the file until the low watermark value of availabel memory is hit and
then it sorts the current records, generates a chunk file, clears the sorted
records and starts on a new chunk.
The key field names are turned into indices
"""
if fields is not None:
assert set(key).issubset(set([f[0] for f in fields]))
with FileRecordStream(filename) as f:
# Find the indices of the requested fields
if fields:
fieldNames = [ff[0] for ff in fields]
indices = [f.getFieldNames().index(name) for name in fieldNames]
assert len(indices) == len(fields)
else:
fileds = f.getFields()
fieldNames = f.getFieldNames()
indices = None
# turn key fields to key indices
key = [fieldNames.index(name) for name in key]
chunk = 0
records = []
for i, r in enumerate(f):
# Select requested fields only
if indices:
temp = []
for i in indices:
temp.append(r[i])
r = temp
# Store processed record
records.append(r)
# Check memory
available_memory = psutil.avail_phymem()
# If bellow the watermark create a new chunk, reset and keep going
if available_memory < watermark:
_sortChunk(records, key, chunk, fields)
records = []
chunk += 1
# Sort and write the remainder
if len(records) > 0:
_sortChunk(records, key, chunk, fields)
chunk += 1
# Marge all the files
_mergeFiles(key, chunk, outputFile, fields)
def _sortChunk(records, key, chunkIndex, fields):
"""Sort in memory chunk of records
records - a list of records read from the original dataset
key - a list of indices to sort the records by
chunkIndex - the index of the current chunk
The records contain only the fields requested by the user.
_sortChunk() will write the sorted records to a standard File
named "chunk_<chunk index>.csv" (chunk_0.csv, chunk_1.csv,...).
"""
title(additional='(key=%s, chunkIndex=%d)' % (str(key), chunkIndex))
assert len(records) > 0
# Sort the current records
records.sort(key=itemgetter(*key))
# Write to a chunk file
if chunkIndex is not None:
filename = 'chunk_%d.csv' % chunkIndex
with FileRecordStream(filename, write=True, fields=fields) as o:
for r in records:
o.appendRecord(r)
assert os.path.getsize(filename) > 0
return records
def _mergeFiles(key, chunkCount, outputFile, fields):
"""Merge sorted chunk files into a sorted output file
chunkCount - the number of available chunk files
outputFile the name of the sorted output file
_mergeFiles()
"""
title()
# Open all chun files
files = [FileRecordStream('chunk_%d.csv' % i) for i in range(chunkCount)]
# Open output file
with FileRecordStream(outputFile, write=True, fields=fields) as o:
# Open all chunk files
files = [FileRecordStream('chunk_%d.csv' % i) for i in range(chunkCount)]
records = [f.getNextRecord() for f in files]
# This loop will run until all files are exhausted
while not all(r is None for r in records):
# Cleanup None values (files that were exhausted)
indices = [i for i,r in enumerate(records) if r is not None]
records = [records[i] for i in indices]
files = [files[i] for i in indices]
# Find the current record
r = min(records, key=itemgetter(*key))
# Write it to the file
o.appendRecord(r)
# Find the index of file that produced the current record
index = records.index(r)
# Read a new record from the file
records[index] = files[index].getNextRecord()
# Cleanup chunk files
for i, f in enumerate(files):
f.close()
os.remove('chunk_%d.csv' % i)
def writeTestFile(testFile, fields, big):
if big:
print 'Creating big test file (763MB)...'
payload = 'x' * 10 ** 8
else:
print 'Creating a small big test file...'
payload = 'x' * 3
with FileRecordStream(testFile, write=True, fields=fields) as o:
print '.'; o.appendRecord([1,3,6, payload])
print '.'; o.appendRecord([2,3,6, payload])
print '.'; o.appendRecord([1,4,6, payload])
print '.'; o.appendRecord([2,4,6, payload])
print '.'; o.appendRecord([1,3,5, payload])
print '.'; o.appendRecord([2,3,5, payload])
print '.'; o.appendRecord([1,4,5, payload])
print '.'; o.appendRecord([2,4,5, payload])
def test(long):
import shutil
from tempfile import gettempdir
print 'Running sorter self-test...'
# Switch to a temp dir in order to create files freely
workDir = os.path.join(gettempdir(), 'sorter_test')
if os.path.exists(workDir):
shutil.rmtree(workDir)
os.makedirs(workDir)
os.chdir(workDir)
print 'cwd:', os.getcwd()
# The fields definition used by all tests
fields = [
('f1', 'int', ''),
('f2', 'int', ''),
('f3', 'int', ''),
('payload', 'string', '')
]
# Create a test file
testFile = '1.csv'
if not os.path.isfile(testFile):
writeTestFile(testFile, fields, big=long)
# Set watermark here to 300MB bellow current available memory. That ensures
# multiple chunk files in the big testcase
mem = psutil.avail_phymem()
watermark = mem - 300 * 1024 * 1024
print 'Test sorting by f1 and f2, watermak:', watermark
results = []
sort(testFile,
key=['f1', 'f2'],
fields=fields,
outputFile='f1_f2.csv',
watermark=watermark)
with FileRecordStream('f1_f2.csv') as f:
for r in f:
results.append(r[:3])
assert results == [
[1, 3, 6],
[1, 3, 5],
[1, 4, 6],
[1, 4, 5],
[2, 3, 6],
[2, 3, 5],
[2, 4, 6],
[2, 4, 5],
]
mem = psutil.avail_phymem()
watermark = mem - 300 * 1024 * 1024
print 'Test sorting by f2 and f1, watermark:', watermark
results = []
sort(testFile,
key=['f2', 'f1'],
fields=fields,
outputFile='f2_f1.csv',
watermark=watermark)
with FileRecordStream('f2_f1.csv') as f:
for r in f:
results.append(r[:3])
assert results == [
[1, 3, 6],
[1, 3, 5],
[2, 3, 6],
[2, 3, 5],
[1, 4, 6],
[1, 4, 5],
[2, 4, 6],
[2, 4, 5],
]
mem = psutil.avail_phymem()
watermark = mem - 300 * 1024 * 1024
print 'Test sorting by f3 and f2, watermark:', watermark
results = []
sort(testFile,
key=['f3', 'f2'],
fields=fields,
outputFile='f3_f2.csv',
watermark=watermark)
with FileRecordStream('f3_f2.csv') as f:
for r in f:
results.append(r[:3])
assert results == [
[1, 3, 5],
[2, 3, 5],
[1, 4, 5],
[2, 4, 5],
[1, 3, 6],
[2, 3, 6],
[1, 4, 6],
[2, 4, 6],
]
# Cleanup the work dir
os.chdir('..')
shutil.rmtree(workDir)
print 'done'
if __name__=='__main__':
print 'Starting tests...'
test('--long' in sys.argv)
print 'All tests pass'