Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

initial version of mongodb

  • Loading branch information...
commit bc1bbd00bb78ffde99cf127ce21248261467f994 1 parent a7039d6
Sam Pullara authored
58 mongodb/pom.xml
... ... @@ -0,0 +1,58 @@
  1 +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3 + <parent>
  4 + <artifactId>all</artifactId>
  5 + <groupId>com.github.spullara.avrobase</groupId>
  6 + <version>0.2-SNAPSHOT</version>
  7 + </parent>
  8 + <modelVersion>4.0.0</modelVersion>
  9 +
  10 + <artifactId>mongodb</artifactId>
  11 + <packaging>jar</packaging>
  12 +
  13 + <name>mongodb</name>
  14 + <url>http://maven.apache.org</url>
  15 +
  16 + <properties>
  17 + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  18 + </properties>
  19 +
  20 + <dependencies>
  21 + <dependency>
  22 + <groupId>com.github.spullara.avrobase</groupId>
  23 + <artifactId>base</artifactId>
  24 + <version>0.2-SNAPSHOT</version>
  25 + </dependency>
  26 +
  27 + <dependency>
  28 + <groupId>org.mongodb</groupId>
  29 + <artifactId>mongo-java-driver</artifactId>
  30 + <version>2.6.2</version>
  31 + </dependency>
  32 +
  33 + <dependency>
  34 + <groupId>junit</groupId>
  35 + <artifactId>junit</artifactId>
  36 + <version>4.8.1</version>
  37 + <scope>test</scope>
  38 + </dependency>
  39 + </dependencies>
  40 +
  41 + <build>
  42 + <plugins>
  43 + <plugin>
  44 + <groupId>org.apache.avro</groupId>
  45 + <artifactId>avro-maven-plugin</artifactId>
  46 + <version>${avroplugin.version}</version>
  47 + <executions>
  48 + <execution>
  49 + <phase>generate-sources</phase>
  50 + <goals>
  51 + <goal>schema</goal>
  52 + </goals>
  53 + </execution>
  54 + </executions>
  55 + </plugin>
  56 + </plugins>
  57 + </build>
  58 +</project>
