Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
PracheerAgarwal-zz committed Jan 22, 2017
2 parents fda3b28 + 3c01168 commit e39808d
Show file tree
Hide file tree
Showing 16 changed files with 260 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ void extensionCommand(CommandLine commandLine, FalconClient client) throws IOExc
result = client.updateExtensionJob(jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.validateExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
Expand Down Expand Up @@ -200,6 +201,8 @@ Options createExtensionOptions() {
Option suspend = new Option(FalconCLIConstants.SUSPEND_OPT, false, "Suspend an extension job");
Option resume = new Option(FalconCLIConstants.RESUME_OPT, false, "Resume an extension job");
Option delete = new Option(FalconCLIConstants.DELETE_OPT, false, "Delete an extension job");
Option enable = new Option(FalconCLIConstants.ENABLE_OPT, false, "Enable an extension");
Option disable = new Option(FalconCLIConstants.DISABLE_OPT, false, "Disable an extension");
Option unregister = new Option(FalconCLIConstants.UREGISTER, false, "Un-register an extension. This will make"
+ " the extension unavailable for instantiation");
Option detail = new Option(FalconCLIConstants.DETAIL, false, "Show details of a given extension");
Expand All @@ -225,6 +228,8 @@ Options createExtensionOptions() {
group.addOption(unregister);
group.addOption(detail);
group.addOption(register);
group.addOption(enable);
group.addOption(disable);
extensionOptions.addOptionGroup(group);

Option url = new Option(FalconCLIConstants.URL_OPTION, true, "Falcon URL");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ private FalconCLIConstants(){
public static final String UPDATE_OPT = "update";
public static final String UPDATE_CLUSTER_DEPENDENTS_OPT = "updateClusterDependents";
public static final String DELETE_OPT = "delete";
public static final String ENABLE_OPT = "enable";
public static final String DISABLE_OPT = "disable";
public static final String SCHEDULE_OPT = "schedule";
public static final String CURRENT_COLO = "current.colo";
public static final String CLIENT_PROPERTIES = "/client.properties";
Expand Down
27 changes: 19 additions & 8 deletions client/src/main/java/org/apache/falcon/client/FalconClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.sun.jersey.client.urlconnection.HTTPSProperties;
import com.sun.jersey.multipart.FormDataMultiPart;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.util.TrustManagerUtils;
import org.apache.falcon.LifeCycle;
Expand Down Expand Up @@ -52,6 +53,8 @@
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
Expand Down Expand Up @@ -81,6 +84,7 @@
public class FalconClient extends AbstractFalconClient {

public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out);
public static final Logger LOG = LoggerFactory.getLogger(FalconClient.class);

public static final String WS_HEADER_PREFIX = "header:";
public static final String USER = System.getProperty("user.name");
Expand Down Expand Up @@ -187,6 +191,7 @@ public FalconClient(String falconUrl, Properties properties) {
client.resource(UriBuilder.fromUri(baseUrl).build());
authenticationToken = getToken(baseUrl);
} catch (Exception e) {
LOG.error("Unable to initialize Falcon Client object. Cause : ", e);
throw new FalconCLIException("Unable to initialize Falcon Client object. Cause : " + e.getMessage(), e);
}
}
Expand Down Expand Up @@ -341,8 +346,8 @@ protected static enum AdminOperations {
protected static enum ExtensionOperations {

ENUMERATE("api/extension/enumerate/", HttpMethod.GET, MediaType.TEXT_XML),
DESCRIBE("api/extension/describe/", HttpMethod.GET, MediaType.TEXT_PLAIN),
DEFINITION("api/extension/definition", HttpMethod.GET, MediaType.APPLICATION_JSON),
DESCRIBE("api/extension/describe/", HttpMethod.GET, MediaType.TEXT_XML),
DEFINITION("api/extension/definition", HttpMethod.GET, MediaType.TEXT_XML),
LIST("api/extension/list", HttpMethod.GET, MediaType.APPLICATION_JSON),
INSTANCES("api/extension/instances", HttpMethod.GET, MediaType.APPLICATION_JSON),
SUBMIT("api/extension/submit", HttpMethod.POST, MediaType.TEXT_XML),
Expand All @@ -354,8 +359,8 @@ protected static enum ExtensionOperations {
RESUME("api/extension/resume", HttpMethod.POST, MediaType.TEXT_XML),
DELETE("api/extension/delete", HttpMethod.POST, MediaType.TEXT_XML),
UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_XML),
DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON),
JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.APPLICATION_JSON),
DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.TEXT_XML),
JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.TEXT_XML),
REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML),
ENABLE("api/extension/enable", HttpMethod.POST, MediaType.TEXT_XML),
DISABLE("api/extension/disable", HttpMethod.POST, MediaType.TEXT_XML);
Expand Down Expand Up @@ -1107,7 +1112,8 @@ private FormDataMultiPart getEntitiesForm(String extensionName, String jobName,
try {
formDataMultiPart.close();
} catch (IOException e) {
throw new FalconCLIException("Submit failed. Failed to submit entities", e);
LOG.error("Submit failed. Failed to submit entities. Cause: ", e);
throw new FalconCLIException("Submit failed. Failed to submit entities:" + e.getMessage(), e);
}
return formDataMultiPart;
}
Expand All @@ -1132,17 +1138,20 @@ private JSONObject getExtensionDetailJson(String extensionName) {
try {
extensionDetailJson = new JSONObject(getResponse(APIResult.class, clientResponse).getMessage());
} catch (JSONException e) {
throw new FalconCLIException("Failed to get details for the given extension", e);
LOG.error("Failed to get details for the given extension. Cause: ", e);
throw new FalconCLIException("Failed to get details for the given extension:" + e.getMessage(), e);
}
return extensionDetailJson;
}

