Skip to content

Commit

Permalink
FALCON-2200 Incorporated review comments. Moved common code from prox…
Browse files Browse the repository at this point in the history
…ies to proxyutil and making 2 api calls to get location in case of update extension
  • Loading branch information
sandeepSamudrala committed Dec 30, 2016
1 parent c8d0ab7 commit 8a4d035
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 210 deletions.
11 changes: 11 additions & 0 deletions client/src/main/java/org/apache/falcon/ExtensionHandler.java
Expand Up @@ -54,6 +54,7 @@ public final class ExtensionHandler {
private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
private static final String LOCATION = "location";
private static final String TYPE = "type";
private static final String NAME = "name";
private static final String EXTENSION_BUILDER_INTERFACE_SERVICE_FILE =
"META-INF/services/org.apache.falcon.extensions.ExtensionBuilder";

Expand Down Expand Up @@ -220,4 +221,14 @@ public static String getExtensionType(String extensionName, JSONObject extensio
}
return extensionType;
}

public static String getExtensionName(String jobName, JSONObject extensionJobDetailJson) {
String extensionType;
try {
extensionType = extensionJobDetailJson.get(NAME).toString();
} catch (JSONException e) {
throw new FalconCLIException("Failed to get extension name for the given extension job:" + jobName, e);
}
return extensionType;
}
}
18 changes: 8 additions & 10 deletions client/src/main/java/org/apache/falcon/client/FalconClient.java
Expand Up @@ -340,7 +340,7 @@ protected static enum AdminOperations {
*/
protected static enum ExtensionOperations {

ENUMERATE("api/extension/enumerate/", HttpMethod.GET, MediaType.APPLICATION_JSON),
ENUMERATE("api/extension/enumerate/", HttpMethod.GET, MediaType.TEXT_XML),
DESCRIBE("api/extension/describe/", HttpMethod.GET, MediaType.TEXT_PLAIN),
DEFINITION("api/extension/definition", HttpMethod.GET, MediaType.APPLICATION_JSON),
LIST("api/extension/list", HttpMethod.GET, MediaType.APPLICATION_JSON),
Expand All @@ -353,10 +353,10 @@ protected static enum ExtensionOperations {
SUSPEND("api/extension/suspend", HttpMethod.POST, MediaType.TEXT_XML),
RESUME("api/extension/resume", HttpMethod.POST, MediaType.TEXT_XML),
DELETE("api/extension/delete", HttpMethod.POST, MediaType.TEXT_XML),
UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_PLAIN),
UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_XML),
DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON),
JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.APPLICATION_JSON),
REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_PLAIN);
REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML);

