-
Notifications
You must be signed in to change notification settings - Fork 7
/
SortingWriter.java
154 lines (139 loc) · 5.68 KB
/
SortingWriter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package com.github.jillesvangurp.mergesort;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.jillesvangurp.common.ResourceUtil;
import com.github.jillesvangurp.metrics.LoggingCounter;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.TreeMultimap;
import com.jillesvangurp.iterables.LineIterable;
/**
* Takes key value parameters and produces a file with lines of key;value sorted by key. Implements merge sort and uses
* a temp directory to store in between files so it can sort more data than fits into memory.
*/
public class SortingWriter implements Closeable {
private static Logger LOG=LoggerFactory.getLogger(SortingWriter.class);
private final String output;
private final int bucketSize;
private int currentBucket = 0;
private final List<String> bucketFiles = new ArrayList<>();
Multimap<String, String> bucket = Multimaps.synchronizedMultimap(TreeMultimap.<String,String>create());
private final String tempDir;
ReadWriteLock bucketLock = new ReentrantReadWriteLock();
private final LoggingCounter loggingCounter;
/**
* @param tempDir
* this directory is used for bucket files. Note. this class will blindly overwrite any pre-existing
* bucket files.
* @param output
* the file with the sorted output.
* @param bucketSize
* the number of entries in the bucket. Each bucket is sorted in memory before being written to disk.
* Ensure you have enough memory available for doing this for a given bucket size.
* @throws IOException when creating the directory fails
*/
public SortingWriter(String tempDir, String output, int bucketSize) throws IOException {
this.tempDir = tempDir;
this.output = output;
this.bucketSize = bucketSize;
if(StringUtils.isNotEmpty(tempDir)) {
FileUtils.forceMkdir(new File(tempDir));
}
loggingCounter = LoggingCounter.counter(LOG, "sort buckets " + output , "lines", 100000);
}
/**
* Add an element to the sorted file.
* @param key the key to sort on
* @param value the value
*/
public void put(String key, String value) {
if (bucket.size() >= bucketSize) {
flushBucket(false);
}
bucketLock.readLock().lock();
try {
boolean added = bucket.put(key, value);
if(!added) {
// TODO better alternative than loggin?
// LOG.warn("failed to add " +key+";"+value);
} else {
loggingCounter.inc();
}
} finally {
bucketLock.readLock().unlock();
}
}
private void flushBucket(boolean skipSizeCheck) {
Multimap<String, String> oldBucket=null;
int bucketNr=-1;
bucketLock.writeLock().lock();
try {
if(bucket.size() > bucketSize || skipSizeCheck) {
// atomically switch over the bucket and then write the old one
oldBucket = bucket;
bucketNr= currentBucket;
currentBucket++;
bucket = Multimaps.synchronizedMultimap(TreeMultimap.<String,String>create());
}
} finally {
bucketLock.writeLock().unlock();
}
if(oldBucket != null) {
File file = new File(tempDir, "bucket-" + bucketNr + ".gz");
try (BufferedWriter bw = ResourceUtil.gzipFileWriter(file.getAbsolutePath())) {
for (Entry<String, String> e : oldBucket.entries()) {
bw.write(e.getKey() + ";" + e.getValue() + "\n");
}
bucketFiles.add(file.getAbsolutePath());
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public void close() throws IOException {
if (bucket.size() > 0) {
// flush any remaining entries
flushBucket(true);
}
loggingCounter.close();
LoggingCounter mergeCounter = LoggingCounter.counter(LOG, "merge buckets into " + output, " lines", 100000);
List<LineIterable> lineIterables = new ArrayList<>();
try {
// important, ensure you have enough filehandles available for the number of buckets. In Linux, you may need to configure this. See README.
for (String file : bucketFiles) {
lineIterables.add(LineIterable.openGzipFile(file));
}
LOG.info("merging " + lineIterables.size() + " buckets");
// merge the buckets
MergingEntryIterable merged = new MergingEntryIterable(lineIterables);
try (BufferedWriter bw = ResourceUtil.gzipFileWriter(output)) {
for (Entry<String, String> entry : merged) {
bw.write(entry.toString() + '\n');
mergeCounter.inc();
}
}
} finally {
for (LineIterable li : lineIterables) {
try {
li.close();
} catch (Exception e) {
LOG.error("cannot close file", e);
}
}
mergeCounter.close();
FileUtils.deleteDirectory(new File(tempDir));
}
}
}