Skip to content

Commit

Permalink
caching layer for avrobase
Browse files Browse the repository at this point in the history
  • Loading branch information
spullara committed May 10, 2011
1 parent 83180b7 commit 9af35ba
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 7 deletions.
64 changes: 64 additions & 0 deletions caching/pom.xml
@@ -0,0 +1,64 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>all</artifactId>
<groupId>com.github.spullara.avrobase</groupId>
<version>0.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.spullara.avrobase</groupId>
<artifactId>caching</artifactId>
<version>0.2-SNAPSHOT</version>
<packaging>jar</packaging>

<name>caching</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>

<!-- AvroBase -->
<dependency>
<groupId>com.github.spullara.avrobase</groupId>
<artifactId>base</artifactId>
<version>0.2-SNAPSHOT</version>
</dependency>

<!-- Testing -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.spullara.avrobase</groupId>
<artifactId>file</artifactId>
<version>0.2-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avroplugin.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
141 changes: 141 additions & 0 deletions caching/src/main/java/avrobase/caching/Cacher.java
@@ -0,0 +1,141 @@
package avrobase.caching;

import avrobase.AvroBase;
import avrobase.AvroBaseException;
import avrobase.Creator;
import avrobase.ForwardingAvroBase;
import avrobase.Mutator;
import avrobase.Row;
import org.apache.avro.specific.SpecificRecord;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
* Cache the results of an avrobase and also send out messages to listneres when one is updated.
* <p/>
* User: sam
* Date: 5/10/11
* Time: 1:37 PM
*/
public class Cacher<T extends SpecificRecord, K> extends ForwardingAvroBase<T, K> {
private final KeyMaker<K> keyMaker;

public static interface Listener<K> {
void invalidate(K row);
}

public static interface KeyMaker<K> {
Object make(K key);
}

private ConcurrentHashMap<Object, Row<T, K>> cache = new ConcurrentHashMap<Object, Row<T, K>>();

public Cacher(AvroBase<T, K> delegate, KeyMaker<K> keyMaker) {
super(delegate);
this.keyMaker = keyMaker;
}

private List<Listener<K>> listeners = new ArrayList<Listener<K>>();

public void addCacheListener(Listener<K> cl) {
listeners.add(cl);
}

private void invalidate(K row) {
for (Listener<K> listener : listeners) {
listener.invalidate(row);
}
}

@Override
public void delete(K key) throws AvroBaseException {
super.delete(key);
cache.remove(keyMaker.make(key));
invalidate(key);
}

@Override
public Row<T, K> get(K row) throws AvroBaseException {
Object key = keyMaker.make(row);
Row<T, K> tkRow = cache.get(key);
if (tkRow == null) {
tkRow = super.get(row);
cache.put(key, tkRow);
invalidate(row);
}
return tkRow;
}

@Override
public Row<T, K> mutate(K row, Mutator<T> tMutator) throws AvroBaseException {
Row<T, K> mutate = super.mutate(row, tMutator);
Object key = keyMaker.make(row);
if (mutate == null) {
cache.remove(key);
} else {
cache.put(key, mutate);
}
invalidate(row);
return mutate;
}

@Override
public Row<T, K> mutate(K row, Mutator<T> tMutator, Creator<T> tCreator) throws AvroBaseException {
Row<T, K> mutate = super.mutate(row, tMutator, tCreator);
Object key = keyMaker.make(row);
if (mutate == null) {
cache.remove(key);
} else {
cache.put(key, mutate);
}
invalidate(row);
return mutate;
}

@Override
public void put(K row, T value) throws AvroBaseException {
super.put(row, value);
cache.put(keyMaker.make(row), new Row<T, K>(value, row));
invalidate(row);
}

@Override
public boolean put(K row, T value, long version) throws AvroBaseException {
boolean put = super.put(row, value, version);
cache.put(keyMaker.make(row), new Row<T, K>(value, row, version));
invalidate(row);
return put;
}

@Override
public Iterable<Row<T, K>> scan(K startRow, K stopRow) throws AvroBaseException {
final Iterable<Row<T, K>> scan = super.scan(startRow, stopRow);
return new Iterable<Row<T, K>>() {
@Override
public Iterator<Row<T, K>> iterator() {
final Iterator<Row<T, K>> iterator = scan.iterator();
return new Iterator<Row<T, K>>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public Row<T, K> next() {
Row<T, K> next = iterator.next();
cache.put(keyMaker.make(next.row), next);
invalidate(next.row);
return next;
}

@Override
public void remove() {
}
};
}
};
}
}
27 changes: 27 additions & 0 deletions caching/src/test/avro/Beacon.avsc
@@ -0,0 +1,27 @@
{
"type":"record",
"name":"Beacon",
"namespace":"avrobase.data",
"fields":[
{
"name":"browser",
"type":["null", "string"],
"doc":"Unique browser cookie"
},
{
"name":"login",
"type":["null", "string"],
"doc":"Unique login cookie"
},
{
"name":"useragent",
"type":["null", "string"],
"doc":"User agent id"
},
{
"name":"parameters",
"type":{"type":"map", "values":"string"},
"doc":"Query parameters on the beacon"
}
]
}
104 changes: 104 additions & 0 deletions caching/src/test/java/avrobase/caching/CacherTest.java
@@ -0,0 +1,104 @@
package avrobase.caching;

