Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
PracheerAgarwal-zz committed Jan 30, 2017
2 parents 9cd8c17 + 21f0b69 commit 9c2f0a5
Show file tree
Hide file tree
Showing 15 changed files with 139 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ void extensionCommand(CommandLine commandLine, FalconClient client) throws IOExc
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.deleteExtensionJob(jobName, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.LIST_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
ExtensionJobList jobs = client.listExtensionJob(extensionName, doAsUser,
ExtensionJobList jobs = client.getExtensionJobs(extensionName, doAsUser,
commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT));
result = jobs != null ? jobs.toString() : "No extension job (" + extensionName + ") found.";
} else if (optionsList.contains(INSTANCES_OPT)) {
Expand Down
16 changes: 9 additions & 7 deletions client/src/main/java/org/apache/falcon/ExtensionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.ExtensionBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.jettison.json.JSONException;
Expand All @@ -38,6 +38,8 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
Expand Down Expand Up @@ -95,11 +97,10 @@ private List<Entity> getEntities(ClassLoader extensionClassloader, String extens
}

public static List<Entity> loadAndPrepare(String extensionName, String jobName, InputStream configStream,
String extensionBuildLocation) throws IOException, FalconException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
String extensionBuildLocation)
throws IOException, FalconException, URISyntaxException {
String stagePath = createStagePath(extensionName, jobName);
List<URL> urls = ExtensionHandler.copyExtensionPackage(extensionBuildLocation, fs, stagePath);
List<URL> urls = ExtensionHandler.copyExtensionPackage(extensionBuildLocation, stagePath);

List<Entity> entities = prepare(extensionName, jobName, configStream, urls);
ExtensionHandler.stageEntities(entities, stagePath);
Expand Down Expand Up @@ -152,9 +153,10 @@ private static String createStagePath(String extensionName, String jobName) {
return stagePath;
}

private static List<URL> copyExtensionPackage(String extensionBuildUrl, FileSystem fs, String stagePath)
throws IOException {
private static List<URL> copyExtensionPackage(String extensionBuildUrl, String stagePath)
throws IOException, FalconException, URISyntaxException {

FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(new URI(extensionBuildUrl));
Path libsPath = new Path(extensionBuildUrl, FalconExtensionConstants.LIBS);
Path buildLibsPath = new Path(libsPath, FalconExtensionConstants.BUILD);
Path localStagePath = new Path(stagePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.EntitySummaryResult;
import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.FeedLookupResult;
import org.apache.falcon.resource.InstanceDependencyResult;
Expand Down Expand Up @@ -294,6 +295,12 @@ public abstract APIResult submitAndScheduleExtensionJob(String extensionName, St
*/
public abstract APIResult enumerateExtensions();

/**
* Returns all registered jobs for an extension.
* @return
*/
public abstract ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser);

/**
*
* Get list of the entities.
Expand Down
20 changes: 10 additions & 10 deletions client/src/main/java/org/apache/falcon/client/FalconClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,16 @@ public APIResult enumerateExtensions() {
return getResponse(APIResult.class, clientResponse);
}

@Override
public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.LIST.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.addQueryParam(SORT_ORDER, sortOrder)
.call(ExtensionOperations.LIST);
return getResponse(ExtensionJobList.class, clientResponse);
}

public APIResult unregisterExtension(final String extensionName) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.UNREGISTER.path, extensionName)
Expand Down Expand Up @@ -1246,16 +1256,6 @@ public APIResult deleteExtensionJob(final String jobName, final String doAsUser)
return getResponse(APIResult.class, clientResponse);
}

public ExtensionJobList listExtensionJob(final String extensionName, final String doAsUser,
final String sortOrder) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.LIST.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.addQueryParam(SORT_ORDER, sortOrder)
.call(ExtensionOperations.LIST);
return getResponse(ExtensionJobList.class, clientResponse);
}

public ExtensionInstanceList listExtensionInstance(final String jobName, final String doAsUser, final String fields,
final String start, final String end, final String status,
final String orderBy, final String sortOrder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;

/**
* Extension job list used for marshalling / unmarshalling with REST calls.
Expand All @@ -36,19 +36,23 @@ public class ExtensionJobList {
int numJobs;

@XmlElementWrapper(name = "jobs")
public List<String> job;
public Map<String, String> job;

public ExtensionJobList() {
numJobs = 0;
job = null;
}

public int getNumJobs() {
return numJobs;
}

public ExtensionJobList(int numJobs) {
this.numJobs = numJobs;
job = new ArrayList<>();
job = new HashMap<>();
}

public ExtensionJobList(int numJobs, List<String> extensionJobNames) {
public ExtensionJobList(int numJobs, Map<String, String> extensionJobNames) {
this.numJobs = numJobs;
this.job = extensionJobNames;
}
Expand All @@ -57,8 +61,8 @@ public ExtensionJobList(int numJobs, List<String> extensionJobNames) {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(numJobs).append("\n");
for (String extensionJobNames : job) {
builder.append(extensionJobNames);
for (Map.Entry<String, String> extensionJobs : job.entrySet()) {
builder.append(extensionJobs.getKey() + "\t" + extensionJobs.getValue() + "\n");
}
return builder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "),
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"),
@NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select a.jobName from ExtensionJobsBean a where a.extensionName = :extensionName")
@NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionJobsBean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,18 @@ public ExtensionBean getDetail(String extensionName) {
}
}

public List<String> getJobsForAnExtension(String extensionName) {
public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
List<ExtensionJobsBean> extensionJobs = new ArrayList<>();
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
query.setParameter(EXTENSION_NAME, extensionName);
List<String> jobNames = new ArrayList<>();
try {
jobNames.addAll((List<String>) query.getResultList());
extensionJobs.addAll(query.getResultList());
return extensionJobs;
} finally {
commitAndCloseTransaction(entityManager);
}
return jobNames;
}

public void deleteExtension(String extensionName) {
Expand Down Expand Up @@ -234,12 +234,14 @@ public ExtensionJobsBean getExtensionJobDetails(String jobName) {
}
}

List<ExtensionJobsBean> getAllExtensionJobs() {
public List<ExtensionJobsBean> getAllExtensionJobs() {
List<ExtensionJobsBean> extensionJobs = new ArrayList<>();
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSION_JOBS);
try {
return q.getResultList();
extensionJobs.addAll(q.getResultList());
return extensionJobs;
} finally {
commitAndCloseTransaction(entityManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand Down Expand Up @@ -300,27 +299,23 @@ private void assertURI(String part, String value) throws ValidationException {
}

private FileSystem getHdfsFileSystem(String path) throws FalconException {
Configuration conf = new Configuration();
URI uri;
try {
uri = new URI(path);
} catch (URISyntaxException e) {
LOG.error("Exception : ", e);
throw new FalconException(e);
}
conf.set("fs.default.name", uri.getScheme() + "://" + uri.getAuthority());
return HadoopClientFactory.get().createFalconFileSystem(uri);
}


public String registerExtension(final String extensionName, final String path, final String description,
String extensionOwner) throws URISyntaxException, FalconException {
Configuration conf = new Configuration();
URI uri = new URI(path);
assertURI("Scheme", uri.getScheme());
assertURI("Authority", uri.getAuthority());
assertURI("Path", uri.getPath());
conf.set("fs.defaultFS", uri.getScheme() + "://" + uri.getAuthority());
FileSystem fileSystem = getHdfsFileSystem(path);
try {
fileSystem.listStatus(new Path(uri.getPath() + "/README"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void testExtensionJob() {
stateStore.storeExtensionJob("job1", "test2", feeds, processes, config);

Assert.assertEquals(stateStore.getJobsForAnExtension("test2").size(), 1);
Assert.assertEquals(stateStore.getJobsForAnExtension("test2").get(0), "job1");
Assert.assertEquals(stateStore.getJobsForAnExtension("test2").get(0).getJobName(), "job1");
Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1);
Assert.assertEquals(stateStore.getExtensionJobDetails("job1").getFeeds().get(0), "testFeed");
stateStore.deleteExtensionJob("job1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.entity.v0.EntityType;
Expand All @@ -35,7 +37,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

Expand All @@ -53,11 +59,16 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
private static final String CONFIG = "config";
private static final String CREATION_TIME = "creationTime";
private static final String LAST_UPDATE_TIME = "lastUpdatedTime";
protected static final String ASCENDING_SORT_ORDER = "asc";
protected static final String DESCENDING_SORT_ORDER = "desc";

public static final String NAME = "name";
public static final String STATUS = "status";
private static final String EXTENSION_TYPE = "type";
private static final String EXTENSION_DESC = "description";
private static final String EXTENSION_LOCATION = "location";
private static final String ENTITY_EXISTS_STATUS = "EXISTS";
private static final String ENTITY_NOT_EXISTS_STATUS = "NOT_EXISTS";

protected static void validateExtensionName(final String extensionName) {
if (StringUtils.isBlank(extensionName)) {
Expand Down Expand Up @@ -100,6 +111,39 @@ public APIResult getExtensions() {
}
}

public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {

Comparator<ExtensionJobsBean> compareByJobName = new Comparator<ExtensionJobsBean>() {
@Override
public int compare(ExtensionJobsBean o1, ExtensionJobsBean o2) {
return o1.getJobName().compareToIgnoreCase(o2.getJobName());
}
};

Map<String, String> jobAndExtensionNames = new HashMap<>();
List<ExtensionJobsBean> extensionJobs = null;
if (extensionName != null) {
extensionJobs = ExtensionStore.getMetaStore().getJobsForAnExtension(extensionName);
} else {
extensionJobs = ExtensionStore.getMetaStore().getAllExtensionJobs();
}

sortOrder = (sortOrder == null) ? ASCENDING_SORT_ORDER : sortOrder;
switch (sortOrder.toLowerCase()) {
case DESCENDING_SORT_ORDER:
Collections.sort(extensionJobs, Collections.reverseOrder(compareByJobName));
break;

default:
Collections.sort(extensionJobs, compareByJobName);
}

for (ExtensionJobsBean job : extensionJobs) {
jobAndExtensionNames.put(job.getJobName(), job.getExtensionName());
}
return new ExtensionJobList(extensionJobs.size(), jobAndExtensionNames);
}

public APIResult deleteExtensionMetadata(String extensionName) {
validateExtensionName(extensionName);
ExtensionStore metaStore = ExtensionStore.get();
Expand All @@ -114,7 +158,7 @@ public APIResult deleteExtensionMetadata(String extensionName) {

private void canDeleteExtension(String extensionName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
List<String> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
List<ExtensionJobsBean> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
if (!extensionJobs.isEmpty()) {
LOG.error("Extension:{} cannot be unregistered as {} are instances of the extension", extensionName,
ArrayUtils.toString(extensionJobs));
Expand All @@ -125,7 +169,7 @@ private void canDeleteExtension(String extensionName) throws FalconException {

protected SortedMap<EntityType, List<String>> getJobEntities(ExtensionJobsBean extensionJobsBean)
throws FalconException {
TreeMap<EntityType, List<String>> entityMap = new TreeMap<>();
TreeMap<EntityType, List<String>> entityMap = new TreeMap<>(Collections.<EntityType>reverseOrder());
entityMap.put(EntityType.PROCESS, extensionJobsBean.getProcesses());
entityMap.put(EntityType.FEED, extensionJobsBean.getFeeds());
return entityMap;
Expand All @@ -142,8 +186,8 @@ private JSONObject buildExtensionJobDetailResult(final String jobName) throws Fa
try {
detailsObject.put(JOB_NAME, jobsBean.getJobName());
detailsObject.put(EXTENSION_NAME, jobsBean.getExtensionName());
detailsObject.put(FEEDS, StringUtils.join(jobsBean.getFeeds(), ","));
detailsObject.put(PROCESSES, StringUtils.join(jobsBean.getProcesses(), ","));
detailsObject.put(FEEDS, getEntitiesStatus(jobsBean.getFeeds(), EntityType.FEED));
detailsObject.put(PROCESSES, getEntitiesStatus(jobsBean.getProcesses(), EntityType.PROCESS));
detailsObject.put(CONFIG, jobsBean.getConfig());
detailsObject.put(CREATION_TIME, jobsBean.getCreationTime());
detailsObject.put(LAST_UPDATE_TIME, jobsBean.getLastUpdatedTime());
Expand Down Expand Up @@ -266,4 +310,18 @@ protected static void checkIfExtensionJobNameExists(String jobName, String exten
Response.Status.INTERNAL_SERVER_ERROR);
}
}

private JSONObject getEntitiesStatus(List<String> entities, EntityType type) throws JSONException, FalconException {
JSONObject entityObject = new JSONObject();
for (String entity : entities) {
try {
entityObject.put(NAME, entity);
EntityUtil.getEntity(type, entity);
entityObject.put(STATUS, ENTITY_EXISTS_STATUS);
} catch (EntityNotRegisteredException e) {
entityObject.put(STATUS, ENTITY_NOT_EXISTS_STATUS);
}
}
return entityObject;
}
}
Loading

0 comments on commit 9c2f0a5

Please sign in to comment.