Skip to content
This repository has been archived by the owner on Dec 5, 2023. It is now read-only.

Commit

Permalink
Import SimpleJSONSerializer and SimpleJSONScheme
Browse files Browse the repository at this point in the history
  • Loading branch information
samstokes committed Sep 29, 2011
1 parent 93170fc commit ed06cfa
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 1 deletion.
20 changes: 19 additions & 1 deletion pom.xml
Expand Up @@ -3,4 +3,22 @@
<groupId>com.rapportive</groupId>
<artifactId>storm-json</artifactId>
<version>0.0.1-SNAPSHOT</version>
</project>
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.5.2</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>
</dependencies>
</project>
52 changes: 52 additions & 0 deletions src/main/java/com/rapportive/storm/scheme/SimpleJSONScheme.java
@@ -0,0 +1,52 @@
package com.rapportive.storm.scheme;

import java.io.UnsupportedEncodingException;

import java.util.Collections;
import java.util.List;

import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;

import backtype.storm.spout.Scheme;

import backtype.storm.tuple.Fields;


public class SimpleJSONScheme implements Scheme {
private static final long serialVersionUID = -7734176307841199017L;

private final String encoding;


public SimpleJSONScheme(String encoding) {
this.encoding = encoding;
}
public SimpleJSONScheme() {
this("UTF-8");
}


@Override
public List<Object> deserialize(byte[] bytes) {
final String chars;
try {
chars = new String(bytes, encoding);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
final Object json;
try {
json = JSONValue.parseWithException(chars);
} catch (ParseException e) {
throw new RuntimeException(e); // TODO this is a bit impolite
}
return Collections.singletonList(json);
}


@Override
public Fields getOutputFields() {
return new Fields("object");
}
}
@@ -0,0 +1,51 @@
package com.rapportive.storm.serializer;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;

import backtype.storm.serialization.ISerialization;

public class SimpleJSONSerializer implements ISerialization<Object> {
public static final String ENCODING = "UTF-8";

@SuppressWarnings("rawtypes")
@Override
public boolean accept(Class c) {
return JSONObject.class.equals(c) ||
JSONArray.class.equals(c);
}

@Override
public void serialize(Object object, DataOutputStream stream)
throws IOException {
final Writer writer = new OutputStreamWriter(stream, ENCODING);
if (object instanceof JSONObject) {
((JSONObject) object).writeJSONString(writer);
} else if (object instanceof JSONArray) {
((JSONArray) object).writeJSONString(writer);
} else {
throw new IllegalArgumentException("Unexpected class " + object.getClass().getCanonicalName());
}
writer.flush();
}

@Override
public Object deserialize(DataInputStream stream) throws IOException {
final Reader reader = new InputStreamReader(stream, ENCODING);
try {
return JSONValue.parseWithException(reader);
} catch (ParseException e) {
throw new RuntimeException(e); // TODO this is a bit impolite
}
}
}

0 comments on commit ed06cfa

Please sign in to comment.