private JSONObject getExtensionJobDetailJson(String jobName) {
ClientResponse clientResponse = getExtensionJobDetailsResponse(jobName);
JSONObject extensionJobDetailJson;
try {
extensionJobDetailJson = new JSONObject(getResponse(APIResult.class, clientResponse).getMessage());
} catch (JSONException e) {
throw new FalconCLIException("Failed to get details for the given extension", e);
LOG.error("Failed to get details for the given extension. Cause: ", e);
throw new FalconCLIException("Failed to get details for the given extension:" + e.getMessage(), e);
}
return extensionJobDetailJson;
}
Expand All @@ -1155,7 +1164,9 @@ private List<Entity> getEntities(String extensionName, String jobName, InputStre
entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream,
extensionBuildLocation);
} catch (Exception e) {
throw new FalconCLIException("Error in building the extension", e);
LOG.error("Error in building the extension. Cause: ", e);
OUT.get().println("Error in building the extension:" + ExceptionUtils.getFullStackTrace(e));
throw new FalconCLIException("Error in building the extension:" + e.getMessage(), e);
}
if (entities == null || entities.isEmpty()) {
throw new FalconCLIException("No entities got built for the given extension");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
@NamedQueries({
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "),
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName")
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"),
@NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionJobsBean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ private PersistenceConstants(){
public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE";

public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME";
public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";

public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
Expand All @@ -80,5 +80,6 @@ private PersistenceConstants(){
public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";
public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB";
public static final String GET_JOBS_FOR_AN_EXTENSION = "GET_JOBS_FOR_AN_EXTENSION";
public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES";
}
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public List<ExtensionBean> getAllExtensions() {
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSIONS);
try {
return q.getResultList();
return (List<ExtensionBean>)q.getResultList();
} finally {
commitAndCloseTransaction(entityManager);
}
Expand All @@ -126,7 +126,24 @@ public ExtensionBean getDetail(String extensionName) {
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION);
q.setParameter(EXTENSION_NAME, extensionName);
try {
return (ExtensionBean)q.getSingleResult();
List resultList = q.getResultList();
if (!resultList.isEmpty()) {
return (ExtensionBean)resultList.get(0);
} else {
return null;
}
} finally {
commitAndCloseTransaction(entityManager);
}
}

public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
query.setParameter(EXTENSION_NAME, extensionName);
try {
return (List<ExtensionJobsBean>)query.getResultList();
} finally {
commitAndCloseTransaction(entityManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -75,7 +76,7 @@ public static ExtensionMetaStore getMetaStore() {
private static final String RESOURCES_DIR = "resources";
private static final String LIBS_DIR = "libs";

public static final String EXTENSION_STORE_URI = "extension.store.uri";
static final String EXTENSION_STORE_URI = "extension.store.uri";

private static final ExtensionStore STORE = new ExtensionStore();

Expand Down Expand Up @@ -103,10 +104,9 @@ private void initializeDbTable() {
ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extension)
? ExtensionType.TRUSTED : ExtensionType.CUSTOM;
String description = getShortDescription(extension);
String recipeName = extension;
String location = storePath.toString() + '/' + extension;
String extensionOwner = System.getProperty("user.name");
metaStore.storeExtensionBean(recipeName, location, extensionType, description, extensionOwner);
metaStore.storeExtensionBean(extension, location, extensionType, description, extensionOwner);
}
} catch (FalconException e) {
LOG.error("Exception in ExtensionMetaStore:", e);
Expand Down Expand Up @@ -143,17 +143,20 @@ private FileSystem initializeFileSystem() {
}
}

public Map<String, String> getExtensionArtifacts(final String extensionName) throws
FalconException {
private Map<String, String> getExtensionArtifacts(final String extensionName) throws
FalconException {
Map<String, String> extensionFileMap = new HashMap<>();
Path extensionPath;
try {
RemoteIterator<LocatedFileStatus> fileStatusListIterator;
if (AbstractExtension.isExtensionTrusted(extensionName)){
if (AbstractExtension.isExtensionTrusted(extensionName)) {
extensionPath = new Path(storePath, extensionName.toLowerCase());
fileStatusListIterator = fs.listFiles(extensionPath, true);
}else{
} else {
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (null == extensionBean) {
throw new StoreAccessException("Extension not found:" + extensionName);
}
extensionPath = new Path(extensionBean.getLocation());
FileSystem fileSystem = getHdfsFileSystem(extensionBean.getLocation());
fileStatusListIterator = fileSystem.listFiles(extensionPath, true);
Expand All @@ -175,7 +178,6 @@ public Map<String, String> getExtensionArtifacts(final String extensionName) thr
}



public Map<String, String> getExtensionResources(final String extensionName) throws StoreAccessException {
Map<String, String> extensionFileMap = new HashMap<>();
try {
Expand Down Expand Up @@ -243,10 +245,10 @@ public String getExtensionResource(final String resourcePath) throws FalconExcep
InputStream data;

ByteArrayOutputStream writer = new ByteArrayOutputStream();
if (resourcePath.startsWith("file")){
if (resourcePath.startsWith("file")) {
data = fs.open(resourceFile);
IOUtils.copyBytes(data, writer, fs.getConf(), true);
}else{
} else {
FileSystem fileSystem = getHdfsFileSystem(resourcePath);
data = fileSystem.open(resourceFile);
IOUtils.copyBytes(data, writer, fileSystem.getConf(), true);
Expand All @@ -257,7 +259,7 @@ public String getExtensionResource(final String resourcePath) throws FalconExcep
}
}

public List<String> getTrustedExtensions() throws StoreAccessException {
private List<String> getTrustedExtensions() throws StoreAccessException {
List<String> extensionList = new ArrayList<>();
try {
FileStatus[] fileStatuses = fs.listStatus(storePath);
Expand Down Expand Up @@ -297,17 +299,18 @@ private void assertURI(String part, String value) throws ValidationException {
throw new ValidationException(msg);
}
}

private FileSystem getHdfsFileSystem(String path) throws FalconException {
Configuration conf = new Configuration();
URI uri;
try {
uri = new URI(path);
} catch (URISyntaxException e){
} catch (URISyntaxException e) {
LOG.error("Exception : ", e);
throw new FalconException(e);
}
conf.set("fs.default.name", uri.getScheme() + "://" + uri.getAuthority());
return HadoopClientFactory.get().createFalconFileSystem(uri);
return HadoopClientFactory.get().createFalconFileSystem(uri);
}


Expand Down Expand Up @@ -362,7 +365,8 @@ public boolean accept(Path file) {
LOG.info("Extension :" + extensionName + " registered successfully.");
return "Extension :" + extensionName + " registered successfully.";
}
public String getResource(final String extensionName, final String resourceName) throws FalconException {

public String getResource(final String extensionName, final String resourceName) throws FalconException {
Map<String, String> resources = getExtensionArtifacts(extensionName);
if (resources.isEmpty()) {
throw new StoreAccessException("No extension resources found for " + extensionName);
Expand All @@ -379,10 +383,26 @@ public boolean isExtensionStoreInitialized() {
return (storePath != null);
}

public List<String> getJobsForAnExtension(final String extensionName) throws FalconException {
List<ExtensionJobsBean> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
List<String> extensionJobNames = new ArrayList<>();
if (null != extensionJobs && !extensionJobs.isEmpty()) {
for (ExtensionJobsBean extensionJobsBean : extensionJobs) {
extensionJobNames.add(extensionJobsBean.getJobName());
}
}
return extensionJobNames;
}

public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws
FalconException {
validateStatusChange(extensionName, currentUser);
if (metaStore.getDetail(extensionName).getStatus().equals(status)) {
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (extensionBean == null) {
LOG.error("Extension not found: " + extensionName);
throw new FalconException("Extension not found:" + extensionName);
}
if (extensionBean.getStatus().equals(status)) {
throw new ValidationException(extensionName + " is already in " + status.toString() + " state.");
} else {
metaStore.updateExtensionStatus(extensionName, status);
Expand Down
Loading

0 comments on commit e39808d

Please sign in to comment.