Permalink
Browse files

Merge branch 'master' of github.com:sriksun/Ivory

Conflicts:
	common/src/main/java/org/apache/ivory/entity/parser/FeedEntityParser.java
	feed/src/main/java/org/apache/ivory/converter/OozieFeedMapper.java
	process/src/main/java/org/apache/ivory/converter/OozieProcessMapper.java
  • Loading branch information...
2 parents 82ec929 + 0681050 commit de3bd51eb275b8dd2ac2eef5fec5196b662aeee2 Shwetha GS committed Oct 29, 2012
Showing with 306 additions and 78 deletions.
  1. +2 −0 client/src/main/resources/feed-0.1.xsd
  2. +11 −0 common/src/main/java/org/apache/ivory/cleanup/AbstractCleanupHandler.java
  3. +1 −1 common/src/main/java/org/apache/ivory/cleanup/FeedCleanupHandler.java
  4. +2 −2 common/src/main/java/org/apache/ivory/cleanup/ProcessCleanupHandler.java
  5. +29 −6 common/src/main/java/org/apache/ivory/entity/FeedHelper.java
  6. +20 −5 common/src/main/java/org/apache/ivory/entity/parser/ClusterEntityParser.java
  7. +27 −12 common/src/main/java/org/apache/ivory/entity/parser/FeedEntityParser.java
  8. +5 −0 common/src/main/java/org/apache/ivory/entity/parser/ProcessEntityParser.java
  9. +1 −1 common/src/main/java/org/apache/ivory/group/FeedGroup.java
  10. +44 −6 common/src/main/java/org/apache/ivory/update/UpdateHelper.java
  11. +2 −1 common/src/test/java/org/apache/ivory/cleanup/LogCleanupServiceTest.java
  12. +18 −2 common/src/test/java/org/apache/ivory/entity/parser/ClusterEntityParserTest.java
  13. +26 −1 common/src/test/java/org/apache/ivory/entity/parser/FeedEntityParserTest.java
  14. +9 −0 common/src/test/java/org/apache/ivory/entity/parser/ProcessEntityParserTest.java
  15. +5 −0 common/src/test/resources/config/feed/feed-0.1.xml
  16. +5 −4 feed/src/main/java/org/apache/ivory/converter/OozieFeedMapper.java
  17. +8 −8 feed/src/main/resources/config/workflow/replication-workflow.xml
  18. +4 −4 feed/src/main/resources/config/workflow/retention-workflow.xml
  19. +3 −3 oozie/src/main/java/org/apache/ivory/converter/AbstractOozieEntityMapper.java
  20. +2 −3 oozie/src/main/java/org/apache/ivory/logging/LogMover.java
  21. +9 −2 oozie/src/main/java/org/apache/ivory/service/SharedLibraryHostingService.java
  22. +6 −0 oozie/src/main/java/org/apache/ivory/workflow/IvoryPostProcessing.java
  23. +5 −0 oozie/src/main/java/org/apache/ivory/workflow/engine/NullBundleJob.java
  24. +5 −0 oozie/src/main/java/org/apache/ivory/workflow/engine/NullCoordJob.java
  25. +35 −5 process/src/main/java/org/apache/ivory/converter/OozieProcessMapper.java
  26. +6 −6 process/src/main/resources/config/workflow/process-parent-workflow.xml
  27. +7 −3 process/src/test/java/org/apache/ivory/converter/OozieProcessMapperTest.java
  28. +5 −0 process/src/test/resources/config/feed/feed-0.1.xml
  29. +4 −3 rerun/src/main/java/org/apache/ivory/rerun/handler/LateRerunHandler.java
