Permalink
Browse files

minor refactoring

  • Loading branch information...
1 parent 728da4f commit 0457efbe10674df8036ee2673e5bc5b8989a8860 Maysam Yabandeh committed May 25, 2012
Showing with 87 additions and 87 deletions.
  1. +87 −87 src/main/java/com/yahoo/omid/client/regionserver/Compacter.java
@@ -44,62 +44,62 @@
private Channel channel;
@Override
- public void start(CoprocessorEnvironment e) throws IOException {
- System.out.println("Starting compacter");
- Configuration conf = e.getConfiguration();
- factory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor, 3);
- bootstrap = new ClientBootstrap(factory);
-
- bootstrap.getPipeline().addLast("decoder", new ObjectDecoder());
- bootstrap.getPipeline().addLast("handler", new Handler());
- bootstrap.setOption("tcpNoDelay", false);
- bootstrap.setOption("keepAlive", true);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("connectTimeoutMillis", 100);
-
- String host = conf.get("tso.host");
- int port = conf.getInt("tso.port", 1234) + 1;
-
- if (host == null) {
- throw new IOException("tso.host missing from configuration");
- }
+ public void start(CoprocessorEnvironment e) throws IOException {
+ System.out.println("Starting compacter");
+ Configuration conf = e.getConfiguration();
+ factory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor, 3);
+ bootstrap = new ClientBootstrap(factory);
+
+ bootstrap.getPipeline().addLast("decoder", new ObjectDecoder());
+ bootstrap.getPipeline().addLast("handler", new Handler());
+ bootstrap.setOption("tcpNoDelay", false);
+ bootstrap.setOption("keepAlive", true);
+ bootstrap.setOption("reuseAddress", true);
+ bootstrap.setOption("connectTimeoutMillis", 100);
+
+ String host = conf.get("tso.host");
+ int port = conf.getInt("tso.port", 1234) + 1;
+
+ if (host == null) {
+ throw new IOException("tso.host missing from configuration");
+ }
- bootstrap.connect(new InetSocketAddress(host, port)).addListener(new ChannelFutureListener() {
+ bootstrap.connect(new InetSocketAddress(host, port)).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- System.out.println("Compacter connected!");
- channel = future.getChannel();
- } else {
- System.out.println("Connection failed");
- }
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ System.out.println("Compacter connected!");
+ channel = future.getChannel();
+ } else {
+ System.out.println("Connection failed");
}
- });
- }
+ }
+ });
+ }
@Override
- public void stop(CoprocessorEnvironment e) throws IOException {
- System.out.println("Stoping compacter");
- if (channel != null) {
- System.out.println("Calling close");
- channel.close();
- }
- System.out.println("Compacter stopped");
+ public void stop(CoprocessorEnvironment e) throws IOException {
+ System.out.println("Stoping compacter");
+ if (channel != null) {
+ System.out.println("Calling close");
+ channel.close();
}
+ System.out.println("Compacter stopped");
+ }
@Override
- public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
- InternalScanner scanner) {
- DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
- Calendar cal = Calendar.getInstance();
- System.out.println("preCompact: " + dateFormat.format(cal.getTime()));
- if (e.getEnvironment().getRegion().getRegionInfo().isMetaTable()) {
- return scanner;
- } else {
- return new CompacterScanner(scanner, minTimestamp);
- }
+ public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
+ InternalScanner scanner) {
+ DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+ Calendar cal = Calendar.getInstance();
+ System.out.println("preCompact: " + dateFormat.format(cal.getTime()));
+ if (e.getEnvironment().getRegion().getRegionInfo().isMetaTable()) {
+ return scanner;
+ } else {
+ return new CompacterScanner(scanner, minTimestamp);
}
+ }
private static class CompacterScanner implements InternalScanner {
private InternalScanner internalScanner;
@@ -114,61 +114,61 @@ public CompacterScanner(InternalScanner internalScanner, long minTimestamp) {
}
@Override
- public boolean next(List<KeyValue> results) throws IOException {
- return next(results, -1);
- }
+ public boolean next(List<KeyValue> results) throws IOException {
+ return next(results, -1);
+ }
@Override
- public boolean next(List<KeyValue> result, int limit) throws IOException {
- boolean moreRows = false;
- List<KeyValue> raw = new ArrayList<KeyValue>(limit);
- while (limit == -1 || result.size() < limit) {
- int toReceive = limit == -1 ? -1 : limit - result.size();
- moreRows = internalScanner.next(raw, toReceive);
- if (raw.size() > 0) {
- ByteArray currentRowId = new ByteArray(raw.get(0).getRow());
- if (!currentRowId.equals(lastRowId)) {
- columnsSeen.clear();
- lastRowId = currentRowId;
- }
- }
- for (KeyValue kv : raw) {
- ByteArray column = new ByteArray(kv.getFamily(), kv.getQualifier());
- if (!columnsSeen.contains(column)) {
- result.add(kv);
- if (kv.getTimestamp() < minTimestamp)
- columnsSeen.add(column);
- }
- }
- if (raw.size() < toReceive || toReceive == -1) {
+ public boolean next(List<KeyValue> result, int limit) throws IOException {
+ boolean moreRows = false;
+ List<KeyValue> raw = new ArrayList<KeyValue>(limit);
+ while (limit == -1 || result.size() < limit) {
+ int toReceive = limit == -1 ? -1 : limit - result.size();
+ moreRows = internalScanner.next(raw, toReceive);
+ if (raw.size() > 0) {
+ ByteArray currentRowId = new ByteArray(raw.get(0).getRow());
+ if (!currentRowId.equals(lastRowId)) {
columnsSeen.clear();
- break;
+ lastRowId = currentRowId;
}
- raw.clear();
}
- if (!moreRows) {
+ for (KeyValue kv : raw) {
+ ByteArray column = new ByteArray(kv.getFamily(), kv.getQualifier());
+ if (!columnsSeen.contains(column)) {
+ result.add(kv);
+ if (kv.getTimestamp() < minTimestamp)
+ columnsSeen.add(column);
+ }
+ }
+ if (raw.size() < toReceive || toReceive == -1) {
columnsSeen.clear();
+ break;
}
- return moreRows;
+ raw.clear();
+ }
+ if (!moreRows) {
+ columnsSeen.clear();
}
+ return moreRows;
+ }
@Override
- public void close() throws IOException {
- internalScanner.close();
- }
+ public void close() throws IOException {
+ internalScanner.close();
+ }
}
private class Handler extends SimpleChannelUpstreamHandler {
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- Object message = e.getMessage();
- // System.out.println("Received " + message);
- if (message instanceof MinimumTimestamp) {
- Compacter.this.minTimestamp = ((MinimumTimestamp) message).getTimestamp();
- } else {
- //System.out.println("Wtf " + message);
- }
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ Object message = e.getMessage();
+ // System.out.println("Received " + message);
+ if (message instanceof MinimumTimestamp) {
+ Compacter.this.minTimestamp = ((MinimumTimestamp) message).getTimestamp();
+ } else {
+ //System.out.println("Wtf " + message);
}
+ }
}
}

0 comments on commit 0457efb

Please sign in to comment.