Skip to content

Commit

Permalink
First cut of Python Instance that just parses the arguments and start… (
Browse files Browse the repository at this point in the history
apache#135)

* First cut of Python Instance that just parses the arguments and starts a server to process incoming requests

* Add license header
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 0863c22 commit fea9805
Show file tree
Hide file tree
Showing 9 changed files with 665 additions and 49 deletions.
7 changes: 4 additions & 3 deletions pulsar-functions/conf/function_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ metricsConfig:
port: 9099
threadContainerFactory:
threadGroupName: "Thread Function Container Group"
# processContainerFactory:
# javaInstanceJarLocation: "/Users/sanjeevkulkarni/workspace/srkukarni/pulsar_private/pulsar-functions/runtime/target/java-instance.jar"
# logDirectory: "/Users/sanjeevkulkarni/workspace/srkukarni/pulsar_private/pulsar-functions/runtime/target/logs"
processContainerFactory:
javaInstanceJarLocation: "/Users/sanjeevkulkarni/workspace/srkukarni/pulsar_private/pulsar-functions/runtime/target/java-instance.jar"
pythonInstanceLocation: "/Users/sanjeevkulkarni/workspace/srkukarni/pulsar_private/pulsar-functions/runtime/src/main/python/python_instance.py"
logDirectory: "/Users/sanjeevkulkarni/workspace/srkukarni/pulsar_private/pulsar-functions/runtime/target/logs"
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.io.IOException;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -61,6 +62,7 @@ class ProcessFunctionContainer implements FunctionContainer {
String logDirectory,
String codeFile,
String pulsarServiceUrl) {
Map<String, String> environment = new HashMap<>();
List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionConfig().getRuntime() == Function.FunctionConfig.Runtime.JAVA) {
args.add("java");
Expand All @@ -73,24 +75,26 @@ class ProcessFunctionContainer implements FunctionContainer {
args.add("--jar");
args.add(codeFile);
} else if (instanceConfig.getFunctionConfig().getRuntime() == Function.FunctionConfig.Runtime.PYTHON) {
environment.put("LOGGING_DIRECTORY", logDirectory);
environment.put("LOGGING_FILE", instanceConfig.getFunctionId());
args.add("python");
args.add(instanceFile);
args.add("--py");
args.add(codeFile);
}
args.add("--instance-id");
args.add("--instance_id");
args.add(instanceConfig.getInstanceId());
args.add("--function-id");
args.add("--function_id");
args.add(instanceConfig.getFunctionId());
args.add("--function-version");
args.add("--function_version");
args.add(instanceConfig.getFunctionVersion());
args.add("--tenant");
args.add(instanceConfig.getFunctionConfig().getTenant());
args.add("--namespace");
args.add(instanceConfig.getFunctionConfig().getNamespace());
args.add("--name");
args.add(instanceConfig.getFunctionConfig().getName());
args.add("--function-classname");
args.add("--function_classname");
args.add(instanceConfig.getFunctionConfig().getClassName());
String sourceTopicString = "";
String inputSerdeClassNameString = "";
Expand All @@ -106,27 +110,27 @@ class ProcessFunctionContainer implements FunctionContainer {
inputSerdeClassNameString = inputSerdeClassNameString + "," + entry.getValue();
}
}
args.add("--source-topics");
args.add("--source_topics");
args.add(sourceTopicString);
args.add("--input-serde-classnames");
args.add("--input_serde_classnames");
args.add(inputSerdeClassNameString);
if (instanceConfig.getFunctionConfig().getSinkTopic() != null) {
args.add("--sink-topic");
args.add("--sink_topic");
args.add(instanceConfig.getFunctionConfig().getSinkTopic());
}
if (instanceConfig.getFunctionConfig().getOutputSerdeClassName() != null) {
args.add("--output-serde-classname");
args.add("--output_serde_classname");
args.add(instanceConfig.getFunctionConfig().getOutputSerdeClassName());
}
args.add("--processing-guarantees");
args.add("--processing_guarantees");
if (instanceConfig.getFunctionConfig().getProcessingGuarantees() != null) {
args.add(String.valueOf(instanceConfig.getFunctionConfig().getProcessingGuarantees()));
} else {
args.add("ATMOST_ONCE");
}
args.add("--pulsar-serviceurl");
args.add("--pulsar_serviceurl");
args.add(pulsarServiceUrl);
args.add("--max-buffered-tuples");
args.add("--max_buffered_tuples");
args.add(String.valueOf(maxBufferedTuples));
Map<String, String> userConfig = instanceConfig.getFunctionConfig().getUserConfigMap();
String userConfigString = "";
Expand All @@ -137,14 +141,17 @@ class ProcessFunctionContainer implements FunctionContainer {
}
userConfigString = userConfigString + entry.getKey() + ":" + entry.getValue();
}
args.add("--user-config");
args.add("--user_config");
args.add(userConfigString);
}
instancePort = findAvailablePort();
args.add("--port");
args.add(String.valueOf(instancePort));

