Skip to content

Commit

Permalink
mainly redis stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed Sep 30, 2011
1 parent 945411d commit e7039db
Show file tree
Hide file tree
Showing 13 changed files with 931 additions and 45 deletions.
2 changes: 1 addition & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
<target name="java-tests" depends="compile-tests">
<testng classpathref="test-classpath">
<!-- <testng classpathref="test-classpath" methods="org.nodex.tests.core.net.NetTest.*"> -->
<classfileset dir="${tests-target}" includes="**/*.class" excludes="**/RedisTest.class"/>
<classfileset dir="${tests-target}" includes="**/*.class" excludes="**/redis/*.class"/>
</testng>
</target>

Expand Down
1 change: 1 addition & 0 deletions mgmt/Planning.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ TODO
otherwise can't do proper pipelining since can't guarantee same connection.
22) Per thread connection pools. If we can have multiple connection pools, one for each core thread then they don't
need any synchronization.
23) Netclient, Redis and HTTP connection reconnection. Most of the code is there, need to implement timer for reconnect.



Expand Down
25 changes: 22 additions & 3 deletions src/main/java/org/nodex/java/addons/redis/InternalConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.nodex.java.core.ConnectionPool;
import org.nodex.java.core.DeferredAction;
import org.nodex.java.core.Handler;
import org.nodex.java.core.SimpleHandler;
import org.nodex.java.core.buffer.Buffer;
import org.nodex.java.core.internal.NodexInternal;
import org.nodex.java.core.net.NetSocket;
Expand All @@ -41,11 +42,23 @@ public class InternalConnection implements Handler<RedisReply>{
private boolean subscriber;
private ReplyHandler currentReplyHandler;
Handler<Buffer> subscriberHandler;
private boolean closed;

InternalConnection(ConnectionPool<InternalConnection> pool, NetSocket socket) {
InternalConnection(final ConnectionPool<InternalConnection> pool, final NetSocket socket) {
this.pool = pool;
this.socket = socket;
socket.dataHandler(new ReplyParser(this));
socket.closedHandler(new SimpleHandler() {
public void handle() {
System.err.println("Channel closed");
socket.close();
pool.connectionClosed();
closed = true;
if (closedHandler != null) {
closedHandler.handle(null);
}
}
});
}

void close(DeferredAction<Void> deferred) {
Expand All @@ -59,11 +72,17 @@ void close(DeferredAction<Void> deferred) {
}
}

void sendRequest(final RedisDeferred<?> deferred, Buffer buffer, long contextID) {
sendRequest(deferred, buffer, false, contextID);
private Handler<Void> closedHandler;

void closedHandler(Handler<Void> handler) {
this.closedHandler = handler;
}

void sendRequest(final RedisDeferred<?> deferred, final Buffer buffer, boolean subscribe, long contextID) {
if (closed) {
System.err.println("Socket is closed");
return;
}
if (subscriber && !subscribe) {
deferred.setException(new RedisException("It is not legal to send commands other than SUBSCRIBE and UNSUBSCRIBE when in subscribe mode"));
} else {
Expand Down
49 changes: 33 additions & 16 deletions src/main/java/org/nodex/java/addons/redis/RedisConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
*
* <pre>
* final RedisConnection conn = pool.connection();
* client.set(Buffer.create("key1"), Buffer.create("value1").handler(new CompletionHandler() {
* conn.set(Buffer.create("key1"), Buffer.create("value1").handler(new CompletionHandler() {
* public void handle(Future&lt;Void&gt; f) {
* System.out.println("The value has been successfully set");
* conn.close();
Expand All @@ -67,9 +67,13 @@
* comp.parallel(conn.set(Buffer.create("key2"), Buffer.create("value2")));
* Future&lt;Buffer&gt; result1 = comp.series(conn.get(Buffer.create("key1")));
* Future&lt;Buffer&gt; result2 = comp.parallel(conn.get(Buffer.create("key2")));
* comp.series(new SimpleAction() {
* protected void act() {
* conn.set(Buffer.create("key3"), Buffer.create(result1.result + result2.result));
* comp.series(new DeferredAction&lt;Void&gt;() {
* protected void run() {
* conn.set(Buffer.create("key3"), Buffer.create(result1.result + result2.result)).handler(new SimpleHandler() {
* public void handle() {
* DeferredAction.this.setResult(null);
* }
* }
* }
* }
* comp.parallel(conn.closeDeferred());
Expand Down Expand Up @@ -235,8 +239,8 @@ public class RedisConnection {
}

/**
* Close the file asynchronously.<p>
* This method must be called using the same event loop the file was opened from.
* Close the connection asynchronously.<p>
* This method must be called using the same event loop the connection was opened from.
* @return a Future representing the future result of closing the file.
*/
public Future<Void> close() {
Expand Down Expand Up @@ -336,7 +340,6 @@ public Deferred<Integer> del(Buffer... keys) {
return createIntegerDeferred(DEL_COMMAND, keys);
}


public Deferred<Void> discard() {
RedisDeferred<Void> deferred = createVoidDeferred(DISCARD_COMMAND);
deferred.commandType = RedisDeferred.TxCommandType.DISCARD;
Expand Down Expand Up @@ -903,7 +906,7 @@ private Deferred<Void> doSubscribe(final byte[] command, final Buffer... channe
return new RedisDeferred<Void>(RedisDeferred.DeferredType.VOID, this) {
public void run() {
final Buffer buff = createCommand(command, channels);
rc.conn.sendRequest(this, buff, true, contextID);
sendRequest(this, buff, true, contextID);
}
public void handleReply(RedisReply reply) {
rc.conn.subscribe(contextID);
Expand All @@ -916,7 +919,7 @@ private Deferred<Void> doUnsubscribe(byte[] command, Buffer... channels) {
final Buffer buff = createCommand(command, channels);
return new RedisDeferred<Void>(RedisDeferred.DeferredType.VOID, this) {
public void run() {
rc.conn.sendRequest(this, buff, true, contextID);
sendRequest(this, buff, true, contextID);
}
public void handleReply(RedisReply reply) {
int num = reply.intResult;
Expand Down Expand Up @@ -953,7 +956,7 @@ private RedisDeferred<Double> createDoubleDeferred(final byte[] command, final B
return new RedisDeferred<Double>(RedisDeferred.DeferredType.DOUBLE, this) {
public void run() {
Buffer buff = createCommand(command, args);
rc.conn.sendRequest(this, buff, contextID);
sendRequest(this, buff, contextID);
}
};
}
Expand All @@ -962,7 +965,7 @@ private RedisDeferred<Integer> createIntegerDeferred(final byte[] command, final
return new RedisDeferred<Integer>(RedisDeferred.DeferredType.INTEGER, this) {
public void run() {
Buffer buff = createCommand(command, args);
rc.conn.sendRequest(this, buff, contextID);
sendRequest(this, buff, contextID);
}
};
}
Expand All @@ -971,7 +974,7 @@ private RedisDeferred<Void> createVoidDeferred(final byte[] command, final Buffe
return new RedisDeferred<Void>(RedisDeferred.DeferredType.VOID, this) {
public void run() {
Buffer buff = createCommand(command, args);
rc.conn.sendRequest(this, buff, contextID);
sendRequest(this, buff, contextID);
}
};
}
Expand All @@ -980,7 +983,7 @@ private RedisDeferred<String> createStringDeferred(final byte[] command, final B
return new RedisDeferred<String>(RedisDeferred.DeferredType.STRING, this) {
public void run() {
Buffer buff = createCommand(command, args);
rc.conn.sendRequest(this, buff, contextID);
sendRequest(this, buff, contextID);
}
};
}
Expand All @@ -989,7 +992,7 @@ private RedisDeferred<Boolean> createBooleanDeferred(final byte[] command, final
return new RedisDeferred<Boolean>(RedisDeferred.DeferredType.BOOLEAN, this) {
public void run() {
Buffer buff = createCommand(command, args);
rc.conn.sendRequest(this, buff, contextID);
sendRequest(this, buff, contextID);
}
};
}
Expand All @@ -998,7 +1001,7 @@ private RedisDeferred<Buffer> createBulkDeferred(final byte[] command, final Buf
return new RedisDeferred<Buffer>(RedisDeferred.DeferredType.BULK, this) {
public void run() {
Buffer buff = createCommand(command, args);
rc.conn.sendRequest(this, buff, contextID);
sendRequest(this, buff, contextID);
}
};
}
Expand All @@ -1007,11 +1010,19 @@ private RedisDeferred<Buffer[]> createMultiBulkDeferred(final byte[] command, fi
return new RedisDeferred<Buffer[]>(RedisDeferred.DeferredType.MULTI_BULK, this) {
public void run() {
Buffer buff = createCommand(command, args);
conn.sendRequest(this, buff, contextID);
sendRequest(this, buff, contextID);
}
};
}

private void sendRequest(final RedisDeferred<?> deferred, final Buffer buffer, long contextID) {
sendRequest(deferred, buffer, false, contextID);
}

private void sendRequest(final RedisDeferred<?> deferred, final Buffer buffer, boolean subscribe, long contextID) {
conn.sendRequest(deferred, buffer, subscribe, contextID);
}

private Buffer[] toBufferArray(Buffer[] buffers, Buffer... others) {
Buffer[] args = new Buffer[buffers.length + others.length];
System.arraycopy(buffers, 0, args, 0, buffers.length);
Expand Down Expand Up @@ -1075,6 +1086,12 @@ public void handle(InternalConnection conn) {

private void setConnection(InternalConnection conn) {
this.conn = conn;
conn.closedHandler(new SimpleHandler() {
public void handle() {
RedisConnection.this.conn = null;
connectionRequested = false;
}
});
if (password != null) {
auth(Buffer.create(password)).execute();
}
Expand Down
19 changes: 16 additions & 3 deletions src/main/java/org/nodex/java/addons/redis/RedisPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
* {@link RedisConnection} instances via the {@link #connection} method. Once a RedisConnection has been done
* with, the {@link RedisConnection#close} method should be called to return it's underlying TCP connection to
* the pool.</p>
* <p>If Redis authentication is enabled on the server, a password should be set using the {@link #setPassword}
* method.</p>
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
Expand All @@ -42,7 +44,18 @@ protected void connect(Handler<InternalConnection> connectHandler, long contextI
private String password;

/**
* Set the port that the client will attempt to connect to on the server to {@code port}. The default value is {@code 80}<p>
* Create a new RedisPool
*/
public RedisPool() {
client.exceptionHandler(new Handler<Exception>() {
public void handle(Exception e) {
System.err.println("Failed to connect");
}
});
}

/**
* Set the port that the client will attempt to connect to on the server to {@code port}. The default value is {@code 6379}<p>
* @return A reference to this, so multiple invocations can be chained together.
*/
public RedisPool setPort(int port) {
Expand All @@ -60,8 +73,8 @@ public RedisPool setHost(String host) {
}

/**
* Set the maximum pool size to the value specified by {@code maxConnections}<p>
* The client will maintain up to {@code maxConnections} HTTP connections in an internal pool<p>
* Set the maximum pool size <p>
* The pool will maintain up to this number of Redis connections in an internal pool<p>
* @return A reference to this, so multiple invocations can be chained together.
*/
public RedisPool setMaxPoolSize(int maxConnections) {
Expand Down
30 changes: 18 additions & 12 deletions src/main/java/org/nodex/java/core/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public int getMaxPoolSize() {
return maxPoolSize;
}

public synchronized void report() {
System.out.println("available: " + available.size() + " connection count: " + connectionCount + " waiters: " + waiters.size());
}

/**
* Get a connection from the pool. The connection is returned in the handler, some time in the future if a
* connection becomes available.
Expand All @@ -56,26 +60,27 @@ public int getMaxPoolSize() {
*/
public void getConnection(Handler<T> handler, long contextID) {
boolean connect = false;
T conn;
outer: synchronized (this) {
T conn = available.poll();
conn = available.poll();
if (conn != null) {
handler.handle(conn);
break outer;
} else {
if (connectionCount < maxPoolSize) {
if (++connectionCount <= maxPoolSize) {
//Create new connection
connect = true;
break outer;
} else {
connectionCount--;
}
//Create new connection
connect = true;
connectionCount++;
break outer;
}
// Add to waiters
waiters.add(new Waiter(handler, contextID));
}
}
// We do the actual connect outside the sync block to minimise the critical section
if (connect) {
// We do this outside the sync block to minimise the critical section
if (conn != null) {
handler.handle(conn);
}
else if (connect) {
connect(handler, contextID);
}
}
Expand All @@ -86,7 +91,8 @@ public void getConnection(Handler<T> handler, long contextID) {
public void connectionClosed() {
Waiter waiter;
synchronized (this) {
if (--connectionCount < maxPoolSize) {
connectionCount--;
if (connectionCount < maxPoolSize) {
//Now the connection count has come down, maybe there is another waiter that can
//create a new connection
waiter = waiters.poll();
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/nodex/java/core/net/NetClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public void run() {
}
});
} else {

// TODO - set a timer for reconnection attempts

Throwable t = channelFuture.getCause();
if (t instanceof Exception && exceptionHandler != null) {
exceptionHandler.handle((Exception) t);
Expand Down Expand Up @@ -261,6 +264,9 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
final NioSocketChannel ch = (NioSocketChannel) e.getChannel();
final NetSocket sock = socketMap.get(ch);

System.out.println("Channel closed netty");

socketMap.remove(ch);
if (sock != null) {
runOnCorrectThread(ch, new Runnable() {
Expand Down Expand Up @@ -296,6 +302,8 @@ public void run() {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {

System.out.println("Exception caught at netty");
final NioSocketChannel ch = (NioSocketChannel) e.getChannel();
final NetSocket sock = socketMap.get(ch);
final Throwable t = e.getCause();
Expand Down
Loading

0 comments on commit e7039db

Please sign in to comment.