Skip to content

Commit

Permalink
simplest watch type for get/set data
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Hemberger committed Feb 24, 2019
1 parent acb08a9 commit faa64d5
Show file tree
Hide file tree
Showing 20 changed files with 612 additions and 42 deletions.
27 changes: 27 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Compiled class file
*.class

# Log file
*.log

# BlueJ files
*.ctxt

# Mobile Tools for Java (J2ME)
.mtj.tmp/

# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

.idea
*.iml
target/
64 changes: 64 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.ph14</groupId>
<artifactId>fdb-zk</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>25.0-jre</version>
</dependency>
<dependency>
<groupId>org.foundationdb</groupId>
<artifactId>fdb-java</artifactId>
<version>6.0.15</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>com.hubspot</groupId>
<artifactId>algebra</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>4.1.0</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.5.2</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
4 changes: 2 additions & 2 deletions src/main/java/com/ph14/fdb/zk/FdbRequestProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void processRequest(Request request) throws RequestProcessorException {
ByteBufferInputStream.byteBuffer2Record(request.request, create2Request);
request.request.clear();

result = fdbZooKeeper.create(create2Request);
result = fdbZooKeeper.create(request, create2Request);

LOG.debug("Handling create request: {}", create2Request);
break;
Expand Down Expand Up @@ -124,7 +124,7 @@ public void processRequest(Request request) throws RequestProcessorException {
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);

result = fdbZooKeeper.getData(getDataRequest);
result = fdbZooKeeper.getData(request, getDataRequest);
break;
case OpCode.getACL:
case OpCode.getChildren:
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/ph14/fdb/zk/FdbSchemaConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,9 @@ public class FdbSchemaConstants {
public static final byte[] ACL_KEY = "a".getBytes();
public static final byte[] STAT_KEY = "s".getBytes();

public static final byte[] NODE_CREATED_WATCH_KEY = "w".getBytes();
public static final byte[] CHILD_CREATED_WATCH_KEY = "c".getBytes();
public static final byte[] NODE_DATA_UPDATED_KEY = "u".getBytes();

public static final byte[] EMPTY_BYTES = new byte[0];
}
58 changes: 49 additions & 9 deletions src/main/java/com/ph14/fdb/zk/FdbZooKeeperImpl.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package com.ph14.fdb.zk;

import java.util.Arrays;
import java.util.Set;

import org.apache.jute.Record;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.CreateResponse;
import org.apache.zookeeper.proto.ExistsRequest;
import org.apache.zookeeper.proto.ExistsResponse;
import org.apache.zookeeper.proto.GetDataRequest;
import org.apache.zookeeper.proto.GetDataResponse;
import org.apache.zookeeper.server.WatchManager;
import org.apache.zookeeper.server.Request;

import com.apple.foundationdb.Database;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.hubspot.algebra.Result;
import com.ph14.fdb.zk.ops.FdbCreate;
Expand All @@ -18,28 +25,61 @@

public class FdbZooKeeperImpl implements FdbZooKeeperLayer {

private static final Set<Integer> FDB_SUPPORTED_OPCODES = ImmutableSet.<Integer>builder()
.addAll(
Arrays.asList(
OpCode.create,
OpCode.delete,
OpCode.setData,
OpCode.setACL,
OpCode.check,
OpCode.multi,
OpCode.sync,
OpCode.exists,
OpCode.getData,
OpCode.getACL,
OpCode.getChildren,
OpCode.getChildren2,
OpCode.setWatches)
)
.build();

private final Database fdb;
private final WatchManager watchManager;

@Inject
public FdbZooKeeperImpl(Database fdb) {
this.fdb = fdb;
this.watchManager = new WatchManager();
}

public boolean handlesRequest(Request request) {
return FDB_SUPPORTED_OPCODES.contains(request.type);
}

public Result<? extends Record, KeeperException> handle(Request request) {
Preconditions.checkArgument(handlesRequest(request), "does not handle request: " + request);

if (request.type == OpCode.create) {

}

return Result.ok(null);
}

// do we want something like ResponseWithWatch, RequestWithRawRequest as the inputs?

@Override
public Result<ExistsResponse, KeeperException> exists(ExistsRequest existsRequest) {
return fdb.run(tr -> new FdbExists(tr, existsRequest).execute());
public Result<ExistsResponse, KeeperException> exists(Request zkRequest, ExistsRequest existsRequest) {
return fdb.run(tr -> new FdbExists(zkRequest, tr, existsRequest).execute());
}

@Override
public Result<CreateResponse, KeeperException> create(CreateRequest createRequest) {
return fdb.run(tr -> new FdbCreate(tr, createRequest).execute());
public Result<CreateResponse, KeeperException> create(Request zkRequest, CreateRequest createRequest) {
return fdb.run(tr -> new FdbCreate(zkRequest, tr, createRequest).execute());
}

@Override
public Result<GetDataResponse, KeeperException> getData(GetDataRequest getDataRequest) {
return fdb.run(tr -> new FdbGetData(tr, getDataRequest).execute());
public Result<GetDataResponse, KeeperException> getData(Request zkRequest, GetDataRequest getDataRequest) {
return fdb.run(tr -> new FdbGetData(zkRequest, tr, getDataRequest).execute());
}

}
7 changes: 4 additions & 3 deletions src/main/java/com/ph14/fdb/zk/FdbZooKeeperLayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
import org.apache.zookeeper.proto.ExistsResponse;
import org.apache.zookeeper.proto.GetDataRequest;
import org.apache.zookeeper.proto.GetDataResponse;
import org.apache.zookeeper.server.Request;

import com.hubspot.algebra.Result;

public interface FdbZooKeeperLayer {

Result<ExistsResponse, KeeperException> exists(ExistsRequest existsRequest);
Result<ExistsResponse, KeeperException> exists(Request zkRequest, ExistsRequest existsRequest);

Result<CreateResponse, KeeperException> create(CreateRequest createRequest);
Result<CreateResponse, KeeperException> create(Request zkRequest, CreateRequest createRequest);

Result<GetDataResponse, KeeperException> getData(GetDataRequest getDataRequest);
Result<GetDataResponse, KeeperException> getData(Request zkRequest, GetDataRequest getDataRequest);

}
5 changes: 5 additions & 0 deletions src/main/java/com/ph14/fdb/zk/layer/FdbNodeReader.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ph14.fdb.zk.layer;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import org.apache.zookeeper.data.ACL;
Expand Down Expand Up @@ -33,6 +34,10 @@ public FdbNode deserialize(Transaction transaction) {
return deserialize(transaction.getRange(nodeSubspace.range()).asList().join());
}

public FdbNode deserialize(byte[] key, byte[] value) {
return deserialize(Collections.singletonList(new KeyValue(key, value)));
}

public FdbNode deserialize(List<KeyValue> keyValues) {
ListMultimap<byte[], KeyValue> keyValuesByPrefix = ArrayListMultimap.create();

Expand Down
25 changes: 16 additions & 9 deletions src/main/java/com/ph14/fdb/zk/layer/FdbNodeWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,22 @@ public FdbNodeWriter(DirectorySubspace nodeSubspace) {
}

public Iterable<KeyValue> serialize(FdbNode fdbNode) {
int dataLength = fdbNode.getData().length;
return Iterables.concat(
getDataKeyValues(fdbNode.getData()),
getStatKeyValues(fdbNode.getStat()),
getAclKeyValues(fdbNode.getAcls()),
getWatchKeys());
}

public List<KeyValue> getDataKeyValues(byte[] data) {
int dataLength = data.length;

if (dataLength > FdbSchemaConstants.ZK_MAX_DATA_LENGTH) {
// this is the actual ZK error. it uses jute.maxBuffer + 1024
// throw new IOException("Unreasonable length " + dataLength);
}

Preconditions.checkArgument(dataLength < FdbSchemaConstants.ZK_MAX_DATA_LENGTH, "node data too large, was: " + dataLength);

return Iterables.concat(
getDataKeyValues(fdbNode.getData()),
getStatKeyValues(fdbNode.getStat()),
getAclKeyValues(fdbNode.getAcls()));
}

private List<KeyValue> getDataKeyValues(byte[] data) {
List<byte[]> dataBlocks = ByteUtil.divideByteArray(data, FdbSchemaConstants.FDB_MAX_VALUE_SIZE);

ImmutableList.Builder<KeyValue> keyValues = ImmutableList.builder();
Expand Down Expand Up @@ -100,4 +100,11 @@ private List<KeyValue> getAclKeyValues(List<ACL> acls) {
return keyValues.build();
}

private List<KeyValue> getWatchKeys() {
return ImmutableList.of(
new KeyValue(nodeSubspace.get(FdbSchemaConstants.NODE_CREATED_WATCH_KEY).pack(), FdbSchemaConstants.EMPTY_BYTES),
new KeyValue(nodeSubspace.get(FdbSchemaConstants.CHILD_CREATED_WATCH_KEY).pack(), FdbSchemaConstants.EMPTY_BYTES)
);
}

}
45 changes: 45 additions & 0 deletions src/main/java/com/ph14/fdb/zk/layer/FdbWatchManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.ph14.fdb.zk.layer;

import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;

import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.directory.DirectorySubspace;
import com.ph14.fdb.zk.FdbSchemaConstants;

public class FdbWatchManager {

public void addNodeCreatedWatch(Transaction transaction, DirectorySubspace nodeSubspace, Watcher watcher) {
CompletableFuture<Void> watch = transaction.watch(nodeSubspace.get(FdbSchemaConstants.NODE_CREATED_WATCH_KEY).pack());

String path = nodeSubspace.getPath().stream().collect(Collectors.joining());

watch.whenComplete((v, e) -> {
if (e != null) {
throw new RuntimeException(e);
}

watcher.process(new WatchedEvent(EventType.NodeCreated, KeeperState.SyncConnected, path));
});
}

public void addNodeDataUpdatedWatch(Transaction transaction, DirectorySubspace nodeSubspace, Watcher watcher) {
CompletableFuture<Void> watch = transaction.watch(nodeSubspace.get(FdbSchemaConstants.NODE_DATA_UPDATED_KEY).pack());

String path = nodeSubspace.getPath().stream().collect(Collectors.joining("/"));

watch.whenComplete((v, e) -> {
if (e != null) {
throw new RuntimeException(e);
}

watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
});
}

}
5 changes: 4 additions & 1 deletion src/main/java/com/ph14/fdb/zk/ops/BaseFdbOp.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package com.ph14.fdb.zk.ops;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.Request;

import com.apple.foundationdb.Transaction;
import com.hubspot.algebra.Result;

public abstract class BaseFdbOp<REQ, RESP> {

protected final Request rawRequest;
protected final Transaction transaction;
protected final REQ request;

public BaseFdbOp(Transaction transaction, REQ request) {
public BaseFdbOp(Request rawRequest, Transaction transaction, REQ request) {
this.rawRequest = rawRequest;
this.transaction = transaction;
this.request = request;
}
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/com/ph14/fdb/zk/ops/FdbCreate.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.CreateResponse;
import org.apache.zookeeper.server.Request;

import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.directory.DirectoryAlreadyExistsException;
Expand All @@ -21,8 +22,8 @@ public class FdbCreate extends BaseFdbOp<CreateRequest, CreateResponse> {

// TODO: Write a Stat into the value

public FdbCreate(Transaction transaction, CreateRequest request) {
super(transaction, request);
public FdbCreate(Request rawRequest, Transaction transaction, CreateRequest request) {
super(rawRequest, transaction, request);
}

@Override
Expand Down
Loading

0 comments on commit faa64d5

Please sign in to comment.