Skip to content

Commit

Permalink
First SQL execution
Browse files Browse the repository at this point in the history
  • Loading branch information
justinsb committed Dec 18, 2013
1 parent 4a42b88 commit c27c747
Show file tree
Hide file tree
Showing 39 changed files with 1,590 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,16 @@ public KeyValueRecordset query(long storeId) throws IOException {
}

static class KeyValueRecordset extends StreamingRecordsetBase<KeyValueEntry> {
private final DataInputStream dis;

public KeyValueRecordset(ClientResponse response) {
super(response);
InputStream is = response.getEntityInputStream();
this.dis = new DataInputStream(is);
}

@Override
protected KeyValueEntry read(DataInputStream dis) throws IOException {
protected KeyValueEntry read() throws IOException {
int keyLength = dis.readInt();
if (keyLength == -1) {
return null;
Expand Down
14 changes: 14 additions & 0 deletions cloudata-server-shared/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,24 @@
<version>1.0-SNAPSHOT</version>
</parent>

<properties>
<jetty.version>9.1.0.v20131115</jetty.version>
</properties>

<artifactId>cloudata-server-shared</artifactId>

<dependencies>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>com.cloudata</groupId>
<artifactId>cloudata-shared</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions cloudata-shared/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>1.17.1</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.cloudata.clients;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.NoSuchElementException;

Expand All @@ -29,9 +27,6 @@ public Iterator<V> iterator() {
throw new IllegalStateException();
}
read = true;
InputStream is = response.getEntityInputStream();

final DataInputStream dis = new DataInputStream(is);

return new Iterator<V>() {
V next;
Expand Down Expand Up @@ -66,7 +61,7 @@ private void ensureHaveNext() {
if (next == null) {
if (!done) {
try {
next = read(dis);
next = read();
} catch (IOException e) {
throw Throwables.propagate(e);
}
Expand All @@ -81,5 +76,5 @@ private void ensureHaveNext() {

}

protected abstract V read(DataInputStream dis) throws IOException;
protected abstract V read() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Arrays;

import javax.ws.rs.core.MediaType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudata.clients.StreamingRecordsetBase;
import com.cloudata.structured.sql.value.ValueHolder;
import com.cloudata.util.ByteStringMessageBodyWriter;
import com.cloudata.util.GsonObjectMessageBodyHandler;
import com.cloudata.util.Hex;
Expand All @@ -20,6 +22,7 @@
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.config.ClientConfig;
Expand Down Expand Up @@ -141,13 +144,43 @@ public KeyValueJsonRecordset queryJson(long storeId) throws IOException {
}
}

public RowRecordset queryJson(long storeId, String sql) throws IOException {
ClientResponse response = CLIENT.resource(url).path(toUrlPath(storeId) + "/sql").queryParam("sql", sql)
.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

try {
int status = response.getStatus();

switch (status) {
case 200:
break;

default:
throw new IllegalStateException("Unexpected status: " + status);
}

RowRecordset records = new RowRecordset(response);
response = null;
return records;
} finally {
if (response != null) {
response.close();
}
}
}

public static class KeyValueJsonRecordset extends StreamingRecordsetBase<KeyValueJsonEntry> {
private final DataInputStream dis;

public KeyValueJsonRecordset(ClientResponse response) {
super(response);

InputStream is = response.getEntityInputStream();
this.dis = new DataInputStream(is);
}

@Override
protected KeyValueJsonEntry read(DataInputStream dis) throws IOException {
protected KeyValueJsonEntry read() throws IOException {
int keyLength = dis.readInt();
if (keyLength == -1) {
return null;
Expand All @@ -165,6 +198,58 @@ protected KeyValueJsonEntry read(DataInputStream dis) throws IOException {
}
}

public static class RowRecordset extends StreamingRecordsetBase<RowEntry> {
private final CodedInputStream cis;

public RowRecordset(ClientResponse response) {
super(response);

InputStream is = response.getEntityInputStream();
this.cis = CodedInputStream.newInstance(is);
}

@Override
protected RowEntry read() throws IOException {
int rowCount = cis.readRawVarint32();
if (rowCount == -1) {
return null;
}

ValueHolder[] values = new ValueHolder[rowCount];
for (int i = 0; i < rowCount; i++) {
values[i] = new ValueHolder();
}

for (int i = 0; i < rowCount; i++) {
values[i].deserialize(cis);
}

return new RowEntry(values);
}
}

public static class RowEntry {
final ValueHolder[] values;

public RowEntry(ValueHolder[] values) {
this.values = values;
}

public int size() {
return values.length;
}

public ValueHolder get(int i) {
return values[i];
}

@Override
public String toString() {
return "RowEntry [values=" + Arrays.toString(values) + "]";
}

}

private String toUrlPath(long storeId, ByteString key) {
return Long.toString(storeId) + "/" + Hex.toHex(key);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
package com.cloudata.structured;

import java.io.File;
import java.util.EnumSet;
import java.util.List;

import javax.servlet.DispatcherType;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.robotninjas.barge.ClusterConfig;
import org.robotninjas.barge.RaftService;
import org.robotninjas.barge.Replica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudata.structured.web.WebModule;
import com.google.common.collect.Lists;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.sun.grizzly.http.SelectorThread;
import com.sun.jersey.api.container.grizzly.GrizzlyServerFactory;
import com.sun.jersey.api.core.PackagesResourceConfig;
import com.sun.jersey.api.core.ResourceConfig;
import com.sun.jersey.core.spi.component.ioc.IoCComponentProviderFactory;
import com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory;
import com.google.inject.servlet.GuiceFilter;

public class StructuredServer {

private static final Logger log = LoggerFactory.getLogger(StructuredServer.class);

final File baseDir;
final int httpPort;
private final Replica local;
private final List<Replica> peers;
private RaftService raft;
private SelectorThread selector;
private Server jetty;

public StructuredServer(File baseDir, Replica local, List<Replica> peers, int httpPort) {
this.baseDir = baseDir;
Expand All @@ -35,7 +40,7 @@ public StructuredServer(File baseDir, Replica local, List<Replica> peers, int ht
}

public synchronized void start() throws Exception {
if (raft != null || selector != null) {
if (raft != null || jetty != null) {
throw new IllegalStateException();
}

Expand All @@ -58,10 +63,23 @@ public synchronized void start() throws Exception {

Injector injector = Guice.createInjector(new StructuredStoreModule(stateMachine), new WebModule());

ResourceConfig rc = new PackagesResourceConfig(WebModule.class.getPackage().getName());
IoCComponentProviderFactory ioc = new GuiceComponentProviderFactory(rc, injector);
// ResourceConfig rc = new PackagesResourceConfig(WebModule.class.getPackage().getName());
// IoCComponentProviderFactory ioc = new GuiceComponentProviderFactory(rc, injector);
//
// this.selector = GrizzlyServerFactory.create(baseUri, rc, ioc);
//

this.jetty = new Server(httpPort);

ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");

FilterHolder filterHolder = new FilterHolder(injector.getInstance(GuiceFilter.class));
context.addFilter(filterHolder, "*", EnumSet.of(DispatcherType.REQUEST));

jetty.setHandler(context);

this.selector = GrizzlyServerFactory.create(baseUri, rc, ioc);
jetty.start();
}

public String getHttpUrl() {
Expand All @@ -85,15 +103,19 @@ public static void main(String... args) throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
server.stop();
try {
server.stop();
} catch (Exception e) {
log.warn("Error shutting down HTTP server", e);
}
}
});
}

public synchronized void stop() {
if (selector != null) {
selector.stopEndpoint();
selector = null;
public synchronized void stop() throws Exception {
if (jetty != null) {
jetty.stop();
jetty = null;
}

if (raft != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public class StructuredStateMachine implements StateMachine {

private RaftService raft;
private File baseDir;
final LoadingCache<Long, StructuredStore> keyValueStoreCache;
final LoadingCache<Long, StructuredStore> storeCache;

public StructuredStateMachine() {
KeyValueStoreCacheLoader loader = new KeyValueStoreCacheLoader();
this.keyValueStoreCache = CacheBuilder.newBuilder().recordStats().build(loader);
StoreCacheLoader loader = new StoreCacheLoader();
this.storeCache = CacheBuilder.newBuilder().recordStats().build(loader);
}

public void init(RaftService raft, File stateDir) {
Expand Down Expand Up @@ -84,7 +84,7 @@ public Object applyOperation(@Nonnull ByteBuffer op) {
ByteString key = entry.getKey();
ByteString value = entry.getValue();

StructuredStore keyValueStore = getKeyValueStore(storeId);
StructuredStore keyValueStore = getStructuredStore(storeId);

StructuredOperation<?> operation;

Expand Down Expand Up @@ -115,16 +115,16 @@ public Object applyOperation(@Nonnull ByteBuffer op) {
}
}

private StructuredStore getKeyValueStore(long id) {
public StructuredStore getStructuredStore(long id) {
try {
return keyValueStoreCache.get(id);
return storeCache.get(id);
} catch (ExecutionException e) {
throw Throwables.propagate(e);
}
}

@Immutable
final class KeyValueStoreCacheLoader extends CacheLoader<Long, StructuredStore> {
final class StoreCacheLoader extends CacheLoader<Long, StructuredStore> {

@Override
public StructuredStore load(@Nonnull Long id) throws Exception {
Expand All @@ -145,12 +145,12 @@ public StructuredStore load(@Nonnull Long id) throws Exception {
}

public Value get(long storeId, Keyspace keyspace, ByteString key) {
StructuredStore keyValueStore = getKeyValueStore(storeId);
StructuredStore keyValueStore = getStructuredStore(storeId);
return keyValueStore.get(keyspace.mapToKey(key).asReadOnlyByteBuffer());
}

public BtreeQuery scan(long storeId, Keyspace keyspace) {
StructuredStore keyValueStore = getKeyValueStore(storeId);
StructuredStore keyValueStore = getStructuredStore(storeId);
return keyValueStore.buildQuery(keyspace);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,8 @@ private static byte[] encode(long id) {
return data;
}

public Btree getBtree() {
return btree;
}

}
Loading

0 comments on commit c27c747

Please sign in to comment.