private String path;
private String method;
Expand Down Expand Up @@ -1101,12 +1101,10 @@ private FormDataMultiPart getEntitiesForm(String extensionName, String jobName,
private List<Entity> validateExtensionAndGetEntities(String extensionName, String jobName,
InputStream configStream) {
JSONObject extensionDetailJson;
if (StringUtils.isNotBlank(extensionName)) {
extensionDetailJson = getExtensionDetailJson(extensionName);
} else {
extensionDetailJson = getExtensionJobDetailJson(jobName);
if (StringUtils.isBlank(extensionName)) {
extensionName = ExtensionHandler.getExtensionName(jobName, getExtensionJobDetailJson(jobName));
}

extensionDetailJson = getExtensionDetailJson(extensionName);
String extensionType = ExtensionHandler.getExtensionType(extensionName, extensionDetailJson);
String extensionBuildLocation = ExtensionHandler.getExtensionLocation(extensionName, extensionDetailJson);
return getEntities(extensionName, jobName, configStream, extensionType,
Expand All @@ -1117,7 +1115,7 @@ private JSONObject getExtensionDetailJson(String extensionName) {
ClientResponse clientResponse = getExtensionDetailResponse(extensionName);
JSONObject extensionDetailJson;
try {
extensionDetailJson = new JSONObject(clientResponse.getEntity(APIResult.class).getMessage());
extensionDetailJson = new JSONObject(getResponse(APIResult.class, clientResponse).getMessage());
} catch (JSONException e) {
throw new FalconCLIException("Failed to get details for the given extension", e);
}
Expand All @@ -1127,7 +1125,7 @@ private JSONObject getExtensionJobDetailJson(String jobName) {
ClientResponse clientResponse = getExtensionJobDetailsResponse(jobName);
JSONObject extensionJobDetailJson;
try {
extensionJobDetailJson = new JSONObject(clientResponse.getEntity(APIResult.class).getMessage());
extensionJobDetailJson = new JSONObject(getResponse(APIResult.class, clientResponse).getMessage());
} catch (JSONException e) {
throw new FalconCLIException("Failed to get details for the given extension", e);
}
Expand Down
Expand Up @@ -116,7 +116,7 @@ protected static void checkColo(String colo) {
}
}

protected Set<String> getAllColos() {
public static Set<String> getAllColos() {
if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
}
Expand All @@ -141,7 +141,7 @@ protected Set<String> getColosFromExpression(String coloExpr, String type, Strin
return colos;
}

protected Set<String> getApplicableColos(String type, String name) {
public static Set<String> getApplicableColos(String type, String name) {
try {
if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
Expand All @@ -157,7 +157,7 @@ protected Set<String> getApplicableColos(String type, String name) {
}
}

protected Set<String> getApplicableColos(String type, Entity entity) {
public static Set<String> getApplicableColos(String type, Entity entity) {
try {
if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
Expand Down
@@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.falcon.resource.proxy;

import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.channel.Channel;
import org.apache.falcon.resource.channel.ChannelFactory;
import org.apache.falcon.util.DeploymentUtil;

import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.apache.falcon.resource.AbstractEntityManager.getAllColos;
import static org.apache.falcon.resource.AbstractEntityManager.getApplicableColos;
import static org.apache.falcon.resource.proxy.SchedulableEntityManagerProxy.FALCON_TAG;

class EntityProxyUtil {
private final Map<String, Channel> entityManagerChannels = new HashMap<>();
private final Map<String, Channel> configSyncChannels = new HashMap<>();

EntityProxyUtil() {
try {
Set<String> colos = getAllColos();

for (String colo : colos) {
initializeFor(colo);
}

DeploymentUtil.setPrismMode();
} catch (FalconException e) {
throw new FalconRuntimException("Unable to initialize channels", e);
}
}
private void initializeFor(String colo) throws FalconException {
entityManagerChannels.put(colo, ChannelFactory.get("SchedulableEntityManager", colo));
configSyncChannels.put(colo, ChannelFactory.get("ConfigSyncService", colo));
}

Channel getConfigSyncChannel(String colo) throws FalconException {
if (!configSyncChannels.containsKey(colo)) {
initializeFor(colo);
}
return configSyncChannels.get(colo);
}

Channel getEntityManager(String colo) throws FalconException {
if (!entityManagerChannels.containsKey(colo)) {
initializeFor(colo);
}
return entityManagerChannels.get(colo);
}

Map<String, APIResult> proxySubmit(final String type, final HttpServletRequest bufferedRequest,
final Entity entity, final Set<String> colos) {
Map<String, APIResult> results = new HashMap<>();
results.put(FALCON_TAG, new EntityProxy(type, entity.getName()) {
@Override
protected Set<String> getColosToApply() {
return colos;
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type, colo);
}
}.execute());
return results;
}

Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun,
final HttpServletRequest bufferedRequest, Entity newEntity) {
final Set<String> oldColos = getApplicableColos(type, entityName);
final Set<String> newColos = getApplicableColos(type, newEntity);
final Set<String> mergedColos = new HashSet<>();
mergedColos.addAll(oldColos);
mergedColos.retainAll(newColos); //Common colos where update should be called
newColos.removeAll(oldColos); //New colos where submit should be called
oldColos.removeAll(mergedColos); //Old colos where delete should be called

Map<String, APIResult> results = new HashMap<>();
if (!oldColos.isEmpty()) {
results.put(FALCON_TAG + "/delete", new EntityProxy(type, entityName) {
@Override
protected Set<String> getColosToApply() {
return oldColos;
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getConfigSyncChannel(colo).invoke("delete", bufferedRequest,
type, entityName, colo);
}
}.execute());
}

if (!mergedColos.isEmpty()) {
results.put(FALCON_TAG + "/update", new EntityProxy(type, entityName) {
@Override
protected Set<String> getColosToApply() {
return mergedColos;
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getConfigSyncChannel(colo).invoke("update", bufferedRequest,
type, entityName,
colo, skipDryRun);
}
}.execute());
}

if (!newColos.isEmpty()) {
results.put(FALCON_TAG + "/submit", new EntityProxy(type, entityName) {
@Override
protected Set<String> getColosToApply() {
return newColos;
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type,
colo);
}
}.execute());
}
return results;
}
}

0 comments on commit 8a4d035

Please sign in to comment.