Permalink
Browse files

Proper redis server config, and a couple of different loading modes.

  • Loading branch information...
mattb committed Feb 19, 2011
1 parent c4c6716 commit 5337ed32a040090538196d36f72bacaac99d1bee
Showing with 57 additions and 9 deletions.
  1. +1 −0 .gitignore
  2. +56 −9 src/com/hackdiary/pig/RedisStorer.java
View
@@ -2,3 +2,4 @@ build
dist
.*.sw*
lib
+pig*.log
@@ -6,6 +6,7 @@
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.fs.*;
import org.apache.pig.*;
+import org.apache.pig.data.*;
import org.apache.pig.impl.util.*;
import org.apache.pig.data.Tuple;
import redis.clients.jedis.*;
@@ -14,25 +15,71 @@
import org.apache.commons.lang.StringUtils;
public class RedisStorer extends StoreFunc {
- protected Jedis _jedis = new Jedis("localhost");
- protected RecordWriter _writer = null;
+ protected Jedis _jedis;
+ protected RecordWriter _writer;
+ protected String _mode;
+ protected String _host;
+ protected int _port;
public RedisStorer() {
+ this("kv","localhost",6379);
}
+ public RedisStorer(String mode) {
+ this(mode,"localhost",6379);
+ }
+ public RedisStorer(String mode, String host) {
+ this(mode,host,6379);
+ }
+ public RedisStorer(String mode, String host, int port) {
+ _host = host;
+ _port = port;
+ _mode = mode;
+ }
+
@Override
public OutputFormat getOutputFormat() {
return new NullOutputFormat();
}
@Override
public void putNext(Tuple f) throws IOException {
- UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(ResourceSchema.class);
+ if(f.get(0) == null) {
+ return;
+ }
String key = f.get(0).toString();
List<Object> values = f.getAll();
- String fieldNames = property.getProperty("redis.field.names");
- if(false && fieldNames != null) {
+ if(_mode.equals("kv")) {
+ if(values.get(1) != null) {
+ _jedis.set(key,values.get(1).toString());
+ }
+ }
+ if(_mode.equals("set")) {
+ int idx = 0;
+ Pipeline p = _jedis.pipelined();
+ for(Object o : values) {
+ if(idx != 0 && o != null) {
+ switch (DataType.findType(o)) {
+ case DataType.TUPLE:
+ case DataType.BAG:
+ for(Object o2 : (Iterable)o) {
+ p.sadd(key, o2.toString());
+ }
+ break;
+ default:
+ p.sadd(key, o.toString());
+ break;
+ }
+ }
+ idx++;
+ }
+ p.execute();
+ }
+ if(_mode.equals("hash")) {
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(ResourceSchema.class);
+ String fieldNames = property.getProperty("redis.field.names");
+
String[] fields = fieldNames.split(",");
int idx = 0;
Pipeline p = _jedis.pipelined();
@@ -43,13 +90,13 @@ public void putNext(Tuple f) throws IOException {
idx++;
}
p.execute();
- } else {
- _jedis.set(key,values.get(1).toString());
}
}
+
@Override
public void prepareToWrite(RecordWriter writer) {
- this._writer = writer;
+ _writer = writer;
+ _jedis = new Jedis(_host,_port);
}
@Override

0 comments on commit 5337ed3

Please sign in to comment.