processBuilder = new ProcessBuilder(args);
if (!environment.isEmpty()) {
processBuilder.environment().putAll(environment);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/
@Slf4j
public class JavaInstanceMain {
@Parameter(names = "--function-classname", description = "Function Class Name\n", required = true)
@Parameter(names = "--function_classname", description = "Function Class Name\n", required = true)
protected String className;
@Parameter(
names = "--jar",
Expand All @@ -57,39 +57,39 @@ public class JavaInstanceMain {
protected String tenant;
@Parameter(names = "--namespace", description = "Namespace Name\n", required = true)
protected String namespace;
@Parameter(names = "--source-topics", description = "Input Topic Name\n", required = true)
@Parameter(names = "--source_topics", description = "Input Topic Name\n", required = true)
protected String sourceTopicName;
@Parameter(names = "--sink-topic", description = "Output Topic Name\n")
@Parameter(names = "--sink_topic", description = "Output Topic Name\n")
protected String sinkTopicName;

@Parameter(names = "--input-serde-classnames", description = "Input SerDe\n", required = true)
@Parameter(names = "--input_serde_classnames", description = "Input SerDe\n", required = true)
protected String inputSerdeClassName;

@Parameter(names = "--output-serde-classname", description = "Output SerDe\n")
@Parameter(names = "--output_serde_classname", description = "Output SerDe\n")
protected String outputSerdeClassName;

@Parameter(names = "--processing-guarantees", description = "Processing Guarantees\n", required = true)
@Parameter(names = "--processing_guarantees", description = "Processing Guarantees\n", required = true)
protected FunctionConfig.ProcessingGuarantees processingGuarantees;

@Parameter(names = "--instance-id", description = "Instance Id\n", required = true)
@Parameter(names = "--instance_id", description = "Instance Id\n", required = true)
protected String instanceId;

@Parameter(names = "--function-id", description = "Function Id\n", required = true)
@Parameter(names = "--function_id", description = "Function Id\n", required = true)
protected String functionId;

@Parameter(names = "--function-version", description = "Function Version\n", required = true)
@Parameter(names = "--function_version", description = "Function Version\n", required = true)
protected String functionVersion;

@Parameter(names = "--pulsar-serviceurl", description = "Pulsar Service Url\n", required = true)
@Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true)
protected String pulsarServiceUrl;

@Parameter(names = "--port", description = "Port to listen on\n", required = true)
protected int port;

@Parameter(names = "--max-buffered-tuples", description = "Maximum number of tuples to buffer\n", required = true)
@Parameter(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n", required = true)
protected int maxBufferedTuples;

@Parameter(names = "--user-config", description = "UserConfig\n")
@Parameter(names = "--user_config", description = "UserConfig\n")
protected String userConfig;

private Thread fnThread;
Expand Down
Loading

0 comments on commit fea9805

Please sign in to comment.