Permalink
Browse files

target partition expression for feed replication

  • Loading branch information...
1 parent 942781c commit 620b2bcba6937d47f2ab8903ce578dcc8a2ab50f Shwetha GS committed Oct 8, 2012
@@ -116,7 +116,7 @@
<xs:element type="retention" name="retention" />
</xs:sequence>
<xs:attribute type="IDENTIFIER" name="name" use="required" />
- <xs:attribute type="cluster-type" name="type" use="required" />
+ <xs:attribute type="cluster-type" name="type" use="optional" />
<xs:attribute type="xs:string" name="partition" use="optional" />
</xs:complexType>
<xs:complexType name="partitions">
@@ -18,10 +18,18 @@
package org.apache.ivory.entity;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.ivory.IvoryException;
+import org.apache.ivory.entity.v0.cluster.Property;
import org.apache.ivory.entity.v0.feed.Cluster;
import org.apache.ivory.entity.v0.feed.Feed;
import org.apache.ivory.entity.v0.feed.Location;
import org.apache.ivory.entity.v0.feed.LocationType;
+import org.apache.ivory.expression.ExpressionHelper;
public class FeedHelper {
public static Cluster getCluster(Feed feed, String clusterName) {
@@ -37,4 +45,37 @@ public static Location getLocation(Feed feed, LocationType type) {
return loc;
return null;
}
-}
+
+ public static String normalizePartitionExpression(String part1, String part2) {
+ String partExp = StringUtils.stripToEmpty(part1) + "/" + StringUtils.stripToEmpty(part2);
+ partExp = partExp.replaceAll("//+", "/");
+ partExp = StringUtils.stripStart(partExp, "/");
+ partExp = StringUtils.stripEnd(partExp, "/");
+ return partExp;
+ }
+
+ public static String normalizePartitionExpression(String partition) {
+ return normalizePartitionExpression(partition, null);
+ }
+
+ private static Properties loadClusterProperties(org.apache.ivory.entity.v0.cluster.Cluster cluster) {
+ Properties properties = new Properties();
+ Map<String, String> clusterVars = new HashMap<String, String>();
+ clusterVars.put("colo", cluster.getColo());
+ clusterVars.put("name", cluster.getName());
+ if (cluster.getProperties() != null) {
+ for (Property property : cluster.getProperties().getProperties())
+ clusterVars.put(property.getName(), property.getValue());
+ }
+ properties.put("cluster", clusterVars);
+ return properties;
+ }
+
+ public static String evaluateClusterExp(org.apache.ivory.entity.v0.cluster.Cluster clusterEntity, String exp)
+ throws IvoryException {
+ Properties properties = loadClusterProperties(clusterEntity);
+ ExpressionHelper expHelp = ExpressionHelper.get();
+ expHelp.setPropertiesForVariable(properties);
+ return expHelp.evaluateFullExpression(exp, String.class);
+ }
+}
@@ -19,20 +19,18 @@
package org.apache.ivory.entity.parser;
import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
+import org.apache.commons.lang.StringUtils;
import org.apache.ivory.IvoryException;
+import org.apache.ivory.entity.EntityUtil;
import org.apache.ivory.entity.FeedHelper;
import org.apache.ivory.entity.store.ConfigurationStore;
import org.apache.ivory.entity.v0.Entity;
import org.apache.ivory.entity.v0.EntityGraph;
import org.apache.ivory.entity.v0.EntityType;
-import org.apache.ivory.entity.v0.cluster.Property;
import org.apache.ivory.entity.v0.feed.Cluster;
import org.apache.ivory.entity.v0.feed.ClusterType;
import org.apache.ivory.entity.v0.feed.Feed;
@@ -65,9 +63,9 @@ public void validate(Feed feed) throws IvoryException {
validateEntityExists(EntityType.CLUSTER, cluster.getName());
validateClusterValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd(), cluster.getName());
validateFeedCutOffPeriod(feed, cluster);
- validateFeedPartitionExpression(feed, cluster);
}
+ validateFeedPartitionExpression(feed);
validateFeedSourceCluster(feed);
validateFeedGroups(feed);
@@ -197,68 +195,59 @@ private void validateFeedCutOffPeriod(Feed feed, Cluster cluster) throws IvoryEx
+ " should be more than feed's late arrival cut-off period: " + feedCutoff + " for feed: " + feed.getName());
}
}
-
- private static void loadClusterProperties(Properties prop, org.apache.ivory.entity.v0.cluster.Cluster cluster) {
- Map<String, String> clusterVars = new HashMap<String, String>();
- clusterVars.put("colo", cluster.getColo());
- clusterVars.put("name", cluster.getName());
- if (cluster.getProperties() != null) {
- for (Property property : cluster.getProperties().getProperties())
- clusterVars.put(property.getName(), property.getValue());
- }
- prop.put("cluster", clusterVars);
- }
-
- private void validateFeedPartitionExpression(Feed feed, Cluster cluster) throws IvoryException {
- int expressions = 0, numSourceClusters = 0, numTargetClusters = 0;
+
+ private void validateFeedPartitionExpression(Feed feed) throws IvoryException {
+ int numSourceClusters = 0, numTrgClusters = 0;
for (Cluster cl : feed.getClusters().getClusters()) {
- if (cl.getType().equals(ClusterType.SOURCE)){
+ if (cl.getType() == ClusterType.SOURCE){
numSourceClusters++;
- }
- if (cl.getType().equals(ClusterType.TARGET)){
- numTargetClusters++;
+ } else if(cl.getType() == ClusterType.TARGET) {
+ numTrgClusters++;
}
}
- if (cluster.getType().equals(ClusterType.SOURCE) && cluster.getPartition() != null && numSourceClusters != 1) {
- String[] tokens = cluster.getPartition().split("/");
- if (feed.getPartitions() == null)
- throw new ValidationException("Feed Partitions not specified for feed: " + feed.getName());
- if (tokens.length != feed.getPartitions().getPartitions().size()) {
- throw new ValidationException(
- "Number of expressions in Partition Expression are not equal to number of feed partitions");
- } else {
- org.apache.ivory.entity.v0.cluster.Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER,
- cluster.getName());
- for (String token : tokens) {
- String val = getPartitionExpValue(clusterEntity, token);
- if (!val.equals(token)) {
- expressions++;
- break;
+
+ int feedParts = feed.getPartitions() != null ? feed.getPartitions().getPartitions().size() : 0;
+
+ for(Cluster cluster:feed.getClusters().getClusters()) {
+
+ if(cluster.getType() == ClusterType.SOURCE && numSourceClusters > 1 && numTrgClusters >= 1) {
+ String part = FeedHelper.normalizePartitionExpression(cluster.getPartition());
+ if(StringUtils.split(part, '/').length == 0)
+ throw new ValidationException("Partition expression has to be specified for cluster " + cluster.getName() +
+ " as there are more than one source clusters");
+ validateClusterExpDefined(cluster);
+
+ } else if(cluster.getType() == ClusterType.TARGET) {
+
+ for(Cluster src:feed.getClusters().getClusters()) {
+ if(src.getType() == ClusterType.SOURCE) {
+ String part = FeedHelper.normalizePartitionExpression(src.getPartition(), cluster.getPartition());
+ int numParts = StringUtils.split(part, '/').length;
+ if(numParts > feedParts)
+ throw new ValidationException("Partition for " + src.getName() + " and " + cluster.getName() +
+ "clusters is more than the number of partitions defined in feed");
}
}
- if (expressions == 0)
- throw new ValidationException("Alteast one of the partition tags has to be an expression");
+
+ if(numTrgClusters > 1 && numSourceClusters >= 1) {
+ String part = FeedHelper.normalizePartitionExpression(cluster.getPartition());
+ if(StringUtils.split(part, '/').length == 0)
+ throw new ValidationException("Partition expression has to be specified for cluster " + cluster.getName() +
+ " as there are more than one target clusters");
+ validateClusterExpDefined(cluster);
+ }
+
}
- } else {
- if (cluster.getPartition() != null && cluster.getType().equals(ClusterType.TARGET))
- throw new ValidationException("Target Cluster should not have Partition Expression");
- else if (cluster.getPartition() == null
- && cluster.getType().equals(ClusterType.SOURCE)
- && numSourceClusters > 1 && numTargetClusters > 0)
- throw new ValidationException("Partition Expression is missing for the cluster: " + cluster.getName());
- else if (cluster.getPartition() != null && numSourceClusters == 1)
- throw new ValidationException("Partition Expression not expected for the cluster:" + cluster.getName());
-
}
-
}
- public static String getPartitionExpValue(org.apache.ivory.entity.v0.cluster.Cluster clusterEntity, String exp)
- throws IvoryException {
- Properties properties = new Properties();
- loadClusterProperties(properties, clusterEntity);
- ExpressionHelper expHelp = ExpressionHelper.get();
- expHelp.setPropertiesForVariable(properties);
- return expHelp.evaluateFullExpression(exp, String.class);
+ private void validateClusterExpDefined(Cluster cl) throws IvoryException {
+ if(cl.getPartition() == null)
+ return;
+
+ org.apache.ivory.entity.v0.cluster.Cluster cluster = (org.apache.ivory.entity.v0.cluster.Cluster) EntityUtil.getEntity(EntityType.CLUSTER, cl.getName());
+ String part = FeedHelper.normalizePartitionExpression(cl.getPartition());
+ if(FeedHelper.evaluateClusterExp(cluster, part).equals(part))
+ throw new ValidationException("Alteast one of the partition tags has to be a cluster expression for cluster " + cl.getName());
}
}
@@ -19,7 +19,7 @@
public final class UpdateHelper {
private static final Logger LOG = Logger.getLogger(UpdateHelper.class);
- private static final String[] FEED_FIELDS = new String[] { "groups", "lateArrival.cutOff", "schema.location", "schema.provider",
+ private static final String[] FEED_FIELDS = new String[] { "partitions", "groups", "lateArrival.cutOff", "schema.location", "schema.provider",
"ACL.group", "ACL.owner", "ACL.permission"};
private static final String[] PROCESS_FIELDS = new String[] { "retry.policy", "retry.delay", "retry.attempts",
"lateProcess.policy", "lateProcess.delay", "lateProcess.lateInputs[\\d+].input", "lateProcess.lateInputs[\\d+].workflowPath"};
@@ -0,0 +1,32 @@
+package org.apache.ivory.entity;
+
+import org.apache.ivory.entity.v0.cluster.Cluster;
+import org.apache.ivory.entity.v0.cluster.Properties;
+import org.apache.ivory.entity.v0.cluster.Property;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class FeedHelperTest {
+ @Test
+ public void testPartitionExpression() {
+ Assert.assertEquals(FeedHelper.normalizePartitionExpression(" /a// ", " /b// "), "a/b");
+ Assert.assertEquals(FeedHelper.normalizePartitionExpression(null, " /b// "), "b");
+ Assert.assertEquals(FeedHelper.normalizePartitionExpression(null, null), "");
+ }
+
+ @Test
+ public void testEvaluateExpression() throws Exception {
+ Cluster cluster = new Cluster();
+ cluster.setName("name");
+ cluster.setColo("colo");
+ cluster.setProperties(new Properties());
+ Property prop = new Property();
+ prop.setName("pname");
+ prop.setValue("pvalue");
+ cluster.getProperties().getProperties().add(prop);
+
+ Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster, "${cluster.colo}/*/US"), "colo/*/US");
+ Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster, "${cluster.name}/*/${cluster.pname}"), "name/*/pvalue");
+ Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster, "IN"), "IN");
+ }
+}
@@ -154,11 +154,53 @@ public void applyValidationInvalidFeed() throws Exception {
@Test
public void testPartitionExpression() throws IvoryException {
- Feed feed = (Feed) parser
- .parseAndValidate(ProcessEntityParserTest.class
- .getResourceAsStream(FEED_XML));
- feed.setPartitions(null);
- parser.validate(feed);
+ Feed feed = (Feed) parser.parseAndValidate(ProcessEntityParserTest.class
+ .getResourceAsStream(FEED_XML));
+
+ //When there are more than 1 src clusters, there should be partition expression
+ org.apache.ivory.entity.v0.feed.Cluster newCluster = new org.apache.ivory.entity.v0.feed.Cluster();
+ newCluster.setName("newCluster");
+ newCluster.setType(ClusterType.SOURCE);
+ newCluster.setPartition("${cluster.colo}");
+ feed.getClusters().getClusters().add(newCluster);
+ try {
+ parser.validate(feed);
+ Assert.fail("Expected ValidationException");
+ } catch(ValidationException e) { }
+
+ //When there are more than 1 src clusters, the partition expression should contain cluster variable
+ feed.getClusters().getClusters().get(0).setPartition("*");
+ try {
+ parser.validate(feed);
+ Assert.fail("Expected ValidationException");
+ } catch(ValidationException e) { }
+
+ //When there are more than 1 target cluster, there should be partition expre
+ newCluster.setType(ClusterType.TARGET);
+ try {
+ parser.validate(feed);
+ Assert.fail("Expected ValidationException");
+ } catch(ValidationException e) { }
+
+ //When there are more than 1 target clusters, the partition expression should contain cluster variable
+ feed.getClusters().getClusters().get(1).setPartition("*");
+ try {
+ parser.validate(feed);
+ Assert.fail("Expected ValidationException");
+ } catch(ValidationException e) { }
+
+ //Number of parts in partition expression < number of partitions defined for feed
+ feed.getClusters().getClusters().get(1).setPartition("*/*");
+ try {
+ parser.validate(feed);
+ Assert.fail("Expected ValidationException");
+ } catch(ValidationException e) { }
+
+ feed.getClusters().getClusters().get(0).setPartition(null);
+ feed.getClusters().getClusters().get(1).setPartition(null);
+ feed.getClusters().getClusters().remove(2);
+ feed.setPartitions(null);
+ parser.validate(feed);
}
@Test
@@ -306,11 +348,11 @@ public void testInvalidGroupNames() throws IvoryException, JAXBException {
public void testClusterPartitionExp() throws IvoryException {
Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER,
"testCluster");
- Assert.assertEquals(FeedEntityParser.getPartitionExpValue(cluster,
+ Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster,
"/*/${cluster.colo}"), "/*/" + cluster.getColo());
- Assert.assertEquals(FeedEntityParser.getPartitionExpValue(cluster,
+ Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster,
"/*/${cluster.name}/Local"), "/*/" + cluster.getName()+"/Local");
- Assert.assertEquals(FeedEntityParser.getPartitionExpValue(cluster,
+ Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster,
"/*/${cluster.field1}/Local"), "/*/value1/Local");
}
@@ -32,10 +32,8 @@
import org.apache.ivory.entity.ClusterHelper;
import org.apache.ivory.entity.EntityUtil;
import org.apache.ivory.entity.FeedHelper;
-import org.apache.ivory.entity.parser.FeedEntityParser;
import org.apache.ivory.entity.store.ConfigurationStore;
import org.apache.ivory.entity.v0.EntityType;
-import org.apache.ivory.entity.v0.Frequency;
import org.apache.ivory.entity.v0.Frequency.TimeUnit;
import org.apache.ivory.entity.v0.SchemaHelper;
import org.apache.ivory.entity.v0.cluster.Cluster;
@@ -141,10 +139,12 @@ private ACTION getRetentionWorkflowAction(Cluster cluster, Path wfPath, String w
private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath) throws IvoryException {
Feed feed = getEntity();
List<COORDINATORAPP> replicationCoords = new ArrayList<COORDINATORAPP>();
- if (FeedHelper.getCluster(feed, targetCluster.getName()).getType().equals(ClusterType.TARGET)) {
+
+ if (FeedHelper.getCluster(feed, targetCluster.getName()).getType() == ClusterType.TARGET) {
String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, feed).toString();
Path basePath = getCoordPath(bundlePath, coordName);
createReplicatonWorkflow(targetCluster, basePath, coordName);
+
for (org.apache.ivory.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
if (feedCluster.getType().equals(ClusterType.SOURCE)) {
COORDINATORAPP coord = createAndGetCoord(feed,
@@ -231,10 +231,14 @@ private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgClust
try {
replicationWF.setAppPath(getHDFSPath(wfPath.toString()));
Feed feed = getEntity();
+
+ String srcPart = FeedHelper.normalizePartitionExpression(FeedHelper.getCluster(feed, srcCluster.getName()).getPartition());
+ srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
+ String targetPart = FeedHelper.normalizePartitionExpression(FeedHelper.getCluster(feed, trgCluster.getName()).getPartition());
+ targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
+
StringBuilder pathsWithPartitions = new StringBuilder();
- pathsWithPartitions.append("${coord:dataIn('input')}").append(
- FeedHelper.getCluster(feed, srcCluster.getName()).getPartition() == null ? "" : "/"+FeedEntityParser.getPartitionExpValue(
- srcCluster, FeedHelper.getCluster(feed, srcCluster.getName()).getPartition()));
+ pathsWithPartitions.append("${coord:dataIn('input')}/").append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
props.put("srcClusterName", srcCluster.getName());

0 comments on commit 620b2bc

Please sign in to comment.