166 mongodb/src/main/java/avrobase/mongodb/MongoAB.java
... ... @@ -0,0 +1,166 @@
  1 +package avrobase.mongodb;
  2 +
  3 +import avrobase.AvroBaseException;
  4 +import avrobase.AvroBaseImpl;
  5 +import avrobase.AvroFormat;
  6 +import avrobase.Row;
  7 +import com.mongodb.BasicDBObject;
  8 +import com.mongodb.DB;
  9 +import com.mongodb.DBCollection;
  10 +import com.mongodb.DBCursor;
  11 +import com.mongodb.DBObject;
  12 +import org.apache.avro.Schema;
  13 +import org.apache.avro.specific.SpecificData;
  14 +import org.apache.avro.specific.SpecificRecord;
  15 +import org.apache.avro.util.Utf8;
  16 +
  17 +import java.nio.ByteBuffer;
  18 +import java.util.Iterator;
  19 +
  20 +/**
  21 + * TODO: Edit this
  22 + * <p/>
  23 + * User: sam
  24 + * Date: 6/4/11
  25 + * Time: 6:54 PM
  26 + */
  27 +public class MongoAB<T extends SpecificRecord, K> extends AvroBaseImpl<T, K> {
  28 +
  29 + private final Schema readerSchema;
  30 + private final DBCollection rows;
  31 +
  32 + public MongoAB(DB db, String typeName, Schema readerSchema) {
  33 + super(readerSchema, AvroFormat.JSON);
  34 + this.readerSchema = readerSchema;
  35 + this.rows = db.getCollection(typeName);
  36 + rows.createIndex(b("id", 1));
  37 + }
  38 +
  39 + private BasicDBObject b() {
  40 + return new BasicDBObject();
  41 + }
  42 +
  43 + private BasicDBObject b(String name, Object value) {
  44 + return new BasicDBObject(name, value);
  45 + }
  46 +
  47 + @Override
  48 + public Row<T, K> get(K row) throws AvroBaseException {
  49 + DBCursor cursor = rows.find(r(row));
  50 + if (cursor.hasNext()) {
  51 + DBObject ro = cursor.next();
  52 + return newrow(row, ro);
  53 + }
  54 + return null;
  55 + }
  56 +
  57 + private Row<T, K> newrow(K row, DBObject ro) {
  58 + T ao = getAvroObject(ro);
  59 + return new Row<T, K>(ao, row, (Long) ro.get("version"));
  60 + }
  61 +
  62 + private T getAvroObject(DBObject ro) {
  63 + BasicDBObject vo = (BasicDBObject) ro.get("value");
  64 + Class c = SpecificData.get().getClass(readerSchema);
  65 + T ao;
  66 + try {
  67 + ao = (T) c.newInstance();
  68 + for (Schema.Field field : readerSchema.getFields()) {
  69 + String name = field.name();
  70 + Object v = vo.get(name);
  71 + if (v instanceof byte[]) {
  72 + v = ByteBuffer.wrap((byte[]) v);
  73 + }
  74 + ao.put(field.pos(), v);
  75 + }
  76 + } catch (Exception e) {
  77 + throw new AvroBaseException("Could not create object", e);
  78 + }
  79 + return ao;
  80 + }
  81 +
  82 + @Override
  83 + public K create(T value) throws AvroBaseException {
  84 + return null;
  85 + }
  86 +
  87 + @Override
  88 + public void put(K row, T value) throws AvroBaseException {
  89 + BasicDBObject ro = r(row);
  90 + ro.put("version", 1L);
  91 + ro.put("value", getDBObject(value));
  92 + rows.update(r(row), ro, true, false);
  93 + }
  94 +
  95 + private BasicDBObject getDBObject(T value) {
  96 + BasicDBObject vo = new BasicDBObject();
  97 + for (Schema.Field field : actualSchema.getFields()) {
  98 + int pos = field.pos();
  99 + Object val = value.get(pos);
  100 + if (val instanceof Utf8) {
  101 + val = val.toString();
  102 + } else if (val instanceof ByteBuffer) {
  103 + val = ((ByteBuffer)val).array();
  104 + }
  105 + vo.put(field.name(), val);
  106 + }
  107 + return vo;
  108 + }
  109 +
  110 + @Override
  111 + public boolean put(K row, T value, long version) throws AvroBaseException {
  112 + BasicDBObject ro = b("$set", r(row));
  113 + ro.put("$set", b("value", getDBObject(value)));
  114 + ro.put("$inc", b("version", 1));
  115 + BasicDBObject query = r(row);
  116 + query.put("version", version);
  117 + return rows.update(query, ro, false, false).getN() == 1;
  118 + }
  119 +
  120 + @Override
  121 + public void delete(K row) throws AvroBaseException {
  122 + rows.findAndRemove(r(row));
  123 + }
  124 +
  125 + private BasicDBObject r(K row) {
  126 + return new BasicDBObject("id", row);
  127 + }
  128 +
  129 + @Override
  130 + public Iterable<Row<T, K>> scan(K startRow, K stopRow) throws AvroBaseException {
  131 + BasicDBObject b = b();
  132 + if (startRow != null) {
  133 + if (stopRow == null) {
  134 + b.put("id", b("$gte", startRow));
  135 + } else {
  136 + BasicDBObject query = b("$gte", startRow);
  137 + query.put("$lt", stopRow);
  138 + b = b("id", query);
  139 + }
  140 + } else if (stopRow != null) b.put("id", b("$lt", stopRow));
  141 + final DBCursor dbCursor = rows.find(b);
  142 + dbCursor.sort(b("id", 1));
  143 + return new Iterable<Row<T, K>>() {
  144 + @Override
  145 + public Iterator<Row<T, K>> iterator() {
  146 + final Iterator<DBObject> iterator = dbCursor.iterator();
  147 + return new Iterator<Row<T, K>>() {
  148 + @Override
  149 + public boolean hasNext() {
  150 + return iterator.hasNext();
  151 + }
  152 +
  153 + @Override
  154 + public Row<T, K> next() {
  155 + DBObject next = iterator.next();
  156 + return newrow((K) next.get("id"), next);
  157 + }
  158 +
  159 + @Override
  160 + public void remove() {
  161 + }
  162 + };
  163 + }
  164 + };
  165 + }
  166 +}
