Skip to content

Commit

Permalink
FALCON-2259 Unregister an extension only if no extension jobs are dep…
Browse files Browse the repository at this point in the history
…endant on the extension
  • Loading branch information
sandeepSamudrala committed Jan 17, 2017
1 parent 7e16263 commit a70f5a9
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 40 deletions.
Expand Up @@ -44,7 +44,8 @@
@NamedQueries({
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "),
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName")
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"),
@NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionJobsBean {
Expand Down
Expand Up @@ -60,9 +60,9 @@ private PersistenceConstants(){
public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE";

public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME";
public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";

public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
Expand All @@ -80,5 +80,6 @@ private PersistenceConstants(){
public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";
public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB";
public static final String GET_JOBS_FOR_AN_EXTENSION = "GET_JOBS_FOR_AN_EXTENSION";
public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES";
}
Expand Up @@ -132,6 +132,18 @@ public ExtensionBean getDetail(String extensionName) {
}
}

public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
query.setParameter(EXTENSION_NAME, extensionName);
try {
return (List<ExtensionJobsBean>)query.getResultList();
} finally {
commitAndCloseTransaction(entityManager);
}
}

public void deleteExtension(String extensionName){
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -379,6 +380,19 @@ public boolean isExtensionStoreInitialized() {
return (storePath != null);
}

public List<String> getJobsForAnExtension(final String extensionName) throws FalconException {
List<ExtensionJobsBean> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
if (null != extensionJobs && !extensionJobs.isEmpty()){
List<String> extensionJobNames = new ArrayList<>();
for (ExtensionJobsBean extensionJobsBean: extensionJobs) {
extensionJobNames.add(extensionJobsBean.getJobName());
}
return extensionJobNames;
} else {
return null;
}
}

public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws
FalconException {
validateStatusChange(extensionName, currentUser);
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.falcon.resource;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
Expand Down Expand Up @@ -99,13 +100,21 @@ public APIResult getExtensions() {
}
}

public APIResult deleteExtensionMetadata(String extensionName){
public APIResult deleteExtensionMetadata(String extensionName) throws FalconException {
validateExtensionName(extensionName);
try {
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().deleteExtension(extensionName,
CurrentUser.getUser()));
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
return new APIResult(APIResult.Status.SUCCEEDED, deleteExtension(extensionName));
}

private String deleteExtension(String extensionName) throws FalconException {
ExtensionStore metaStore = ExtensionStore.get();
List<String> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
if (null == extensionJobs || extensionJobs.isEmpty()) {
return metaStore.deleteExtension(extensionName, CurrentUser.getUser());
} else {
LOG.error("Extension:{} cannot be unregistered as {} are instances of the extension", extensionName,
ArrayUtils.toString(extensionJobs));
throw new FalconException("Extension:" + extensionName + " cannot be unregistered as following instances"
+ " are dependent on the extension" + ArrayUtils.toString(extensionJobs));
}
}

Expand Down
Expand Up @@ -701,6 +701,8 @@ public APIResult deleteExtensionMetadata(
checkIfExtensionServiceIsEnabled();
try {
return super.deleteExtensionMetadata(extensionName);
} catch (FalconException e){
throw FalconWebException.newAPIException(e, Response.Status.BAD_REQUEST);
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
Expand Down
Expand Up @@ -277,7 +277,11 @@ public APIResult registerExtension(String extensionName, String packagePath, Str

@Override
public APIResult unregisterExtension(String extensionName) {
return localExtensionManager.unRegisterExtension(extensionName);
try {
return localExtensionManager.unRegisterExtension(extensionName);
} catch (FalconException e) {
throw new FalconCLIException("Failed in unRegistering the exnteison"+ e.getMessage());
}
}

@Override
Expand Down
Expand Up @@ -179,7 +179,7 @@ APIResult registerExtensionMetadata(String extensionName, String packagePath, St
return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser());
}

APIResult unRegisterExtension(String extensionName) {
APIResult unRegisterExtension(String extensionName) throws FalconException {
return super.deleteExtensionMetadata(extensionName);
}

Expand Down
Expand Up @@ -218,29 +218,29 @@ private Map<String, String> updateColoAndCluster(String colo, String cluster, Ma
return props;
}

public String registerExtension(String extensionName, String packagePath, String description)
APIResult registerExtension(String extensionName, String packagePath, String description)
throws IOException, FalconException {

return falconUnitClient.registerExtension(extensionName, packagePath, description).getMessage();
return falconUnitClient.registerExtension(extensionName, packagePath, description);
}

public String disableExtension(String extensionName) {
String disableExtension(String extensionName) {
return falconUnitClient.disableExtension(extensionName).getMessage();
}

public String enableExtension(String extensionName) {
String enableExtension(String extensionName) {
return falconUnitClient.enableExtension(extensionName).getMessage();
}

public String getExtensionJobDetails(String jobName) {
return falconUnitClient.getExtensionJobDetails(jobName).getMessage();
APIResult getExtensionJobDetails(String jobName) {
return falconUnitClient.getExtensionJobDetails(jobName);
}

public String unregisterExtension(String extensionName) {
return falconUnitClient.unregisterExtension(extensionName).getMessage();
APIResult unregisterExtension(String extensionName) {
return falconUnitClient.unregisterExtension(extensionName);
}

public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
return falconUnitClient.submitExtensionJob(extensionName, jobName, configPath, doAsUser);
}

Expand Down
40 changes: 21 additions & 19 deletions unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
Expand Up @@ -19,6 +19,7 @@

import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.Process;
Expand Down Expand Up @@ -245,7 +246,7 @@ private void submitClusterAndFeeds() throws IOException {
assertStatus(result);
}

public void setDummyProperty(Process process) {
private void setDummyProperty(Process process) {
Property property = new Property();
property.setName("dummy");
property.setValue("dummy");
Expand Down Expand Up @@ -424,13 +425,12 @@ public void testRegisterAndUnregisterExtension() throws Exception {
clearDB();
submitCluster();
createExtensionPackage();

String result = registerExtension("testExtension", new Path(STORAGE_URL + EXTENSION_PATH).toString()
APIResult apiResult = registerExtension("testExtension", new Path(STORAGE_URL + EXTENSION_PATH).toString()
, "testExtension");
Assert.assertEquals(result, "Extension :testExtension registered successfully.");

result = unregisterExtension("testExtension");
Assert.assertEquals(result, "Deleted extension:testExtension");
assertStatus(apiResult);
apiResult = unregisterExtension("testExtension");
assertStatus(apiResult);
Assert.assertEquals(apiResult.getMessage(), "Deleted extension:testExtension");
}

@Test
Expand All @@ -441,8 +441,8 @@ public void testExtensionJobOperations() throws Exception {
createDir(PROCESS_APP_PATH);
fs.copyFromLocalFile(new Path(getAbsolutePath(WORKFLOW)), new Path(PROCESS_APP_PATH, "workflow.xml"));
String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString();
String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
Assert.assertEquals(result, "Extension :testExtension registered successfully.");
APIResult apiResult = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
assertStatus(apiResult);

disableExtension(TEST_EXTENSION);
createDir(PROCESS_APP_PATH);
Expand All @@ -457,10 +457,10 @@ public void testExtensionJobOperations() throws Exception {
}
enableExtension(TEST_EXTENSION);

APIResult apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
assertStatus(apiResult);
result = getExtensionJobDetails(TEST_JOB);
JSONObject resultJson = new JSONObject(result);
apiResult = getExtensionJobDetails(TEST_JOB);
JSONObject resultJson = new JSONObject(apiResult.getMessage());
Assert.assertEquals(resultJson.get("extensionName"), TEST_EXTENSION);
Process process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
Assert.assertEquals(process.getPipelines(), "testPipeline");
Expand All @@ -482,7 +482,7 @@ public void testExtensionJobOperations() throws Exception {
apiResult = updateExtensionJob(TEST_JOB, getAbsolutePath(EXTENSION_PROPERTIES), null);
assertStatus(apiResult);

String processes = new JSONObject(getExtensionJobDetails(TEST_JOB)).get("processes").toString();
String processes = new JSONObject(getExtensionJobDetails(TEST_JOB).getMessage()).get("processes").toString();
Assert.assertEquals(processes, "sample");
process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
Assert.assertEquals(process.getPipelines(), "testSample");
Expand All @@ -491,6 +491,12 @@ public void testExtensionJobOperations() throws Exception {
assertStatus(apiResult);
Assert.assertEquals(apiResult.getMessage(), "RUNNING");

try {
unregisterExtension(TEST_EXTENSION);
Assert.fail("Should have thrown a FalconCLIException");
} catch (FalconCLIException e) {
//Do nothing. Exception expected as there are dependent extension jobs and so extension cannot be deleted.
}
apiResult = deleteExtensionJob(TEST_JOB, null);
assertStatus(apiResult);
try {
Expand All @@ -506,14 +512,10 @@ public void testExtensionJobOperations() throws Exception {
Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Job name not found:testJob");
//Do nothing. Exception Expected.
}
apiResult = unregisterExtension(TEST_EXTENSION);
assertStatus(apiResult);
}

@Test
public void testExtensionJobSuspendAndResume() throws Exception {

}


private void copyExtensionJar(String destDirPath) throws IOException {
File dir = new File(new Path(JARS_DIR).toUri().toURL().getPath());
for (File file : dir.listFiles()) {
Expand Down

0 comments on commit a70f5a9

Please sign in to comment.