Skip to content

Commit

Permalink
Create keyspace and column family for filter information
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardcapriolo committed Jun 12, 2014
1 parent 1a554d3 commit 1aecc9d
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 15 deletions.
3 changes: 3 additions & 0 deletions src/main/java/io/teknek/intravert/action/ActionFactory.java
Expand Up @@ -3,6 +3,7 @@
import java.util.HashMap;
import java.util.Map;

import io.teknek.intravert.action.impl.CreateFilterAction;
import io.teknek.intravert.action.impl.GetKeyspaceAction;
import io.teknek.intravert.action.impl.SaveSessionAction;
import io.teknek.intravert.action.impl.LoadSessionAction;
Expand All @@ -14,6 +15,7 @@ public class ActionFactory {
public static final String LOAD_SESSION = "loadsession";
public static final String SET_KEYSPACE = "setkeyspace";
public static final String GET_KEYSPACE = "getkeyspace";
public static final String CREATE_FILTER = "createfilter";
private Map<String,Action> actions;

public ActionFactory(){
Expand All @@ -22,6 +24,7 @@ public ActionFactory(){
actions.put(LOAD_SESSION, new LoadSessionAction());
actions.put(SET_KEYSPACE, new SetKeyspaceAction());
actions.put(GET_KEYSPACE, new GetKeyspaceAction());
actions.put(CREATE_FILTER, new CreateFilterAction());
}

public Action findAction(String operation){
Expand Down
@@ -1,7 +1,29 @@
package io.teknek.intravert.action.impl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.thrift.CfDef;

import groovy.lang.Closure;
import io.teknek.intravert.action.Action;
import io.teknek.intravert.action.filter.Filter;
import io.teknek.intravert.action.filter.FilterFactory;
import io.teknek.intravert.model.Constants;
import io.teknek.intravert.model.Operation;
import io.teknek.intravert.model.Response;
import io.teknek.intravert.service.ApplicationContext;
Expand All @@ -21,13 +43,40 @@ public void doAction(Operation operation, Response response, RequestContext requ
n.setTheClass((String) operation.getArguments().get("theClass"));
n.setScript((String) operation.getArguments().get("script"));
try {
Filter f = io.teknek.nit.NitFactory.construct(n);

request.getSession().putFilter(name, f);

} catch (NitException e) {
Filter f = FilterFactory.createFilter(io.teknek.nit.NitFactory.construct(n));
if ("session".equalsIgnoreCase(scope)){
request.getSession().putFilter(name, f);
} else if ("application".equalsIgnoreCase(scope)){
addFilterToCluster(f, n);
application.putFilter(name, f);
}
} catch (NitException | WriteTimeoutException | InvalidRequestException | ConfigurationException | UnavailableException | OverloadedException e) {
throw new RuntimeException(e);
}
Map m = new HashMap();
m.put(Constants.RESULT, Constants.OK);
response.getResults().put(operation.getId(), Arrays.asList(m));
}


public void maybeCreateColumnFamily() throws InvalidRequestException, ConfigurationException {
List<String> keyspaces = Schema.instance.getNonSystemTables();
if (!keyspaces.contains("intravert")){
CreateKeyspaceAction.createKeyspace("intravert", 1);
CFMetaData cfm = null;
CfDef def = new CfDef();
def.setKeyspace("intravert");
def.setName("filter");
cfm = CFMetaData.fromThrift(def);
cfm.addDefaultIndexNames();
MigrationManager.announceNewColumnFamily(cfm);
}
}

public void addFilterToCluster(Filter f, NitDesc n) throws InvalidRequestException, ConfigurationException, WriteTimeoutException, UnavailableException, OverloadedException{
maybeCreateColumnFamily();
List<IMutation> changes = new ArrayList<>();
StorageProxy.mutate(changes, ConsistencyLevel.QUORUM);

}
}
Expand Up @@ -24,11 +24,13 @@ public void doAction(Operation operation, Response response, RequestContext requ
ApplicationContext application) {
String keyspace = (String) operation.getArguments().get("name");
int replication = (Integer) operation.getArguments().get("replication");


createKeyspace(keyspace, replication);
}

public static void createKeyspace(String name, int replication){
Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>(0);
KsDef def = new KsDef();
def.setName(keyspace);
def.setName(name);
def.setStrategy_class("SimpleStrategy");
Map<String, String> strat = new HashMap<String, String>();
strat.put("replication_factor", Integer.toString(replication));
Expand All @@ -42,5 +44,4 @@ public void doAction(Operation operation, Response response, RequestContext requ
throw new RuntimeException(e);
}
}

}
22 changes: 22 additions & 0 deletions src/main/java/io/teknek/intravert/service/ApplicationContext.java
@@ -1,6 +1,28 @@
package io.teknek.intravert.service;

import io.teknek.intravert.action.filter.Filter;

import java.util.HashMap;
import java.util.Map;

public class ApplicationContext {

private Map<String,Filter> filters;

public ApplicationContext(){
filters = new HashMap<>();
}

public void putFilter(String name, Filter f){
filters.put(name, f);
}

public Filter getFilter(String name){
Filter f = filters.get(name);
//todo get from column family
if (f == null){
throw new RuntimeException ("filter not found");
}
return f;
}
}
6 changes: 0 additions & 6 deletions src/main/java/io/teknek/intravert/service/RequestContext.java
@@ -1,16 +1,10 @@
package io.teknek.intravert.service;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.mortbay.io.BufferCache.CachedBuffer;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.ForwardingCache;
import com.google.common.cache.LoadingCache;

public class RequestContext {
Expand Down
48 changes: 48 additions & 0 deletions src/test/java/io/teknek/intravert/daemon/CreateFilterTest.java
@@ -0,0 +1,48 @@
package io.teknek.intravert.daemon;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import junit.framework.Assert;
import io.teknek.intravert.action.ActionFactory;
import io.teknek.intravert.client.Client;
import io.teknek.intravert.model.Constants;
import io.teknek.intravert.model.Operation;
import io.teknek.intravert.model.Request;
import io.teknek.intravert.model.Response;
import io.teknek.nit.NitDesc;

import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.junit.Test;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;

public class CreateFilterTest extends BaseIntravertTest {

@Test
public void createApplicationFilter() throws JsonGenerationException, JsonMappingException, IllegalStateException, UnsupportedEncodingException, IOException, RuntimeException{
Request request = new Request();
Map<String,Object> filterDef = new HashMap<String,Object>();
filterDef.put("spec", NitDesc.NitSpec.GROOVY_CLOSURE.toString());
filterDef.put("name", "under21");
filterDef.put("scope", "application");
filterDef.put("script", "{ row -> if (row['value'].toInteger() > 21) return row else return null }");
request.getOperations().add(new Operation()
.withId("1").withType(ActionFactory.CREATE_FILTER).withArguments(filterDef));
Client cl = new Client();
Response response = cl.post("http://127.0.0.1:7654", request);
List<Map> results = (List<Map>) response.getResults().get("1");
Assert.assertEquals(new ImmutableMap.Builder<String, Object>().put("result", "ok").build(), results.get(0));
}
}
/*
Assert.assertEquals(
"{\"exceptionMessage\":null,\"exceptionId\":null,\"results\":{\"2\":[{\"result\":\"ok\"}],\"1\":[{\"session_id\":0}]},\"metaData\":{}}"
, cl.postAsString("http://127.0.0.1:7654", new ObjectMapper().writeValueAsString(request)));
*/

0 comments on commit 1aecc9d

Please sign in to comment.