Skip to content

Commit

Permalink
Refactored for better OO design
Browse files Browse the repository at this point in the history
  • Loading branch information
stoyanr committed Nov 25, 2012
1 parent a9ed3f4 commit e22d938
Show file tree
Hide file tree
Showing 17 changed files with 504 additions and 366 deletions.
1 change: 1 addition & 0 deletions wordcounter/pom.xml
Expand Up @@ -55,6 +55,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
@@ -0,0 +1,7 @@
package com.stoyanr.wordcounter;

interface AnalysisFactory<T> {
Analyzer<T> getAnalyzer();

Merger<T> getMerger();
}
@@ -0,0 +1,5 @@
package com.stoyanr.wordcounter;

interface Analyzer<T> {
T analyze(int lo, int hi);
}
@@ -0,0 +1,102 @@
package com.stoyanr.wordcounter;

import static com.stoyanr.util.Logger.debug;
import static com.stoyanr.util.Logger.isDebug;

import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;

class CounterRunnableFactory implements RunnableFactory {

private final BlockingQueue<String> queue;
private final ConcurrentMap<String, Integer> counts;
private final WordCounter wc;

public CounterRunnableFactory(BlockingQueue<String> queue, ConcurrentMap<String, Integer> counts,
WordCounter wc) {
this.queue = queue;
this.counts = counts;
this.wc = wc;
}

@Override
public Runnable getRunnable() {
return new Runnable() {
@Override
public void run() {
count();
}
};
}

private void count() {
boolean finished = false;
while (!finished) {
try {
String text = consumeText();
add(counts, wc.countWords(text));
logCounterJobDone(text);
} catch (InterruptedException e) {
finished = true;
}
}
}

private String consumeText()
throws InterruptedException {
long t0 = logCounterQueueEmpty();
String text = queue.take();
logCounterWaitTime(t0);
return text;
}

private long logCounterQueueEmpty() {
long t0 = 0;
if (isDebug() && queue.isEmpty()) {
debug("[Counter (%s)] Queue empty, waiting ...", getThreadName());
t0 = System.nanoTime();
}
return t0;
}

private void logCounterWaitTime(long t0) {
if (isDebug() && t0 != 0) {
long t1 = System.nanoTime();
debug("[Counter (%s)] Waited for %.2f us", getThreadName(), ((double) (t1 - t0)) / 1000);
}
}

private void logCounterJobDone(String text) {
if (isDebug()) {
debug("[Counter (%s)] Processed text '%s'", getThreadName(), trim(text));
}
}

private static void add(ConcurrentMap<String, Integer> m, String word, int count) {
boolean put = false;
do {
Integer cc = m.get(word);
if (cc != null) {
put = m.replace(word, cc, cc + count);
} else {
put = (m.putIfAbsent(word, count) == null);
}
} while (!put);
}

private static void add(ConcurrentMap<String, Integer> m1, Map<String, Integer> m2) {
for (Entry<String, Integer> e : m2.entrySet()) {
add(m1, e.getKey(), e.getValue());
}
}

private static String trim(String text) {
return text.substring(0, Math.min(text.length(), 20));
}

private static String getThreadName() {
return Thread.currentThread().getName();
}
}
@@ -0,0 +1,8 @@
package com.stoyanr.wordcounter;

import java.io.IOException;
import java.nio.file.Path;

interface FileProcessor {
void process(Path file) throws IOException;
}
57 changes: 57 additions & 0 deletions wordcounter/src/main/java/com/stoyanr/wordcounter/FileUtils.java
@@ -0,0 +1,57 @@
package com.stoyanr.wordcounter;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import com.stoyanr.wordcounter.TextProcessor;

public class FileUtils {
private static final int BUF_SIZE = 256 * 1024;

public static String readFileToString(Path file) throws IOException {
final StringBuilder sb = new StringBuilder();
readFileAsync(file, new TextProcessor<Void>() {
@Override
public Void process(String text, Void x) throws InterruptedException {
sb.append(text);
return x;
}
});
return sb.toString();
}

public static <T> void readFileAsync(Path file, TextProcessor<T> processor) throws IOException {
try (AsynchronousFileChannel ac = AsynchronousFileChannel.open(file)) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
T rem = null;
int pos = 0, read = 0;
do {
read = readBuffer(buffer, ac, pos);
pos += read;
String text = Charset.defaultCharset().decode(buffer).toString();
rem = processor.process(text, rem);
} while (read == buffer.capacity());
processor.process("", rem);
} catch (IOException ex) {
throw ex;
} catch (ExecutionException | InterruptedException ex) {
}
}

