Navigation Menu

Skip to content

Commit

Permalink
Experimental block store
Browse files Browse the repository at this point in the history
  • Loading branch information
justinsb committed Dec 30, 2013
1 parent a84bd08 commit 98194a8
Show file tree
Hide file tree
Showing 52 changed files with 4,433 additions and 0 deletions.
48 changes: 48 additions & 0 deletions cloudata-blocks/pom.xml
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.cloudata</groupId>
<artifactId>cloudata-parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<artifactId>cloudata-blocks</artifactId>


<dependencies>
<dependency>
<groupId>com.cloudata</groupId>
<artifactId>cloudata-server-shared</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-2</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
</configuration>
</plugin>
</plugins>
</build>
</project>
27 changes: 27 additions & 0 deletions cloudata-blocks/src/main/assembly/assembly.xml
@@ -0,0 +1,27 @@
<assembly>
<id>bundle</id>
<formats>
<format>tar.gz</format>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/bin</directory>
<outputDirectory>bin</outputDirectory>
<includes>
<include>*.sh</include>
</includes>
<lineEnding>unix</lineEnding>
<fileMode>0744</fileMode>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<scope>runtime</scope>
<useProjectArtifact>true</useProjectArtifact>
<outputDirectory>lib</outputDirectory>
<unpack>false</unpack>
</dependencySet>
</dependencySets>
</assembly>
27 changes: 27 additions & 0 deletions cloudata-blocks/src/main/bin/keyvalue.sh
@@ -0,0 +1,27 @@
#!/bin/bash

# Find Java
if [ "$JAVA_HOME" = "" ] ; then
JAVA="java -server"
else
JAVA="$JAVA_HOME/bin/java -server"
fi

PREFIX=$( echo `dirname $0`/.. )
LIB_DIR=$PREFIX/lib

# Set Java options
if [ "$JAVA_OPTIONS" = "" ] ; then
JAVA_OPTIONS=" \
-XX:+UseConcMarkSweepGC \
-d64"
fi

export BASE_DIR=$*

# Launch the application
cd $PREFIX

export PREFIX
export CLASSPATH=$( echo $LIB_DIR/*.jar . | sed 's/ /:/g')
exec $JAVA $JAVA_OPTIONS com.cloudata.keyvalue.KeyValueServer $*
@@ -0,0 +1,158 @@
package com.cloudata.blockstore;

import java.io.File;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.EnumSet;
import java.util.List;

import javax.servlet.DispatcherType;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.robotninjas.barge.ClusterConfig;
import org.robotninjas.barge.RaftService;
import org.robotninjas.barge.Replica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudata.blockstore.iscsi.IscsiEndpoint;
import com.cloudata.blockstore.iscsi.IscsiServer;
import com.cloudata.blockstore.web.WebModule;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;

public class BlockStoreServer {
private static final Logger log = LoggerFactory.getLogger(BlockStoreServer.class);

final File baseDir;
final int httpPort;
private final Replica local;
private final List<Replica> peers;
private RaftService raft;
private final SocketAddress iscsiSocketAddress;
private IscsiEndpoint redisEndpoint;
private Server jetty;

public BlockStoreServer(File baseDir, Replica local, List<Replica> peers, int httpPort,
SocketAddress iscsiSocketAddress) {
this.baseDir = baseDir;
this.local = local;
this.peers = peers;
this.httpPort = httpPort;
this.iscsiSocketAddress = iscsiSocketAddress;
}

public synchronized void start() throws Exception {
if (raft != null || jetty != null) {
throw new IllegalStateException();
}

File logDir = new File(baseDir, "logs");
File stateDir = new File(baseDir, "state");

logDir.mkdirs();
stateDir.mkdirs();

KeyValueStateMachine stateMachine = new KeyValueStateMachine();

ClusterConfig config = ClusterConfig.from(local, peers);
this.raft = RaftService.newBuilder(config).logDir(logDir).timeout(300).build(stateMachine);

stateMachine.init(raft, stateDir);

raft.startAsync().awaitRunning();

// final String baseUri = getHttpUrl();

Injector injector = Guice.createInjector(new KeyValueModule(stateMachine), new WebModule());

// ResourceConfig rc = new PackagesResourceConfig(WebModule.class.getPackage().getName());
// IoCComponentProviderFactory ioc = new GuiceComponentProviderFactory(rc, injector);

// this.selector = GrizzlyServerFactory.create(baseUri, rc, ioc);

this.jetty = new Server(httpPort);

ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");

FilterHolder filterHolder = new FilterHolder(injector.getInstance(GuiceFilter.class));
context.addFilter(filterHolder, "*", EnumSet.of(DispatcherType.REQUEST));

jetty.setHandler(context);

jetty.start();

if (iscsiSocketAddress != null) {
long storeId = 1;
IscsiServer iscsiServer = new IscsiServer(stateMachine, storeId);

this.redisEndpoint = new IscsiEndpoint(iscsiSocketAddress, iscsiServer);
this.redisEndpoint.start();
}
}

public String getHttpUrl() {
return "http://localhost:" + httpPort + "/";
}

public static void main(String... args) throws Exception {
final int port = Integer.parseInt(args[0]);

Replica local = Replica.fromString("localhost:" + (10000 + port));
List<Replica> members = Lists.newArrayList(Replica.fromString("localhost:10001"));
// Replica.fromString("localhost:10002"), Replica.fromString("localhost:10003"));
members.remove(local);

File baseDir = new File(args[0]);
int httpPort = (9990 + port);
int iscsiPort = 3260 + port;

SocketAddress iscsiSocketAddress = new InetSocketAddress(iscsiPort);
final BlockStoreServer server = new BlockStoreServer(baseDir, local, members, httpPort, iscsiSocketAddress);
server.start();

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
server.stop();
} catch (Exception e) {
log.error("Error stopping server", e);
}
}
});
}

public synchronized void stop() throws Exception {
if (jetty != null) {
jetty.stop();
jetty = null;
}

if (redisEndpoint != null) {
try {
redisEndpoint.stop();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
redisEndpoint = null;
}

if (raft != null) {
raft.stopAsync().awaitTerminated();
raft = null;
}
}

public SocketAddress getRedisSocketAddress() {
return iscsiSocketAddress;
}

}
@@ -0,0 +1,50 @@
package com.cloudata.blockstore;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

public class DummyVolume implements Volume {

private static final Logger log = LoggerFactory.getLogger(DummyVolume.class);
private static final int CHUNK_SIZE = 256 * 1024;

private final long lun;

public DummyVolume(long lun) {
this.lun = lun;
}

@Override
public ListenableFuture<ByteBuf> read(long offset, long length) {
log.warn("DUMMY: read {} {}", offset, length);
ByteBuf buf = Unpooled.buffer(Ints.checkedCast(length));
return Futures.immediateFuture(buf);
}

@Override
public ListenableFuture<Void> write(long offset, long length, ByteBuf buf) {
Preconditions.checkState(length == buf.readableBytes());
log.warn("DUMMY: write {} {}", offset, length);
return Futures.immediateFuture(null);
}

@Override
public ListenableFuture<Void> sync() {
log.warn("DUMMY: sync");
return Futures.immediateFuture(null);
}

@Override
public int getChunkSize() {
return CHUNK_SIZE;
}

}

0 comments on commit 98194a8

Please sign in to comment.