Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][fn] Optimize Function Worker startup by lazy loading and di… #172

Merged
merged 3 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ metadataStoreOperationTimeoutSeconds: 30
# Metadata store cache expiry time in seconds
metadataStoreCacheExpirySeconds: 300

# Specifies if the function worker should use classloading for validating submissions for built-in
# connectors and functions. This is required for validateConnectorConfig to take effect.
# Default is false.
enableClassloadingOfBuiltinFiles: false

# Specifies if the function worker should use classloading for validating submissions for external
# connectors and functions. This is required for validateConnectorConfig to take effect.
# Default is false.
enableClassloadingOfExternalFiles: false

################################
# Function package management
################################
Expand Down Expand Up @@ -390,7 +400,10 @@ saslJaasServerRoleTokenSignerSecretPath:
connectorsDirectory: ./connectors
functionsDirectory: ./functions

# Should connector config be validated during submission
# Enables extended validation for connector config with fine-grain annotation based validation
# during submission. Classloading with either enableClassloadingOfExternalFiles or
# enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect.
# Default is false.
validateConnectorConfig: false

# Whether to initialize distributed log metadata by runtime.
Expand Down
4 changes: 4 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,10 @@ The Apache Software License, Version 2.0
* Jodah
- net.jodah-typetools-0.5.0.jar
- net.jodah-failsafe-2.4.4.jar
* Byte Buddy
- net.bytebuddy-byte-buddy-1.14.12.jar
* zt-zip
- org.zeroturnaround-zt-zip-1.17.jar
* Apache Avro
- org.apache.avro-avro-1.11.3.jar
- org.apache.avro-avro-protobuf-1.11.3.jar
Expand Down
2 changes: 1 addition & 1 deletion docker/pulsar/scripts/gen-yml-from-env.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
conf_files = sys.argv[1:]

for conf_filename in conf_files:
conf = yaml.load(open(conf_filename))
conf = yaml.load(open(conf_filename), Loader=yaml.FullLoader)

# update the config
modified = False
Expand Down
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ flexible messaging model and an intuitive client API.</description>
<puppycrawl.checkstyle.version>8.37</puppycrawl.checkstyle.version>
<dockerfile-maven.version>1.4.13</dockerfile-maven.version>
<typetools.version>0.5.0</typetools.version>
<byte-buddy.version>1.14.12</byte-buddy.version>
<zt-zip.version>1.17</zt-zip.version>
<protobuf3.version>3.19.6</protobuf3.version>
<protoc3.version>${protobuf3.version}</protoc3.version>
<grpc.version>1.45.1</grpc.version>
Expand Down Expand Up @@ -955,6 +957,18 @@ flexible messaging model and an intuitive client API.</description>
<version>${typetools.version}</version>
</dependency>

<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>${byte-buddy.version}</version>
</dependency>

<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-zip</artifactId>
<version>${zt-zip.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
Expand Down
30 changes: 30 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,24 @@
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-cassandra</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-io-cassandra.nar</destFileName>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-twitter</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-io-twitter.nar</destFileName>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-data-generator</artifactId>
Expand Down Expand Up @@ -501,6 +519,15 @@
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-functions-api-examples.jar</destFileName>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-api-examples</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-functions-api-examples.nar</destFileName>
</artifactItem>
</artifactItems>
</configuration>
</execution>
Expand All @@ -514,7 +541,10 @@
<systemPropertyVariables>
<pulsar-io-data-generator.nar.path>${project.build.directory}/pulsar-io-data-generator.nar</pulsar-io-data-generator.nar.path>
<pulsar-functions-api-examples.jar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-functions-api-examples.jar.path>
<pulsar-functions-api-examples.nar.path>${project.build.directory}/pulsar-functions-api-examples.nar</pulsar-functions-api-examples.nar.path>
<pulsar-io-batch-data-generator.nar.path>${project.build.directory}/pulsar-io-batch-data-generator.nar</pulsar-io-batch-data-generator.nar.path>
<pulsar-io-cassandra.nar.path>${project.build.directory}/pulsar-io-cassandra.nar</pulsar-io-cassandra.nar.path>
<pulsar-io-twitter.nar.path>${project.build.directory}/pulsar-io-twitter.nar</pulsar-io-twitter.nar.path>
<!-- workaround issue #13750 which gets triggered if org.apache.bookkeeper.meta.MetadataDrivers class gets loaded before org.apache.pulsar.metadata.bookkeeper.BKCluster constructor is called -->
<bookkeeper.metadata.bookie.drivers>org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver</bookkeeper.metadata.bookie.drivers>
<bookkeeper.metadata.client.drivers>org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver</bookkeeper.metadata.client.drivers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ public NarClassLoader run() {
});
}