private static int readBuffer(ByteBuffer buffer, AsynchronousFileChannel ac, int pos)
throws InterruptedException, ExecutionException {
buffer.rewind();
Future<Integer> future = ac.read(buffer, pos);
while (!future.isDone()) {
Thread.yield();
}
buffer.flip();
return future.get();
}

}
@@ -0,0 +1,114 @@
package com.stoyanr.wordcounter;

import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;

final class FindTopAnalysisFactory implements AnalysisFactory<SortedMap<Integer, Set<String>>> {

private final Set<Entry<String, Integer>> entries;
private final int number;
private final boolean top;

FindTopAnalysisFactory(Set<Entry<String, Integer>> entries, int number, boolean top) {
if (number < 0 || number > entries.size())
throw new IllegalArgumentException();
this.entries = entries;
this.number = (number != 0) ? number : entries.size();
this.top = top;
}

@Override
public Analyzer<SortedMap<Integer, Set<String>>> getAnalyzer() {
return new Analyzer<SortedMap<Integer, Set<String>>>() {
@Override
public SortedMap<Integer, Set<String>> analyze(int lo, int hi) {
return findTop(lo, hi);
}
};
}

@Override
public Merger<SortedMap<Integer, Set<String>>> getMerger() {
return new Merger<SortedMap<Integer, Set<String>>>() {
@Override
public SortedMap<Integer, Set<String>> merge(SortedMap<Integer, Set<String>> r1,
SortedMap<Integer, Set<String>> r2) {
add(r1, r2);
return r1;
}
};
}

private SortedMap<Integer, Set<String>> findTop(int lo, int hi) {
// @formatter:off
SortedMap<Integer, Set<String>> result = (top) ?
new TreeMap<Integer, Set<String>>(new ReverseComparator()) :
new TreeMap<Integer, Set<String>>();
// @formatter:on
Iterator<Entry<String, Integer>> it = entries.iterator();
advance(it, lo);
for (int i = lo; i < hi; i++) {
Entry<String, Integer> e = it.next();
if (result.size() < number || shouldInclude(result, e)) {
addWord(result, e.getValue(), e.getKey());
}
}
return result;
}

private boolean shouldInclude(SortedMap<Integer, Set<String>> m, Entry<String, Integer> e) {
return (top) ? (e.getValue() >= m.lastKey()) : (e.getValue() <= m.lastKey());
}

private void add(SortedMap<Integer, Set<String>> m1, SortedMap<Integer, Set<String>> m2) {
for (Entry<Integer, Set<String>> e : m2.entrySet()) {
addWords(m1, e.getKey(), e.getValue());
}
}

private void addWord(SortedMap<Integer, Set<String>> m, int count, String word) {
if (m.containsKey(count)) {
m.get(count).add(word);
} else {
putWords(m, count, getSet(word));
}
}

private void addWords(SortedMap<Integer, Set<String>> m, int count, Set<String> words) {
if (m.containsKey(count)) {
m.get(count).addAll(words);
} else {
putWords(m, count, words);
}
}

private void putWords(SortedMap<Integer, Set<String>> m, int count, Set<String> words) {
m.put(count, words);
if (m.size() > number) {
m.remove(m.lastKey());
}
}

private static void advance(Iterator<Entry<String, Integer>> it, int lo) {
for (int i = 0; i < lo; i++)
it.next();
}

private static Set<String> getSet(String word) {
Set<String> set = new HashSet<>();
set.add(word);
return set;
}

static final class ReverseComparator implements Comparator<Integer> {
@Override
public int compare(Integer o1, Integer o2) {
return o2 - o1;
}
}
}
5 changes: 5 additions & 0 deletions wordcounter/src/main/java/com/stoyanr/wordcounter/Merger.java
@@ -0,0 +1,5 @@
package com.stoyanr.wordcounter;

interface Merger<T> {
T merge(T result1, T result2);
}

0 comments on commit e22d938

Please sign in to comment.