Skip to content

Commit

Permalink
TEZ-4275: Use Google Guava Intern Facility (apache#95)
Browse files Browse the repository at this point in the history
* TEZ-4275: Use Google Guava Intern Facility

* Also add hint for running finalization

* Use Guava String Intern implementation

* Fix checkstyle, white-space issues

* Remove GC hint

Co-authored-by: David Mollitor <david.mollitor@cloudera.com>
  • Loading branch information
belugabehr and David Mollitor committed Jun 30, 2021
1 parent 26b86e7 commit 984d09c
Show file tree
Hide file tree
Showing 23 changed files with 127 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.yarn.api.records.ApplicationId;

import org.apache.tez.common.Preconditions;
import org.apache.tez.util.FastNumberFormat;

import com.google.common.collect.Interner;
import com.google.common.collect.Interners;

/**
* TezDAGID represents the immutable and unique identifier for
* a Tez DAG.
Expand All @@ -40,7 +42,7 @@
*/
public class TezDAGID extends TezID {

private static TezIDCache<TezDAGID> tezDAGIDCache = new TezIDCache<>();
private static Interner<TezDAGID> tezDAGIDCache = Interners.newWeakInterner();
private ApplicationId applicationId;

/**
Expand All @@ -53,12 +55,7 @@ public static TezDAGID getInstance(ApplicationId applicationId, int id) {
// will be short-lived.
// Alternately the cache can be keyed by the hash of the incoming paramters.
Preconditions.checkArgument(applicationId != null, "ApplicationID cannot be null");
return tezDAGIDCache.getInstance(new TezDAGID(applicationId, id));
}

@InterfaceAudience.Private
public static void clearCache() {
tezDAGIDCache.clear();
return tezDAGIDCache.intern(new TezDAGID(applicationId, id));
}

/**
Expand All @@ -72,7 +69,7 @@ public static TezDAGID getInstance(String yarnRMIdentifier, int appId, int id) {
// will be short-lived.
// Alternately the cache can be keyed by the hash of the incoming paramters.
Preconditions.checkArgument(yarnRMIdentifier != null, "yarnRMIdentifier cannot be null");
return tezDAGIDCache.getInstance(new TezDAGID(yarnRMIdentifier, appId, id));
return tezDAGIDCache.intern(new TezDAGID(yarnRMIdentifier, appId, id));
}

// Public for Writable serialization. Verify if this is actually required.
Expand Down
21 changes: 0 additions & 21 deletions tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.WeakHashMap;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand All @@ -42,25 +40,6 @@ public abstract class TezID implements WritableComparable<TezID> {
public static final char SEPARATOR = '_';
protected int id;

public static class TezIDCache<T> {
private final WeakHashMap<T, WeakReference<T>> cache = new WeakHashMap<>();

synchronized T getInstance(final T id) {
final WeakReference<T> cached = cache.get(id);
if (cached != null) {
final T value = cached.get();
if (value != null)
return value;
}
cache.put(id, new WeakReference<T>(id));
return id;
}

synchronized void clear() {
cache.clear();
}
}

/** constructs an ID object from the given int */
public TezID(int id) {
this.id = id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import com.google.common.collect.Interner;
import com.google.common.collect.Interners;

/**
* TezTaskAttemptID represents the immutable and unique identifier for
* a task attempt. Each task attempt is one particular instance of a Tez Task
Expand All @@ -46,7 +49,7 @@ public class TezTaskAttemptID extends TezID {
public static final String ATTEMPT = "attempt";
private TezTaskID taskId;

private static TezIDCache<TezTaskAttemptID> tezTaskAttemptIDCache = new TezIDCache<>();
private static Interner<TezTaskAttemptID> tezTaskAttemptIDCache = Interners.newWeakInterner();

// Public for Writable serialization. Verify if this is actually required.
public TezTaskAttemptID() {
Expand All @@ -58,12 +61,7 @@ public TezTaskAttemptID() {
* @param id the task attempt number
*/
public static TezTaskAttemptID getInstance(TezTaskID taskID, int id) {
return tezTaskAttemptIDCache.getInstance(new TezTaskAttemptID(taskID, id));
}

@InterfaceAudience.Private
public static void clearCache() {
tezTaskAttemptIDCache.clear();
return tezTaskAttemptIDCache.intern(new TezTaskAttemptID(taskID, id));
}

private TezTaskAttemptID(TezTaskID taskId, int id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.tez.common.Preconditions;
import org.apache.tez.util.FastNumberFormat;

import com.google.common.collect.Interner;
import com.google.common.collect.Interners;

/**
* TaskID represents the immutable and unique identifier for
* a Tez Task. Each TaskID encompasses multiple attempts made to
Expand All @@ -51,7 +54,7 @@ public FastNumberFormat initialValue() {
}
};

private static TezIDCache<TezTaskID> tezTaskIDCache = new TezIDCache<>();
private static Interner<TezTaskID> tezTaskIDCache = Interners.newWeakInterner();
private TezVertexID vertexId;

/**
Expand All @@ -61,12 +64,7 @@ public FastNumberFormat initialValue() {
*/
public static TezTaskID getInstance(TezVertexID vertexID, int id) {
Preconditions.checkArgument(vertexID != null, "vertexID cannot be null");
return tezTaskIDCache.getInstance(new TezTaskID(vertexID, id));
}

@InterfaceAudience.Private
public static void clearCache() {
tezTaskIDCache.clear();
return tezTaskIDCache.intern(new TezTaskID(vertexID, id));
}

private TezTaskID(TezVertexID vertexID, int id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.tez.common.Preconditions;
import org.apache.tez.util.FastNumberFormat;

import com.google.common.collect.Interner;
import com.google.common.collect.Interners;

/**
* TezVertexID represents the immutable and unique identifier for
* a Vertex in a Tez DAG. Each TezVertexID encompasses multiple Tez Tasks.
Expand All @@ -53,7 +56,7 @@ public FastNumberFormat initialValue() {
}
};

private static TezIDCache<TezVertexID> tezVertexIDCache = new TezIDCache<>();
private static Interner<TezVertexID> tezVertexIDCache = Interners.newWeakInterner();
private TezDAGID dagId;

// Public for Writable serialization. Verify if this is actually required.
Expand All @@ -67,12 +70,7 @@ public TezVertexID() {
*/
public static TezVertexID getInstance(TezDAGID dagId, int id) {
Preconditions.checkArgument(dagId != null, "DagID cannot be null");
return tezVertexIDCache.getInstance(new TezVertexID(dagId, id));
}

@InterfaceAudience.Private
public static void clearCache() {
tezVertexIDCache.clear();
return tezVertexIDCache.intern(new TezVertexID(dagId, id));
}

private TezVertexID(TezDAGID dagId, int id) {
Expand Down
36 changes: 36 additions & 0 deletions tez-common/src/main/java/org/apache/tez/util/StringInterner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.util;

import com.google.common.collect.Interner;
import com.google.common.collect.Interners;

/**
* A class to replace the {@code String.intern()}. The {@code String.intern()}
* has some well-known performance limitations, and should generally be avoided.
* Prefer Google's interner over the JDK's implementation.
*/
public final class StringInterner {

private static final Interner<String> STRING_INTERNER =
Interners.newWeakInterner();

private StringInterner() {
}

public static String intern(final String str) {
return (str == null) ? null : STRING_INTERNER.intern(str);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@
import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.history.events.DAGRecoveredEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -869,10 +867,6 @@ protected synchronized void handle(DAGAppMasterEvent event) {
taskCommunicatorManager.dagComplete(cleanupEvent.getDag());
nodes.dagComplete(cleanupEvent.getDag());
containers.dagComplete(cleanupEvent.getDag());
TezTaskAttemptID.clearCache();
TezTaskID.clearCache();
TezVertexID.clearCache();
TezDAGID.clearCache();
LOG.info("Completed cleanup for DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" +
cleanupEvent.getDag().getID());
synchronized (idleStateLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@
import org.apache.tez.dag.app.rm.AMSchedulerEventTAStateUpdated;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.util.StringInterner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
Expand Down Expand Up @@ -1399,7 +1399,7 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
ta.container = container;
ta.containerId = tEvent.getContainerId();
ta.containerNodeId = container.getNodeId();
ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress());
ta.nodeHttpAddress = StringInterner.intern(container.getNodeHttpAddress());
}

if (event instanceof TaskAttemptEventContainerTerminatedBySystem) {
Expand All @@ -1411,7 +1411,7 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
ta.container = container;
ta.containerId = tEvent.getContainerId();
ta.containerNodeId = container.getNodeId();
ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress());
ta.nodeHttpAddress = StringInterner.intern(container.getNodeHttpAddress());
}

if (ta.recoveryData == null ||
Expand Down Expand Up @@ -1453,8 +1453,8 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) {
ta.container = container;
ta.containerId = event.getContainerId();
ta.containerNodeId = container.getNodeId();
ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress());
ta.nodeRackName = StringInterner.weakIntern(RackResolver.resolve(ta.containerNodeId.getHost())
ta.nodeHttpAddress = StringInterner.intern(container.getNodeHttpAddress());
ta.nodeRackName = StringInterner.intern(RackResolver.resolve(ta.containerNodeId.getHost())
.getNetworkLocation());
ta.lastNotifyProgressTimestamp = ta.clock.getTime();

Expand All @@ -1463,7 +1463,7 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) {
// TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = NetUtils
.createSocketAddr(ta.nodeHttpAddress); // TODO: Costly?
ta.trackerName = StringInterner.weakIntern(nodeHttpInetAddr.getHostName());
ta.trackerName = StringInterner.intern(nodeHttpInetAddr.getHostName());
ta.httpPort = nodeHttpInetAddr.getPort();
ta.sendEvent(createDAGCounterUpdateEventTALaunched(ta));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
Expand Down Expand Up @@ -190,6 +189,7 @@
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
import org.apache.tez.util.StringInterner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -965,7 +965,7 @@ public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
StateChangeNotifier entityStatusTracker, Configuration dagOnlyConf) {
this.vertexId = vertexId;
this.vertexPlan = vertexPlan;
this.vertexName = StringInterner.weakIntern(vertexName);
this.vertexName = StringInterner.intern(vertexName);
this.vertexConf = new Configuration(dagConf);
this.vertexOnlyConf = new Configuration(dagOnlyConf);
if (vertexPlan.hasVertexConf()) {
Expand Down Expand Up @@ -4514,7 +4514,7 @@ public Resource getTaskResource() {
}

void addIO(String name) {
ioIndices.put(StringInterner.weakIntern(name), ioIndices.size());
ioIndices.put(StringInterner.intern(name), ioIndices.size());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import org.apache.commons.collections4.bidimap.DualHashBidiMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.client.CallerContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.util.StringInterner;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
Expand Down Expand Up @@ -100,7 +100,7 @@ public class DagInfo extends BaseInfo {
Preconditions.checkArgument(jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
(Constants.TEZ_DAG_ID));

dagId = StringInterner.weakIntern(jsonObject.getString(Constants.ENTITY));
dagId = StringInterner.intern(jsonObject.getString(Constants.ENTITY));

//Parse additional Info
JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
Expand Down Expand Up @@ -140,7 +140,7 @@ public class DagInfo extends BaseInfo {
diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS);
JSONObject dagPlan = otherInfoNode.optJSONObject(Constants.DAG_PLAN);
name = StringInterner.weakIntern((dagPlan != null) ? (dagPlan.optString(Constants.DAG_NAME)) : null);
name = StringInterner.intern((dagPlan != null) ? (dagPlan.optString(Constants.DAG_NAME)) : null);
if (dagPlan != null) {
JSONArray vertices = dagPlan.optJSONArray(Constants.VERTICES);
if (vertices != null) {
Expand All @@ -152,7 +152,7 @@ public class DagInfo extends BaseInfo {
} else {
numVertices = 0;
}
status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS));
status = StringInterner.intern(otherInfoNode.optString(Constants.STATUS));

//parse name id mapping
JSONObject vertexIDMappingJson = otherInfoNode.optJSONObject(Constants.VERTEX_NAME_ID_MAPPING);
Expand Down
Loading

0 comments on commit 984d09c

Please sign in to comment.