Skip to content

Commit

Permalink
ZEPPELIN-3411 Long running logic inside synchronized block in Interpr…
Browse files Browse the repository at this point in the history
…eterSettingManager

### What is this PR for?
Removing redundant synchronized code to avoid blocking other logics.

### What type of PR is it?
[Bug Fix]

### Todos
* [x] - Change synchronized block to read/write lock

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3411

### How should this be tested?
* Current tests should be passed

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jongyoul Lee <jongyoul@gmail.com>

Closes apache#2935 from jongyoul/ZEPPELIN-3411 and squashes the following commits:

3b90155 [Jongyoul Lee] Removed unused `import` statements Simplified `for` loop
4691301 [Jongyoul Lee] Removed lock/synchronized codes because interpreterSettings already is implemented by currentHashMap
24be692 [Jongyoul Lee] Removed all synchronized blocks and replace them to read/write lock
  • Loading branch information
jongyoul authored and mckartha committed Aug 9, 2018
1 parent 8716b1d commit f094b89
Showing 1 changed file with 104 additions and 128 deletions.
Expand Up @@ -64,7 +64,6 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -123,8 +122,6 @@ public class InterpreterSettingManager {
private RecoveryStorage recoveryStorage;
private ConfigStorage configStorage;



public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
AngularObjectRegistryListener angularObjectRegistryListener,
RemoteInterpreterProcessListener
Expand Down Expand Up @@ -213,7 +210,7 @@ private void loadFromFile() throws IOException {
for (String oldId : oldSettingIdList) {
if (infoSaving.interpreterSettings.containsKey(oldId)) {
newSettingIdList.add(infoSaving.interpreterSettings.get(oldId).getName());
};
}
}
newBindingMap.put(noteId, newSettingIdList);
}
Expand Down Expand Up @@ -298,13 +295,11 @@ private void loadFromFile() throws IOException {
}

public void saveToFile() throws IOException {
synchronized (interpreterSettings) {
InterpreterInfoSaving info = new InterpreterInfoSaving();
info.interpreterBindings = interpreterBindings;
info.interpreterSettings = interpreterSettings;
info.interpreterRepositories = interpreterRepositories;
configStorage.save(info);
}
InterpreterInfoSaving info = new InterpreterInfoSaving();
info.interpreterBindings = interpreterBindings;
info.interpreterSettings = Maps.newHashMap(interpreterSettings);
info.interpreterRepositories = interpreterRepositories;
configStorage.save(info);
}

