Skip to content

Commit

Permalink
Nomenclature (apache#32)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Made our function hierarchy match pulsar's

* Replace cluster with tenant
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent a548bc2 commit 4dd4089
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.util.List;
import lombok.Getter;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.functions.fs.FunctionConfig;
Expand All @@ -45,7 +43,7 @@ abstract class FunctionsCommand extends CliCommand {
names = "--function-classpath",
description = "Function Classpath\n",
listConverter = StringConverter.class)
protected List<String> jarFiles;
protected String jarFile;
@Parameter(names = "--source-topic", description = "Input Topic Name\n")
protected String sourceTopicName;
@Parameter(names = "--sink-topic", description = "Output Topic Name\n")
Expand Down Expand Up @@ -86,10 +84,8 @@ void run() throws Exception {
if (null != outputSerdeClassName) {
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
}
if (null != jarFiles) {
functionConfig.setJarFiles(jarFiles);
} else {
functionConfig.setJarFiles(Lists.newArrayList());
if (null != jarFile) {
functionConfig.setCodeFile(jarFile);
}

run_functions_cmd();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
@EqualsAndHashCode
@ToString
public class FunctionConfig {
// namespace that the function resides in
// tenant that the function resides in
private String tenant;
// namespace that the function belongs to
private String nameSpace;
// Username that the function belongs to
private String userName;
// function name
private String name;
// function class name
Expand All @@ -51,7 +51,9 @@ public class FunctionConfig {
// output serde class name
private String outputSerdeClassName;
// function jar name
private List<String> jarFiles;
private String codeFile;
// function code in byte
private byte[] code;
// source topic
private String sourceTopic;
// sink topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.pulsar.functions.runtime.container;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -110,7 +111,7 @@ public void start() throws Exception {
fnCache.registerFunctionInstance(
javaInstanceConfig.getFunctionId(),
javaInstanceConfig.getInstanceId(),
javaInstanceConfig.getFunctionConfig().getJarFiles(),
Arrays.asList(javaInstanceConfig.getFunctionConfig().getCodeFile()),
Collections.emptyList());
log.info("Initialize function class loader for function {} at function cache manager",
javaInstanceConfig.getFunctionConfig().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ public class ThreadFunctionContainerTest {
public final TestName runtime = new TestName();

private final ThreadFunctionContainerFactory factory;
private final List<String> jarFiles;
private final String jarFile;
private final List<URL> classpaths;

public ThreadFunctionContainerTest() {
URL jarUrl = getClass().getClassLoader().getResource("multifunction.jar");
this.jarFiles = Lists.newArrayList(jarUrl.getPath());
this.jarFile = jarUrl.getPath();
this.classpaths = Collections.emptyList();
this.factory = new ThreadFunctionContainerFactory(1024);
}
Expand All @@ -74,7 +74,7 @@ private Function<Integer, Integer> loadAddFunction() throws Exception {
FunctionConfig createFunctionConfig() {
FunctionConfig config = new FunctionConfig();
config.setName(runtime.getMethodName());
config.setJarFiles(jarFiles);
config.setCodeFile(jarFile);
config.setClassName("org.apache.pulsar.functions.runtime.functioncache.AddFunction");
config.setSourceTopic(runtime.getMethodName() + "-source");
config.setSinkTopic(runtime.getMethodName() + "-sink");
Expand Down

0 comments on commit 4dd4089

Please sign in to comment.