Skip to content

Commit

Permalink
Incorporated review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Jan 20, 2017
1 parent 7609139 commit 410f6a7
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 65 deletions.
Expand Up @@ -45,7 +45,7 @@
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "),
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"),
@NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName")
@NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select a.jobName from ExtensionJobsBean a where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionJobsBean {
Expand Down
Expand Up @@ -27,6 +27,7 @@

import javax.persistence.EntityManager;
import javax.persistence.Query;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

Expand Down Expand Up @@ -74,10 +75,7 @@ public Boolean checkIfExtensionExists(String extensionName) {
} finally {
commitAndCloseTransaction(entityManager);
}
if (resultSize > 0){
return true;
}
return false;
return resultSize > 0;
}

public Boolean checkIfExtensionJobExists(String jobName) {
Expand All @@ -91,18 +89,15 @@ public Boolean checkIfExtensionJobExists(String jobName) {
} finally {
commitAndCloseTransaction(entityManager);
}
if (resultSize > 0){
return true;
}
return false;
return resultSize > 0;
}

public List<ExtensionBean> getAllExtensions() {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSIONS);
try {
return (List<ExtensionBean>)q.getResultList();
return (List<ExtensionBean>) q.getResultList();
} finally {
commitAndCloseTransaction(entityManager);
}
Expand All @@ -113,7 +108,7 @@ public void deleteExtensionsOfType(ExtensionType extensionType) {
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE);
q.setParameter(EXTENSION_TYPE, extensionType);
try{
try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
Expand All @@ -128,7 +123,7 @@ public ExtensionBean getDetail(String extensionName) {
try {
List resultList = q.getResultList();
if (!resultList.isEmpty()) {
return (ExtensionBean)resultList.get(0);
return (ExtensionBean) resultList.get(0);
} else {
return null;
}
Expand All @@ -137,36 +132,26 @@ public ExtensionBean getDetail(String extensionName) {
}
}

public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
public List<String> getJobsForAnExtension(String extensionName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
query.setParameter(EXTENSION_NAME, extensionName);
List<String> jobNames = new ArrayList<>();
try {
return (List<ExtensionJobsBean>)query.getResultList();
jobNames.addAll((List<String>) query.getResultList());
} finally {
commitAndCloseTransaction(entityManager);
}
return jobNames;
}

public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
query.setParameter(EXTENSION_NAME, extensionName);
try {
return (List<ExtensionJobsBean>)query.getResultList();
} finally {
commitAndCloseTransaction(entityManager);
}
}

public void deleteExtension(String extensionName){
public void deleteExtension(String extensionName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_EXTENSION);
q.setParameter(EXTENSION_NAME, extensionName);
try{
try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
Expand All @@ -177,7 +162,7 @@ public void storeExtensionJob(String jobName, String extensionName, List<String>
byte[] config) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
boolean alreadySubmitted = false;
if (metaStore.getExtensionJobDetails(jobName) != null){
if (metaStore.getExtensionJobDetails(jobName) != null) {
alreadySubmitted = true;
}
ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean();
Expand Down Expand Up @@ -207,7 +192,7 @@ public void deleteExtensionJob(String jobName) {
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.DELETE_EXTENSION_JOB);
query.setParameter(JOB_NAME, jobName);
try{
try {
query.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
Expand Down Expand Up @@ -249,7 +234,7 @@ public ExtensionJobsBean getExtensionJobDetails(String jobName) {
}
}

public List<ExtensionJobsBean> getAllExtensionJobs() {
List<ExtensionJobsBean> getAllExtensionJobs() {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSION_JOBS);
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -383,17 +382,6 @@ public boolean isExtensionStoreInitialized() {
return (storePath != null);
}

public List<String> getJobsForAnExtension(final String extensionName) throws FalconException {
List<ExtensionJobsBean> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
List<String> extensionJobNames = new ArrayList<>();
if (null != extensionJobs && !extensionJobs.isEmpty()) {
for (ExtensionJobsBean extensionJobsBean : extensionJobs) {
extensionJobNames.add(extensionJobsBean.getJobName());
}
}
return extensionJobNames;
}

public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws
FalconException {
validateStatusChange(extensionName, currentUser);
Expand Down
Expand Up @@ -45,7 +45,7 @@ public class ExtensionMetaStoreTest extends AbstractTestExtensionStore {
private static ExtensionMetaStore stateStore;

@BeforeClass
public void setup() throws Exception{
public void setup() throws Exception {
initExtensionStore();
this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
this.conf = dfsCluster.getConf();
Expand All @@ -58,7 +58,7 @@ public void init() {
}

@Test
public void testExtension(){
public void testExtension() {
//insert
stateStore.storeExtensionBean("test1", "test_location", ExtensionType.TRUSTED, "test_description",
"falconUser");
Expand Down Expand Up @@ -86,6 +86,8 @@ public void testExtensionJob() {
//storing again to check for entity manager merge to let submission go forward.
stateStore.storeExtensionJob("job1", "test2", feeds, processes, config);

Assert.assertEquals(stateStore.getJobsForAnExtension("test2").size(), 1);
Assert.assertEquals(stateStore.getJobsForAnExtension("test2").get(0), "job1");
Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1);
Assert.assertEquals(stateStore.getExtensionJobDetails("job1").getFeeds().get(0), "testFeed");
stateStore.deleteExtensionJob("job1");
Expand Down
Expand Up @@ -113,7 +113,7 @@ public APIResult deleteExtensionMetadata(String extensionName) {
}

private void canDeleteExtension(String extensionName) throws FalconException {
ExtensionStore metaStore = ExtensionStore.get();
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
List<String> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
if (!extensionJobs.isEmpty()) {
LOG.error("Extension:{} cannot be unregistered as {} are instances of the extension", extensionName,
Expand Down Expand Up @@ -190,7 +190,7 @@ protected String enableExtension(String extensionName, String currentUser) {
private JSONObject buildExtensionDetailResult(final String extensionName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();

if (!metaStore.checkIfExtensionExists(extensionName)){
if (!metaStore.checkIfExtensionExists(extensionName)) {
throw new ValidationException("No extension resources found for " + extensionName);
}

Expand Down
Expand Up @@ -64,7 +64,6 @@
import javax.xml.bind.JAXBException;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Set;
Expand All @@ -85,7 +84,6 @@
public class ExtensionManagerProxy extends AbstractExtensionManager {
public static final Logger LOG = LoggerFactory.getLogger(ExtensionManagerProxy.class);

private static final String TAG_PREFIX_EXTENSION_NAME = "_falcon_extension_name=";
private static final String ASCENDING_SORT_ORDER = "asc";
private static final String DESCENDING_SORT_ORDER = "desc";
private Extension extension = new Extension();
Expand All @@ -96,6 +94,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
private EntityProxyUtil entityProxyUtil = new EntityProxyUtil();

private static final String EXTENSION_PROPERTY_JSON_SUFFIX = "-properties.json";

//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
@GET
@Path("list/{extension-name}")
Expand All @@ -107,7 +106,7 @@ public ExtensionJobList getExtensionJobs(
checkIfExtensionServiceIsEnabled();
checkIfExtensionExists(extensionName);
try {
List<String> jobNames = ExtensionStore.get().getJobsForAnExtension(extensionName);
List<String> jobNames = ExtensionStore.getMetaStore().getJobsForAnExtension(extensionName);
switch (sortOrder.toLowerCase()) {
case DESCENDING_SORT_ORDER:
Collections.sort(jobNames, Collections.reverseOrder(String.CASE_INSENSITIVE_ORDER));
Expand All @@ -116,7 +115,7 @@ public ExtensionJobList getExtensionJobs(
Collections.sort(jobNames, String.CASE_INSENSITIVE_ORDER);
}
return new ExtensionJobList(jobNames.size(), jobNames);
} catch (FalconException e) {
} catch (Throwable e) {
LOG.error("Failed to get extension job list of " + extensionName + ": ", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
Expand Down Expand Up @@ -758,25 +757,11 @@ private List<Entity> generateEntities(String extensionName, InputStream configSt
return entities;
}

private Map<String, List<Entity>> groupEntitiesByJob(List<Entity> entities) {
Map<String, List<Entity>> groupedEntities = new HashMap<>();
for (Entity entity : entities) {
String jobName = getJobNameFromTag(entity.getTags());
if (!groupedEntities.containsKey(jobName)) {
groupedEntities.put(jobName, new ArrayList<Entity>());
}
groupedEntities.get(jobName).add(entity);
}
return groupedEntities;
}

private static void checkIfExtensionServiceIsEnabled() {
if (!Services.get().isRegistered(ExtensionService.SERVICE_NAME)) {
LOG.error(ExtensionService.SERVICE_NAME + " is not enabled.");
throw FalconWebException.newAPIException(
ExtensionService.SERVICE_NAME + " is not enabled.", Response.Status.NOT_FOUND);
}
}


}

0 comments on commit 410f6a7

Please sign in to comment.