Permalink
Browse files

Merge branch 'master' of github.com:sriksun/Ivory

  • Loading branch information...
2 parents 4a2d55a + e418435 commit f19193c7c8498576c93b681a5983c39e724b4813 Shwetha GS committed Oct 12, 2012
View
@@ -48,6 +48,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.ivory</groupId>
+ <artifactId>ivory-metrics</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.1</version>
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ivory.cleanup;
+
+import java.io.IOException;
+
+import javax.servlet.jsp.el.ELException;
+import javax.servlet.jsp.el.ExpressionEvaluator;
+
+import org.apache.commons.el.ExpressionEvaluatorImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ivory.IvoryException;
+import org.apache.ivory.entity.ClusterHelper;
+import org.apache.ivory.entity.store.ConfigurationStore;
+import org.apache.ivory.entity.v0.Entity;
+import org.apache.ivory.entity.v0.Frequency;
+import org.apache.ivory.entity.v0.Frequency.TimeUnit;
+import org.apache.ivory.entity.v0.cluster.Cluster;
+import org.apache.ivory.expression.ExpressionHelper;
+import org.apache.ivory.util.RuntimeProperties;
+import org.apache.log4j.Logger;
+
+public abstract class AbstractCleanupHandler {
+
+ protected static final Logger LOG = Logger
+ .getLogger(AbstractCleanupHandler.class);
+ protected final ConfigurationStore STORE = ConfigurationStore.get();
+ public static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
+ public static final ExpressionHelper resolver = ExpressionHelper.get();
+
+ protected long getRetention(Entity entity, TimeUnit timeUnit)
+ throws IvoryException {
+ String retention = getRetentionValue(timeUnit);
+ try {
+ return (Long) EVALUATOR.evaluate("${" + retention + "}",
+ Long.class, resolver, resolver);
+ } catch (ELException e) {
+ throw new IvoryException("Unable to evalue retention limit: "
+ + retention + " for entity: " + entity.getName());
+ }
+ }
+
+ private String getRetentionValue(Frequency.TimeUnit timeunit) {
+ return RuntimeProperties.get().getProperty(
+ "log.cleanup.frequency." + timeunit + ".retention", "days(1)");
+
+ }
+
+ protected FileStatus[] getAllLogs(
+ org.apache.ivory.entity.v0.cluster.Cluster cluster, Entity entity)
+ throws IvoryException {
+ String stagingPath = ClusterHelper.getLocation(cluster, "staging");
+ Path logPath = getLogPath(entity, stagingPath);
+ FileSystem fs = getFileSystem(cluster);
+ FileStatus[] paths;
+ try {
+ paths = fs.globStatus(logPath);
+ } catch (IOException e) {
+ throw new IvoryException(e);
+ }
+ return paths;
+ }
+
+ private FileSystem getFileSystem(
+ org.apache.ivory.entity.v0.cluster.Cluster cluster)
+ throws IvoryException {
+
+ FileSystem fs;
+ try {
+ fs = new Path(ClusterHelper.getHdfsUrl(cluster))
+ .getFileSystem(new Configuration());
+ } catch (IOException e) {
+ throw new IvoryException(e);
+ }
+ return fs;
+ }
+
+ protected void delete(Cluster cluster, Entity entity, long retention)
+ throws IvoryException {
+
+ FileStatus[] logs = getAllLogs(cluster, entity);
+ long now = System.currentTimeMillis();
+
+ for (FileStatus log : logs) {
+ if (now - log.getModificationTime() > retention) {
+ try {
+ boolean isDeleted = getFileSystem(cluster).delete(
+ log.getPath(), true);
+ if (isDeleted == false) {
+ LOG.error("Unable to delete path: " + log.getPath());
+ } else {
+ LOG.info("Deleted path: " + log.getPath());
+ }
+ } catch (IOException e) {
+ throw new IvoryException(" Unable to delete log file : "
+ + log.getPath() + " for entity " + entity.getName()
+ + " for cluster: " + cluster.getName(), e);
+ }
+ } else {
+ LOG.info("Retention limit: " + retention
+ + " is less than modification"
+ + (now - log.getModificationTime()) + " for path: "
+ + log.getPath());
+ }
+ }
+
+ }
+
+ public abstract void cleanup() throws IvoryException;
+
+ protected abstract Path getLogPath(Entity entity, String stagingPath);
+}
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ivory.cleanup;
+
+import java.util.Collection;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.ivory.IvoryException;
+import org.apache.ivory.entity.v0.Entity;
+import org.apache.ivory.entity.v0.EntityType;
+import org.apache.ivory.entity.v0.cluster.Cluster;
+import org.apache.ivory.entity.v0.feed.Feed;
+
+public class FeedCleanupHandler extends AbstractCleanupHandler {
+
+ @Override
+ public void cleanup() throws IvoryException {
+ Collection<String> feeds = STORE.getEntities(EntityType.FEED);
+ for (String feedName : feeds) {
+ Feed feed;
+ feed = STORE.get(EntityType.FEED, feedName);
+ long retention = getRetention(feed, feed.getFrequency()
+ .getTimeUnit());
+ for (org.apache.ivory.entity.v0.feed.Cluster cluster : feed
+ .getClusters().getClusters()) {
+ Cluster currentCluster = STORE.get(EntityType.CLUSTER,
+ cluster.getName());
+ delete(currentCluster, feed, retention);
+ }
+
+ }
+ }
+
+ @Override
+ protected Path getLogPath(Entity entity, String stagingPath) {
+ Path logPath = new Path(stagingPath, "ivory/workflows/feed/"
+ + entity.getName() + "/logs/job-*");
+ return logPath;
+ }
+
+}
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ivory.cleanup;
+
+import java.util.Collection;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.ivory.IvoryException;
+import org.apache.ivory.entity.v0.Entity;
+import org.apache.ivory.entity.v0.EntityType;
+import org.apache.ivory.entity.v0.cluster.Cluster;
+import org.apache.ivory.entity.v0.process.Process;
+
+public class ProcessCleanupHandler extends AbstractCleanupHandler {
+
+ @Override
+ public void cleanup() throws IvoryException {
+ Collection<String> processes = STORE.getEntities(EntityType.PROCESS);
+ for (String processName : processes) {
+ Process process;
+ process = STORE.get(EntityType.PROCESS, processName);
+ long retention = getRetention(process, process.getFrequency()
+ .getTimeUnit());
+ for (org.apache.ivory.entity.v0.process.Cluster cluster : process
+ .getClusters().getClusters()) {
+ LOG.info("Cleaning up logs for process:" + processName
+ + " in cluster: " + cluster.getName());
+ Cluster currentCluster = STORE.get(EntityType.CLUSTER,
+ cluster.getName());
+ delete(currentCluster, process, retention);
+ }
+
+ }
+ }
+
+ @Override
+ protected Path getLogPath(Entity entity, String stagingPath) {
+ Path logPath = new Path(stagingPath, "ivory/workflows/process/"
+ + entity.getName() + "/logs/job-*");
+ return logPath;
+ }
+
+}
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ivory.service;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.servlet.jsp.el.ELException;
+import javax.servlet.jsp.el.ExpressionEvaluator;
+
+import org.apache.commons.el.ExpressionEvaluatorImpl;
+import org.apache.ivory.IvoryException;
+import org.apache.ivory.aspect.GenericAlert;
+import org.apache.ivory.cleanup.AbstractCleanupHandler;
+import org.apache.ivory.cleanup.FeedCleanupHandler;
+import org.apache.ivory.cleanup.ProcessCleanupHandler;
+import org.apache.ivory.expression.ExpressionHelper;
+import org.apache.ivory.util.StartupProperties;
+import org.apache.log4j.Logger;
+
+public class LogCleanupService implements IvoryService {
+
+ private static final Logger LOG = Logger.getLogger(LogCleanupService.class);
+ private final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
+ private final ExpressionHelper resolver = ExpressionHelper.get();
+
+ @Override
+ public String getName() {
+ return "Ivory Log cleanup service";
+ }
+
+ @Override
+ public void init() throws IvoryException {
+ Timer timer = new Timer();
+ timer.schedule(new CleanupThread(), 0, getDelay());
+ LOG.info("Ivory log cleanup service initialized");
+
+ }
+
+ private class CleanupThread extends TimerTask {
+
+ private AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler();
+ private AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler();
+
+ @Override
+ public void run() {
+ try {
+ LOG.info("Cleaning up logs at: " + new Date());
+ processCleanupHandler.cleanup();
+ feedCleanupHandler.cleanup();
+ } catch (Throwable t) {
+ LOG.error("Error in cleanup task: ", t);
+ GenericAlert.alertLogCleanupServiceFailed(
+ "Exception in log cleanup service", t);
+ }
+ }
+ }
+
+ @Override
+ public void destroy() throws IvoryException {
+ LOG.info("Ivory log cleanup service destroyed");
+ }
+
+ private long getDelay() throws IvoryException {
+ String delay = StartupProperties.get().getProperty(
+ "ivory.cleanup.service.frequency", "days(1)");
+ try {
+ return (Long) EVALUATOR.evaluate("${" + delay + "}", Long.class,
+ resolver, resolver);
+ } catch (ELException e) {
+ throw new IvoryException("Exception in EL evaluation", e);
+ }
+ }
+
+}
@@ -17,3 +17,9 @@
#
*.domain=debug
+
+*.log.cleanup.frequency.minutes.retention =hours(6)
+*.log.cleanup.frequency.hours.retention =minutes(1)
+*.log.cleanup.frequency.days.retention =days(7)
+*.log.cleanup.frequency.months.retention =months(3)
+
@@ -32,7 +32,8 @@
org.apache.ivory.service.ProcessSubscriberService,\
org.apache.ivory.rerun.service.RetryService,\
org.apache.ivory.rerun.service.LateRunService,\
- org.apache.ivory.service.SLAMonitoringService
+ org.apache.ivory.service.SLAMonitoringService,\
+ org.apache.ivory.service.LogCleanupService
*.configstore.listeners=org.apache.ivory.entity.v0.EntityGraph,\
org.apache.ivory.entity.ColoClusterRelation,\
org.apache.ivory.group.FeedGroupMap,\
@@ -50,6 +51,8 @@ debug.system.lib.location=${user.dir}/webapp/target/ivory-webapp-0.2-SNAPSHOT/WE
debug.broker.url=vm://localhost
debug.retry.recorder.path=${user.dir}/target/retry
+*.ivory.cleanup.service.frequency=days(1)
+
*.broker.url=tcp://localhost:61616
#default time-to-live for a JMS message 3 days (time in minutes)
*.broker.ttlInMins=4320
Oops, something went wrong.

0 comments on commit f19193c

Please sign in to comment.