Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
PracheerAgarwal committed Jan 10, 2017
2 parents a93d71a + 44da870 commit a932633
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ Options createExtensionOptions() {
Option suspend = new Option(FalconCLIConstants.SUSPEND_OPT, false, "Suspend an extension job");
Option resume = new Option(FalconCLIConstants.RESUME_OPT, false, "Resume an extension job");
Option delete = new Option(FalconCLIConstants.DELETE_OPT, false, "Delete an extension job");
Option enable = new Option(FalconCLIConstants.ENABLE_OPT, false, "Enable an extension");
Option disable = new Option(FalconCLIConstants.DISABLE_OPT, false, "Disable an extension");
Option unregister = new Option(FalconCLIConstants.UREGISTER, false, "Un-register an extension. This will make"
+ " the extension unavailable for instantiation");
Option detail = new Option(FalconCLIConstants.DETAIL, false, "Show details of a given extension");
Expand All @@ -225,6 +227,8 @@ Options createExtensionOptions() {
group.addOption(unregister);
group.addOption(detail);
group.addOption(register);
group.addOption(enable);
group.addOption(disable);
extensionOptions.addOptionGroup(group);

Option url = new Option(FalconCLIConstants.URL_OPTION, true, "Falcon URL");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ private FalconCLIConstants(){
public static final String UPDATE_OPT = "update";
public static final String UPDATE_CLUSTER_DEPENDENTS_OPT = "updateClusterDependents";
public static final String DELETE_OPT = "delete";
public static final String ENABLE_OPT = "enable";
public static final String DISABLE_OPT = "disable";
public static final String SCHEDULE_OPT = "schedule";
public static final String CURRENT_COLO = "current.colo";
public static final String CLIENT_PROPERTIES = "/client.properties";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ protected static enum AdminOperations {
protected static enum ExtensionOperations {

ENUMERATE("api/extension/enumerate/", HttpMethod.GET, MediaType.TEXT_XML),
DESCRIBE("api/extension/describe/", HttpMethod.GET, MediaType.TEXT_PLAIN),
DEFINITION("api/extension/definition", HttpMethod.GET, MediaType.APPLICATION_JSON),
DESCRIBE("api/extension/describe/", HttpMethod.GET, MediaType.TEXT_XML),
DEFINITION("api/extension/definition", HttpMethod.GET, MediaType.TEXT_XML),
LIST("api/extension/list", HttpMethod.GET, MediaType.APPLICATION_JSON),
INSTANCES("api/extension/instances", HttpMethod.GET, MediaType.APPLICATION_JSON),
SUBMIT("api/extension/submit", HttpMethod.POST, MediaType.TEXT_XML),
Expand All @@ -354,8 +354,8 @@ protected static enum ExtensionOperations {
RESUME("api/extension/resume", HttpMethod.POST, MediaType.TEXT_XML),
DELETE("api/extension/delete", HttpMethod.POST, MediaType.TEXT_XML),
UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_XML),
DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON),
JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.APPLICATION_JSON),
DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.TEXT_XML),
JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.TEXT_XML),
REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML),
ENABLE("api/extension/enable", HttpMethod.POST, MediaType.TEXT_XML),
DISABLE("api/extension/disable", HttpMethod.POST, MediaType.TEXT_XML);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,23 @@ private static JSONArray buildEnumerateResult() throws FalconException {
}
return results;
}

protected static void checkIfExtensionIsEnabled(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
if (!metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
LOG.error("Extension: " + extensionName + " is in disabled state.");
throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
Response.Status.INTERNAL_SERVER_ERROR);
}
}