61 mongodb/src/test/avro/User.avsc
... ... @@ -0,0 +1,61 @@
  1 +{
  2 + "type":"record",
  3 + "name":"User",
  4 + "namespace":"bagcheck",
  5 + "fields": [
  6 + {
  7 + "name":"firstName",
  8 + "type":"string",
  9 + "default": ""
  10 + },
  11 + {
  12 + "name":"lastName",
  13 + "type":"string",
  14 + "default": ""
  15 + },
  16 + {
  17 + "name":"email",
  18 + "type":"string",
  19 + "default": ""
  20 + },
  21 + {
  22 + "name":"birthday",
  23 + "type":["string", "null"],
  24 + "default": ""
  25 + },
  26 + {
  27 + "name":"gender",
  28 + "type":[{"type":"enum", "name":"GenderType", "symbols":["FEMALE", "MALE"]}, "null"]
  29 + },
  30 + {
  31 + "name":"image",
  32 + "type":"string",
  33 + "default": ""
  34 + },
  35 + {
  36 + "name":"title",
  37 + "type":["string", "null"],
  38 + "default": ""
  39 + },
  40 + {
  41 + "name":"description",
  42 + "type":["string", "null"],
  43 + "default": ""
  44 + },
  45 + {
  46 + "name":"location",
  47 + "type":["string", "null"],
  48 + "default": ""
  49 + },
  50 + {
  51 + "name":"password",
  52 + "type":"bytes",
  53 + "default": ""
  54 + },
  55 + {
  56 + "name":"mobile",
  57 + "type":["string", "null"],
  58 + "default": "null"
  59 + }
  60 + ]
  61 +}
