Skip to content
Browse files

Prepare for github

  • Loading branch information...
1 parent 34aa63e commit 5948bf10fb1f5f901844312707d208a32d4236c3 @wallrat committed
View
4 README.md
@@ -17,7 +17,7 @@ keep the client up-to-date with Redis development. Also, useful documention for
- Replies can alse be deref:ed to their underlying values `@@(ping db) => "PONG"`
- Return values are processed as little as possible, eg. `@@(get db "xxx")` returns byte[].
Includes some helper fns for converting to `String` `(->str @(get r "xxx"))` and `String[]` (->strs)
-- Sane pub/sub support, including correct behaviour for UNSUBSCRIBE returning connection to normal state.
+- Sane pub/sub support, including correct behavior for UNSUBSCRIBE returning connection to normal state.
- Support for MULTI/EXEC and return values (see example below).
- labs-redis does not use global `*bindings*` for the connection ref (as in clj-redis and redis-clojure).
In my target code for this library talks to alot of different Redis instances and `(with-connection (client) (set key val))` adds alot of uneccesary boilerplate for us.
@@ -59,7 +59,7 @@ In my target code for this library talks to alot of different Redis instances an
=> "bar"
```
-Arguments to commands are converted in a do-what-I-mean style, so we can write code like
+Arguments to commands are converted and flattened in a do-what-I-mean style, so we can write code like
```clojure
;; ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
View
32 java/labs/redis/BulkReply.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright 2012 Preemptive Labs / Andreas Bielk (http://www.preemptive.se)
+ *
+ *************************************************************************************/
+package labs.redis;
+
+
+public class BulkReply extends Reply
+{
+ public static final char MARKER = '$';
+ public final byte[] bytes;
+
+ public BulkReply(byte[] bytes)
+ {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public byte[] getValue()
+ {
+ return bytes;
+ }
+
+ @Override
+ public String toString()
+ {
+
+ return "BulkReply{" +
+ "bytes=" + (bytes == null ? "null" : bytes.length) +
+ '}';
+ }
+}
View
136 java/labs/redis/Client.java
@@ -0,0 +1,136 @@
+/**
+ * Copyright 2012 Preemptive Labs / Andreas Bielk (http://www.preemptive.se)
+ *
+ *************************************************************************************/
+
+package labs.redis;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Client
+{
+ public static final Charset US_ASCII = Charset.forName("US-ASCII");
+ public static final Charset UTF8 = Charset.forName("UTF-8");
+
+ private static final byte[] EVALSHA_BYTES = "EVALSHA".getBytes(US_ASCII);
+ private static final byte[] PING_BYTES = "PING".getBytes(US_ASCII);
+
+ private final Map<String, byte[]> evalCache = new HashMap<String, byte[]>(16);
+ public final Connection protocol;
+ protected LinkedReplyFuture tail = null;
+
+ public Client(final Socket socket)
+ throws IOException
+ {
+ protocol = new Connection(socket);
+ }
+
+ public Client(String host, int port)
+ throws IOException
+ {
+ protocol = new Connection(SocketFactory.newSocket(host, port));
+ }
+
+ public Client()
+ throws IOException
+ {
+ this("localhost", 6379);
+ }
+
+
+ public synchronized LinkedReplyFuture pipeline(Object... o)
+ throws IOException
+ {
+ send(o);
+ this.tail = new LinkedReplyFuture(protocol, this.tail);
+ return this.tail;
+ }
+
+ /**
+ * Send data to Redis, should be paired with pull()
+ */
+ public synchronized void send(Object... o)
+ throws IOException
+ {
+ if (protocol.pipelined.get() > 128) tail.ensure();
+ protocol.send(o);
+ }
+
+ public synchronized LinkedReplyFuture pull()
+ {
+ this.tail = new LinkedReplyFuture(protocol, this.tail);
+ return this.tail;
+ }
+
+ public synchronized void close()
+ throws IOException
+ {
+ this.protocol.close();
+ }
+
+ // benchmark impl of PING
+ public synchronized LinkedReplyFuture ping()
+ throws IOException
+ {
+ send(new Object[]{PING_BYTES});
+ this.tail = new LinkedReplyFuture(protocol, this.tail);
+ return this.tail;
+ }
+
+
+ public synchronized LinkedReplyFuture eval(String lua, Object[] keys, Object[] args)
+ throws IOException
+ {
+ byte[] sha1 = evalCache.get(lua);
+ if (sha1 == null)
+ {
+ sha1 = (byte[]) pipeline("SCRIPT", "LOAD", lua).get().getValue();
+ evalCache.put(lua, sha1);
+ }
+
+ Object[] args2 = new Object[keys.length + args.length + 3];
+ args2[0] = EVALSHA_BYTES;
+ args2[1] = sha1;
+ args2[2] = keys.length;
+ System.arraycopy(keys, 0, args2, 3, keys.length);
+ System.arraycopy(args, 0, args2, 3 + keys.length, args.length);
+
+ send(args2);
+ this.tail = new LinkedReplyFuture(protocol, this.tail);
+ return this.tail;
+ }
+
+ /**
+ * EXEC and update tail with results
+ */
+ public synchronized MultiBulkReply execWithResults()
+ throws IOException
+ {
+ // capture tail
+ LinkedReplyFuture t = tail;
+
+ // EXEC
+ final MultiBulkReply exec = (MultiBulkReply) pipeline("EXEC").get();
+
+ // update tail
+ for (int i = exec.values.length - 1; i >= 0; i--)
+ {
+ // assertions
+ if (t == null) throw new IllegalStateException("Missing tail");
+ if (t.value != StatusReply.QUEUED)
+ throw new IllegalStateException("Currupt tail, expected QUEUED, got " + t.value.getValue());
+
+ t.value = exec.values[i];
+ t = t.tail;
+ }
+
+ // assertion
+ if (t != null) throw new IllegalStateException("Found longer tail than expected " + t.tail.value);
+
+ return exec;
+ }
+}
View
79 java/labs/redis/ClientPool.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2012 Preemptive Labs / Andreas Bielk (http://www.preemptive.se)
+ *
+ *************************************************************************************/
+
+package labs.redis;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+public class ClientPool
+{
+ private final Queue<Client> queue = new ArrayBlockingQueue<Client>(100, true);
+ private final String host;
+ private final int port;
+ private final boolean testOnBorrow;
+
+ public ClientPool(String host, int port, boolean testOnBorrow)
+ {
+ this.host = host;
+ this.port = port;
+ this.testOnBorrow = testOnBorrow;
+ }
+
+ public ClientPool(boolean testOnBorrow)
+ {
+ this("localhost",6379, testOnBorrow);
+ }
+
+ public synchronized int size() { return queue.size(); }
+
+ public synchronized Client borrow()
+ throws IOException
+ {
+ final Client client = queue.poll();
+
+ if (client != null)
+ {
+ if (valid(client))
+ return client;
+ else
+ return borrow();
+ }
+
+ return new Client(SocketFactory.newSocket(host, port));
+ }
+
+ public synchronized void release(Client client)
+ {
+ // validate
+ if (client != null && valid(client)) queue.add(client);
+ }
+
+ private boolean valid(final Client client)
+ {
+ if (this.testOnBorrow)
+ {
+ try
+ {
+ return client.ping().get() == StatusReply.PONG;
+ }
+ catch (IOException e)
+ {
+ return false;
+ }
+ }
+
+ return client.protocol.isConnected();
+ }
+
+ public synchronized void flush()
+ throws IOException
+ {
+ Client c = null;
+ while((c=queue.poll()) != null)
+ c.close();
+ }
+}
View
405 java/labs/redis/Connection.java
@@ -0,0 +1,405 @@
+/*
+ * Inspiration (and some code) from https://github.com/spullara/redis-protocol Copyright 2012 Sam Pullara
+ * (no original copyright notice in source, originally Apache License 2.0)
+ *
+ * Copyright 2012 Preemptive Labs / Andreas Bielk (http://www.preemptive.se)
+ *************************************************************************************/
+package labs.redis;
+
+import java.io.*;
+import java.net.Socket;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class is not thread-safe. Sync is implemented in Client for writer-side
+ * and LinkedFuture for reader-side.
+ * <p/>
+ * No, sync should be in Connection, because readAsync touches os.flush().
+ * The bad think is we also need to synch in Client, because we use tail.
+ * Can we do smthng else? Compare and swap?
+ */
+public class Connection
+{
+ public static final Charset US_ASCII = Charset.forName("US-ASCII");
+ public static final Charset UTF8 = Charset.forName("UTF-8");
+
+ public static final byte[] ARGS_PREFIX = "*".getBytes();
+ public static final byte[] CRLF = "\r\n".getBytes();
+ public static final byte[] OK = "OK".getBytes(US_ASCII);
+ public static final byte[] PONG = "PONG".getBytes(US_ASCII);
+ public static final byte[] QUEUED = "QUEUED".getBytes(US_ASCII);
+ public static final byte[] BYTES_PREFIX = "$".getBytes();
+ public static final byte[] EMPTY_BYTES = new byte[0];
+ public static final byte[] NEG_ONE = convert(-1, false);
+ public static final byte[] NEG_ONE_WITH_CRLF = convert(-1, true);
+ public static final char LF = '\n';
+ public static final char CR = '\r';
+ private static final char ZERO = '0';
+
+ final AtomicInteger pipelined = new AtomicInteger(0);
+
+ public final Socket socket;
+ public boolean failed = false;
+
+ private final BufferedInputStream is;
+ private final OutputStream os;
+
+ public Connection(Socket socket)
+ throws IOException
+ {
+ this.socket = socket;
+ is = new BufferedInputStream(socket.getInputStream());
+ os = new BufferedOutputStream(socket.getOutputStream());
+ }
+
+ public void close()
+ throws IOException
+ {
+ if (!socket.isClosed())
+ socket.close();
+ }
+
+
+ public boolean isConnected()
+ {
+ return socket != null
+ && socket.isBound()
+ && !socket.isClosed()
+ && socket.isConnected()
+ && !socket.isInputShutdown()
+ && !socket.isOutputShutdown()
+ && !failed;
+ }
+
+
+ ////////////////////////////////////////////// read
+ public Reply receive()
+ throws IOException
+ {
+ try
+ {
+//synchronized (os)
+ {
+ os.flush();
+ }
+
+ //synchronized (is)
+ {
+ Reply r = receiveReply(is);
+ pipelined.decrementAndGet();
+ return r;
+ }
+ }
+ catch (IOException e)
+ {
+ this.failed = true;
+ throw e;
+ }
+ }
+
+
+ private Reply receiveReply(InputStream is)
+ throws IOException
+ {
+ int code = is.read();
+ switch (code)
+ {
+ case StatusReply.MARKER:
+ {
+ byte[] buf = readStatus(is);
+
+ // optimze 'OK\r\n'
+ if (buf == OK)
+ return StatusReply.OK;
+
+ // optimze 'PONG\r\n'
+ if (buf == PONG)
+ return StatusReply.PONG;
+
+ // TODO: handle QUEUED
+ if (Arrays.equals(QUEUED, buf))
+ return StatusReply.QUEUED;
+
+ return new StatusReply(buf);
+ }
+ case ErrorReply.MARKER:
+ {
+ return new ErrorReply(readStatus(is));
+ }
+ case IntegerReply.MARKER:
+ {
+ return new IntegerReply(readInteger(is));
+ }
+ case BulkReply.MARKER:
+ {
+ return new BulkReply(readBytes(is));
+ }
+ case MultiBulkReply.MARKER:
+ {
+ return new MultiBulkReply(readMultiBulk(is));
+ }
+ default:
+ {
+ throw new IOException("Unexpected character in stream: " + code);
+ }
+ }
+ }
+
+ private Reply[] readMultiBulk(InputStream is)
+ throws IOException
+ {
+ int size = readInteger(is);
+ Reply[] values = new Reply[size];
+ for (int i = 0; i < values.length; i++)
+ values[i] = receiveReply(is);
+ return values;
+ }
+
+ private static byte[] readStatus(InputStream is)
+ throws IOException
+ {
+ // DataInputStream.readLine
+ byte buf[] = new byte[128];
+ int room = buf.length;
+ int offset = 0;
+ int c;
+
+ loop:
+ while (true)
+ {
+ switch (c = is.read())
+ {
+ case -1:
+ case '\n':
+ break loop;
+
+ case '\r':
+ int c2 = is.read();
+ if ((c2 != '\n') && (c2 != -1))
+ {
+ if (!(is instanceof PushbackInputStream))
+ is = new PushbackInputStream(is);
+
+ ((PushbackInputStream) is).unread(c2);
+ }
+ break loop;
+
+ default:
+ if (--room < 0)
+ {
+ byte[] newBuffer = new byte[offset + 128];
+ room = buf.length - offset - 1;
+ System.arraycopy(newBuffer, 0, buf, 0, offset);
+ buf = newBuffer;
+ }
+
+ buf[offset++] = (byte) c;
+ break;
+ }
+ }
+
+ if ((c == -1) && (offset == 0))
+ {
+ return null;
+ }
+
+ // optimze 'OK\r\n'
+ if (offset == 2 && buf[0] == 'O' && buf[1] == 'K')
+ return OK;
+
+ // optimze 'PONG\r\n'
+ if (offset == 4 && buf[0] == 'P' && buf[1] == 'O' && buf[2] == 'N' && buf[3] == 'G')
+ return PONG;
+
+ //TODO: optimize 'QUEUED\r\n'
+
+ byte[] r = new byte[offset];
+ System.arraycopy(buf, 0, r, 0, offset);
+ return r;
+ }
+
+ private static int readInteger(InputStream is)
+ throws IOException
+ {
+ int size = 0;
+ int sign = 1;
+ int read = is.read();
+ if (read == '-')
+ {
+ read = is.read();
+ sign = -1;
+ }
+
+ do
+ {
+ if (read == CR)
+ {
+ if (is.read() == LF)
+ {
+ break;
+ }
+ }
+
+ int value = read - ZERO;
+ if (value >= 0 && value < 10)
+ {
+ size *= 10;
+ size += value;
+ }
+ else
+ {
+ throw new IOException("Invalid character in integer");
+ }
+
+ read = is.read();
+ }
+ while (true);
+
+ return size * sign;
+ }
+
+ private static byte[] readBytes(InputStream is)
+ throws IOException
+ {
+ int size = readInteger(is);
+ int read;
+ if (size == -1)
+ return null;
+
+ byte[] bytes = new byte[size];
+ int total = 0;
+ while (total < bytes.length && (read = is.read(bytes, total, bytes.length - total)) != -1)
+ total += read;
+
+ if (total < bytes.length)
+ throw new IOException("Failed to read enough bytes: " + total);
+
+ int cr = is.read();
+ int lf = is.read();
+ if (cr != CR || lf != LF)
+ throw new IOException("Improper line ending: " + cr + ", " + lf);
+
+ return bytes;
+ }
+
+
+ ////////////////////////////////////////////// write
+
+ public void send(Object[] objects)
+ throws IOException
+ {
+ try
+ {
+//synchronized (os)
+ {
+ write(os, objects);
+ pipelined.incrementAndGet();
+ }
+ }
+ catch (IOException e)
+ {
+ failed = true;
+ throw e;
+ }
+ }
+
+ private static void write(OutputStream os, Object... objects)
+ throws IOException
+ {
+ os.write(ARGS_PREFIX);
+ os.write(numToBytes(objects.length, true));
+
+ for (Object object : objects)
+ {
+ os.write(BYTES_PREFIX);
+
+ byte[] b;
+ if (object == null)
+ b = EMPTY_BYTES;
+ else if (object instanceof byte[])
+ b = (byte[]) object;
+ else if (object instanceof Number)
+ b = numToBytes(((Number) object).longValue(), false);
+ else
+ b = object.toString().getBytes(UTF8);
+
+ os.write(numToBytes(b.length, true));
+ os.write(b);
+ os.write(CRLF);
+ }
+ }
+
+
+ // itoa impl from https://github.com/spullara/redis-protocol Copyright 2012 Sam Pullara
+ private static final int NUM_MAP_LENGTH = 256;
+ private static final byte[][] numMap = new byte[NUM_MAP_LENGTH][];
+ private static final byte[][] numMapWithCRLF = new byte[NUM_MAP_LENGTH][];
+
+ static
+ {
+ for (int i = 0; i < NUM_MAP_LENGTH; i++)
+ {
+ numMap[i] = convert(i, false);
+ numMapWithCRLF[i] = convert(i, true);
+ }
+ }
+
+
+ // Optimized for the direct to ASCII bytes case
+ // Could be even more optimized but it is already
+ // about twice as fast as using Long.toString().getBytes()
+ private static byte[] numToBytes(long value, boolean withCRLF)
+ {
+ if (value >= 0 && value < NUM_MAP_LENGTH)
+ {
+ int index = (int) value;
+ return withCRLF ? numMapWithCRLF[index] : numMap[index];
+ }
+ else if (value == -1)
+ {
+ return withCRLF ? NEG_ONE_WITH_CRLF : NEG_ONE;
+ }
+ return convert(value, withCRLF);
+ }
+
+ private static byte[] convert(long value, boolean withCRLF)
+ {
+ boolean negative = value < 0;
+ int index = negative ? 2 : 1;
+ long current = negative ? -value : value;
+ while ((current /= 10) > 0)
+ {
+ index++;
+ }
+ byte[] bytes = new byte[withCRLF ? index + 2 : index];
+ if (withCRLF)
+ {
+ bytes[index + 1] = LF;
+ bytes[index] = CR;
+ }
+ if (negative)
+ {
+ bytes[0] = '-';
+ }
+ current = negative ? -value : value;
+ long tmp = current;
+ while ((tmp /= 10) > 0)
+ {
+ bytes[--index] = (byte) ('0' + (current % 10));
+ current = tmp;
+ }
+ bytes[--index] = (byte) ('0' + current);
+ return bytes;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Connection{" +
+ "pipelined=" + pipelined +
+ ", socket=" + socket +
+ '}';
+ }
+}
View
30 java/labs/redis/ErrorReply.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright 2012 Preemptive Labs / Andreas Bielk (http://www.preemptive.se)
+ *
+ *************************************************************************************/
+package labs.redis;
+
+public class ErrorReply extends Reply
+{
+ public static final char MARKER = '-';
+ private final byte[] error;
+
+ public ErrorReply(byte[] error) {
+ this.error = error;
+ }
+
+ @Override
+ public String getValue()
+ {
+ return new String(error);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ErrorReply{" +
+ "error='" + getValue() + '\'' +
+ '}';
+ }
+
+}
View
31 java/labs/redis/IntegerReply.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright 2012 Preemptive Labs / Andreas Bielk (http://www.preemptive.se)
+ *
+ *************************************************************************************/
+package labs.redis;
+
+public class IntegerReply extends Reply
+{
+ public static final char MARKER = ':';
+ public final long integer;
+
+ public IntegerReply(long integer)
+ {
+ this.integer = integer;
+ }
+
+ @Override
+ public Long getValue()
+ {
+ return integer;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "IntegerReply{" +
+ "integer=" + integer +
+ '}';
+ }
+
+}
View
61 java/labs/redis/LinkedReplyFuture.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright 2012 Preemptive Labs / Andreas Bielk (http://www.preemptive.se)
+ *
+ *************************************************************************************/
+
+package labs.redis;
+
+import java.io.IOException;
+import clojure.lang.IDeref;
+
+public class LinkedReplyFuture implements IDeref
+{
+ private final Connection connection;
+ LinkedReplyFuture tail;
+ protected Reply value;
+
+ public LinkedReplyFuture(Connection connection, LinkedReplyFuture tail)
+ {
+ this.connection = connection;
+ this.tail = tail;
+ }
+
+ // QUEUED values will result in Futures keeping it's tail
+ // so we can complete them at a later time (EXEC)
+ public boolean realizeValue()
+ throws IOException
+ {
+ if (value != null) return (value != StatusReply.QUEUED); // done
+
+ value = this.connection.receive();
+ return (value != StatusReply.QUEUED);
+ }
+
+ protected synchronized boolean ensure()
+ throws IOException
+ {
+ if (tail != null && tail.ensure())
+ tail = null;
+
+ return realizeValue();
+ }
+
+ public synchronized Reply get()
+ throws IOException
+ {
+ ensure();
+ return value;
+ }
+
+ public Object deref()
+ {
+ try
+ {
+ return get();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
View
30 java/labs/redis/MultiBulkReply.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright 2012 Preemptive Labs / Andreas Bielk (http://www.preemptive.se)
+ *
+ *************************************************************************************/
+package labs.redis;
+
+public class MultiBulkReply extends Reply
+{
+ public static final char MARKER = '*';
+ public final Reply[] values;
+
+ public MultiBulkReply(Reply[] values)
+ {
+ this.values = values;
+ }
+
+ @Override
+ public Reply[] getValue()
+ {
+ return values;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "MultiBulkReply{" +
+ "byteArrays=" + values.length +
+ '}';
+ }
+}
View
16 java/labs/redis/Reply.java
@@ -0,0 +1,16 @@
+/**
+ * Copyright 2012 Preemptive Labs / Andreas Bielk (http://www.preemptive.se)
+ *
+ *************************************************************************************/
+package labs.redis;
+import clojure.lang.IDeref;
+
+public abstract class Reply implements IDeref
+{
+ public abstract Object getValue();
+
+ public Object deref()
+ {
+ return getValue();
+ }
+}
View
30 java/labs/redis/SocketFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright 2012 Preemptive Labs / Andreas Bielk (http://www.preemptive.se)
+ *
+ *************************************************************************************/
+package labs.redis;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+public class SocketFactory
+{
+ public static final int timeout = 10000;
+
+ public static Socket newSocket(String host, int port)
+ throws IOException
+ {
+ final Socket socket = new Socket();
+
+ socket.setReuseAddress(true);
+ socket.setKeepAlive(true); //Will monitor the TCP connection is valid
+ socket.setTcpNoDelay(true); //Socket buffer Whetherclosed, to ensure timely delivery of data
+ socket.setSoLinger(true, 0); //Control calls close () method, the underlying socket is closed immediately
+
+ socket.connect(new InetSocketAddress(host, port), timeout);
+ //socket.setSoTimeout(timeout);
+
+ return socket;
+ }
+}
View
45 java/labs/redis/StatusReply.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright 2012 Preemptive Labs / Andreas Bielk (http://www.preemptive.se)
+ *
+ *************************************************************************************/
+package labs.redis;
+
+public class StatusReply extends Reply
+{
+ public static final char MARKER = '+';
+ public static final StatusReply OK = new StatusReply("OK");
+ public static final StatusReply PONG = new StatusReply("PONG");
+ public static final StatusReply QUEUED = new StatusReply("QUEUED");
+
+ private String status;
+ private byte[] statusBytes;
+
+
+ public StatusReply(String status)
+ {
+ this.status = status;
+ }
+
+ public StatusReply(byte[] statusBytes)
+ {
+ this.statusBytes = statusBytes;
+ }
+
+ @Override
+ public String getValue()
+ {
+ if (status == null && statusBytes != null)
+ status = new String(statusBytes);
+
+ return status;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StatusReply{" +
+ "status='" + getValue() + '\'' +
+ '}';
+ }
+
+}
View
10 project.clj
@@ -0,0 +1,10 @@
+(defproject labs.redis "0.1.0-SNAPSHOT"
+ :description "FIXME: write description"
+ :dependencies [[org.clojure/clojure "1.3.0"]
+ [org.clojure/data.json "0.1.1"]
+ [redis.clients/jedis "2.0.0"]
+ [criterium "0.2.0"]
+ ]
+ :dev-dependencies [[clojure-source "1.3.0"]]
+ :java-source-path "java"
+ :main labs.redis.core)
View
1,779 src/commands.json
1,779 additions, 0 deletions not shown because the diff is too large. Please use a local Git client to view these changes.
View
326 src/labs/redis/core.clj
@@ -0,0 +1,326 @@
+(ns ^{:doc "A Redis client library for Clojure."
+ :author "Andreas Bielk (@wallrat)"}
+ labs.redis.core
+ (:refer-clojure :exclude [get set keys type eval sort sync])
+ (:use [clojure.repl]
+ [clojure.pprint :only (pprint)]
+ [clojure.java.io :only (resource)])
+ (:require [clojure.data.json :as json])
+ (:import [labs.redis Client ClientPool Reply BulkReply MultiBulkReply]))
+
+(set! *warn-on-reflection* true)
+
+(defn parse-url [url]
+ (let [u (java.net.URI. url)
+ ui (.getUserInfo u)
+ [user password] (when ui (.split ui ":"))]
+ {:host (.getHost u)
+ :port (.getPort u)
+ :user user
+ :password password}))
+
+(defn client
+ "Creates and returns an Redis client"
+ ([] (Client.))
+ ([{:keys [host port timeout test-on-borrow] :as opts}]
+ (Client. ^String host ^int port)))
+
+(defn pool
+ "Creates and returns a pool of Redis clients"
+ ([] (pool (parse-url "redis://localhost:6379")))
+ ([{:keys [host port timeout test-on-borrow] :as opts}]
+ (ClientPool. host port test-on-borrow)))
+
+;; (defmacro with-pool [name pool & body]
+;; `(let [~name (.borrow ~pool)]
+;; (try
+;; (let [result# (do ~@body)]
+;; (.release ~pool ~name)
+;; result#)
+;; (catch Exception e#
+;; (.release ~pool ~name)
+;; (throw e#)))))
+
+(defmacro with-pool [name pool & body]
+ `(let [~name (.borrow ~pool)]
+ (try
+ (let [result# (do ~@body)]
+ result#)
+ (finally
+ (.release ~pool ~name)))))
+
+;; helper fns for futures and values
+
+(defn value [^Reply reply]
+ (.getValue reply))
+
+(let [byte-array-class (Class/forName "[B")]
+ (defn ->str [reply]
+ "Coerces reply into a String."
+ (condp instance? reply
+ byte-array-class (String. ^bytes reply)
+ BulkReply (String. (.bytes ^BulkReply reply))
+ java.lang.Object (.toString ^java.lang.Object reply))))
+
+(defn ->strs [reply]
+ (map ->str (value reply)))
+
+(defn ->>str [r]
+ (condp instance? r
+ MultiBulkReply (map ->>str (value r))
+ Reply (->>str (value r))
+ java.lang.Object (->str r)))
+
+;; low level redis protocol fns
+
+(defn- cmd-arg-convert
+ "Converts arguments to cmd* to acceptable java interop values"
+ [v]
+ (cond
+ (string? v) v
+ (instance? java.lang.Number v) v
+ (keyword? v) (name v)
+ (map? v) (map cmd-arg-convert v)
+ (vector? v) (map cmd-arg-convert v)
+ ;; string
+ ;; byte[]
+ :default (.toString ^Object v)))
+
+
+(defn cmd*
+ "Low-level fn for sending commands to redis. Returns a LinkedReplyFuture
+ Example (cmd* db \"SET\" [\"mykey\" \"myval\"])"
+ ([^Client R cmd ks1 ks2] (cmd* R cmd (concat ks1 ks2)))
+ ([^Client R cmd ks]
+ (let [cv (flatten (map cmd-arg-convert ks))
+ args (into-array java.lang.Object (cons cmd cv))]
+ (.pipeline R args))))
+
+(defn cmd**
+ ([^Client R cmd ks1 ks2] (cmd* R cmd (concat ks1 ks2)))
+ ([^Client R cmd ks]
+ (let [cv (flatten (map cmd-arg-convert ks))
+ args (into-array java.lang.Object (cons cmd cv))]
+ (.send R args))))
+
+;; high level redis commands
+
+(def REDIS-COMMANDS
+ (dissoc (json/read-json (slurp (resource "commands.json")))
+ :SUBSCRIBE :UNSUBSCRIBE :PSUBSCRIBE :PUNSUBSCRIBE :MONITOR))
+
+(defn- redis-doc-str
+ "Creates a doc string matching http://redis.io"
+ ;; ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
+ [n {args :arguments}]
+ (let [fmt (fn [a]
+ (let [s (cond
+ (:command a) (str (:command a) " "
+ (if (= (:type a) "enum")
+ (apply str (interpose "|" (:enum a)))
+ (if (string? (:name a))
+ (:name a)
+ (apply str (interpose " " (:name a))))))
+ (= (:type a) "enum") (apply str (interpose "|" (:enum a)))
+ :default (:name a))
+ s (if (:multiple a) (str s " [" s " ..]") s)
+ s (if (:optional a) (str "[" s "]") s)]
+ s))
+ ]
+ (str (.toUpperCase (name n)) " "
+ (apply str (interpose " " (map fmt args))))))
+
+(defn- fn-docs [n m]
+ (str (redis-doc-str n m) "\n"
+ " " (:summary m) "\n"
+ " " "Since Redis version " (:since m)))
+
+(defn- fn-args [m]
+ (if (:arguments m)
+ ['db '& 'args]
+ ['db]))
+
+(defn- create-cmd* [^String n m]
+ (let [cmd-parts (seq (.split (.toUpperCase n) " ")) ;; handle 'DEBUG OBJECT'
+ cmd-name (first cmd-parts)
+ static-args (apply vector (rest cmd-parts))
+ fn-name (.replaceAll (.toLowerCase n) " " "-")
+ args (fn-args m)
+ params (if (second args) ['args])
+ dox (fn-docs n m)
+ ]
+ `(let [name# (.getBytes ~cmd-name)]
+ (defn ~(symbol fn-name) ~dox ~args
+ (cmd* ~'db name# ~static-args ~@params)))
+ ))
+
+(defn- create-cmd
+ "Create a new fn from a redis.io documentation map"
+ [n m]
+ (clojure.core/eval (create-cmd* n m)))
+
+(defn create-cmds []
+ (doseq [[cmd-name cmd-def] REDIS-COMMANDS]
+ ;; (println "adding " cmd-name)
+ (create-cmd (name cmd-name) cmd-def)))
+
+(defn spit-cmds [f]
+ (spit f (pr-str
+ (for [[cmd-name cmd-def] REDIS-COMMANDS]
+ (create-cmd* (name cmd-name) cmd-def)))))
+
+(create-cmds)
+
+
+;; EVAL
+;; eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
+(defmacro defeval-naive
+ "Creates a custom EVAL command"
+ [name keys args lua]
+ `(defn ~name [~'db ~@keys ~@args]
+ (cmd* ~'db "EVAL" [~lua ~(count keys)] [~@keys ~@args])))
+
+(defmacro defeval
+ "Creates a custom EVAL command. Scripts are sent once (SCRIPT LOAD) per connection, then ran with EVALSHA."
+ ([name keys args lua]
+ `(defn ~name [~'db ~@keys ~@args]
+ (.eval ~'db ~lua
+ (into-array java.lang.Object ~keys)
+ (into-array java.lang.Object ~args))))
+ ([name doc-string keys args lua]
+ `(defn ~name ~doc-string [~'db ~@keys ~@args]
+ (.eval ~'db ~lua
+ (into-array java.lang.Object ~keys)
+ (into-array java.lang.Object ~args)))))
+
+
+;; PUB/SUB
+(let [MESSAGE (.getBytes "message")
+ SUBSCRIBE (.getBytes "subscribe")
+ UNSUBSCRIBE (.getBytes "unsubscribe")
+ acmp (fn [^bytes a ^bytes b] (java.util.Arrays/equals a b))]
+ (defn subscribe-with
+ "Listen for messages published to given channels.
+ Calls handler with (handler db channel message) on messages. Handler can return false to unsubscribe all channels and make the connection available again. Runs in current thread, returns when all channels are unsubscribed (connection no longer in pub/sub special state).
+ Since Redis version 1.3.8"
+ [^Client db channels handler]
+ (cmd** db SUBSCRIBE channels)
+
+ (loop [subscribed-channels (count channels)]
+ ;; (print subscribed-channels " SW ")
+ (when (> subscribed-channels 0)
+ (let [r @(.pull db)
+ ^objects v (value r)
+ cmd (.getValue ^Reply (aget v 0))
+ channel (->str (aget v 1))
+ message (aget v 2)]
+ ;; (println (->str cmd) channel (->str message))
+ (cond
+ (acmp cmd SUBSCRIBE) (recur (int (value (aget v 2))))
+ (acmp cmd UNSUBSCRIBE) (recur (int (value (aget v 2))))
+ (acmp cmd MESSAGE) (do
+ (when-not (handler db channel message)
+ (cmd** db UNSUBSCRIBE channels))
+ (recur subscribed-channels))))))))
+
+(let [MESSAGE (.getBytes "pmessage")
+ SUBSCRIBE (.getBytes "psubscribe")
+ UNSUBSCRIBE (.getBytes "punsubscribe")
+ acmp (fn [^bytes a ^bytes b] (java.util.Arrays/equals a b))]
+ (defn psubscribe-with
+ "Listen for messages published to given channels matching given patterns.
+ Calls handler with (handler db channel message) on messages. Handler can return false to unsubscribe all channels and make the connection available again. Runs in current thread, returns when all channels are unsubscribed (connection no longer in pub/sub special state).
+ Since Redis version 1.3.8"
+ [^Client db channels handler]
+ (cmd** db SUBSCRIBE channels)
+
+ (loop [subscribed-channels (count channels)]
+ ;; (print subscribed-channels " SW ")
+ (when (> subscribed-channels 0)
+ (let [r @(.pull db)
+ ^objects v (value r)
+ cmd (.getValue ^Reply (aget v 0))]
+ ;; (println (->str cmd) (alength v) (->str (aget v 1)))
+ (cond
+ (acmp cmd SUBSCRIBE) (recur (int (value (aget v 2))))
+ (acmp cmd UNSUBSCRIBE) (recur (int (value (aget v 2))))
+ (acmp cmd MESSAGE) (do
+ (when-not (handler db (->str (aget v 1)) (->str (aget v 2)) (aget v 3))
+ (cmd** db UNSUBSCRIBE channels))
+ (recur subscribed-channels))))))))
+
+(defn example-subscribe-with [db]
+ (subscribe-with
+ db
+ ["msgs" "msgs2" "msgs3"]
+ (fn [db channel message]
+ (let [message (->str message)]
+ (println "R " channel message)
+ (when (= "u" message)
+ (cmd** db "UNSUBSCRIBE" [channel]))
+ (not (= "quit" message))))))
+
+(defn example-psubscribe-with [db]
+ (psubscribe-with
+ db
+ ["msgs.*"]
+ (fn [db pattern channel message]
+ (let [message (->str message)]
+ (println "R " pattern channel message)
+ (not (= "quit" message))))))
+
+;; Transactions
+;; Like atomically in redis-clojure
+(defmacro atomically
+ "Execute all redis commands in body in a MULTI/EXEC. If an exception is thrown the
+ the transaction will be closed by an DISCARD, and the exception will be rethrown.
+Any exceptions thrown by DISCARD will be ignored."
+ [db & body]
+ `(do
+ (multi ~db)
+ (try
+ (do
+ ~@body
+ (exec ~db))
+ (catch Throwable e#
+ ;; on DISCARD we .ensure to flush the pipeline
+ (try @(discard ~db)
+ (finally (throw e#)))))))
+
+(defn exec!
+ "EXEC
+Execute all commands issued after MULTI.
+Completes QUEUED futures with results.
+Since Redis version 1.1.95"
+ [^Client db]
+ (.execWithResults db))
+
+(comment
+ (try
+ (multi r)
+ (let [_ (mset r "k1" "v1" "k2" "v2")
+ a (mget r "k1" "k2")
+ b (set r "k1" "xx")]
+ (println @a @b) ;; QUEUED QUEUED
+ (exec! r)
+ (println @a @b)) ;; xx v2
+ (catch Throwable t
+ (try @(discard r)
+ (finally (throw t)))))
+ )
+
+;; info helper
+(defn info!
+ "Blocking version of INFO that parses value into a map.
+ For more info, see (doc info)"
+ [db]
+ (let [s ^String (->str @(cmd* db "INFO" []))
+ i (map #(seq (.split ^String % ":")) (seq (.split s "\r\n")))
+ f (filter second i)]
+ (zipmap (map first f) (map second f))))
+
+
+(defn -main []
+ (pprint
+ (let [r (client)]
+ (->>str @(ping r)))))

0 comments on commit 5948bf1

Please sign in to comment.
Something went wrong with that request. Please try again.