@@ -114,6 +114,7 @@
<xs:sequence>
<xs:element type="validity" name="validity" />
<xs:element type="retention" name="retention" />
+ <xs:element type="locations" name="locations" minOccurs="0"/>
</xs:sequence>
<xs:attribute type="IDENTIFIER" name="name" use="required" />
<xs:attribute type="cluster-type" name="type" use="optional" />
@@ -287,6 +288,7 @@
<xs:enumeration value="data" />
<xs:enumeration value="stats" />
<xs:enumeration value="meta" />
+ <xs:enumeration value="tmp" />
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="IDENTIFIER">
@@ -109,6 +109,7 @@ protected void delete(Cluster cluster, Entity entity, long retention)
} else {
LOG.info("Deleted path: " + log.getPath());
}
+ deleteParentIfEmpty(getFileSystem(cluster),log.getPath().getParent());
} catch (IOException e) {
throw new IvoryException(" Unable to delete log file : "
+ log.getPath() + " for entity " + entity.getName()
@@ -124,6 +125,16 @@ protected void delete(Cluster cluster, Entity entity, long retention)
}
+ private void deleteParentIfEmpty(FileSystem fs, Path parent) throws IOException {
+ FileStatus[] files = fs.listStatus(parent);
+ if(files!=null && files.length==0){
+ LOG.info("Parent path: "+parent+ " is empty, deleting path");
+ fs.delete(parent, true);
+ deleteParentIfEmpty(fs,parent.getParent());
+ }
+
+ }
+
public abstract void cleanup() throws IvoryException;
protected abstract Path getLogPath(Entity entity, String stagingPath);
@@ -49,7 +49,7 @@ public void cleanup() throws IvoryException {
@Override
protected Path getLogPath(Entity entity, String stagingPath) {
Path logPath = new Path(stagingPath, "ivory/workflows/feed/"
- + entity.getName() + "/logs/job-*");
+ + entity.getName() + "/logs/job-*/*/*");
return logPath;
}
@@ -39,7 +39,7 @@ public void cleanup() throws IvoryException {
for (org.apache.ivory.entity.v0.process.Cluster cluster : process
.getClusters().getClusters()) {
LOG.info("Cleaning up logs for process:" + processName
- + " in cluster: " + cluster.getName());
+ + " in cluster: " + cluster.getName() + " with retention: "+retention);
Cluster currentCluster = STORE.get(EntityType.CLUSTER,
cluster.getName());
delete(currentCluster, process, retention);
@@ -51,7 +51,7 @@ public void cleanup() throws IvoryException {
@Override
protected Path getLogPath(Entity entity, String stagingPath) {
Path logPath = new Path(stagingPath, "ivory/workflows/process/"
- + entity.getName() + "/logs/job-*");
+ + entity.getName() + "/logs/job-*/*");
return logPath;
}
@@ -29,6 +29,7 @@
import org.apache.ivory.entity.v0.feed.Feed;
import org.apache.ivory.entity.v0.feed.Location;
import org.apache.ivory.entity.v0.feed.LocationType;
+import org.apache.ivory.entity.v0.feed.Locations;
import org.apache.ivory.expression.ExpressionHelper;
public class FeedHelper {
@@ -39,12 +40,34 @@ public static Cluster getCluster(Feed feed, String clusterName) {
return null;
}
- public static Location getLocation(Feed feed, LocationType type) {
- for(Location loc:feed.getLocations().getLocations())
- if(loc.getType() == type)
- return loc;
- return null;
- }
+ public static Location getLocation(Feed feed, LocationType type,
+ String clusterName) {
+ Cluster cluster = getCluster(feed, clusterName);
+ if (cluster!=null &&cluster.getLocations() != null
+ && cluster.getLocations() .getLocations().size() != 0) {
+ return getLocation(cluster.getLocations() , type);
+ }
+ else{
+ return getLocation(feed.getLocations(), type);
+ }
+
+ }
+
+ public static Location getLocation(Feed feed, LocationType type) {
+ return getLocation(feed.getLocations(), type);
+ }
+
+ public static Location getLocation(Locations locations, LocationType type) {
+ for (Location loc : locations.getLocations()) {
+ if (loc.getType() == type) {
+ return loc;
+ }
+ }
+ Location loc = new Location();
+ loc.setPath("/tmp");
+ loc.setType(type);
+ return loc;
+ }
public static String normalizePartitionExpression(String part1, String part2) {
String partExp = StringUtils.stripToEmpty(part1) + "/" + StringUtils.stripToEmpty(part2);
@@ -18,6 +18,8 @@
package org.apache.ivory.entity.parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.ivory.entity.ClusterHelper;
import org.apache.ivory.entity.store.StoreAccessException;
import org.apache.ivory.entity.v0.EntityType;
@@ -33,9 +35,22 @@ public ClusterEntityParser() {
}
@Override
- public void validate(Cluster cluster) throws StoreAccessException, ValidationException {
- if (!ClusterHelper.getHdfsUrl(cluster).startsWith("hdfs://")) {
- throw new ValidationException("Cannot get valid nameNode from write interface of cluster: " + cluster.getName());
- }
- }
+ public void validate(Cluster cluster) throws StoreAccessException,
+ ValidationException {
+ if (!ClusterHelper.getHdfsUrl(cluster).startsWith("hdfs://")) {
+ throw new ValidationException(
+ "Cannot get valid nameNode from write interface of cluster: "
+ + cluster.getName());
+ }
+ try {
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", ClusterHelper.getHdfsUrl(cluster));
+ conf.setInt("ipc.client.connect.max.retries", 10);
+ FileSystem.get(conf);
+ } catch (Exception e) {
+ throw new ValidationException("Invalid HDFS server or port:"
+ + ClusterHelper.getHdfsUrl(cluster), e);
+ }
+ }
+
}
@@ -66,7 +66,6 @@ public void validate(Feed feed) throws IvoryException {
}
validateFeedPartitionExpression(feed);
- validateFeedSourceCluster(feed);
validateFeedGroups(feed);
// Seems like a good enough entity object for a new one
@@ -100,6 +99,23 @@ public void validate(Feed feed) throws IvoryException {
private void validateFeedGroups(Feed feed) throws ValidationException {
String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[] {};
+ String defaultPath = FeedHelper.getLocation(feed, LocationType.DATA)
+ .getPath();
+ for (Cluster cluster : feed.getClusters().getClusters()) {
+ if (!FeedGroup.getDatePattern(
+ FeedHelper.getLocation(feed, LocationType.DATA,
+ cluster.getName()).getPath()).equals(
+ FeedGroup.getDatePattern(defaultPath))) {
+ throw new ValidationException("Feeds default path pattern: "
+ + FeedHelper.getLocation(feed, LocationType.DATA)
+ .getPath()
+ + ", does not match with cluster: "
+ + cluster.getName()
+ + " path pattern: "
+ + FeedHelper.getLocation(feed, LocationType.DATA,
+ cluster.getName()).getPath());
+ }
+ }
for (String groupName : groupNames) {
FeedGroup group = FeedGroupMap.get().getGroupsMapping().get(groupName);
if (group == null || group.canContainFeed(feed)) {
@@ -153,17 +169,6 @@ private void ensureValidityFor(Feed newFeed, Process process) throws IvoryExcept
}
}
- private void validateFeedSourceCluster(Feed feed) throws ValidationException {
- int i = 0;
- for (Cluster cluster : feed.getClusters().getClusters()) {
- if (cluster.getType() == ClusterType.SOURCE) {
- i++;
- }
- }
- if (i == 0)
- throw new ValidationException("Feed should have atleast one source cluster");
- }
-
private void validateClusterValidity(Date start, Date end, String clusterName) throws IvoryException {
try {
if (start.after(end)) {
@@ -198,14 +203,24 @@ private void validateFeedCutOffPeriod(Feed feed, Cluster cluster) throws IvoryEx
private void validateFeedPartitionExpression(Feed feed) throws IvoryException {
int numSourceClusters = 0, numTrgClusters = 0;
+ Set<String> clusters = new HashSet<String>();
for (Cluster cl : feed.getClusters().getClusters()) {
+ if (!clusters.add(cl.getName())) {
+ throw new ValidationException("Cluster: " + cl.getName()
+ + " is defined more than once for feed: "+feed.getName());
+ }
if (cl.getType() == ClusterType.SOURCE){
numSourceClusters++;
} else if(cl.getType() == ClusterType.TARGET) {
numTrgClusters++;
}
}
+ if (numTrgClusters >= 1 && numSourceClusters == 0) {
+ throw new ValidationException("Feed: " + feed.getName()
+ + " should have atleast one source cluster defined");
+ }
+
int feedParts = feed.getPartitions() != null ? feed.getPartitions().getPartitions().size() : 0;
for(Cluster cluster:feed.getClusters().getClusters()) {
@@ -60,8 +60,13 @@ public void validate(Process process) throws IvoryException {
process.setTimezone(TimeZone.getTimeZone("UTC"));
// check if dependent entities exists
+ Set<String> clusters = new HashSet<String>();
for (org.apache.ivory.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
String clusterName = cluster.getName();
+ if (!clusters.add(cluster.getName())) {
+ throw new ValidationException("Cluster: " + cluster.getName()
+ + " is defined more than once for process: "+process.getName());
+ }
validateEntityExists(EntityType.CLUSTER, clusterName);
validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd());
validateHDFSpaths(process, clusterName);
@@ -43,7 +43,7 @@ public FeedGroup(String group, Frequency frequency, String path) {
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
}
- private String getDatePattern(String path) {
+ public static String getDatePattern(String path) {
Matcher matcher = FeedDataPath.PATTERN.matcher(path);
List<String> fields = new ArrayList<String>();
while (matcher.find()) {
@@ -52,8 +52,26 @@ public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, Entity af
}
public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess) {
- if (!FeedHelper.getLocation(oldFeed, LocationType.DATA).getPath()
- .equals(FeedHelper.getLocation(newFeed, LocationType.DATA).getPath()))
+ if (!FeedHelper
+ .getLocation(oldFeed.getLocations(), LocationType.DATA)
+ .getPath()
+ .equals(FeedHelper.getLocation(newFeed.getLocations(),
+ LocationType.DATA).getPath())
+ || !FeedHelper
+ .getLocation(oldFeed.getLocations(), LocationType.META)
+ .getPath()
+ .equals(FeedHelper.getLocation(newFeed.getLocations(),
+ LocationType.META).getPath())
+ || !FeedHelper
+ .getLocation(oldFeed.getLocations(), LocationType.STATS)
+ .getPath()
+ .equals(FeedHelper.getLocation(newFeed.getLocations(),
+ LocationType.STATS).getPath())
+ || !FeedHelper
+ .getLocation(oldFeed.getLocations(), LocationType.TMP)
+ .getPath()
+ .equals(FeedHelper.getLocation(newFeed.getLocations(),
+ LocationType.TMP).getPath()))
return true;
LOG.debug(oldFeed.toShortString() + ": Location identical. Ignoring...");
@@ -88,10 +106,30 @@ public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedP
}
for (Cluster cluster : affectedProcess.getClusters().getClusters()) {
- if (!FeedHelper.getCluster(oldFeed, cluster.getName()).getValidity().getStart()
- .equals(FeedHelper.getCluster(newFeed, cluster.getName()).getValidity().getStart()))
- return true;
- LOG.debug(oldFeed.toShortString() + ": Feed start on cluster" + cluster.getName() + " identical. Ignoring...");
+ if (!FeedHelper
+ .getCluster(oldFeed, cluster.getName())
+ .getValidity()
+ .getStart()
+ .equals(FeedHelper.getCluster(newFeed, cluster.getName())
+ .getValidity().getStart())
+ || !FeedHelper.getLocation(oldFeed, LocationType.DATA,
+ cluster.getName()).getPath().equals(
+ FeedHelper.getLocation(newFeed, LocationType.DATA,
+ cluster.getName()).getPath())
+ || !FeedHelper.getLocation(oldFeed, LocationType.META,
+ cluster.getName()).getPath().equals(
+ FeedHelper.getLocation(newFeed, LocationType.META,
+ cluster.getName()).getPath())
+ || !FeedHelper.getLocation(oldFeed, LocationType.STATS,
+ cluster.getName()).getPath().equals(
+ FeedHelper.getLocation(newFeed, LocationType.STATS,
+ cluster.getName()).getPath())
+ || !FeedHelper.getLocation(oldFeed, LocationType.TMP,
+ cluster.getName()).getPath().equals(
+ FeedHelper.getLocation(newFeed, LocationType.TMP,
+ cluster.getName()).getPath()))
+ return true;
+ LOG.debug(oldFeed.toShortString() + ": Feed on cluster" + cluster.getName() + " identical. Ignoring...");
}
return false;
@@ -98,10 +98,11 @@ public void testProcessLogs() throws IOException, IvoryException,
fs.mkdirs(instanceLogPath2);
fs.mkdirs(instanceLogPath3);
fs.mkdirs(instanceLogPath4);
- Thread.sleep(61000);
+
// fs.setTimes wont work on dirs
fs.createNewFile(new Path(instanceLogPath, "oozie.log"));
fs.createNewFile(new Path(instanceLogPath, "pigAction_SUCCEEDED.log"));
+ Thread.sleep(61000);
AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler();
processCleanupHandler.cleanup();
@@ -27,6 +27,7 @@
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.ivory.IvoryException;
import org.apache.ivory.entity.AbstractTestBase;
import org.apache.ivory.entity.ClusterHelper;
@@ -35,6 +36,8 @@
import org.apache.ivory.entity.v0.cluster.Interface;
import org.apache.ivory.entity.v0.cluster.Interfacetype;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class ClusterEntityParserTest extends AbstractTestBase {
@@ -46,7 +49,8 @@ public void testParse() throws IOException, IvoryException, JAXBException {
InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML);
- Cluster cluster = (Cluster) parser.parseAndValidate(stream);
+ Cluster cluster = (Cluster) parser.parse(stream);
+ ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(conf.get("fs.default.name"));
Assert.assertNotNull(cluster);
assertEquals(cluster.getName(), "testCluster");
@@ -61,7 +65,7 @@ public void testParse() throws IOException, IvoryException, JAXBException {
assertEquals(readonly.getVersion(), "0.20.2");
Interface write = ClusterHelper.getInterface(cluster, Interfacetype.WRITE);
- assertEquals(write.getEndpoint(), "hdfs://localhost:8020");
+ //assertEquals(write.getEndpoint(), conf.get("fs.default.name"));
assertEquals(write.getVersion(), "0.20.2");
Interface workflow = ClusterHelper.getInterface(cluster, Interfacetype.WORKFLOW);
@@ -74,5 +78,17 @@ public void testParse() throws IOException, IvoryException, JAXBException {
Marshaller marshaller = EntityType.CLUSTER.getMarshaller();
marshaller.marshal(cluster, stringWriter);
System.out.println(stringWriter.toString());
+ parser.parseAndValidate(stringWriter.toString());
}
+
+ @BeforeClass
+ public void init() throws Exception {
+ conf.set("hadoop.log.dir", "/tmp");
+ this.dfsCluster = new MiniDFSCluster(conf, 1, true, null);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ this.dfsCluster.shutdown();
+ }
}
Oops, something went wrong.

0 comments on commit de3bd51

Please sign in to comment.