From f094b89ef2c7b4edd2c1d790abe441c9626e6c63 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Mon, 23 Apr 2018 15:43:45 +0900 Subject: [PATCH] ZEPPELIN-3411 Long running logic inside synchronized block in InterpreterSettingManager ### What is this PR for? Removing redundant synchronized code to avoid blocking other logics. ### What type of PR is it? [Bug Fix] ### Todos * [x] - Change synchronized block to read/write lock ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3411 ### How should this be tested? * Current tests should be passed ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jongyoul Lee Closes #2935 from jongyoul/ZEPPELIN-3411 and squashes the following commits: 3b90155b9 [Jongyoul Lee] Removed unused `import` statements Simplified `for` loop 4691301af [Jongyoul Lee] Removed lock/synchronized codes because interpreterSettings already is implemented by currentHashMap 24be69298 [Jongyoul Lee] Removed all synchronized blocks and replace them to read/write lock --- .../InterpreterSettingManager.java | 232 ++++++++---------- 1 file changed, 104 insertions(+), 128 deletions(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index f3384c14f74..b61ec555c30 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -64,7 +64,6 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -123,8 +122,6 @@ public class InterpreterSettingManager { private RecoveryStorage recoveryStorage; private ConfigStorage configStorage; - - public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration, AngularObjectRegistryListener angularObjectRegistryListener, RemoteInterpreterProcessListener @@ -213,7 +210,7 @@ private void loadFromFile() throws IOException { for (String oldId : oldSettingIdList) { if (infoSaving.interpreterSettings.containsKey(oldId)) { newSettingIdList.add(infoSaving.interpreterSettings.get(oldId).getName()); - }; + } } newBindingMap.put(noteId, newSettingIdList); } @@ -298,13 +295,11 @@ private void loadFromFile() throws IOException { } public void saveToFile() throws IOException { - synchronized (interpreterSettings) { - InterpreterInfoSaving info = new InterpreterInfoSaving(); - info.interpreterBindings = interpreterBindings; - info.interpreterSettings = interpreterSettings; - info.interpreterRepositories = interpreterRepositories; - configStorage.save(info); - } + InterpreterInfoSaving info = new InterpreterInfoSaving(); + info.interpreterBindings = interpreterBindings; + info.interpreterSettings = Maps.newHashMap(interpreterSettings); + info.interpreterRepositories = interpreterRepositories; + configStorage.save(info); } private void init() throws IOException { @@ -439,7 +434,6 @@ public InterpreterSetting getDefaultInterpreterSetting(String noteId) { public List getInterpreterSettings(String noteId) { List settings = new ArrayList<>(); - synchronized (interpreterSettings) { List interpreterSettingIds = interpreterBindings.get(noteId); if (interpreterSettingIds != null) { for (String settingId : interpreterSettingIds) { @@ -451,19 +445,19 @@ public List getInterpreterSettings(String noteId) { } } } - } return settings; } public InterpreterSetting getInterpreterSettingByName(String name) { - synchronized (interpreterSettings) { + try { for (InterpreterSetting setting : interpreterSettings.values()) { if (setting.getName().equals(name)) { return setting; } } + throw new RuntimeException("No such interpreter setting: " + name); + } finally { } - throw new RuntimeException("No such interpreter setting: " + name); } public ManagedInterpreterGroup getInterpreterGroupById(String groupId) { @@ -617,12 +611,11 @@ public void removeResourcesBelongsToNote(String noteId) { } /** - * Overwrite dependency jar under local-repo/{interpreterId} - * if jar file in original path is changed + * Overwrite dependency jar under local-repo/{interpreterId} if jar file in original path is + * changed */ private void copyDependenciesFromLocalPath(final InterpreterSetting setting) { setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES); - synchronized (interpreterSettings) { final Thread t = new Thread() { public void run() { try { @@ -653,7 +646,6 @@ public void run() { } }; t.start(); - } } /** @@ -713,26 +705,24 @@ public void setInterpreterBinding(String user, String noteId, List setti throws IOException { List unBindedSettingIdList = new LinkedList<>(); - synchronized (interpreterSettings) { - List oldSettingIdList = interpreterBindings.get(noteId); - if (oldSettingIdList != null) { - for (String oldSettingId : oldSettingIdList) { - if (!settingIdList.contains(oldSettingId)) { - unBindedSettingIdList.add(oldSettingId); - } + List oldSettingIdList = interpreterBindings.get(noteId); + if (oldSettingIdList != null) { + for (String oldSettingId : oldSettingIdList) { + if (!settingIdList.contains(oldSettingId)) { + unBindedSettingIdList.add(oldSettingId); } } - interpreterBindings.put(noteId, settingIdList); - saveToFile(); + } + interpreterBindings.put(noteId, settingIdList); + saveToFile(); - for (String settingId : unBindedSettingIdList) { - InterpreterSetting interpreterSetting = interpreterSettings.get(settingId); - //TODO(zjffdu) Add test for this scenario - //only close Interpreters when it is note scoped - if (interpreterSetting.getOption().perNoteIsolated() || - interpreterSetting.getOption().perNoteScoped()) { - interpreterSetting.closeInterpreters(user, noteId); - } + for (String settingId : unBindedSettingIdList) { + InterpreterSetting interpreterSetting = interpreterSettings.get(settingId); + //TODO(zjffdu) Add test for this scenario + //only close Interpreters when it is note scoped + if (interpreterSetting.getOption().perNoteIsolated() || + interpreterSetting.getOption().perNoteScoped()) { + interpreterSetting.closeInterpreters(user, noteId); } } } @@ -794,30 +784,28 @@ public void removeNoteInterpreterSettingBinding(String user, String noteId) thro interpreterBindings.remove(noteId); } - /** - * Change interpreter properties and restart - */ - public void setPropertyAndRestart(String id, InterpreterOption option, - Map properties, - List dependencies) + /** Change interpreter properties and restart */ + public void setPropertyAndRestart( + String id, + InterpreterOption option, + Map properties, + List dependencies) throws InterpreterException, IOException { - synchronized (interpreterSettings) { - InterpreterSetting intpSetting = interpreterSettings.get(id); - if (intpSetting != null) { - try { - intpSetting.close(); - intpSetting.setOption(option); - intpSetting.setProperties(properties); - intpSetting.setDependencies(dependencies); - intpSetting.postProcessing(); - saveToFile(); - } catch (Exception e) { - loadFromFile(); - throw new IOException(e); - } - } else { - throw new InterpreterException("Interpreter setting id " + id + " not found"); + InterpreterSetting intpSetting = interpreterSettings.get(id); + if (intpSetting != null) { + try { + intpSetting.close(); + intpSetting.setOption(option); + intpSetting.setProperties(properties); + intpSetting.setDependencies(dependencies); + intpSetting.postProcessing(); + saveToFile(); + } catch (Exception e) { + loadFromFile(); + throw new IOException(e); } + } else { + throw new InterpreterException("Interpreter setting id " + id + " not found"); } } @@ -825,18 +813,16 @@ public void setPropertyAndRestart(String id, InterpreterOption option, public void restart(String settingId, String noteId, String user) throws InterpreterException { InterpreterSetting intpSetting = interpreterSettings.get(settingId); Preconditions.checkNotNull(intpSetting); - synchronized (interpreterSettings) { - intpSetting = interpreterSettings.get(settingId); - // Check if dependency in specified path is changed - // If it did, overwrite old dependency jar with new one - if (intpSetting != null) { - //clean up metaInfos - intpSetting.setInfos(null); - copyDependenciesFromLocalPath(intpSetting); - intpSetting.closeInterpreters(user, noteId); - } else { - throw new InterpreterException("Interpreter setting id " + settingId + " not found"); - } + intpSetting = interpreterSettings.get(settingId); + // Check if dependency in specified path is changed + // If it did, overwrite old dependency jar with new one + if (intpSetting != null) { + // clean up metaInfos + intpSetting.setInfos(null); + copyDependenciesFromLocalPath(intpSetting); + intpSetting.closeInterpreters(user, noteId); + } else { + throw new InterpreterException("Interpreter setting id " + settingId + " not found"); } } @@ -845,9 +831,7 @@ public void restart(String id) throws InterpreterException { } public InterpreterSetting get(String id) { - synchronized (interpreterSettings) { - return interpreterSettings.get(id); - } + return interpreterSettings.get(id); } @VisibleForTesting @@ -866,23 +850,20 @@ public void remove(String id) throws IOException { // 3. remove this interpreter setting from note binding // 4. clean local repo directory LOGGER.info("Remove interpreter setting: " + id); - synchronized (interpreterSettings) { - if (interpreterSettings.containsKey(id)) { - - InterpreterSetting intp = interpreterSettings.get(id); - intp.close(); - interpreterSettings.remove(id); - for (List settings : interpreterBindings.values()) { - Iterator it = settings.iterator(); - while (it.hasNext()) { - String settingId = it.next(); - if (settingId.equals(id)) { - it.remove(); - } + if (interpreterSettings.containsKey(id)) { + InterpreterSetting intp = interpreterSettings.get(id); + intp.close(); + interpreterSettings.remove(id); + for (List settings : interpreterBindings.values()) { + Iterator it = settings.iterator(); + while (it.hasNext()) { + String settingId = it.next(); + if (settingId.equals(id)) { + it.remove(); } } - saveToFile(); } + saveToFile(); } File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + id); @@ -893,36 +874,34 @@ public void remove(String id) throws IOException { * Get interpreter settings */ public List get() { - synchronized (interpreterSettings) { - List orderedSettings = new ArrayList<>(interpreterSettings.values()); - Collections.sort(orderedSettings, new Comparator() { - @Override - public int compare(InterpreterSetting o1, InterpreterSetting o2) { - int i = interpreterGroupOrderList.indexOf(o1.getGroup()); - int j = interpreterGroupOrderList.indexOf(o2.getGroup()); - if (i < 0) { - LOGGER.warn("InterpreterGroup " + o1.getGroup() - + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName()); - // move the unknown interpreter to last - i = Integer.MAX_VALUE; - } - if (j < 0) { - LOGGER.warn("InterpreterGroup " + o2.getGroup() - + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName()); - // move the unknown interpreter to last - j = Integer.MAX_VALUE; - } - if (i < j) { - return -1; - } else if (i > j) { - return 1; - } else { - return 0; - } + List orderedSettings = new ArrayList<>(interpreterSettings.values()); + Collections.sort(orderedSettings, new Comparator() { + @Override + public int compare(InterpreterSetting o1, InterpreterSetting o2) { + int i = interpreterGroupOrderList.indexOf(o1.getGroup()); + int j = interpreterGroupOrderList.indexOf(o2.getGroup()); + if (i < 0) { + LOGGER.warn("InterpreterGroup " + o1.getGroup() + + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName()); + // move the unknown interpreter to last + i = Integer.MAX_VALUE; } - }); - return orderedSettings; - } + if (j < 0) { + LOGGER.warn("InterpreterGroup " + o2.getGroup() + + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName()); + // move the unknown interpreter to last + j = Integer.MAX_VALUE; + } + if (i < j) { + return -1; + } else if (i > j) { + return 1; + } else { + return 0; + } + } + }); + return orderedSettings; } @VisibleForTesting @@ -940,17 +919,15 @@ public void close(String settingId) { public void close() { List closeThreads = new LinkedList<>(); - synchronized (interpreterSettings) { - Collection intpSettings = interpreterSettings.values(); - for (final InterpreterSetting intpSetting : intpSettings) { - Thread t = new Thread() { - public void run() { - intpSetting.close(); - } - }; - t.start(); - closeThreads.add(t); - } + for (final InterpreterSetting intpSetting : interpreterSettings.values()) { + Thread t = + new Thread() { + public void run() { + intpSetting.close(); + } + }; + t.start(); + closeThreads.add(t); } for (Thread t : closeThreads) { @@ -961,5 +938,4 @@ public void run() { } } } - }