private void init() throws IOException {
Expand Down Expand Up @@ -439,7 +434,6 @@ public InterpreterSetting getDefaultInterpreterSetting(String noteId) {

public List<InterpreterSetting> getInterpreterSettings(String noteId) {
List<InterpreterSetting> settings = new ArrayList<>();
synchronized (interpreterSettings) {
List<String> interpreterSettingIds = interpreterBindings.get(noteId);
if (interpreterSettingIds != null) {
for (String settingId : interpreterSettingIds) {
Expand All @@ -451,19 +445,19 @@ public List<InterpreterSetting> getInterpreterSettings(String noteId) {
}
}
}
}
return settings;
}

public InterpreterSetting getInterpreterSettingByName(String name) {
synchronized (interpreterSettings) {
try {
for (InterpreterSetting setting : interpreterSettings.values()) {
if (setting.getName().equals(name)) {
return setting;
}
}
throw new RuntimeException("No such interpreter setting: " + name);
} finally {
}
throw new RuntimeException("No such interpreter setting: " + name);
}

public ManagedInterpreterGroup getInterpreterGroupById(String groupId) {
Expand Down Expand Up @@ -617,12 +611,11 @@ public void removeResourcesBelongsToNote(String noteId) {
}

/**
* Overwrite dependency jar under local-repo/{interpreterId}
* if jar file in original path is changed
* Overwrite dependency jar under local-repo/{interpreterId} if jar file in original path is
* changed
*/
private void copyDependenciesFromLocalPath(final InterpreterSetting setting) {
setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES);
synchronized (interpreterSettings) {
final Thread t = new Thread() {
public void run() {
try {
Expand Down Expand Up @@ -653,7 +646,6 @@ public void run() {
}
};
t.start();
}
}

/**
Expand Down Expand Up @@ -713,26 +705,24 @@ public void setInterpreterBinding(String user, String noteId, List<String> setti
throws IOException {
List<String> unBindedSettingIdList = new LinkedList<>();

synchronized (interpreterSettings) {
List<String> oldSettingIdList = interpreterBindings.get(noteId);
if (oldSettingIdList != null) {
for (String oldSettingId : oldSettingIdList) {
if (!settingIdList.contains(oldSettingId)) {
unBindedSettingIdList.add(oldSettingId);
}
List<String> oldSettingIdList = interpreterBindings.get(noteId);
if (oldSettingIdList != null) {
for (String oldSettingId : oldSettingIdList) {
if (!settingIdList.contains(oldSettingId)) {
unBindedSettingIdList.add(oldSettingId);
}
}
interpreterBindings.put(noteId, settingIdList);
saveToFile();
}
interpreterBindings.put(noteId, settingIdList);
saveToFile();

for (String settingId : unBindedSettingIdList) {
InterpreterSetting interpreterSetting = interpreterSettings.get(settingId);
//TODO(zjffdu) Add test for this scenario
//only close Interpreters when it is note scoped
if (interpreterSetting.getOption().perNoteIsolated() ||
interpreterSetting.getOption().perNoteScoped()) {
interpreterSetting.closeInterpreters(user, noteId);
}
for (String settingId : unBindedSettingIdList) {
InterpreterSetting interpreterSetting = interpreterSettings.get(settingId);
//TODO(zjffdu) Add test for this scenario
//only close Interpreters when it is note scoped
if (interpreterSetting.getOption().perNoteIsolated() ||
interpreterSetting.getOption().perNoteScoped()) {
interpreterSetting.closeInterpreters(user, noteId);
}
}
}
Expand Down Expand Up @@ -794,49 +784,45 @@ public void removeNoteInterpreterSettingBinding(String user, String noteId) thro
interpreterBindings.remove(noteId);
}

/**
* Change interpreter properties and restart
*/
public void setPropertyAndRestart(String id, InterpreterOption option,
Map<String, InterpreterProperty> properties,
List<Dependency> dependencies)
/** Change interpreter properties and restart */
public void setPropertyAndRestart(
String id,
InterpreterOption option,
Map<String, InterpreterProperty> properties,
List<Dependency> dependencies)
throws InterpreterException, IOException {
synchronized (interpreterSettings) {
InterpreterSetting intpSetting = interpreterSettings.get(id);
if (intpSetting != null) {
try {
intpSetting.close();
intpSetting.setOption(option);
intpSetting.setProperties(properties);
intpSetting.setDependencies(dependencies);
intpSetting.postProcessing();
saveToFile();
} catch (Exception e) {
loadFromFile();
throw new IOException(e);
}
} else {
throw new InterpreterException("Interpreter setting id " + id + " not found");
InterpreterSetting intpSetting = interpreterSettings.get(id);
if (intpSetting != null) {
try {
intpSetting.close();
intpSetting.setOption(option);
intpSetting.setProperties(properties);
intpSetting.setDependencies(dependencies);
intpSetting.postProcessing();
saveToFile();
} catch (Exception e) {
loadFromFile();
throw new IOException(e);
}
} else {
throw new InterpreterException("Interpreter setting id " + id + " not found");
}
}

// restart in note page
public void restart(String settingId, String noteId, String user) throws InterpreterException {
InterpreterSetting intpSetting = interpreterSettings.get(settingId);
Preconditions.checkNotNull(intpSetting);
synchronized (interpreterSettings) {
intpSetting = interpreterSettings.get(settingId);
// Check if dependency in specified path is changed
// If it did, overwrite old dependency jar with new one
if (intpSetting != null) {
//clean up metaInfos
intpSetting.setInfos(null);
copyDependenciesFromLocalPath(intpSetting);
intpSetting.closeInterpreters(user, noteId);
} else {
throw new InterpreterException("Interpreter setting id " + settingId + " not found");
}
intpSetting = interpreterSettings.get(settingId);
// Check if dependency in specified path is changed
// If it did, overwrite old dependency jar with new one
if (intpSetting != null) {
// clean up metaInfos
intpSetting.setInfos(null);
copyDependenciesFromLocalPath(intpSetting);
intpSetting.closeInterpreters(user, noteId);
} else {
throw new InterpreterException("Interpreter setting id " + settingId + " not found");
}
}

Expand All @@ -845,9 +831,7 @@ public void restart(String id) throws InterpreterException {
}

public InterpreterSetting get(String id) {
synchronized (interpreterSettings) {
return interpreterSettings.get(id);
}
return interpreterSettings.get(id);
}

@VisibleForTesting
Expand All @@ -866,23 +850,20 @@ public void remove(String id) throws IOException {
// 3. remove this interpreter setting from note binding
// 4. clean local repo directory
LOGGER.info("Remove interpreter setting: " + id);
synchronized (interpreterSettings) {
if (interpreterSettings.containsKey(id)) {

InterpreterSetting intp = interpreterSettings.get(id);
intp.close();
interpreterSettings.remove(id);
for (List<String> settings : interpreterBindings.values()) {
Iterator<String> it = settings.iterator();
while (it.hasNext()) {
String settingId = it.next();
if (settingId.equals(id)) {
it.remove();
}
if (interpreterSettings.containsKey(id)) {
InterpreterSetting intp = interpreterSettings.get(id);
intp.close();
interpreterSettings.remove(id);
for (List<String> settings : interpreterBindings.values()) {
Iterator<String> it = settings.iterator();
while (it.hasNext()) {
String settingId = it.next();
if (settingId.equals(id)) {
it.remove();
}
}
saveToFile();
}
saveToFile();
}

File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + id);
Expand All @@ -893,36 +874,34 @@ public void remove(String id) throws IOException {
* Get interpreter settings
*/
public List<InterpreterSetting> get() {
synchronized (interpreterSettings) {
List<InterpreterSetting> orderedSettings = new ArrayList<>(interpreterSettings.values());
Collections.sort(orderedSettings, new Comparator<InterpreterSetting>() {
@Override
public int compare(InterpreterSetting o1, InterpreterSetting o2) {
int i = interpreterGroupOrderList.indexOf(o1.getGroup());
int j = interpreterGroupOrderList.indexOf(o2.getGroup());
if (i < 0) {
LOGGER.warn("InterpreterGroup " + o1.getGroup()
+ " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
// move the unknown interpreter to last
i = Integer.MAX_VALUE;
}
if (j < 0) {
LOGGER.warn("InterpreterGroup " + o2.getGroup()
+ " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
// move the unknown interpreter to last
j = Integer.MAX_VALUE;
}
if (i < j) {
return -1;
} else if (i > j) {
return 1;
} else {
return 0;
}
List<InterpreterSetting> orderedSettings = new ArrayList<>(interpreterSettings.values());
Collections.sort(orderedSettings, new Comparator<InterpreterSetting>() {
@Override
public int compare(InterpreterSetting o1, InterpreterSetting o2) {
int i = interpreterGroupOrderList.indexOf(o1.getGroup());
int j = interpreterGroupOrderList.indexOf(o2.getGroup());
if (i < 0) {
LOGGER.warn("InterpreterGroup " + o1.getGroup()
+ " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
// move the unknown interpreter to last
i = Integer.MAX_VALUE;
}
});
return orderedSettings;
}
if (j < 0) {
LOGGER.warn("InterpreterGroup " + o2.getGroup()
+ " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
// move the unknown interpreter to last
j = Integer.MAX_VALUE;
}
if (i < j) {
return -1;
} else if (i > j) {
return 1;
} else {
return 0;
}
}
});
return orderedSettings;
}

@VisibleForTesting
Expand All @@ -940,17 +919,15 @@ public void close(String settingId) {

public void close() {
List<Thread> closeThreads = new LinkedList<>();
synchronized (interpreterSettings) {
Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
for (final InterpreterSetting intpSetting : intpSettings) {
Thread t = new Thread() {
public void run() {
intpSetting.close();
}
};
t.start();
closeThreads.add(t);
}
for (final InterpreterSetting intpSetting : interpreterSettings.values()) {
Thread t =
new Thread() {
public void run() {
intpSetting.close();
}
};
t.start();
closeThreads.add(t);
}

for (Thread t : closeThreads) {
Expand All @@ -961,5 +938,4 @@ public void run() {
}
}
}

}

0 comments on commit f094b89

Please sign in to comment.