Skip to content

Commit

Permalink
[GOBBLIN-1496] Initialize arrays and maps related to gobblin as a ser…
Browse files Browse the repository at this point in the history
…vice with an in… (apache#3339)

* optimize some maps and arrays that can be initialized with a fixed size

* use newHashmapWithExpectedSize

* fix checkstyle

* fix bug

* undo accidental change getTimeUnit
  • Loading branch information
Will-Lo authored and jack-moseley committed Aug 24, 2022
1 parent 1bfeac7 commit 478b0e1
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public static Set<String> getErrorCodeWhitelist(Config config) {
*/
public static void updateStatusType(ResponseStatus status, int statusCode, Set<String> errorCodeWhitelist) {
if (statusCode >= 300 & statusCode < 500) {
List<String> whitelist = new ArrayList<>();
List<String> whitelist = new ArrayList<>(2);
whitelist.add(Integer.toString(statusCode));
if (statusCode > 400) {
whitelist.add(HttpConstants.CODE_4XX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.gobblin.service;

import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -125,7 +125,7 @@ private FlowStatusId createFlowStatusId(String locationHeader) {
matcher.find();
String allFields = matcher.group("flowStatusIdParams");
String[] flowStatusIdParams = allFields.split(",");
Map<String, String> paramsMap = new HashMap<>();
Map<String, String> paramsMap = Maps.newHashMapWithExpectedSize(flowStatusIdParams.length);
for (String flowStatusIdParam : flowStatusIdParams) {
paramsMap.put(flowStatusIdParam.split(":")[0], flowStatusIdParam.split(":")[1]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public Builder withTemplate(URI templateURI) {
*/
public Builder withResourceTemplates(List<URI> templateURIs) {
try {
List<JobTemplate> templates = new ArrayList<>();
List<JobTemplate> templates = new ArrayList<>(templateURIs.size());
for(URI uri : templateURIs) {
templates.add(ResourceBasedJobTemplate.forResourcePath(uri.getPath()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static List<Properties> loadGenericJobConfigs(Properties sysProps, JobSpe
Collection<Config> configs =
loader.loadPullFilesRecursively(rootPath, sysConfig, true);

List<Properties> jobConfigs = Lists.newArrayList();
List<Properties> jobConfigs = Lists.newArrayListWithCapacity(configs.size());
for (Config config : configs) {
try {
jobConfigs.add(resolveTemplate(ConfigUtils.configToProperties(config), resolver));
Expand Down Expand Up @@ -107,7 +107,7 @@ public static List<Properties> loadGenericJobConfigs(Properties sysProps, Path c
Collection<Config> configs =
loader.loadPullFilesRecursively(commonPropsPath.getParent(), sysConfig, true);

List<Properties> jobConfigs = Lists.newArrayList();
List<Properties> jobConfigs = Lists.newArrayListWithCapacity(configs.size());
for (Config config : configs) {
try {
jobConfigs.add(resolveTemplate(ConfigUtils.configToProperties(config), resolver));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private List<SpecExecutor> getSpecExecutors(Config edgeConfig)
//Get the logical names of SpecExecutors where the FlowEdge can be executed.
List<String> specExecutorNames = ConfigUtils.getStringList(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
//Load all the SpecExecutor configurations for this FlowEdge from the SpecExecutor Catalog.
List<SpecExecutor> specExecutors = new ArrayList<>();
List<SpecExecutor> specExecutors = new ArrayList<>(specExecutorNames.size());
for (String specExecutorName: specExecutorNames) {
URI specExecutorUri = new URI(specExecutorName);
specExecutors.add(this.topologySpecMap.get(specExecutorUri).getSpecExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.gobblin.service.modules.flow;

import com.google.common.collect.Maps;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -60,8 +60,8 @@
public class FlowGraphPath {
@Getter
private List<List<FlowEdgeContext>> paths;
private FlowSpec flowSpec;
private Long flowExecutionId;
private final FlowSpec flowSpec;
private final Long flowExecutionId;

public FlowGraphPath(FlowSpec flowSpec, Long flowExecutionId) {
this.flowSpec = flowSpec;
Expand Down Expand Up @@ -142,26 +142,26 @@ private static boolean isNodeForkable(DagNode<JobExecutionPlan> dagNode) {
*/
private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext flowEdgeContext, Config sysConfig)
throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate();
DatasetDescriptor inputDatasetDescriptor = flowEdgeContext.getInputDatasetDescriptor();
DatasetDescriptor outputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
Config mergedConfig = flowEdgeContext.getMergedConfig();
SpecExecutor specExecutor = flowEdgeContext.getSpecExecutor();

List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
Map<String, String> templateToJobNameMap = new HashMap<>();

//Get resolved job configs from the flow template
List<Config> resolvedJobConfigs = flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, outputDatasetDescriptor);
//Iterate over each resolved job config and convert the config to a JobSpec.
for (Config resolvedJobConfig : resolvedJobConfigs) {
JobExecutionPlan jobExecutionPlan = new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId, sysConfig);
jobExecutionPlans.add(jobExecutionPlan);
templateToJobNameMap.put(getJobTemplateName(jobExecutionPlan), jobExecutionPlan.getJobSpec().getConfig().getString(
ConfigurationKeys.JOB_NAME_KEY));
}
updateJobDependencies(jobExecutionPlans, templateToJobNameMap);
return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate();
DatasetDescriptor inputDatasetDescriptor = flowEdgeContext.getInputDatasetDescriptor();
DatasetDescriptor outputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
Config mergedConfig = flowEdgeContext.getMergedConfig();
SpecExecutor specExecutor = flowEdgeContext.getSpecExecutor();

//Get resolved job configs from the flow template
List<Config> resolvedJobConfigs = flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, outputDatasetDescriptor);

List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(resolvedJobConfigs.size());
Map<String, String> templateToJobNameMap = Maps.newHashMapWithExpectedSize(resolvedJobConfigs.size());
//Iterate over each resolved job config and convert the config to a JobSpec.
for (Config resolvedJobConfig : resolvedJobConfigs) {
JobExecutionPlan jobExecutionPlan = new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId, sysConfig);
jobExecutionPlans.add(jobExecutionPlan);
templateToJobNameMap.put(getJobTemplateName(jobExecutionPlan), jobExecutionPlan.getJobSpec().getConfig().getString(
ConfigurationKeys.JOB_NAME_KEY));
}
updateJobDependencies(jobExecutionPlans, templateToJobNameMap);
return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
}

/**
Expand Down Expand Up @@ -198,9 +198,10 @@ private static String getJobTemplateName(JobExecutionPlan jobExecutionPlan) {
private void updateJobDependencies(List<JobExecutionPlan> jobExecutionPlans, Map<String, String> templateToJobNameMap) {
for (JobExecutionPlan jobExecutionPlan: jobExecutionPlans) {
JobSpec jobSpec = jobExecutionPlan.getJobSpec();
List<String> updatedDependenciesList = new ArrayList<>();
if (jobSpec.getConfig().hasPath(ConfigurationKeys.JOB_DEPENDENCIES)) {
for (String dependency : ConfigUtils.getStringList(jobSpec.getConfig(), ConfigurationKeys.JOB_DEPENDENCIES)) {
List<String> jobDependencies = ConfigUtils.getStringList(jobSpec.getConfig(), ConfigurationKeys.JOB_DEPENDENCIES);
List<String> updatedDependenciesList = new ArrayList<>(jobDependencies.size());
for (String dependency : jobDependencies) {
if (!templateToJobNameMap.containsKey(dependency)) {
//We should never hit this condition. The logic here is a safety check.
throw new RuntimeException("TemplateToJobNameMap does not contain dependency " + dependency);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public List<FlowEdgeContext> findPathUnicast(DataNode destNode) {

//Base condition 2: Check if we are already at the target. If so, return an empty path.
if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
return new ArrayList<>();
return new ArrayList<>(0);
}

LinkedList<FlowEdgeContext> edgeQueue =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.gobblin.service.modules.spec;

import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -44,8 +44,9 @@ public class JobExecutionPlanDagFactory {

public Dag<JobExecutionPlan> createDag(List<JobExecutionPlan> jobExecutionPlans) {
//Maintain a mapping between job name and the corresponding JobExecutionPlan.
Map<String, Dag.DagNode<JobExecutionPlan>> jobExecutionPlanMap = new HashMap<>();
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
Map<String, Dag.DagNode<JobExecutionPlan>> jobExecutionPlanMap =
Maps.newHashMapWithExpectedSize(jobExecutionPlans.size());
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(jobExecutionPlans.size());
/**
* Create a {@link Dag.DagNode<JobExecutionPlan>} for every {@link JobSpec} in the flow. Add this node
* to a HashMap.
Expand Down Expand Up @@ -89,7 +90,7 @@ public Dag<JobExecutionPlan> createDag(List<JobExecutionPlan> jobExecutionPlans)
*/
private static List<String> getDependencies(Config config) {
return config.hasPath(ConfigurationKeys.JOB_DEPENDENCIES) ? Arrays
.asList(config.getString(ConfigurationKeys.JOB_DEPENDENCIES).split(",")) : new ArrayList<>();
.asList(config.getString(ConfigurationKeys.JOB_DEPENDENCIES).split(",")) : new ArrayList<>(0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public List<Config> getResolvedJobConfigs(Config userConfig, DatasetDescriptor i
Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);

List<Config> resolvedJobConfigs = new ArrayList<>();
List<Config> resolvedJobConfigs = new ArrayList<>(getJobTemplates().size());
JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
for (JobTemplate jobTemplate: getJobTemplates()) {
ResolvedJobSpec resolvedJobSpec = this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(jobTemplate).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, Strin
Predicate<String> flowExecutionIdPredicate = input -> input.startsWith(String.valueOf(flowExecutionId) + ".");
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
try {
List<JobStatus> jobStatuses = new ArrayList<>();
List<String> tableNames = this.stateStore.getTableNames(storeName, flowExecutionIdPredicate);
List<JobStatus> jobStatuses = new ArrayList<>(tableNames.size());
for (String tableName: tableNames) {
List<State> jobStates = this.stateStore.getAll(storeName, tableName);
if (jobStates.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -97,7 +96,7 @@ public void saveConfigToFile(final Config config, final Path destPath)
* @return a {@link Properties} instance
*/
public static Properties configToProperties(Config config) {
return configToProperties(config, Optional.<String>absent());
return configToProperties(config, Optional.absent());
}

/**
Expand Down Expand Up @@ -181,7 +180,7 @@ public static State configToState(Config config) {
* @return a {@link Config} instance
*/
public static Config propertiesToConfig(Properties properties) {
return propertiesToConfig(properties, Optional.<String>absent());
return propertiesToConfig(properties, Optional.absent());
}

/**
Expand Down Expand Up @@ -229,7 +228,7 @@ public static Set<String> findFullPrefixKeys(Properties properties,
* @return a {@link Config} instance
*/
public static Config propertiesToConfig(Properties properties, Optional<String> prefix) {
Set<String> blacklistedKeys = new HashSet<>();
Set<String> blacklistedKeys = new HashSet<>(0);
if (properties.containsKey(GOBBLIN_CONFIG_BLACKLIST_KEYS)) {
blacklistedKeys = new HashSet<>(Splitter.on(',').omitEmptyStrings().trimResults()
.splitToList(properties.getProperty(GOBBLIN_CONFIG_BLACKLIST_KEYS)));
Expand Down Expand Up @@ -303,7 +302,7 @@ public static Config propertiesToTypedConfig(Properties properties, Optional<Str
* values Strings. This implementation will try to recognize booleans and numbers. All keys are
* treated as strings.*/
private static Map<String, Object> guessPropertiesTypes(Map<Object, Object> srcProperties) {
Map<String, Object> res = new HashMap<>();
Map<String, Object> res = Maps.newHashMapWithExpectedSize(srcProperties.size());
for (Map.Entry<Object, Object> prop : srcProperties.entrySet()) {
Object value = prop.getValue();
if (null != value && value instanceof String && !Strings.isNullOrEmpty(value.toString())) {
Expand Down Expand Up @@ -478,7 +477,7 @@ public static List<String> getStringList(Config config, String path) {
return Collections.emptyList();
}

List<String> valueList = Lists.newArrayList();
List<String> valueList;
try {
valueList = config.getStringList(path);
} catch (ConfigException.WrongType e) {
Expand All @@ -494,7 +493,7 @@ public static List<String> getStringList(Config config, String path) {
* b
* 10,12
*/
try (CSVReader csvr = new CSVReader(new StringReader(config.getString(path)));) {
try (CSVReader csvr = new CSVReader(new StringReader(config.getString(path)))) {
valueList = Lists.newArrayList(csvr.readNext());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
Expand Down Expand Up @@ -561,7 +560,7 @@ public static Config resolveEncrypted(Config config, Optional<String> encConfigP
Config encryptedConfig = config.getConfig(encConfigPath.get());

PasswordManager passwordManager = PasswordManager.getInstance(configToProperties(config));
Map<String, String> tmpMap = Maps.newHashMap();
Map<String, String> tmpMap = Maps.newHashMapWithExpectedSize(encryptedConfig.entrySet().size());
for (Map.Entry<String, ConfigValue> entry : encryptedConfig.entrySet()) {
String val = entry.getValue().unwrapped().toString();
val = passwordManager.readPassword(val);
Expand Down

0 comments on commit 478b0e1

Please sign in to comment.