protected static void checkIfExtensionJobNameExists(String jobName, String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
if (extensionJobsBean != null && !extensionJobsBean.getExtensionName().equals(extensionName)) {
LOG.error("Extension job with name: " + extensionName + " already exists.");
throw FalconWebException.newAPIException("Extension job with name: " + extensionName + " already exists.",
Response.Status.INTERNAL_SERVER_ERROR);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.extensions.Extension;
import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.ExtensionService;
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.ExtensionProperties;
Expand Down Expand Up @@ -334,7 +333,7 @@ public APIResult submit(
@FormDataParam("config") InputStream config) {
checkIfExtensionServiceIsEnabled();
checkIfExtensionIsEnabled(extensionName);
checkIfExtensionJobExists(jobName, extensionName);
checkIfExtensionJobNameExists(jobName, extensionName);
SortedMap<EntityType, List<Entity>> entityMap;
try {
entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
Expand Down Expand Up @@ -381,12 +380,24 @@ private SortedMap<EntityType, List<Entity>> getEntityList(String extensionName,
private ExtensionType getExtensionType(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionBean extensionDetails = metaStore.getDetail(extensionName);
if (extensionDetails == null) {
// return failure if the extension job doesn't exist
LOG.error("Extension not found: " + extensionName);
throw FalconWebException.newAPIException("Extension not found:" + extensionName,
Response.Status.NOT_FOUND);
}
return extensionDetails.getExtensionType();
}

private String getExtensionName(String jobName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobDetails = metaStore.getExtensionJobDetails(jobName);
if (extensionJobDetails == null) {
// return failure if the extension job doesn't exist
LOG.error("Extension job not found: " + jobName);
throw FalconWebException.newAPIException("Extension Job not found:" + jobName,
Response.Status.NOT_FOUND);
}
return extensionJobDetails.getExtensionName();
}

Expand All @@ -405,7 +416,7 @@ public APIResult submitAndSchedule(
@FormDataParam("config") InputStream config) {
checkIfExtensionServiceIsEnabled();
checkIfExtensionIsEnabled(extensionName);
checkIfExtensionJobExists(jobName, extensionName);
checkIfExtensionJobNameExists(jobName, extensionName);
SortedMap<EntityType, List<Entity>> entityMap;
SortedMap<EntityType, List<String>> entityNameMap;
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
Expand Down Expand Up @@ -592,6 +603,10 @@ public APIResult update(
String extensionName = getExtensionName(jobName);
try {
entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
if (entityMap.get(EntityType.FEED).isEmpty() && entityMap.get(EntityType.PROCESS).isEmpty()) {
// return failure if the extension job doesn't exist
return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist.");
}
updateEntities(extensionName, jobName, entityMap, config, request);
} catch (FalconException | IOException | JAXBException e) {
LOG.error("Error while updating extension job: " + jobName, e);
Expand Down Expand Up @@ -625,7 +640,6 @@ public APIResult validate(
return new APIResult(APIResult.Status.SUCCEEDED, "Validated successfully");
}


// Extension store related REST API's
@GET
@Path("enumerate")
Expand All @@ -641,7 +655,7 @@ public APIResult getExtensions() {

@GET
@Path("describe/{extension-name}")
@Produces(MediaType.TEXT_PLAIN)
@Produces({MediaType.TEXT_PLAIN, MediaType.TEXT_XML})
public APIResult getExtensionDescription(
@PathParam("extension-name") String extensionName) {
checkIfExtensionServiceIsEnabled();
Expand All @@ -655,7 +669,7 @@ public APIResult getExtensionDescription(

@GET
@Path("detail/{extension-name}")
@Produces({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
public APIResult getDetail(@PathParam("extension-name") String extensionName) {
checkIfExtensionServiceIsEnabled();
validateExtensionName(extensionName);
Expand All @@ -668,7 +682,7 @@ public APIResult getDetail(@PathParam("extension-name") String extensionName) {

@GET
@Path("extensionJobDetails/{job-name}")
@Produces({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
public APIResult getExtensionJobDetail(@PathParam("job-name") String jobName) {
checkIfExtensionServiceIsEnabled();
try {
Expand Down Expand Up @@ -710,7 +724,7 @@ public APIResult registerExtensionMetadata(

@GET
@Path("definition/{extension-name}")
@Produces({MediaType.APPLICATION_JSON})
@Produces({MediaType.TEXT_PLAIN, MediaType.TEXT_XML})
public APIResult getExtensionDefinition(
@PathParam("extension-name") String extensionName) {
checkIfExtensionServiceIsEnabled();
Expand All @@ -725,7 +739,7 @@ public APIResult getExtensionDefinition(
@POST
@Path("disable/{extension-name}")
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Produces(MediaType.TEXT_PLAIN)
@Produces({MediaType.TEXT_PLAIN, MediaType.TEXT_XML})
public APIResult disableExtension(
@PathParam("extension-name") String extensionName) {
checkIfExtensionServiceIsEnabled();
Expand All @@ -740,7 +754,7 @@ public APIResult disableExtension(
@POST
@Path("enable/{extension-name}")
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Produces(MediaType.TEXT_PLAIN)
@Produces({MediaType.TEXT_PLAIN, MediaType.TEXT_XML})
public APIResult enableExtension(
@PathParam("extension-name") String extensionName) {
checkIfExtensionServiceIsEnabled();
Expand Down Expand Up @@ -785,22 +799,5 @@ private static void checkIfExtensionServiceIsEnabled() {
}
}

private static void checkIfExtensionIsEnabled(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
if (!metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
LOG.error("Extension: " + extensionName + " is in disabled state.");
throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
Response.Status.INTERNAL_SERVER_ERROR);
}
}

private static void checkIfExtensionJobExists(String jobName, String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
if (extensionJobsBean != null && !extensionJobsBean.getExtensionName().equals(extensionName)) {
LOG.error("Extension job with name: " + extensionName + " already exists.");
throw FalconWebException.newAPIException("Extension job with name: " + extensionName + " already exists.",
Response.Status.INTERNAL_SERVER_ERROR);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,30 @@
/**
* A proxy implementation of the extension operations in local mode.
*/
class LocalExtensionManager extends AbstractExtensionManager {
LocalExtensionManager() {}
public class LocalExtensionManager extends AbstractExtensionManager {
LocalExtensionManager() {
}

APIResult submitExtensionJob(String extensionName, String jobName, InputStream configStream,
SortedMap<EntityType, List<Entity>> entityMap)
throws FalconException, IOException {
SortedMap<EntityType, List<Entity>> entityMap) throws FalconException, IOException {
checkIfExtensionIsEnabled(extensionName);
checkIfExtensionJobNameExists(jobName, extensionName);
for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
for (Entity entity : entry.getValue()) {
submitInternal(entity, "falconUser");
}
}
storeExtension(extensionName, jobName, configStream, entityMap);

return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName);
}

APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream configStream,
SortedMap<EntityType, List<Entity>> entityMap)
throws FalconException, IOException {
for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
for(Entity entity : entry.getValue()){
checkIfExtensionIsEnabled(extensionName);
checkIfExtensionJobNameExists(jobName, extensionName);
for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
for (Entity entity : entry.getValue()) {
submitInternal(entity, "falconUser");
}
}
Expand Down Expand Up @@ -96,7 +99,8 @@ private void storeExtension(String extensionName, String jobName, InputStream co
}

APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser)
throws FalconException, IOException{
throws FalconException, IOException {
checkIfExtensionIsEnabled(ExtensionStore.getMetaStore().getExtensionJobDetails(jobName).getExtensionName());
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean);
Expand All @@ -122,8 +126,7 @@ APIResult deleteExtensionJob(String jobName) throws FalconException, IOException
}

APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream,
SortedMap<EntityType, List<Entity>> entityMap)
throws FalconException, IOException {
SortedMap<EntityType, List<Entity>> entityMap) throws FalconException, IOException {
List<String> feedNames = new ArrayList<>();
List<String> processNames = new ArrayList<>();
for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
Expand Down Expand Up @@ -180,7 +183,7 @@ APIResult unRegisterExtension(String extensionName) {
return super.deleteExtensionMetadata(extensionName);
}

APIResult getExtensionJobDetails(String jobName){
APIResult getExtensionJobDetails(String jobName) {
return super.getExtensionJobDetail(jobName);
}

Expand All @@ -189,14 +192,14 @@ APIResult disableExtension(String extensionName) {
}

APIResult enableExtension(String extensionName) {
return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser()));
return new APIResult(APIResult.Status.SUCCEEDED, super.enableExtension(extensionName, CurrentUser.getUser()));
}

APIResult getExtensionDetails(String extensionName){
APIResult getExtensionDetails(String extensionName) {
return super.getExtensionDetail(extensionName);
}

public APIResult getExtensions(){
public APIResult getExtensions() {
return super.getExtensions();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ public String registerExtension(String extensionName, String packagePath, String
return falconUnitClient.registerExtension(extensionName, packagePath, description).getMessage();
}

public String disableExtension(String extensionName) {
return falconUnitClient.disableExtension(extensionName).getMessage();
}

public String enableExtension(String extensionName) {
return falconUnitClient.enableExtension(extensionName).getMessage();
}

public String getExtensionJobDetails(String jobName) {
return falconUnitClient.getExtensionJobDetails(jobName).getMessage();
}
Expand Down
11 changes: 11 additions & 0 deletions unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,19 @@ public void testExtensionJobOperations() throws Exception {
String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
Assert.assertEquals(result, "Extension :testExtension registered successfully.");

disableExtension(TEST_EXTENSION);
createDir(PROCESS_APP_PATH);
copyExtensionJar(packageBuildLib);

try {
submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
Assert.fail("Should have thrown a FalconWebException");
} catch (FalconWebException e) {
Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Extension: "
+ TEST_EXTENSION + " is in disabled state.");
}
enableExtension(TEST_EXTENSION);

APIResult apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
assertStatus(apiResult);
result = getExtensionJobDetails(TEST_JOB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,30 @@ public APIResult resume(@PathParam("job-name") String jobName,
@Produces({MediaType.APPLICATION_JSON})
public APIResult getExtensionDefinition(
@PathParam("extension-name") String extensionName) {
LOG.error("Definition is not supported on Server.Please run your operation on Prism ");
LOG.error("Definition is not supported on Server. Please run your operation on Prism ");
throw FalconWebException.newAPIException("Definition is not supported on Server. Please run your operation "
+ "on Prism.");
}

@GET
@Path("enable/{extension-name}")
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Produces(MediaType.TEXT_PLAIN)
public APIResult enableExtension(
@PathParam("extension-name") String extensionName) {
LOG.error("Enable extension is not supported on Server. Please run your operation on Prism ");
throw FalconWebException.newAPIException("Enable extension is not supported on Server. Please run your "
+ "operation on Prism.");
}

@GET
@Path("disable/{extension-name}")
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Produces(MediaType.TEXT_PLAIN)
public APIResult disableExtension(
@PathParam("extension-name") String extensionName) {
LOG.error("Disable extension is not supported on Server. Please run your operation on Prism ");
throw FalconWebException.newAPIException("Disable extension is not supported on Server. Please run your "
+ "operation on Prism.");
}
}

0 comments on commit a932633

Please sign in to comment.