From a70f5a9d38a6e5032c1a3d23f214fad394fbb328 Mon Sep 17 00:00:00 2001 From: sandeep Date: Tue, 17 Jan 2017 19:06:12 +0530 Subject: [PATCH] FALCON-2259 Unregister an extension only if no extension jobs are dependant on the extension --- .../falcon/persistence/ExtensionJobsBean.java | 3 +- .../persistence/PersistenceConstants.java | 7 ++-- .../extensions/jdbc/ExtensionMetaStore.java | 12 ++++++ .../extensions/store/ExtensionStore.java | 14 +++++++ .../resource/AbstractExtensionManager.java | 21 +++++++--- .../resource/proxy/ExtensionManagerProxy.java | 2 + .../apache/falcon/unit/FalconUnitClient.java | 6 ++- .../falcon/unit/LocalExtensionManager.java | 2 +- .../falcon/unit/FalconUnitTestBase.java | 18 ++++----- .../apache/falcon/unit/TestFalconUnit.java | 40 ++++++++++--------- 10 files changed, 85 insertions(+), 40 deletions(-) diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java index 15a4dac26..b6ac79d5f 100644 --- a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java @@ -44,7 +44,8 @@ @NamedQueries({ @NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "), @NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "), - @NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName") + @NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"), + @NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName") }) //RESUME CHECKSTYLE CHECK LineLengthCheck public class ExtensionJobsBean { diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index e80f7b763..1e6a04b2d 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -60,9 +60,9 @@ private PersistenceConstants(){ public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE"; public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME"; - public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS"; - public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS"; - public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES"; + static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS"; + static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS"; + static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES"; public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH"; public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE"; @@ -80,5 +80,6 @@ private PersistenceConstants(){ public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS"; public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB"; public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB"; + public static final String GET_JOBS_FOR_AN_EXTENSION = "GET_JOBS_FOR_AN_EXTENSION"; public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES"; } diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index 9126b6730..8fcc35e6c 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -132,6 +132,18 @@ public ExtensionBean getDetail(String extensionName) { } } + public List getJobsForAnExtension(String extensionName) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION); + query.setParameter(EXTENSION_NAME, extensionName); + try { + return (List)query.getResultList(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + public void deleteExtension(String extensionName){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java index 32b0cfd1b..436266f53 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -28,6 +28,7 @@ import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.persistence.ExtensionBean; +import org.apache.falcon.persistence.ExtensionJobsBean; import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -379,6 +380,19 @@ public boolean isExtensionStoreInitialized() { return (storePath != null); } + public List getJobsForAnExtension(final String extensionName) throws FalconException { + List extensionJobs = metaStore.getJobsForAnExtension(extensionName); + if (null != extensionJobs && !extensionJobs.isEmpty()){ + List extensionJobNames = new ArrayList<>(); + for (ExtensionJobsBean extensionJobsBean: extensionJobs) { + extensionJobNames.add(extensionJobsBean.getJobName()); + } + return extensionJobNames; + } else { + return null; + } + } + public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws FalconException { validateStatusChange(extensionName, currentUser); diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java index f7bf8e8d2..b14081362 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.resource; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; @@ -99,13 +100,21 @@ public APIResult getExtensions() { } } - public APIResult deleteExtensionMetadata(String extensionName){ + public APIResult deleteExtensionMetadata(String extensionName) throws FalconException { validateExtensionName(extensionName); - try { - return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().deleteExtension(extensionName, - CurrentUser.getUser())); - } catch (Throwable e) { - throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + return new APIResult(APIResult.Status.SUCCEEDED, deleteExtension(extensionName)); + } + + private String deleteExtension(String extensionName) throws FalconException { + ExtensionStore metaStore = ExtensionStore.get(); + List extensionJobs = metaStore.getJobsForAnExtension(extensionName); + if (null == extensionJobs || extensionJobs.isEmpty()) { + return metaStore.deleteExtension(extensionName, CurrentUser.getUser()); + } else { + LOG.error("Extension:{} cannot be unregistered as {} are instances of the extension", extensionName, + ArrayUtils.toString(extensionJobs)); + throw new FalconException("Extension:" + extensionName + " cannot be unregistered as following instances" + + " are dependent on the extension" + ArrayUtils.toString(extensionJobs)); } } diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java index 6c9087e01..5d3b39b1c 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java @@ -701,6 +701,8 @@ public APIResult deleteExtensionMetadata( checkIfExtensionServiceIsEnabled(); try { return super.deleteExtensionMetadata(extensionName); + } catch (FalconException e){ + throw FalconWebException.newAPIException(e, Response.Status.BAD_REQUEST); } catch (Throwable e) { throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index d76dbcacd..1248d8b32 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -277,7 +277,11 @@ public APIResult registerExtension(String extensionName, String packagePath, Str @Override public APIResult unregisterExtension(String extensionName) { - return localExtensionManager.unRegisterExtension(extensionName); + try { + return localExtensionManager.unRegisterExtension(extensionName); + } catch (FalconException e) { + throw new FalconCLIException("Failed in unRegistering the exnteison"+ e.getMessage()); + } } @Override diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index a7303b171..ca39ddb07 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -179,7 +179,7 @@ APIResult registerExtensionMetadata(String extensionName, String packagePath, St return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser()); } - APIResult unRegisterExtension(String extensionName) { + APIResult unRegisterExtension(String extensionName) throws FalconException { return super.deleteExtensionMetadata(extensionName); } diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 0dd09c190..e9367d52f 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -218,29 +218,29 @@ private Map updateColoAndCluster(String colo, String cluster, Ma return props; } - public String registerExtension(String extensionName, String packagePath, String description) + APIResult registerExtension(String extensionName, String packagePath, String description) throws IOException, FalconException { - return falconUnitClient.registerExtension(extensionName, packagePath, description).getMessage(); + return falconUnitClient.registerExtension(extensionName, packagePath, description); } - public String disableExtension(String extensionName) { + String disableExtension(String extensionName) { return falconUnitClient.disableExtension(extensionName).getMessage(); } - public String enableExtension(String extensionName) { + String enableExtension(String extensionName) { return falconUnitClient.enableExtension(extensionName).getMessage(); } - public String getExtensionJobDetails(String jobName) { - return falconUnitClient.getExtensionJobDetails(jobName).getMessage(); + APIResult getExtensionJobDetails(String jobName) { + return falconUnitClient.getExtensionJobDetails(jobName); } - public String unregisterExtension(String extensionName) { - return falconUnitClient.unregisterExtension(extensionName).getMessage(); + APIResult unregisterExtension(String extensionName) { + return falconUnitClient.unregisterExtension(extensionName); } - public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) { + APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) { return falconUnitClient.submitExtensionJob(extensionName, jobName, configPath, doAsUser); } diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 29cfed411..8e57723b3 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -19,6 +19,7 @@ import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; +import org.apache.falcon.client.FalconCLIException; import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.Process; @@ -245,7 +246,7 @@ private void submitClusterAndFeeds() throws IOException { assertStatus(result); } - public void setDummyProperty(Process process) { + private void setDummyProperty(Process process) { Property property = new Property(); property.setName("dummy"); property.setValue("dummy"); @@ -424,13 +425,12 @@ public void testRegisterAndUnregisterExtension() throws Exception { clearDB(); submitCluster(); createExtensionPackage(); - - String result = registerExtension("testExtension", new Path(STORAGE_URL + EXTENSION_PATH).toString() + APIResult apiResult = registerExtension("testExtension", new Path(STORAGE_URL + EXTENSION_PATH).toString() , "testExtension"); - Assert.assertEquals(result, "Extension :testExtension registered successfully."); - - result = unregisterExtension("testExtension"); - Assert.assertEquals(result, "Deleted extension:testExtension"); + assertStatus(apiResult); + apiResult = unregisterExtension("testExtension"); + assertStatus(apiResult); + Assert.assertEquals(apiResult.getMessage(), "Deleted extension:testExtension"); } @Test @@ -441,8 +441,8 @@ public void testExtensionJobOperations() throws Exception { createDir(PROCESS_APP_PATH); fs.copyFromLocalFile(new Path(getAbsolutePath(WORKFLOW)), new Path(PROCESS_APP_PATH, "workflow.xml")); String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString(); - String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION); - Assert.assertEquals(result, "Extension :testExtension registered successfully."); + APIResult apiResult = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION); + assertStatus(apiResult); disableExtension(TEST_EXTENSION); createDir(PROCESS_APP_PATH); @@ -457,10 +457,10 @@ public void testExtensionJobOperations() throws Exception { } enableExtension(TEST_EXTENSION); - APIResult apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null); + apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null); assertStatus(apiResult); - result = getExtensionJobDetails(TEST_JOB); - JSONObject resultJson = new JSONObject(result); + apiResult = getExtensionJobDetails(TEST_JOB); + JSONObject resultJson = new JSONObject(apiResult.getMessage()); Assert.assertEquals(resultJson.get("extensionName"), TEST_EXTENSION); Process process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null); Assert.assertEquals(process.getPipelines(), "testPipeline"); @@ -482,7 +482,7 @@ public void testExtensionJobOperations() throws Exception { apiResult = updateExtensionJob(TEST_JOB, getAbsolutePath(EXTENSION_PROPERTIES), null); assertStatus(apiResult); - String processes = new JSONObject(getExtensionJobDetails(TEST_JOB)).get("processes").toString(); + String processes = new JSONObject(getExtensionJobDetails(TEST_JOB).getMessage()).get("processes").toString(); Assert.assertEquals(processes, "sample"); process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null); Assert.assertEquals(process.getPipelines(), "testSample"); @@ -491,6 +491,12 @@ public void testExtensionJobOperations() throws Exception { assertStatus(apiResult); Assert.assertEquals(apiResult.getMessage(), "RUNNING"); + try { + unregisterExtension(TEST_EXTENSION); + Assert.fail("Should have thrown a FalconCLIException"); + } catch (FalconCLIException e) { + //Do nothing. Exception expected as there are dependent extension jobs and so extension cannot be deleted. + } apiResult = deleteExtensionJob(TEST_JOB, null); assertStatus(apiResult); try { @@ -506,14 +512,10 @@ public void testExtensionJobOperations() throws Exception { Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Job name not found:testJob"); //Do nothing. Exception Expected. } + apiResult = unregisterExtension(TEST_EXTENSION); + assertStatus(apiResult); } - @Test - public void testExtensionJobSuspendAndResume() throws Exception { - - } - - private void copyExtensionJar(String destDirPath) throws IOException { File dir = new File(new Path(JARS_DIR).toUri().toURL().getPath()); for (File file : dir.listFiles()) {