import avrobase.AvroFormat;
import avrobase.Row;
import avrobase.data.Beacon;
import avrobase.file.FAB;
import com.google.common.base.Supplier;
import com.google.common.primitives.Longs;
import org.junit.Test;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Random;

import static junit.framework.Assert.assertEquals;

/**
* Caching test.
* <p/>
* User: sam
* Date: 5/10/11
* Time: 2:12 PM
*/
public class CacherTest {
Random r = new SecureRandom();

@Test
public void cachingtest() {
FAB<Beacon, byte[]> beaconFAB = new FAB<Beacon, byte[]>("/tmp/cachingtest/beacons", "/tmp/cachingtest/schemas", new Supplier<byte[]>() {
@Override
public byte[] get() {
return Longs.toByteArray(r.nextLong());
}
}, Beacon.SCHEMA$, AvroFormat.BINARY, null);
Cacher<Beacon, byte[]> beaconCacher = new Cacher<Beacon, byte[]>(beaconFAB, new Cacher.KeyMaker<byte[]>() {
public Object make(final byte[] key) {
return new Object() {
@Override
public boolean equals(Object o) {
if (o instanceof byte[]) {
byte[] other = (byte[]) o;
if (key.length == other.length) {
int i = 0;
for (byte b : key) {
if (b == other[i++]) return false;
}
return true;
}
}
return false; //To change body of overridden methods use File | Settings | File Templates.
}

@Override
public int hashCode() {
int hashcode = 0;
for (int i = 0; i < key.length; i++) {
hashcode += key[i] + hashcode * 43;
}
return hashcode;
}
};
}
});
int total = 0;
for (Row<Beacon, byte[]> beaconRow: beaconCacher.scan(null, null)){
beaconCacher.delete(beaconRow.row);
}
for (Row<Beacon, byte[]> beaconRow: beaconCacher.scan(null, null)){
total++;
}
assertEquals(0, total);
List<byte[]> rows = new ArrayList<byte[]>();
for (int i = 0; i < 1000; i++) {
Beacon beacon = new Beacon();
beacon.browser = "asdfasdfasdfasd" + i;
beacon.login = "adfasdfasdfasdfsfas" + i;
beacon.useragent = "adfasdfasdfasdfadsfadsf" + i;
beacon.parameters = new HashMap<CharSequence, CharSequence>();
rows.add(beaconCacher.create(beacon));
}
for (Row<Beacon, byte[]> beaconRow: beaconCacher.scan(null, null)){
total++;
}
assertEquals(1000, total);
{
long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
beaconFAB.get(rows.get(r.nextInt(rows.size())));
}
long end = System.currentTimeMillis();
System.out.println(end - start);
}
{
long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
beaconCacher.get(rows.get(r.nextInt(rows.size())));
}
long end = System.currentTimeMillis();
System.out.println(end - start);
}
}
}
20 changes: 14 additions & 6 deletions file/src/main/java/avrobase/file/FAB.java
Expand Up @@ -15,8 +15,6 @@
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
Expand Down Expand Up @@ -70,7 +68,17 @@ public FAB(String directory, String schemaDirectory, Supplier<K> supplier, Schem
schemaDir = new File(schemaDirectory);
schemaDir.mkdirs();
this.supplier = supplier;
this.transformer = transformer;
this.transformer = transformer == null ? new ReversableFunction<K, byte[]>() {
@Override
public byte[] apply(K k) {
return (byte[]) k;
}

@Override
public K unapply(byte[] bytes) {
return (K) bytes;
}
} : transformer;
}

private String toFile(K row) {
Expand Down Expand Up @@ -331,15 +339,15 @@ public void delete(K row) throws AvroBaseException {
Lock writeLock = writeLock(row);
try {
File file = getFile(row, false);
FileInputStream fis = new FileInputStream(file);
FileChannel channel = fis.getChannel();
RandomAccessFile raf = new RandomAccessFile(file, "rw");
FileChannel channel = raf.getChannel();
FileLock fileLock = channel.lock();
try {
file.delete();
} finally {
fileLock.release();
channel.close();
fis.close();
raf.close();
}
} catch (FileNotFoundException e) {
// Already deleted
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -21,6 +21,7 @@
<module>file</module>
<module>handlersocket</module>
<module>s3archive</module>
<module>caching</module>
</modules>
<packaging>pom</packaging>

Expand Down

0 comments on commit 9af35ba

Please sign in to comment.