Skip to content

Commit

Permalink
Merge remote branch 'checkpoint/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
Bruce Robbins committed Sep 25, 2011
2 parents b2bf958 + e89e295 commit 5dbbaec
Show file tree
Hide file tree
Showing 82 changed files with 6,524 additions and 263 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Expand Up @@ -18,6 +18,10 @@ bin/
*~

# Misc hidden files
.DS_Store

# class files
*.class
.DS_Store

# create an empty .gitignore file when you want to track an empty
Expand Down
152 changes: 74 additions & 78 deletions build.gradle
Expand Up @@ -51,13 +51,15 @@ allprojects {
archivesBaseName = 's4'

repositories {
mavenCentral()
mavenLocal()
mavenRepo name: "gson", urls: "http://google-gson.googlecode.com/svn/mavenrepo"


/* Add lib dir as a repo. Some jar files that are not available
in a public repo are distributed in the lib dir. */
flatDir name: 'libDir', dirs: "$rootDir/lib"
flatDir name: 'libDir', dirs: ["$projectDir/../lib", "$projectDir/lib", "$projectDir/../../lib"]

mavenLocal()
mavenRepo name: "gson", urls: "http://google-gson.googlecode.com/svn/mavenrepo"
mavenRepo name: "repository.jboss.org", urls: "https://repository.jboss.org/nexus/content/repositories/releases/"
mavenCentral()
}
}

Expand All @@ -68,7 +70,6 @@ lift_json: 'net.liftweb:lift-json_2.8.1:2.2',
gson: 'com.google.code.gson:gson:1.6',
zk: 'org.apache.zookeeper:zookeeper:3.3.1',
log4j: 'log4j:log4j:1.2.15',
junit: 'junit:junit:3.8.1',
flexjson: 'net.sf.flexjson:flexjson:2.1',
bcel: 'org.apache.bcel:bcel:5.2',
jakarta_regexp: 'jakarta-regexp:jakarta-regexp:1.4',
Expand All @@ -84,7 +85,9 @@ commons_httpclient: 'commons-httpclient:commons-httpclient:3.1',
spring: 'org.springframework:spring:2.5.6',
junit: 'junit:junit:4.4',
scala_compiler: 'org.scala-lang:scala-compiler:2.8.1',
scala_library: 'org.scala-lang:scala-library:2.8.1'
scala_library: 'org.scala-lang:scala-library:2.8.1',
jedis: 'redis.clients:jedis:2.0.0',
commons_pool: 'commons-pool:commons-pool:1.5.6',
]


Expand Down Expand Up @@ -158,84 +161,78 @@ binDistImage = copySpec {
}
}

//println project(':s4-core').configurations.runtime.asPath
//println project(':s4-core').configurations.archives.asPath
//println project(':s4-core').configurations.default.asPath


