Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
added initial plumbing, proxy client connection
Browse files Browse the repository at this point in the history
  • Loading branch information
zznate committed Mar 16, 2012
1 parent db74f37 commit 51706b1
Show file tree
Hide file tree
Showing 8 changed files with 1,029 additions and 0 deletions.
152 changes: 152 additions & 0 deletions lcp/pom.xml
@@ -0,0 +1,152 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>me.prettyprint</groupId>
<artifactId>hector</artifactId>
<version>1.0-4-SNAPSHOT</version>
</parent>
<artifactId>hector-lcp</artifactId>
<packaging>jar</packaging>
<name>hector-lcp</name>
<properties>
<!-- OSGi bundle properties -->
<bundle.symbolicName>me.prettyprint.hector-lcp</bundle.symbolicName>
<bundle.namespace>me.prettyprint</bundle.namespace>
</properties>

<build>
<plugins>

<!--
run examples thusly: mvn exec:java
-Dexec.mainClass="me.prettyprint.cassandra.examples.ExampleDao"
-->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.1</version>
<executions>
<execution>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkMode>always</forkMode>
<argLine>-Xmx512M -Xms512M -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8</argLine>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-thrift</artifactId>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.6.1</version> <!-- needs to be >= 0.7 for bytebuffer, later for SSL+Async-->
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.github.stephenc.eaio-uuid</groupId>
<artifactId>uuid</artifactId>
</dependency>
<dependency>
<groupId>com.ecyrd.speed4j</groupId>
<artifactId>speed4j</artifactId>
</dependency>
<dependency>
<groupId>me.prettyprint</groupId>
<artifactId>hector-core</artifactId>
<version>${pom.version}</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>me.prettyprint</groupId>
<artifactId>hector-test</artifactId>
<version>${pom.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.stephenc</groupId>
<artifactId>jamm</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${org.springframework.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${org.springframework.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${org.springframework.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
122 changes: 122 additions & 0 deletions lcp/src/main/java/me/prettyprint/lcp/CassandraProxyClient.java
@@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package me.prettyprint.lcp;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.thrift.*;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.transport.*;


/**
* This wraps the underlying Cassandra thrift client and attempts to handle
* disconnect, unavailable, timeout errors gracefully.
* <p/>
* On disconnect, if it cannot reconnect to the same host then it will use a
* different host from the ring. After a successful connecting, the ring will be
* refreshed.
* <p/>
* This incorporates the CircuitBreaker pattern so not to overwhelm the network
* with reconnect attempts.
*/
public class CassandraProxyClient implements java.lang.reflect.InvocationHandler {

private static final Logger logger = Logger.getLogger(CassandraProxyClient.class);

public enum ConnectionStrategy {
RANDOM, ROUND_ROBIN, STICKY
}

private String host;
private int port;

private Cassandra.Client client;

/**
* Build the Cassandra.Iface proxy
*
* @param host cassandra host
* @param port cassandra port
* @return a Casandra.Client Interface
* @throws IOException
*/
public static Cassandra.Iface newProxyConnection(String host, int port)
throws IOException {
return (Cassandra.Iface) java.lang.reflect.Proxy.newProxyInstance(Cassandra.Client.class.getClassLoader(),
Cassandra.Client.class.getInterfaces(), new CassandraProxyClient(host, port));
}

/**
* Create connection to a given host.
*
* @return cassandra thrift client
* @throws IOException error
*/
public Cassandra.Client createConnection() throws IOException {
TSocket socket = new TSocket(host, port, 10000);
TTransport trans;
try {
socket.getSocket().setKeepAlive(true);
socket.getSocket().setSoLinger(false,0);
socket.getSocket().setTcpNoDelay(true);

trans = new TFastFramedTransport(socket);
trans.open();

return new Cassandra.Client(new TBinaryProtocol(trans));

} catch (Exception e) {
throw new IOException("unable to connect to server", e);
}

}

private CassandraProxyClient(String host, int port) throws IOException {
this.host = host;
this.port = port;
}



public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
Object result = null;
try {
this.client = createConnection();
result = m.invoke(client, args);

return result;

} catch (Exception e) {
logger.error("Error invoking a method via proxy: ", e);
throw new RuntimeException(e);
}

}


}

90 changes: 90 additions & 0 deletions lcp/src/main/java/me/prettyprint/lcp/EmbeddedServerLauncher.java
@@ -0,0 +1,90 @@
package me.prettyprint.lcp;

import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.thrift.CassandraDaemon;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @author zznate
*/
public class EmbeddedServerLauncher {
private static Logger log = LoggerFactory.getLogger(EmbeddedServerLauncher.class);

private static final String CASS_ROOT = "cassandra";

private final String yamlFile;
static LcpCassandraDaemon cassandraDaemon;

public EmbeddedServerLauncher() {
this("/cassandra.yaml");
}

public EmbeddedServerLauncher(String yamlFile) {
this.yamlFile = yamlFile;
}

static ExecutorService executor = Executors.newSingleThreadExecutor();

/**
* Set embedded cassandra up and spawn it in a new thread.
*
* @throws org.apache.thrift.transport.TTransportException
* @throws java.io.IOException
* @throws InterruptedException
*/
public void setup() throws TTransportException, IOException,
InterruptedException, ConfigurationException {
if ( cassandraDaemon != null && cassandraDaemon.isRPCServerRunning() ) {
return;
}
System.setProperty("cassandra-foreground","true");
DatabaseDescriptor.createAllDirectories();

log.info("Starting executor");

executor.execute(new CassandraRunner());
log.info("Started executor");
try
{
TimeUnit.SECONDS.sleep(3);
log.info("Done sleeping");
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
}

public static void teardown() {
try {
CommitLog.instance.shutdownBlocking();
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdown();
executor.shutdownNow();
log.info("Teardown complete");
}


class CassandraRunner implements Runnable {

public void run() {

cassandraDaemon = new LcpCassandraDaemon();

cassandraDaemon.activate();

}

}
}

0 comments on commit 51706b1

Please sign in to comment.