public static List<File> getClasspathFromArchive(File narPath, String narExtractionDirectory) throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
return getClassPathEntries(unpacked);
}

private static File getNarExtractionDirectory(String configuredDirectory) {
return new File(configuredDirectory + "/" + TMP_DIR_PREFIX);
}
Expand All @@ -164,16 +169,11 @@ private static File getNarExtractionDirectory(String configuredDirectory) {
* @param narWorkingDirectory
* directory to explode nar contents to
* @param parent
* @throws IllegalArgumentException
* if the NAR is missing the Java Services API file for <tt>FlowFileProcessor</tt> implementations.
* @throws ClassNotFoundException
* if any of the <tt>FlowFileProcessor</tt> implementations defined by the Java Services API cannot be
* loaded.
* @throws IOException
* if an error occurs while loading the NAR.
*/
private NarClassLoader(final File narWorkingDirectory, Set<String> additionalJars, ClassLoader parent)
throws ClassNotFoundException, IOException {
throws IOException {
super(new URL[0], parent);
this.narWorkingDirectory = narWorkingDirectory;

Expand Down Expand Up @@ -238,22 +238,31 @@ public List<String> getServiceImplementation(String serviceName) throws IOExcept
* if the URL list could not be updated.
*/
private void updateClasspath(File root) throws IOException {
addURL(root.toURI().toURL()); // for compiled classes, META-INF/, etc.
getClassPathEntries(root).forEach(f -> {
try {
addURL(f.toURI().toURL());
} catch (IOException e) {
log.error("Failed to add entry to classpath: {}", f, e);
}
});
}

static List<File> getClassPathEntries(File root) {
List<File> classPathEntries = new ArrayList<>();
classPathEntries.add(root);
File dependencies = new File(root, "META-INF/bundled-dependencies");
if (!dependencies.isDirectory()) {
log.warn("{} does not contain META-INF/bundled-dependencies!", narWorkingDirectory);
log.warn("{} does not contain META-INF/bundled-dependencies!", root);
}
addURL(dependencies.toURI().toURL());
classPathEntries.add(dependencies);
if (dependencies.isDirectory()) {
final File[] jarFiles = dependencies.listFiles(JAR_FILTER);
if (jarFiles != null) {
Arrays.sort(jarFiles, Comparator.comparing(File::getName));
for (File libJar : jarFiles) {
addURL(libJar.toURI().toURL());
}
classPathEntries.addAll(Arrays.asList(jarFiles));
}
}
return classPathEntries;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import lombok.extern.slf4j.Slf4j;

/**
Expand Down Expand Up @@ -114,16 +115,22 @@ static File doUnpackNar(final File nar, final File baseWorkingDirectory, Runnabl
* if the NAR could not be unpacked.
*/
private static void unpack(final File nar, final File workingDirectory) throws IOException {
try (JarFile jarFile = new JarFile(nar)) {
Enumeration<JarEntry> jarEntries = jarFile.entries();
while (jarEntries.hasMoreElements()) {
JarEntry jarEntry = jarEntries.nextElement();
String name = jarEntry.getName();
File f = new File(workingDirectory, name);
if (jarEntry.isDirectory()) {
Path workingDirectoryPath = workingDirectory.toPath().normalize();
try (ZipFile zipFile = new ZipFile(nar)) {
Enumeration<? extends ZipEntry> zipEntries = zipFile.entries();
while (zipEntries.hasMoreElements()) {
ZipEntry zipEntry = zipEntries.nextElement();
String name = zipEntry.getName();
Path targetFilePath = workingDirectoryPath.resolve(name).normalize();
if (!targetFilePath.startsWith(workingDirectoryPath)) {
log.error("Invalid zip file with entry '{}'", name);
throw new IOException("Invalid zip file. Aborting unpacking.");
}
File f = targetFilePath.toFile();
if (zipEntry.isDirectory()) {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(f);
} else {
makeFile(jarFile.getInputStream(jarEntry), f);
makeFile(zipFile.getInputStream(zipEntry), f);
}
}
}
Expand Down
Loading
Loading