Skip to content

Commit

Permalink
FALCON-2200 Update API support for extension job (user extension)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Dec 28, 2016
1 parent 456d4ee commit cc7c9e9
Show file tree
Hide file tree
Showing 16 changed files with 244 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
result = client.registerExtension(extensionName, path, description).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.updateExtensionJob(extensionName, filePath, doAsUser).getMessage();
result = client.updateExtensionJob(jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,16 @@ public abstract APIResult submitExtensionJob(String extensionName, String jobNam
public abstract APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);

/**
* Prepares set of entities the extension has implemented and stage them to a local directory and updates them.
* @param jobName name to be used in all the extension entities' tagging that are built as part of
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
* @throws FalconCLIException
*/
public abstract APIResult updateExtensionJob(String jobName, String configPath, String doAsUser);

/**
* Prepares set of entities the extension has implemented to validate the extension job.
* @param jobName job name of the extension job.
Expand Down
35 changes: 27 additions & 8 deletions client/src/main/java/org/apache/falcon/client/FalconClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1028,9 +1028,12 @@ public APIResult getExtensionDetail(final String extensionName) {
}

public APIResult getExtensionJobDetails(final String jobName) {
ClientResponse clientResponse = new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName)
return getResponse(APIResult.class, getExtensionJobDetailsResponse(jobName));
}

private ClientResponse getExtensionJobDetailsResponse(final String jobName) {
return new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName)
.call(ExtensionOperations.JOB_DETAILS);
return getResponse(APIResult.class, clientResponse);
}

private ClientResponse getExtensionDetailResponse(final String extensionName) {
Expand Down Expand Up @@ -1097,7 +1100,13 @@ private FormDataMultiPart getEntitiesForm(String extensionName, String jobName,

private List<Entity> validateExtensionAndGetEntities(String extensionName, String jobName,
InputStream configStream) {
JSONObject extensionDetailJson = getExtensionDetailJson(extensionName);
JSONObject extensionDetailJson;
if (StringUtils.isNotBlank(extensionName)) {
extensionDetailJson = getExtensionDetailJson(extensionName);
} else {
extensionDetailJson = getExtensionJobDetailJson(jobName);
}

String extensionType = ExtensionHandler.getExtensionType(extensionName, extensionDetailJson);
String extensionBuildLocation = ExtensionHandler.getExtensionLocation(extensionName, extensionDetailJson);
return getEntities(extensionName, jobName, configStream, extensionType,
Expand All @@ -1108,12 +1117,22 @@ private JSONObject getExtensionDetailJson(String extensionName) {
ClientResponse clientResponse = getExtensionDetailResponse(extensionName);
JSONObject extensionDetailJson;
try {
extensionDetailJson = new JSONObject(clientResponse.getEntity(String.class));
extensionDetailJson = new JSONObject(clientResponse.getEntity(APIResult.class).getMessage());
} catch (JSONException e) {
throw new FalconCLIException("Failed to get details for the given extension", e);
}
return extensionDetailJson;
}
private JSONObject getExtensionJobDetailJson(String jobName) {
ClientResponse clientResponse = getExtensionJobDetailsResponse(jobName);
JSONObject extensionJobDetailJson;
try {
extensionJobDetailJson = new JSONObject(clientResponse.getEntity(APIResult.class).getMessage());
} catch (JSONException e) {
throw new FalconCLIException("Failed to get details for the given extension", e);
}
return extensionJobDetailJson;
}

private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream,
String extensionType, String extensionBuildLocation) {
Expand Down Expand Up @@ -1143,12 +1162,12 @@ public APIResult submitAndScheduleExtensionJob(final String extensionName, final
return getResponse(APIResult.class, clientResponse);
}

public APIResult updateExtensionJob(final String extensionName, final String filePath, final String doAsUser) {
InputStream entityStream = getServletInputStream(filePath);
public APIResult updateExtensionJob(final String jobName, final String configPath, final String doAsUser) {
FormDataMultiPart entitiesForm = getEntitiesForm(null, jobName, configPath);
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.UPDATE.path, extensionName)
.path(ExtensionOperations.UPDATE.path, jobName)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.UPDATE, entityStream);
.call(ExtensionOperations.UPDATE, entitiesForm);
return getResponse(APIResult.class, clientResponse);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ private void createDB(String sqlFile, boolean run) throws Exception {
"create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";

private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
String insertDbVersion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";

PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
writer.println();
writer.println(CREATE_FALCON_DB_PROPS);
writer.println(insertDbVerion);
writer.println(insertDbVersion);
writer.close();
System.out.println("Create FALCON_DB_PROPS table");
if (run) {
Expand All @@ -287,7 +287,7 @@ private void createFalconPropsTable(String sqlFile, boolean run, String version)
conn.setAutoCommit(true);
st = conn.createStatement();
st.executeUpdate(CREATE_FALCON_DB_PROPS);
st.executeUpdate(insertDbVerion);
st.executeUpdate(insertDbVersion);
st.close();
} catch (Exception ex) {
closeStatement(st);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,23 @@ public void deleteExtensionJob(String jobName) {
}
}

public void updateExtensionJob(String jobName, String extensionName, List<String> feedNames,
List<String> processNames, byte[] configBytes) {
EntityManager entityManager = getEntityManager();
ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean();
extensionJobsBean.setJobName(jobName);
extensionJobsBean.setExtensionName(extensionName);
extensionJobsBean.setFeeds(feedNames);
extensionJobsBean.setProcesses(processNames);
extensionJobsBean.setConfig(configBytes);
try {
beginTransaction(entityManager);
entityManager.merge(extensionJobsBean);
} finally {
commitAndCloseTransaction(entityManager);
}
}

public ExtensionJobsBean getExtensionJobDetails(String jobName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Expand Down
20 changes: 17 additions & 3 deletions extensions/src/test/java/org/apache/falcon/ExtensionExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,44 @@
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Schema;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.extensions.ExtensionBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
* Extension Example for testing extension loading and preparing entities.
*/
public class ExtensionExample implements ExtensionBuilder{

public static final Logger LOG = LoggerFactory.getLogger(ExtensionExample.class);
public static final String PROCESS_XML = "/extension-example.xml";

@Override
public List<Entity> getEntities(String extensionName, InputStream extensionConfigStream) throws FalconException {
Entity process;
Process process;
try {
process = (Entity) EntityType.PROCESS.getUnmarshaller().unmarshal(
process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(
getClass().getResourceAsStream(PROCESS_XML));
} catch (JAXBException e) {
throw new FalconException("Failed in un-marshalling the entity");
}
if (extensionConfigStream != null) {
Properties properties = new Properties();
try {
properties.load(extensionConfigStream);
} catch (IOException e) {
LOG.warn("Not able to load the configStream");
}
process.setPipelines(properties.getProperty("pipelines.name"));
}
List<Entity> entities = new ArrayList<>();
entities.add(process);
return entities;
Expand All @@ -52,7 +67,6 @@ public List<Entity> getEntities(String extensionName, InputStream extensionConfi
@Override
public void validateExtensionConfig(String extensionName, InputStream extensionConfigStream)
throws FalconException {

}

@Override
Expand Down
1 change: 0 additions & 1 deletion extensions/src/test/resources/extension-example.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
<parallel>1</parallel>
<order>LIFO</order>
<frequency>hours(1)</frequency>
<sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/>
<!-- how -->
<properties>
<property name="name1" value="value1"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void deleteEntityInstance(String entityName){
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES);
q.setParameter("entityName", entityName);
try{
try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,27 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
public static final Logger LOG = LoggerFactory.getLogger(AbstractExtensionManager.class);

private static final String JOB_NAME = "jobName";
public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
protected static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
private static final String EXTENSION_NAME = "extensionName";
private static final String FEEDS = "feeds";
private static final String PROCESSES = "processes";
private static final String CONFIG = "config";
private static final String CREATION_TIME = "creationTime";
private static final String LAST_UPDATE_TIME = "lastUpdatedTime";

private static final String NAME = "name";
private static final String EXTENSION_TYPE = "type";
private static final String EXTENSION_DESC = "description";
private static final String EXTENSION_LOCATION = "location";
public static final String NAME = "name";
protected static final String EXTENSION_TYPE = "type";
protected static final String EXTENSION_DESC = "description";
protected static final String EXTENSION_LOCATION = "location";

public static void validateExtensionName(final String extensionName) {
protected static void validateExtensionName(final String extensionName) {
if (StringUtils.isBlank(extensionName)) {
throw FalconWebException.newAPIException("Extension name is mandatory and shouldn't be blank",
Response.Status.BAD_REQUEST);
}
}

public APIResult registerExtensionMetadata(String extensionName, String path, String description, String owner) {
protected APIResult registerExtensionMetadata(String extensionName, String path, String description, String owner) {
validateExtensionName(extensionName);
try {
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().registerExtension(extensionName, path,
Expand All @@ -80,7 +80,7 @@ public APIResult getExtensionJobDetail(String jobName) {
}
}

public APIResult getExtensionDetail(String extensionName) {
protected APIResult getExtensionDetail(String extensionName) {
try {
return new APIResult(APIResult.Status.SUCCEEDED, buildExtensionDetailResult(extensionName).toString());
} catch (FalconException e) {
Expand Down Expand Up @@ -112,6 +112,7 @@ private JSONObject buildExtensionJobDetailResult(final String jobName) throws Fa
if (jobsBean == null) {
throw new ValidationException("Job name not found:" + jobName);
}
ExtensionBean extensionBean = metaStore.getDetail(jobsBean.getExtensionName());
JSONObject detailsObject = new JSONObject();
try {
detailsObject.put(JOB_NAME, jobsBean.getJobName());
Expand All @@ -121,6 +122,8 @@ private JSONObject buildExtensionJobDetailResult(final String jobName) throws Fa
detailsObject.put(CONFIG, jobsBean.getConfig());
detailsObject.put(CREATION_TIME, jobsBean.getCreationTime());
detailsObject.put(LAST_UPDATE_TIME, jobsBean.getLastUpdatedTime());
detailsObject.put(EXTENSION_LOCATION, extensionBean.getLocation());
detailsObject.put(EXTENSION_TYPE, extensionBean.getExtensionType());
} catch (JSONException e) {
LOG.error("Exception while building extension jon details for job {}", jobName, e);
}
Expand Down

0 comments on commit cc7c9e9

Please sign in to comment.