Skip to content

Commit

Permalink
A combined checkin that includes:
Browse files Browse the repository at this point in the history
1. Integration of RiceCoder from CC private src.
2. Some Memory Mapped IO helper code (MMapUtils)
3. Better shared / copy on write semantics for TextBytes and FlexBuffer
4. Changes to various classes to reflect changes in TextBytes and FlexBuffer
   APIs.
5. RPC Compiler / Code Generator modifications to accomodate new TextBytes
   /FlexBuffer API.
6. TFile related helper utilities.
7. Added Type Parameter to RPCStruct base class.
  • Loading branch information
Ahad Rana authored and Ahad Rana committed Nov 14, 2011
1 parent 94d0011 commit badda24
Show file tree
Hide file tree
Showing 17 changed files with 1,076 additions and 104 deletions.
19 changes: 9 additions & 10 deletions src/org/commoncrawl/rpc/BinaryProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,17 @@ public boolean readBool(DataInput in) throws IOException {
}

// @Override
public FlexBuffer readFlexBuffer(DataInput in) throws IOException {
FlexBuffer buffer = null;

public void readFlexBuffer(DataInput in,FlexBuffer buffer) throws IOException {
int length = in.readInt();
if (length != 0) {
byte[] data = new byte[length];
in.readFully(data);
buffer = new FlexBuffer(data, 0, data.length);
} else {
buffer = new FlexBuffer();
// set buffer's count to zero first ...
buffer.setCount(0);
if (length != 0) {
// now reinit count to proper length ...
// this prevents the buffer from copying old data to newly
// allocated buffer if capacity < count
buffer.setCount(length);
in.readFully(buffer.get(),buffer.getOffset(),buffer.getCount());
}
return buffer;
}

// @Override
Expand Down
4 changes: 2 additions & 2 deletions src/org/commoncrawl/rpc/RPCStruct.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.io.DataOutput;
import java.io.IOException;

public abstract class RPCStruct implements Cloneable {
public abstract class RPCStruct<ConcreteType> implements Cloneable {

static protected final String emptyString = new String();

Expand All @@ -38,7 +38,7 @@ public abstract class RPCStruct implements Cloneable {
*
* @throws CloneNotSupportedException
*/
public abstract void merge(Object peer) throws CloneNotSupportedException;
public abstract void merge(ConcreteType peer) throws CloneNotSupportedException;

/**
* Serialize to the given output stream using the given protocol encoder
Expand Down
2 changes: 1 addition & 1 deletion src/org/commoncrawl/rpc/RPCStructWithId.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.commoncrawl.rpc;

public abstract class RPCStructWithId extends RPCStruct {
public abstract class RPCStructWithId<ConcreteType> extends RPCStruct<ConcreteType> {

private long _recordId;

Expand Down
11 changes: 9 additions & 2 deletions src/org/commoncrawl/rpc/compiler/JBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,23 @@ void genGetSet(CodeBuffer cb, String fname, boolean trackDirtyFields) {
cb.append("this." + fname + "=" + fname + ";\n");
cb.append("}\n");
cb.append("public void set" + toCamelCase(fname) + "( Buffer " + fname
+ ") {\n");
+ ",boolean shared) {\n");
if (trackDirtyFields) {
cb.append("__validFields.set(Field_" + fname.toUpperCase() + ");\n");
}
cb.append("this." + fname + "= new FlexBuffer(" + fname + ".get(),0,"
+ fname + ".getCount());\n");
+ fname + ".getCount(),shared);\n");
cb.append("}\n");

}

@Override
void genReadMethod(CodeBuffer cb, String fname, String tag, boolean decl) {
if (decl) {
cb.append(getType() + " " + fname + " = new FlexBuffer();\n");
}
cb.append("decoder.read" + getMethodSuffix() + "(input," + fname+ ");\n");
}
}

/** Creates a new instance of JBuffer */
Expand Down
113 changes: 71 additions & 42 deletions src/org/commoncrawl/rpc/compiler/JRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public static class Modifiers {
public static int HAS_RECORDID = 1 << 0;
public static int HAS_NO_DIRTY_TRACKING = 1 << 1;
public static int IS_ANONYMOUS_RECORD = 1 << 2;
public static int IS_RAW = 1 << 3;
}

private String fullyQualifiedName;
Expand Down Expand Up @@ -64,9 +65,13 @@ boolean isComparable() {
}

private boolean trackDirtyFields() {
return (modifiers & Modifiers.HAS_NO_DIRTY_TRACKING) == 0;
return (modifiers & (Modifiers.HAS_NO_DIRTY_TRACKING | Modifiers.IS_RAW)) == 0;
}

private boolean serializeRaw() {
return (modifiers & Modifiers.IS_RAW) != 0;
}

public String getFullName() {
return fullName;
}
Expand Down Expand Up @@ -135,10 +140,11 @@ void genCode(String destDir, ArrayList<String> options) throws IOException {
cb.append("// Generated File: " + name + "\n");
cb.append("public class " + name);
if ((modifiers & Modifiers.HAS_RECORDID) != 0) {
cb.append(" extends org.commoncrawl.rpc.RPCStructWithId ");
cb.append(" extends org.commoncrawl.rpc.RPCStructWithId");
} else {
cb.append(" extends org.commoncrawl.rpc.RPCStruct ");
cb.append(" extends org.commoncrawl.rpc.RPCStruct");
}
cb.append("<" + name +"> ");

// if the field has no key then implement Writable only
if (this.getKeyCount() == 0) {
Expand Down Expand Up @@ -266,8 +272,7 @@ void genCode(String destDir, ArrayList<String> options) throws IOException {

cb.append("// Field Declarations\n");
if (trackDirtyFields()) {
cb
.append("private BitSet __validFields = new BitSet(FieldID_MAX+1);\n\n");
cb.append("private BitSet __validFields = new BitSet(FieldID_MAX+1);\n\n");
}
// TODO: FIX FIELD DECLARATIONS AS NECESSARY
for (Iterator<JField<JavaType>> i = fields.iterator(); i.hasNext();) {
Expand Down Expand Up @@ -343,8 +348,9 @@ void genCode(String destDir, ArrayList<String> options) throws IOException {
cb.append("public final void serialize("
+ "DataOutput output,BinaryProtocol encoder)\n"
+ "throws java.io.IOException {\n");

cb.append("encoder.beginFields(output);\n");
if (!serializeRaw()) {
cb.append("encoder.beginFields(output);\n");
}
for (Iterator<JField<JavaType>> i = fields.iterator(); i.hasNext();) {
JField<JavaType> jf = i.next();
if ((jf.getModifiers() & JField.Modifiers.TRANSIENT) == 0) {
Expand All @@ -368,8 +374,9 @@ void genCode(String destDir, ArrayList<String> options) throws IOException {
// modify source object's dirty flags
cb.append("__validFields.set(Field_" + name.toUpperCase() + ");\n");
}
cb.append("encoder.beginField(output,\"" + name + "\",Field_"
+ name.toUpperCase() + ");\n");
if (!serializeRaw()) {
cb.append("encoder.beginField(output,\"" + name + "\",Field_"+ name.toUpperCase() + ");\n");
}
type.genWriteMethod(cb, name, name);
// cb.append("encoder.endField(output,\""+name+"\",Field_"+name.toUpperCase()+");\n");
cb.append("}\n");
Expand All @@ -384,8 +391,9 @@ void genCode(String destDir, ArrayList<String> options) throws IOException {
}
}
}

cb.append("encoder.endFields(output);\n");
if (!serializeRaw()) {
cb.append("encoder.endFields(output);\n");
}

cb.append("}\n");

Expand All @@ -394,37 +402,56 @@ void genCode(String destDir, ArrayList<String> options) throws IOException {
cb.append("public final void deserialize("
+ "DataInput input, BinaryProtocol decoder)\n"
+ "throws java.io.IOException {\n");
cb.append("// clear existing data first \n");
cb.append("clear();\n\n");
cb
.append("// reset protocol object to unknown field id enconding mode (for compatibility)\n");
cb
.append("decoder.pushFieldIdEncodingMode(BinaryProtocol.FIELD_ID_ENCODING_MODE_UNKNOWN);\n");
cb.append("// keep reading fields until terminator (-1) is located \n");
cb.append("int fieldId;\n");
cb.append("while ((fieldId = decoder.readFieldId(input)) != -1) { \n");

cb.append("switch (fieldId) { \n");
for (Iterator<JField<JavaType>> i = fields.iterator(); i.hasNext();) {
JField<JavaType> jf = i.next();
if ((jf.getModifiers() & JField.Modifiers.TRANSIENT) == 0) {
String name = jf.getName();
JavaType type = jf.getType();
cb.append("case Field_" + name.toUpperCase() + ":{\n");
if (trackDirtyFields()) {
cb.append("__validFields.set(Field_" + name.toUpperCase() + ");\n");
// if raw serialization ... skip clear and data driven deserialize
if (serializeRaw()) {
// iterate fields ...
for (Iterator<JField<JavaType>> i = fields.iterator(); i.hasNext();) {
JField<JavaType> jf = i.next();
// if the field is NOT TRANSIENT ...
if ((jf.getModifiers() & JField.Modifiers.TRANSIENT) == 0) {

// get name and type ...
String name = jf.getName();
JavaType type = jf.getType();
cb.append("{\n");
type.genReadMethod(cb, name, name, false);
cb.append("}\n");
}
type.genReadMethod(cb, name, name, false);
cb.append("}\n");
cb.append("break;\n");
}
}

cb.append("}\n");
cb.append("}\n");

cb.append("// pop extra encoding mode off of stack \n");
cb.append("decoder.popFieldIdEncodingMode();\n");
else {
cb.append("// clear existing data first \n");
cb.append("clear();\n\n");
cb
.append("// reset protocol object to unknown field id enconding mode (for compatibility)\n");
cb
.append("decoder.pushFieldIdEncodingMode(BinaryProtocol.FIELD_ID_ENCODING_MODE_UNKNOWN);\n");
cb.append("// keep reading fields until terminator (-1) is located \n");
cb.append("int fieldId;\n");
cb.append("while ((fieldId = decoder.readFieldId(input)) != -1) { \n");

cb.append("switch (fieldId) { \n");
for (Iterator<JField<JavaType>> i = fields.iterator(); i.hasNext();) {
JField<JavaType> jf = i.next();
if ((jf.getModifiers() & JField.Modifiers.TRANSIENT) == 0) {
String name = jf.getName();
JavaType type = jf.getType();
cb.append("case Field_" + name.toUpperCase() + ":{\n");
if (trackDirtyFields()) {
cb.append("__validFields.set(Field_" + name.toUpperCase() + ");\n");
}
type.genReadMethod(cb, name, name, false);
cb.append("}\n");
cb.append("break;\n");
}
}

cb.append("}\n");
cb.append("}\n");

cb.append("// pop extra encoding mode off of stack \n");
cb.append("decoder.popFieldIdEncodingMode();\n");
}

cb.append("}\n");

Expand Down Expand Up @@ -515,8 +542,7 @@ void genCode(String destDir, ArrayList<String> options) throws IOException {
cb.append("// merge implementation \n");
cb.append("@SuppressWarnings(\"unchecked\")\n");
cb
.append("public final void merge(Object peer_) throws CloneNotSupportedException {\n");
cb.append(name + " peer = (" + name + ") peer_;\n");
.append("public final void merge(" + name +" peer) throws CloneNotSupportedException {\n");
if (trackDirtyFields()) {
cb.append("__validFields.or(peer.__validFields);\n");
}
Expand Down Expand Up @@ -630,7 +656,10 @@ private void parseModifiers(Set<String> modifierSet) {
modifiers |= Modifiers.HAS_NO_DIRTY_TRACKING;
} else if (modifier.equals("anonymous")) {
modifiers |= Modifiers.IS_ANONYMOUS_RECORD;
} else {
} else if (modifier.equals("raw")) {
modifiers |= Modifiers.IS_RAW;
}
else {
throw new Error("Invalid Record Modifier token:" + modifier
+ " encountered while parsing Record:" + fullyQualifiedName);
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/commoncrawl/rpc/compiler/JString.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class JavaString extends JavaCompType {
@Override
void genClone(CodeBuffer cb, String type, String targetField,
String sourceField) {
cb.append(targetField + "= new TextBytes(" + sourceField + ");\n");
cb.append(targetField + "= (TextBytes)" + sourceField + ".clone();\n");
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/org/commoncrawl/util/shared/ArcFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ public final void finish() throws IOException {
.error("ArcFileBuilder Encountered Item with Zero Length Content. URI:"
+ _item.getUri());
} else {
_item.setContent(_buffer);
_item.setContent(_buffer,false);
_buffer = new Buffer();
}
_item = null;
Expand Down
42 changes: 31 additions & 11 deletions src/org/commoncrawl/util/shared/BitUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,54 @@ public class BitUtils {
public static class BitStream {

public int nbits; // number of bits in the control stream
public byte[] bits; // the actual bits of the control stream
public FlexBuffer bits = new FlexBuffer(); // the actual bits of the control stream

public BitStream() {
nbits = 0; // number of bits in the control stream
bits = new byte[4]; // the actual bits of the control stream
bits.setCapacity(4); // the actual bits of the control stream
}

public BitStream(byte[] bits, int nbits) {
this.nbits = nbits;
this.bits = bits;
this.bits.set(bits,false);
}

// add a bit to the encoding
public void addbit(int b) {
public BitStream(byte[] bits,int offset,int length, int nbits) {
this.nbits = nbits;
this.bits.set(bits,offset,length,false);
}

public BitStream(FlexBuffer buffer,int nbits){
this.nbits = nbits;
this.bits = buffer;
}

public void reset(FlexBuffer buffer,int nbits) {
this.nbits = nbits;
this.bits = buffer;
}

int len = bits.length;
// add a bit to the encoding
public BitStream addbit(int b) {
int len = bits.getCapacity();
// need to grow the bit list
if (nbits == len * 8) {
int newlen = (int) (len * 1.5) + 1;
byte tmp[] = new byte[newlen];
System.arraycopy(bits, 0, tmp, 0, bits.length);
bits = tmp;
System.arraycopy(bits.get(),bits.getOffset(), tmp, 0, bits.getCapacity());
bits.set(tmp,false);
}
if (b == 1)
bits[(nbits >> 3)] |= (1 << (nbits & 0x7));
bits.get()[bits.getOffset() + (nbits >> 3)] |= (1 << (nbits & 0x7));
nbits++;

return this;
}

public int getBit(int index) {
return (bits.get()[bits.getOffset() + (index >> 3)] >> (index & 0x7)) & 0x1;
}

}

/**
Expand All @@ -68,10 +89,9 @@ public boolean hasNext() {

// get the value of the next bit in the stream
public int getbit() {
int bit = (_bitStream.bits[(_currPos >> 3)] >> (_currPos & 0x7)) & 0x1;
int bit = (_bitStream.bits.get()[_bitStream.bits.getOffset() + (_currPos >> 3)] >> (_currPos & 0x7)) & 0x1;
_currPos++;
return bit;
}
}

}
Loading

0 comments on commit badda24

Please sign in to comment.