Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Jan 23, 2017
2 parents 7e16263 + 3c01168 commit c5da0a2
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
@NamedQueries({
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "),
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName")
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"),
@NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionJobsBean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ private PersistenceConstants(){
public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE";

public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME";
public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";

public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
Expand All @@ -80,5 +80,6 @@ private PersistenceConstants(){
public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";
public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB";
public static final String GET_JOBS_FOR_AN_EXTENSION = "GET_JOBS_FOR_AN_EXTENSION";
public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES";
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public List<ExtensionBean> getAllExtensions() {
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSIONS);
try {
return q.getResultList();
return (List<ExtensionBean>)q.getResultList();
} finally {
commitAndCloseTransaction(entityManager);
}
Expand All @@ -126,7 +126,24 @@ public ExtensionBean getDetail(String extensionName) {
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION);
q.setParameter(EXTENSION_NAME, extensionName);
try {
return (ExtensionBean)q.getSingleResult();
List resultList = q.getResultList();
if (!resultList.isEmpty()) {
return (ExtensionBean)resultList.get(0);
} else {
return null;
}
} finally {
commitAndCloseTransaction(entityManager);
}
}

public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
query.setParameter(EXTENSION_NAME, extensionName);
try {
return (List<ExtensionJobsBean>)query.getResultList();
} finally {
commitAndCloseTransaction(entityManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -75,7 +76,7 @@ public static ExtensionMetaStore getMetaStore() {
private static final String RESOURCES_DIR = "resources";
private static final String LIBS_DIR = "libs";

public static final String EXTENSION_STORE_URI = "extension.store.uri";
static final String EXTENSION_STORE_URI = "extension.store.uri";

private static final ExtensionStore STORE = new ExtensionStore();

Expand Down Expand Up @@ -103,10 +104,9 @@ private void initializeDbTable() {
ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extension)
? ExtensionType.TRUSTED : ExtensionType.CUSTOM;
String description = getShortDescription(extension);
String recipeName = extension;
String location = storePath.toString() + '/' + extension;
String extensionOwner = System.getProperty("user.name");
metaStore.storeExtensionBean(recipeName, location, extensionType, description, extensionOwner);
metaStore.storeExtensionBean(extension, location, extensionType, description, extensionOwner);
}
} catch (FalconException e) {
LOG.error("Exception in ExtensionMetaStore:", e);
Expand Down Expand Up @@ -143,17 +143,20 @@ private FileSystem initializeFileSystem() {
}
}

public Map<String, String> getExtensionArtifacts(final String extensionName) throws
FalconException {
private Map<String, String> getExtensionArtifacts(final String extensionName) throws
FalconException {
Map<String, String> extensionFileMap = new HashMap<>();
Path extensionPath;
try {
RemoteIterator<LocatedFileStatus> fileStatusListIterator;
if (AbstractExtension.isExtensionTrusted(extensionName)){
if (AbstractExtension.isExtensionTrusted(extensionName)) {
extensionPath = new Path(storePath, extensionName.toLowerCase());
fileStatusListIterator = fs.listFiles(extensionPath, true);
}else{
} else {
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (null == extensionBean) {
throw new StoreAccessException("Extension not found:" + extensionName);
}
extensionPath = new Path(extensionBean.getLocation());
FileSystem fileSystem = getHdfsFileSystem(extensionBean.getLocation());
fileStatusListIterator = fileSystem.listFiles(extensionPath, true);
Expand All @@ -175,7 +178,6 @@ public Map<String, String> getExtensionArtifacts(final String extensionName) thr
}



public Map<String, String> getExtensionResources(final String extensionName) throws StoreAccessException {
Map<String, String> extensionFileMap = new HashMap<>();
try {
Expand Down Expand Up @@ -243,10 +245,10 @@ public String getExtensionResource(final String resourcePath) throws FalconExcep
InputStream data;

ByteArrayOutputStream writer = new ByteArrayOutputStream();
if (resourcePath.startsWith("file")){
if (resourcePath.startsWith("file")) {
data = fs.open(resourceFile);
IOUtils.copyBytes(data, writer, fs.getConf(), true);
}else{
} else {
FileSystem fileSystem = getHdfsFileSystem(resourcePath);
data = fileSystem.open(resourceFile);
IOUtils.copyBytes(data, writer, fileSystem.getConf(), true);
Expand All @@ -257,7 +259,7 @@ public String getExtensionResource(final String resourcePath) throws FalconExcep
}
}

public List<String> getTrustedExtensions() throws StoreAccessException {
private List<String> getTrustedExtensions() throws StoreAccessException {
List<String> extensionList = new ArrayList<>();
try {
FileStatus[] fileStatuses = fs.listStatus(storePath);
Expand Down Expand Up @@ -297,17 +299,18 @@ private void assertURI(String part, String value) throws ValidationException {
throw new ValidationException(msg);
}
}

private FileSystem getHdfsFileSystem(String path) throws FalconException {
Configuration conf = new Configuration();
URI uri;
try {
uri = new URI(path);
} catch (URISyntaxException e){
} catch (URISyntaxException e) {
LOG.error("Exception : ", e);
throw new FalconException(e);
}
conf.set("fs.default.name", uri.getScheme() + "://" + uri.getAuthority());
return HadoopClientFactory.get().createFalconFileSystem(uri);
return HadoopClientFactory.get().createFalconFileSystem(uri);
}


Expand Down Expand Up @@ -362,7 +365,8 @@ public boolean accept(Path file) {
LOG.info("Extension :" + extensionName + " registered successfully.");
return "Extension :" + extensionName + " registered successfully.";
}
public String getResource(final String extensionName, final String resourceName) throws FalconException {

public String getResource(final String extensionName, final String resourceName) throws FalconException {
Map<String, String> resources = getExtensionArtifacts(extensionName);
if (resources.isEmpty()) {
throw new StoreAccessException("No extension resources found for " + extensionName);
Expand All @@ -379,10 +383,26 @@ public boolean isExtensionStoreInitialized() {
return (storePath != null);
}

public List<String> getJobsForAnExtension(final String extensionName) throws FalconException {
List<ExtensionJobsBean> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
List<String> extensionJobNames = new ArrayList<>();
if (null != extensionJobs && !extensionJobs.isEmpty()) {
for (ExtensionJobsBean extensionJobsBean : extensionJobs) {
extensionJobNames.add(extensionJobsBean.getJobName());
}
}
return extensionJobNames;
}

public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws
FalconException {
validateStatusChange(extensionName, currentUser);
if (metaStore.getDetail(extensionName).getStatus().equals(status)) {
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (extensionBean == null) {
LOG.error("Extension not found: " + extensionName);
throw new FalconException("Extension not found:" + extensionName);
}
if (extensionBean.getStatus().equals(status)) {
throw new ValidationException(extensionName + " is already in " + status.toString() + " state.");
} else {
metaStore.updateExtensionStatus(extensionName, status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.falcon.resource;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
Expand Down Expand Up @@ -99,13 +100,26 @@ public APIResult getExtensions() {
}
}

public APIResult deleteExtensionMetadata(String extensionName){
public APIResult deleteExtensionMetadata(String extensionName) {
validateExtensionName(extensionName);
ExtensionStore metaStore = ExtensionStore.get();
try {
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().deleteExtension(extensionName,
CurrentUser.getUser()));
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
canDeleteExtension(extensionName);
return new APIResult(APIResult.Status.SUCCEEDED,
metaStore.deleteExtension(extensionName, CurrentUser.getUser()));
} catch (FalconException e) {
throw FalconWebException.newAPIException(e);
}
}

private void canDeleteExtension(String extensionName) throws FalconException {
ExtensionStore metaStore = ExtensionStore.get();
List<String> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
if (!extensionJobs.isEmpty()) {
LOG.error("Extension:{} cannot be unregistered as {} are instances of the extension", extensionName,
ArrayUtils.toString(extensionJobs));
throw new FalconException("Extension:" + extensionName + " cannot be unregistered as following instances"
+ " are dependent on the extension" + ArrayUtils.toString(extensionJobs));
}
}

Expand Down Expand Up @@ -180,13 +194,17 @@ private JSONObject buildExtensionDetailResult(final String extensionName) throws
throw new ValidationException("No extension resources found for " + extensionName);
}

ExtensionBean bean = metaStore.getDetail(extensionName);
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (extensionBean == null) {
LOG.error("Extension not found: " + extensionName);
throw new FalconException("Extension not found:" + extensionName);
}
JSONObject resultObject = new JSONObject();
try {
resultObject.put(NAME, bean.getExtensionName());
resultObject.put(EXTENSION_TYPE, bean.getExtensionType());
resultObject.put(EXTENSION_DESC, bean.getDescription());
resultObject.put(EXTENSION_LOCATION, bean.getLocation());
resultObject.put(NAME, extensionBean.getExtensionName());
resultObject.put(EXTENSION_TYPE, extensionBean.getExtensionType());
resultObject.put(EXTENSION_DESC, extensionBean.getDescription());
resultObject.put(EXTENSION_LOCATION, extensionBean.getLocation());
} catch (JSONException e) {
LOG.error("Exception in buildDetailResults:", e);
throw new FalconException(e);
Expand Down Expand Up @@ -216,7 +234,13 @@ private static JSONArray buildEnumerateResult() throws FalconException {

protected static void checkIfExtensionIsEnabled(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
if (!metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (extensionBean == null) {
LOG.error("Extension not found: " + extensionName);
throw FalconWebException.newAPIException("Extension not found:" + extensionName,
Response.Status.NOT_FOUND);
}
if (!extensionBean.getStatus().equals(ExtensionStatus.ENABLED)) {
LOG.error("Extension: " + extensionName + " is in disabled state.");
throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
Response.Status.INTERNAL_SERVER_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,8 @@ public APIResult getExtensionDescription(
validateExtensionName(extensionName);
try {
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().getResource(extensionName, README));
} catch (FalconException e) {
throw FalconWebException.newAPIException(e, Response.Status.BAD_REQUEST);
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
Expand Down Expand Up @@ -731,6 +733,8 @@ public APIResult getExtensionDefinition(
try {
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().getResource(extensionName,
extensionName.toLowerCase() + EXTENSION_PROPERTY_JSON_SUFFIX));
} catch (FalconException e) {
throw FalconWebException.newAPIException(e, Response.Status.BAD_REQUEST);
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ public void run() {
if (status.getInstances().length > 0
&& status.getInstances()[0].status == InstancesResult.
WorkflowStatus.SUCCEEDED) {
LOG.debug("Instance of nominaltime {} of entity {} has succeeded, removing "
LOG.debug("Instance of nominal time {} of entity {} has succeeded, removing "
+ "from backlog entries", nominalTimeStr, entity.getName());
backlogMetricStore.deleteMetricInstance(entity.getName(),
metricInfo.getCluster(), nominalTime, entity.getEntityType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,11 @@ public APIResult registerExtension(String extensionName, String packagePath, Str

@Override
public APIResult unregisterExtension(String extensionName) {
return localExtensionManager.unRegisterExtension(extensionName);
try {
return localExtensionManager.unRegisterExtension(extensionName);
} catch (FalconException e) {
throw new FalconCLIException("Failed in unRegistering the extension"+ e.getMessage());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ APIResult registerExtensionMetadata(String extensionName, String packagePath, St
return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser());
}

APIResult unRegisterExtension(String extensionName) {
APIResult unRegisterExtension(String extensionName) throws FalconException {
return super.deleteExtensionMetadata(extensionName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,29 +218,29 @@ private Map<String, String> updateColoAndCluster(String colo, String cluster, Ma
return props;
}

public String registerExtension(String extensionName, String packagePath, String description)
APIResult registerExtension(String extensionName, String packagePath, String description)
throws IOException, FalconException {

return falconUnitClient.registerExtension(extensionName, packagePath, description).getMessage();
return falconUnitClient.registerExtension(extensionName, packagePath, description);
}

public String disableExtension(String extensionName) {
String disableExtension(String extensionName) {
return falconUnitClient.disableExtension(extensionName).getMessage();
}

public String enableExtension(String extensionName) {
String enableExtension(String extensionName) {
return falconUnitClient.enableExtension(extensionName).getMessage();
}

public String getExtensionJobDetails(String jobName) {
return falconUnitClient.getExtensionJobDetails(jobName).getMessage();
APIResult getExtensionJobDetails(String jobName) {
return falconUnitClient.getExtensionJobDetails(jobName);
}

public String unregisterExtension(String extensionName) {
return falconUnitClient.unregisterExtension(extensionName).getMessage();
APIResult unregisterExtension(String extensionName) {
return falconUnitClient.unregisterExtension(extensionName);
}

public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
return falconUnitClient.submitExtensionJob(extensionName, jobName, configPath, doAsUser);
}

Expand Down
Loading

0 comments on commit c5da0a2

Please sign in to comment.