Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
PracheerAgarwal-zz committed Jan 4, 2017
2 parents 7f572a1 + 7c5822c commit b20f044
Show file tree
Hide file tree
Showing 18 changed files with 486 additions and 88 deletions.
Expand Up @@ -369,7 +369,7 @@ public void entityCommand(CommandLine commandLine, FalconClient client) throws I
OUT.get().println(result);
}

private void validateColo(Set<String> optionsList) {
static void validateColo(Set<String> optionsList) {
if (optionsList.contains(FalconCLIConstants.COLO_OPT)) {
throw new FalconCLIException("Invalid argument : " + FalconCLIConstants.COLO_OPT);
}
Expand Down
54 changes: 38 additions & 16 deletions cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
Expand Up @@ -33,36 +33,44 @@
import org.apache.falcon.resource.ExtensionInstanceList;
import org.apache.falcon.resource.ExtensionJobList;

import java.io.IOException;
import java.io.PrintStream;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.falcon.cli.FalconEntityCLI.validateColo;
import static org.apache.falcon.client.FalconCLIConstants.COLO_OPT;
import static org.apache.falcon.client.FalconCLIConstants.COLO_OPT_DESCRIPTION;

/**
* Falcon extensions Command Line Interface - wraps the RESTful API for extensions.
*/
public class FalconExtensionCLI {
public class FalconExtensionCLI extends FalconCLI{
public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out);

// Extension commands
public static final String ENUMERATE_OPT = "enumerate";
public static final String DEFINITION_OPT = "definition";
public static final String DESCRIBE_OPT = "describe";
public static final String INSTANCES_OPT = "instances";
public static final String UNREGISTER_OPT = "unregister";
public static final String DETAIL_OPT = "detail";
public static final String REGISTER_OPT = "register";
private static final String ENUMERATE_OPT = "enumerate";
private static final String DEFINITION_OPT = "definition";
private static final String DESCRIBE_OPT = "describe";
private static final String INSTANCES_OPT = "instances";
private static final String UNREGISTER_OPT = "unregister";
private static final String DETAIL_OPT = "detail";
private static final String REGISTER_OPT = "register";
private static final String ENABLE_OPT = "enable";
private static final String DISABLE_OPT = "disable";

// Input parameters
public static final String EXTENSION_NAME_OPT = "extensionName";
public static final String JOB_NAME_OPT = "jobName";
private static final String EXTENSION_NAME_OPT = "extensionName";
private static final String JOB_NAME_OPT = "jobName";
public static final String DESCRIPTION = "description";
public static final String PATH = "path";
private static final String PATH = "path";

public FalconExtensionCLI() {
FalconExtensionCLI() throws Exception {
super();
}

public void extensionCommand(CommandLine commandLine, FalconClient client) {
void extensionCommand(CommandLine commandLine, FalconClient client) throws IOException {
Set<String> optionsList = new HashSet<>();
for (Option option : commandLine.getOptions()) {
optionsList.add(option.getOpt());
Expand All @@ -75,6 +83,8 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
String doAsUser = commandLine.getOptionValue(FalconCLIConstants.DO_AS_OPT);
String path = commandLine.getOptionValue(FalconCLIConstants.PATH);
String description = commandLine.getOptionValue(FalconCLIConstants.DESCRIPTION);
String colo = commandLine.getOptionValue(FalconCLIConstants.COLO_OPT);
colo = getColo(colo);

if (optionsList.contains(ENUMERATE_OPT)) {
result = client.enumerateExtensions().getMessage();
Expand Down Expand Up @@ -103,6 +113,7 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
validateColo(optionsList);
result = client.submitExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(REGISTER_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
Expand All @@ -112,6 +123,7 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
validateColo(optionsList);
result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
Expand All @@ -123,7 +135,8 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
result = client.validateExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.scheduleExtensionJob(jobName, doAsUser).getMessage();
colo = getColo(colo);
result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.suspendExtensionJob(jobName, doAsUser).getMessage();
Expand Down Expand Up @@ -153,16 +166,22 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT),
commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT));
result = instances != null ? instances.toString() : "No instance (" + jobName + ") found.";
} else if (optionsList.contains(ENABLE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.enableExtension(extensionName).getMessage();
} else if (optionsList.contains(DISABLE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.disableExtension(extensionName).getMessage();
} else {
throw new FalconCLIException("Invalid/missing extension command. Supported commands include "
+ "enumerate, definition, describe, list, instances, submit, submitAndSchedule, "
+ "schedule, suspend, resume, delete, update, validate. "
+ "schedule, suspend, resume, delete, update, validate, enable, disable. "
+ "Please refer to Falcon CLI twiki for more details.");
}
OUT.get().println(result);
}

public Options createExtensionOptions() {
Options createExtensionOptions() {
Options extensionOptions = new Options();

Option enumerate = new Option(ENUMERATE_OPT, false, "Enumerate all extensions");
Expand All @@ -184,6 +203,8 @@ public Options createExtensionOptions() {
Option detail = new Option(FalconCLIConstants.DETAIL, false, "Show details of a given extension");
Option register = new Option(FalconCLIConstants.REGISTER, false, "Register an extension with Falcon. This will "
+ "make the extension available for instantiation for all users.");
Option colo = new Option(COLO_OPT, true, COLO_OPT_DESCRIPTION);
colo.setRequired(false);

OptionGroup group = new OptionGroup();
group.addOption(enumerate);
Expand Down Expand Up @@ -241,6 +262,7 @@ public Options createExtensionOptions() {
extensionOptions.addOption(filePath);
extensionOptions.addOption(path);
extensionOptions.addOption(description);
extensionOptions.addOption(colo);

return extensionOptions;
}
Expand Down
Expand Up @@ -196,6 +196,20 @@ public abstract APIResult submitAndSchedule(String entityType, String filePath,
*/
public abstract APIResult unregisterExtension(String extensionName);

/**
*
* @param extensionName extensionName that needs to be enabled
* @return Result of the enableExtension operation
*/
public abstract APIResult enableExtension(String extensionName);

/**
*
* @param extensionName extensionName that needs to be disabled
* @return Result of the disableExtension operation
*/
public abstract APIResult disableExtension(String extensionName);

/**
* Prepares set of entities the extension has implemented and stage them to a local directory and submit them too.
* @param extensionName extension which is available in the store.
Expand All @@ -207,6 +221,13 @@ public abstract APIResult submitAndSchedule(String entityType, String filePath,
public abstract APIResult submitExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);

/**
* Schedules the set of entities that are part of the extension.
* @param jobName extensionJob that needs to be scheduled.
* @return APIResult stating status of scheduling the extension.
*/
public abstract APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser);

/**
* Prepares set of entities the extension has implemented and stage them to a local directory and submits and
* schedules them.
Expand Down
19 changes: 17 additions & 2 deletions client/src/main/java/org/apache/falcon/client/FalconClient.java
Expand Up @@ -356,7 +356,9 @@ protected static enum ExtensionOperations {
UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_XML),
DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON),
JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.APPLICATION_JSON),
REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML);
REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML),
ENABLE("api/extension/enable", HttpMethod.POST, MediaType.TEXT_XML),
DISABLE("api/extension/disable", HttpMethod.POST, MediaType.TEXT_XML);

private String path;
private String method;
Expand Down Expand Up @@ -1049,6 +1051,18 @@ public APIResult registerExtension(final String extensionName, final String pack
return getResponse(APIResult.class, clientResponse);
}

public APIResult enableExtension(final String extensionName) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.ENABLE.path, extensionName).call(ExtensionOperations.ENABLE);
return getResponse(APIResult.class, clientResponse);
}

public APIResult disableExtension(final String extensionName) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.DISABLE.path, extensionName).call(ExtensionOperations.DISABLE);
return getResponse(APIResult.class, clientResponse);
}

public APIResult getExtensionDefinition(final String extensionName) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.DEFINITION.path, extensionName)
Expand Down Expand Up @@ -1186,9 +1200,10 @@ public APIResult validateExtensionJob(final String extensionName, final String j
}
}

public APIResult scheduleExtensionJob(final String jobName, final String doAsUser) {
public APIResult scheduleExtensionJob(String jobName, final String coloExpr, final String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.SCHEDULE.path, jobName)
.addQueryParam(COLO, coloExpr)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.SCHEDULE);
return getResponse(APIResult.class, clientResponse);
Expand Down
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.extensions;

/**
* Enum to store ExtensionStatus.
*/
public enum ExtensionStatus {
ENABLED("enabled state"),
DISABLED("disabled state");

private final String text;

ExtensionStatus(final String text) {
this.text = text;
}

@Override
public String toString(){
return text;
}
}
Expand Up @@ -18,6 +18,7 @@

package org.apache.falcon.persistence;

import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.ExtensionType;

import javax.persistence.Basic;
Expand All @@ -44,7 +45,8 @@
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSIONS, query = "select OBJECT(a) from ExtensionBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE, query = "delete from ExtensionBean a where a.extensionType = :extensionType "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION, query = "delete from ExtensionBean a where a.extensionName = :extensionName "),
@NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionBean a where a.extensionName = :extensionName")
@NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionBean a where a.extensionName = :extensionName"),
@NamedQuery(name = PersistenceConstants.CHANGE_EXTENSION_STATUS, query = "update ExtensionBean a set a.status = :extensionStatus where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionBean {
Expand Down Expand Up @@ -79,6 +81,12 @@ public class ExtensionBean {
@Column(name = "extension_owner")
private String extensionOwner;

@Basic
@NotNull
@Column(name = "status")
@Enumerated(EnumType.STRING)
private ExtensionStatus status;

public ExtensionType getExtensionType() {
return extensionType;
}
Expand Down Expand Up @@ -127,4 +135,12 @@ public void setExtensionOwner(String extensionOwner) {
this.extensionOwner = extensionOwner;
}

public ExtensionStatus getStatus() {
return status;
}

public void setStatus(ExtensionStatus status) {
this.status = status;
}

}
Expand Up @@ -75,6 +75,7 @@ private PersistenceConstants(){
public static final String DELETE_EXTENSIONS_OF_TYPE = "DELETE_EXTENSIONS_OF_TYPE";
public static final String DELETE_EXTENSION = "DELETE_EXTENSION";
public static final String GET_EXTENSION = "GET_EXTENSION";
public static final String CHANGE_EXTENSION_STATUS = "CHANGE_EXTENSION_STATUS";

public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";
Expand Down
Expand Up @@ -17,7 +17,9 @@
*/
package org.apache.falcon.extensions.jdbc;

import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.store.ExtensionStore;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.persistence.PersistenceConstants;
Expand All @@ -36,6 +38,7 @@ public class ExtensionMetaStore {
private static final String EXTENSION_NAME = "extensionName";
private static final String JOB_NAME = "jobName";
private static final String EXTENSION_TYPE = "extensionType";
private static final String EXTENSION_STATUS = "extensionStatus";

private EntityManager getEntityManager() {
return FalconJPAService.get().getEntityManager();
Expand All @@ -50,6 +53,7 @@ public void storeExtensionBean(String extensionName, String location, ExtensionT
extensionBean.setCreationTime(new Date(System.currentTimeMillis()));
extensionBean.setDescription(description);
extensionBean.setExtensionOwner(extensionOwner);
extensionBean.setStatus(ExtensionStatus.ENABLED);
EntityManager entityManager = getEntityManager();
try {
beginTransaction(entityManager);
Expand Down Expand Up @@ -142,6 +146,11 @@ public void deleteExtension(String extensionName){

public void storeExtensionJob(String jobName, String extensionName, List<String> feeds, List<String> processes,
byte[] config) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
boolean alreadySubmitted = false;
if (metaStore.getExtensionJobDetails(jobName) != null){
alreadySubmitted = true;
}
ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean();
Date currentTime = new Date(System.currentTimeMillis());
extensionJobsBean.setJobName(jobName);
Expand All @@ -154,7 +163,11 @@ public void storeExtensionJob(String jobName, String extensionName, List<String>
EntityManager entityManager = getEntityManager();
try {
beginTransaction(entityManager);
entityManager.persist(extensionJobsBean);
if (alreadySubmitted) {
entityManager.merge(extensionJobsBean);
} else {
entityManager.persist(extensionJobsBean);
}
} finally {
commitAndCloseTransaction(entityManager);
}
Expand Down Expand Up @@ -229,4 +242,16 @@ private void commitAndCloseTransaction(EntityManager entityManager) {
entityManager.close();
}
}

public void updateExtensionStatus(String extensionName, ExtensionStatus status) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.CHANGE_EXTENSION_STATUS);
q.setParameter(EXTENSION_NAME, extensionName).setParameter(EXTENSION_STATUS, status);
try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
}
}
}

0 comments on commit b20f044

Please sign in to comment.