133 mongodb/src/test/java/avrobase/mongodb/MongoABTest.java
... ... @@ -0,0 +1,133 @@
  1 +package avrobase.mongodb;
  2 +
  3 +import avrobase.Mutator;
  4 +import avrobase.Row;
  5 +import bagcheck.User;
  6 +import com.google.common.base.Charsets;
  7 +import com.google.common.primitives.Ints;
  8 +import com.mongodb.DB;
  9 +import com.mongodb.Mongo;
  10 +import org.apache.avro.util.Utf8;
  11 +import org.junit.BeforeClass;
  12 +import org.junit.Test;
  13 +
  14 +import java.net.UnknownHostException;
  15 +import java.nio.ByteBuffer;
  16 +
  17 +import static junit.framework.Assert.assertEquals;
  18 +
  19 +public class MongoABTest {
  20 +
  21 + private static DB avrobasetest;
  22 + private byte[] row;
  23 +
  24 + @BeforeClass
  25 + public static void setup() throws UnknownHostException {
  26 + Mongo mongo = new Mongo("localhost");
  27 + avrobasetest = mongo.getDB("avrobasetest");
  28 + }
  29 +
  30 + @Test
  31 + public void putGet() throws InterruptedException {
  32 + MongoAB<User, byte[]> userRAB = getAB();
  33 + User user = getUser();
  34 + userRAB.put("test".getBytes(Charsets.UTF_8), user);
  35 + Thread.sleep(1000);
  36 + Row<User, byte[]> test = userRAB.get("test".getBytes(Charsets.UTF_8));
  37 + assertEquals(user, test.value);
  38 + }
  39 +
  40 + @Test
  41 + public void putGet2() {
  42 + MongoAB<User, byte[]> userRAB = getAB();
  43 + User user = getUser();
  44 + row = "test".getBytes(Charsets.UTF_8);
  45 + userRAB.put(row, user);
  46 + userRAB.mutate(row, new Mutator<User>() {
  47 + @Override
  48 + public User mutate(User value) {
  49 + value.firstName = $("John");
  50 + return value;
  51 + }
  52 + });
  53 + Row<User, byte[]> test = userRAB.get("test".getBytes(Charsets.UTF_8));
  54 + user = getUser();
  55 + user.firstName = $("John");
  56 + assertEquals(user, test.value);
  57 + }
  58 +
  59 + @Test
  60 + public void rdelete() {
  61 + MongoAB<User, byte[]> userRAB = getAB();
  62 + row = "test".getBytes(Charsets.UTF_8);
  63 + userRAB.delete(row);
  64 + Row<User, byte[]> test = userRAB.get("test".getBytes(Charsets.UTF_8));
  65 + assertEquals(null, test);
  66 + }
  67 +
  68 + @Test
  69 + public void testScan() {
  70 + MongoAB<User, byte[]> userRAB = getAB();
  71 + User user = getUser();
  72 + long start;
  73 + start = System.currentTimeMillis();
  74 + for (int i = 0; i < 100000; i++) {
  75 + userRAB.put(Ints.toByteArray(i), user);
  76 + }
  77 + System.out.println(System.currentTimeMillis() - start);
  78 + start = System.currentTimeMillis();
  79 + int total;
  80 + start = System.currentTimeMillis();
  81 + total = 0;
  82 + for (Row<User, byte[]> userRow : userRAB.scan(Ints.toByteArray(50000), null)) {
  83 + total++;
  84 + }
  85 + assertEquals(50000, total);
  86 + System.out.println(System.currentTimeMillis() - start);
  87 + start = System.currentTimeMillis();
  88 + total = 0;
  89 + for (Row<User, byte[]> userRow : userRAB.scan(null, Ints.toByteArray(50000))) {
  90 + total++;
  91 + }
  92 + assertEquals(50000, total);
  93 + System.out.println(System.currentTimeMillis() - start);
  94 + start = System.currentTimeMillis();
  95 + total = 0;
  96 + for (Row<User, byte[]> userRow : userRAB.scan(Ints.toByteArray(25000), Ints.toByteArray(75000))) {
  97 + total++;
  98 + }
  99 + assertEquals(50000, total);
  100 + System.out.println(System.currentTimeMillis() - start);
  101 + total = 0;
  102 + for (Row<User, byte[]> userRow : userRAB.scan(null, null)) {
  103 + total++;
  104 + userRAB.delete(userRow.row);
  105 + }
  106 + assertEquals(100000, total);
  107 + System.out.println(System.currentTimeMillis() - start);
  108 + total = 0;
  109 + for (Row<User, byte[]> userRow : userRAB.scan(null, null)) {
  110 + total++;
  111 + }
  112 + assertEquals(0, total);
  113 + }
  114 +
  115 + private User getUser() {
  116 + User user = new User();
  117 + user.email = $("spullara@yahoo.com");
  118 + user.firstName = $("Sam");
  119 + user.lastName = $("Pullara");
  120 + user.image = $("");
  121 + user.password = ByteBuffer.allocate(0);
  122 + return user;
  123 + }
  124 +
  125 + Utf8 $(String s) {
  126 + return new Utf8(s);
  127 + }
  128 +
  129 + private MongoAB<User, byte[]> getAB() {
  130 + return new MongoAB<User, byte[]>(avrobasetest, "users", User.SCHEMA$);
  131 + }
  132 +
  133 +}
1  pom.xml
@@ -21,6 +21,7 @@
21 21 <module>handlersocket</module>
22 22 <module>s3archive</module>
23 23 <module>caching</module>
  24 + <module>mongodb</module>
24 25 </modules>
25 26 <packaging>pom</packaging>
26 27

0 comments on commit bc1bbd0

Please sign in to comment.
Something went wrong with that request. Please try again.