allDistImage = copySpec {
with binDistImage
subprojects.findAll {proj ->
into('src') {
subprojects.findAll {proj ->
into('src') {
from proj.sourceSets.main.allJava
}
switch(proj.name) {
case "s4-core":
break
case "s4-driver":
into ("${proj.name}/lib/python") {
from ("${proj.name}/python/")
}
into ("${proj.name}/lib/perl") {
from ("${proj.name}/perl/")
}
break
case "s4-example-driver":
into("s4-driver") {
from proj.sourceSets.main.resources
}
into ("s4-driver/lib") {
from proj.configurations.runtime
from proj.configurations.archives.allArtifactFiles
}
break
case "s4-tools-loadgenerator":
into("s4-tools-loadgenerator") {
from proj.sourceSets.main.resources
}
into ("s4-tools-loadgenerator/lib") {
from proj.configurations.runtime
from proj.configurations.archives.allArtifactFiles
}
break
case "s4-example-testinput":
print "Got here"
into("s4-example-testinput") {
from proj.sourceSets.main.resources
}
break
case ~/(s4\-example).*/:
into ("s4-example-apps/" + proj.name + "/lib") {
from(
proj.configurations.runtime
- project("s4-core").configurations.runtime
)
from proj.configurations.archives.allArtifactFiles
}
into ("s4-example-apps/" + proj.name) {
from proj.sourceSets.main.resources
}
break
default :
println("default")
}
//if(project.name != 's4-core') {
// into(project.name) {
// from project.sourceSets.main.resources
// }
//}
}
switch(proj.name) {
case "s4-core":
break
case "s4-driver":
into ("${proj.name}/lib/python") {
from ("${proj.name}/python/")
}
into ("${proj.name}/lib/perl") {
from ("${proj.name}/perl/")
}
break
case "s4-example-driver":
into("s4-driver") {
from proj.sourceSets.main.resources
}
into ("s4-driver/lib") {
from proj.configurations.runtime
from proj.configurations.archives.allArtifactFiles
}
break
case "s4-tools-loadgenerator":
into("s4-tools-loadgenerator") {
from proj.sourceSets.main.resources
}
into ("s4-tools-loadgenerator/lib") {
from proj.configurations.runtime
from proj.configurations.archives.allArtifactFiles
}
break
case "s4-example-testinput":
print "Got here"
into("s4-example-testinput") {
from proj.sourceSets.main.resources
}
break
case ~/(s4\-example).*/:
into ("s4-example-apps/" + proj.name + "/lib") {
from(
proj.configurations.runtime
- project("s4-core").configurations.runtime
)
from proj.configurations.archives.allArtifactFiles
}
into ("s4-example-apps/" + proj.name) {
from proj.sourceSets.main.resources
}
break
default :
println("default")
}
//if(project.name != 's4-core') {
// into(project.name) {
// from project.sourceSets.main.resources
// }
//}
}
into('javadoc') {
from "$buildDir/javadoc"
}
}

task binImage(type: Copy) {
description = "Create binary image"
destinationDir = file("$buildDir/s4-image")
Expand All @@ -248,7 +245,6 @@ task allImage(type: Copy, dependsOn: s4Javadoc) {
with allDistImage
}


allImage << {
// we need this because copyTask's fileMode property is not honored
ant.chmod(dir: "$buildDir/s4-image/scripts", perm: "u+x", includes: "*")
Expand Down
42 changes: 21 additions & 21 deletions s4-comm/build.gradle
@@ -1,21 +1,21 @@
/*
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
* License. See accompanying LICENSE file.
*/
dependencies {
compile( libraries.json )
compile( libraries.zk )
}


/*
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
* License. See accompanying LICENSE file.
*/
dependencies {
compile( libraries.json )
compile( libraries.zk )
}


5 changes: 5 additions & 0 deletions s4-core/.gitignore
Expand Up @@ -17,3 +17,8 @@

# Backup files
*~
*.pid
*.log
*.mon
/tmp
/bin
9 changes: 9 additions & 0 deletions s4-core/build.gradle
Expand Up @@ -13,6 +13,7 @@
* language governing permissions and limitations under the
* License. See accompanying LICENSE file.
*/
apply plugin: 'eclipse'
dependencies {
compile( libraries.gson )
compile( libraries.flexjson )
Expand All @@ -24,5 +25,13 @@ dependencies {
compile( libraries.asm )
compile( libraries.commons_cli )
compile( libraries.commons_jexl )
compile( libraries.commons_codec )
compile( libraries.jedis )
compile project(':s4-comm')
}

test {
jvmArgs '-Dlog4j.configuration=/Users/matthieu/dev/s4/s4-core/src/test/resources/log4j.xml'
forkEvery=1
}

9 changes: 8 additions & 1 deletion s4-core/src/main/java/io/s4/MainApp.java
Expand Up @@ -15,6 +15,7 @@
*/
package io.s4;

import io.s4.ft.SafeKeeper;
import io.s4.processor.AbstractPE;
import io.s4.processor.PEContainer;
import io.s4.util.S4Util;
Expand All @@ -35,6 +36,8 @@
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.core.io.ClassPathResource;
Expand Down Expand Up @@ -240,7 +243,11 @@ public static void main(String args[]) throws Exception {
for (String processingElementBeanName : processingElementBeanNames) {
AbstractPE bean = (AbstractPE) context.getBean(processingElementBeanName);
bean.setClock(clock);

try {
bean.setSafeKeeper((SafeKeeper) context.getBean("safeKeeper"));
} catch (NoSuchBeanDefinitionException ignored) {
// no safe keeper = no checkpointing / recovery
}
// if the application did not specify an id, use the Spring bean name
if (bean.getId() == null) {
bean.setId(processingElementBeanName);
Expand Down
4 changes: 4 additions & 0 deletions s4-core/src/main/java/io/s4/dispatcher/Dispatcher.java
Expand Up @@ -52,6 +52,10 @@ public void setEventEmitter(EventEmitter eventEmitter) {
this.eventEmitter = eventEmitter;
}

public EventEmitter getEventEmitter() {
return this.eventEmitter;
}

public void setConfigFilename(String configFilename) {
this.configFilename = configFilename;
}
Expand Down
@@ -0,0 +1,54 @@
package io.s4.dispatcher.partitioner;

import io.s4.emitter.CommLayerEmitter;
import io.s4.emitter.EventEmitter;
import io.s4.listener.EventListener;
import io.s4.processor.PEContainer;

import java.util.ArrayList;
import java.util.List;

/**
* A partitioner that assigns events to the current partition, as given by the comm layer.
*
*/
public class LoopbackPartitioner implements Partitioner, VariableKeyPartitioner {

CommLayerEmitter emitter;

@Override
public List<CompoundKeyInfo> partition(String streamName,
List<List<String>> compoundKeyNames, Object event,
int partitionCount) {
List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
StringBuilder compoundKeyBuilder = new StringBuilder();
// This partitioning ignores the values of the keyed attributes;
// it partitions to the current partition id of the pe container
partitionInfo.setPartitionId(emitter.getListener().getId());
for (List<String> keyNames : compoundKeyNames) {
for (String keyName : keyNames) {
compoundKeyBuilder.append(keyName);
}
}
partitionInfo.setCompoundKey(compoundKeyBuilder.toString());
partitionInfoList.add(partitionInfo);
return partitionInfoList;
}

@Override
public List<CompoundKeyInfo> partition(String streamName, Object event,
int partitionCount) {
return partition(streamName, new ArrayList<List<String>>(0), event,
partitionCount);
}

/**
* A reference on the emitter allows getting the current partition id from the comm layer
* @param emitter comm layer emitter
*/
public void setEventEmitter(CommLayerEmitter emitter) {
this.emitter = emitter;
}

}
4 changes: 4 additions & 0 deletions s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
Expand Up @@ -81,6 +81,10 @@ public void setListenerAppName(String listenerAppName) {
public void setListener(CommLayerListener listener) {
this.listener = listener;
}

public CommLayerListener getListener() {
return this.listener;
}

public void init() {

Expand Down

0 comments on commit 5dbbaec

Please sign in to comment.