Skip to content
Browse files

Fixed comments

Threaded log copier/delete
Added option to retain logs for some number of days
  • Loading branch information...
1 parent cd85867 commit 786c401268030aeabddf8250771f7bfb9c373d55 @oclc committed May 29, 2012
View
22 src/main/java/org/oclc/firefly/hadoop/backup/Backup.java
@@ -396,13 +396,17 @@ public boolean doMajorCopy(String[] tables, int maxTries)
tries++;
}
- }
-
- if (verifyCopiedRegions()) {
- LOG.info("Verification passed succesfully");
- } else {
- ret = false;
- LOG.info("Verification failed. Please inspect errors manually");
+
+ if (ret) {
+ if (verifyCopiedRegions()) {
+ LOG.info("Verification passed succesfully");
+ } else {
+ ret = false;
+ LOG.info("Verification failed. Please inspect errors manually");
+ }
+ } else {
+ LOG.info("No attempts left. Try setting -n to a higher value, or setting it to 0");
+ }
}
if (ret) {
@@ -641,9 +645,9 @@ private void addCopiedRegions(List<Pair<String, HRegionInfo>> inputRegions,
SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
while (reader.next(rserver)) {
HRegionInfo rinfo = new HRegionInfo();
- LOG.info(rinfo.toString());
-
reader.getCurrentValue(rinfo);
+
+ LOG.info(rinfo.toString());
ret.add(Pair.newPair(rserver.toString(), rinfo));
}
View
460 src/main/java/org/oclc/firefly/hadoop/backup/LogCopier.java
@@ -17,6 +17,7 @@
package org.oclc.firefly.hadoop.backup;
import java.io.IOException;
+import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -53,9 +54,15 @@
/** milliseconds per minute */
private static final long MS_PER_MINUTE = 60000;
+ /** milliseconds per day */
+ private static final long ONE_DAY_MILLI = 86400000;
+
/** default number of minutes between runs */
private static final long DEFAULT_FREQUENCY = 10L;
+ /** default number of threads */
+ private static final long DEFAULT_NUM_THREADS = 10L;
+
/** default buffer size 8KB */
private static final int BUFFER_SIZE = 8192;
@@ -92,6 +99,15 @@
/** Number of logs that failed copied in the last run */
private long lastRunFailed = 0L;
+ /** The number of days that a log is archived */
+ private long daysToLive = 0L;
+
+ /** number of copy threads */
+ private long numberThreads = DEFAULT_NUM_THREADS;
+
+ /** copy threads */
+ List<Thread> threads = new ArrayList<Thread>();
+
/**
* Construct a new LogCopier
* @param destination The destination of the log files
@@ -119,6 +135,8 @@ public LogCopier(String destination, long freqencyMinutes) {
public static void main(String[] args) throws Exception {
String destDirectory = null;
long frequency = DEFAULT_FREQUENCY;
+ long threads = DEFAULT_NUM_THREADS;
+ long dtl = 0L;
CommandLineParser parser = new PosixParser();
CommandLine cmdline = null;
@@ -143,19 +161,37 @@ public static void main(String[] args) throws Exception {
throw new IllegalArgumentException("Minutes must be greater than 0");
}
break;
+ case 'l':
+ dtl = Long.parseLong(option.getValue());
+ if (dtl < 0) {
+ throw new IllegalArgumentException("Log days-to-live Must be non-negative");
+ }
+ break;
+ case 't':
+ threads = Long.parseLong(option.getValue());
+ if (threads < 0) {
+ throw new IllegalArgumentException("Number of threads must be greater than 0");
+ }
+ break;
default:
throw new IllegalArgumentException("unexpected option " + option);
}
}
LogCopier copier = new LogCopier(destDirectory, frequency);
+ copier.setLogDaysToLive(dtl);
+ copier.setNumberThreads(threads);
+
+ LOG.info("--------------------------------------------------");
+ LOG.info("Copy frequency : " + copier.getCopyFrequency() + " minutes");
+ LOG.info("Archive directory : " + copier.getArchiveDirectory());
+ LOG.info("Log days to live : " + copier.getLogDaysToLive() + " days");
+ LOG.info("Copy threads : " + copier.getNumberThreads());
+ LOG.info("--------------------------------------------------");
- LOG.info("Copy frequency : " + frequency + " minutes");
- LOG.info("Archive directory : " + destDirectory);
-
copier.run();
}
-
+
/**
* Controls running loop
*/
@@ -170,18 +206,14 @@ public void run() {
conf = HBaseConfiguration.create();
fs = FileSystem.get(conf);
archivePath = new Path(destinationPath, getTodayDateString());
-
- if (!fs.exists(archivePath)) {
- fs.mkdirs(archivePath);
- }
copyLogs(fs, archivePath, conf);
endRunTimer();
try {
putToSleep();
} catch (InterruptedException ie) {
- LOG.warn("Process has been interrupted by another thread. Exiting...", ie);
+ LOG.warn("Sleep interrupted by another thread. Resuming..", ie);
}
} catch (IOException e) {
LOG.error("Failed to communicate with file system", e);
@@ -208,50 +240,41 @@ private void copyLogs(FileSystem fs, Path archivePath, Configuration conf) throw
// Over time, the list of archived files could get very large. The oldest start time is used to filter out
// older archived HLogs. That way when we do comparisons we are only checking against the most recent files,
// a smaller set.
- long oldestFileStartTime = getOldestFileStartTime(logFiles);
+ //long oldestFileStartTime = getOldestFileStartTime(logFiles);
// Get the list of log files we have archived
- List<FileStatus> copiedLogFiles = getHLogs(fs, destinationPath, oldestFileStartTime);
+ //List<FileStatus> copiedLogFiles = getHLogs(fs, destinationPath, oldestFileStartTime);
+ List<FileStatus> copiedLogFiles = getHLogs(fs, destinationPath, null);
// Get list of HLogs to copy
List<FileStatus> newLogFiles = getNewLogFiles(fs, logFiles, copiedLogFiles);
- deleteOldLogsFromArchive(fs, newLogFiles, copiedLogFiles);
+
+ cleanOldLogsFromArchive(fs, copiedLogFiles);
+ prepareLogsToBeReplaced(fs, newLogFiles, copiedLogFiles);
lastRunSuccess = 0L;
lastRunFailed = 0L;
+ threads.clear();
- for (FileStatus newLogFile : newLogFiles) {
- boolean copyFromOldLogs = false;
- Path newLogFilePath = newLogFile.getPath();
- Path logArchivePath = new Path(archivePath, newLogFilePath.getName());
- LOG.info("HLog: " + newLogFilePath);
-
- if (newLogFilePath.toString().startsWith(logsPath.toString())) {
- try {
- FileUtil.copy(fs, newLogFilePath, fs, logArchivePath, false, true, conf);
- lastRunSuccess++;
- } catch (IOException e) {
- LOG.warn("Failed: " + newLogFilePath, e);
-
- copyFromOldLogs = true;
- newLogFilePath = new Path(oldLogsPath, newLogFilePath.getName());
-
- LOG.info("Trying: " + newLogFilePath);
- }
- } else {
- copyFromOldLogs = true;
- }
+ if (newLogFiles.size() > 0 && !fs.exists(archivePath)) {
+ fs.mkdirs(archivePath);
+ }
- if (copyFromOldLogs) {
- try {
- // Use our copy function this time. It checks the trash for deleted files before giving up
- BackupUtils.copy(fs, newLogFilePath, fs, logArchivePath, buffer, username,
- fs.getDefaultReplication());
- lastRunSuccess++;
- } catch (Exception e) {
- LOG.error(" Failed to copy " + newLogFilePath, e);
- lastRunFailed++;
- }
+ // Split files among all threads and start start them
+ // Improvement: use thread pool
+ List<List<FileStatus>> split = splitFileList(newLogFiles, numberThreads);
+ for (List<FileStatus> listFiles : split) {
+ Thread t = new Thread(new Copier(fs, listFiles, archivePath));
+ t.start();
+ threads.add(t);
+ }
+
+ // Wait for threads to finish
+ for (Thread t : threads) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ LOG.error("Failed to join thread", e);
}
}
@@ -260,11 +283,45 @@ private void copyLogs(FileSystem fs, Path archivePath, Configuration conf) throw
}
/**
+ * Divide log files to copy among the number of threads
+ * @param newLogFiles The list of logs files to split
+ * @param numThreads The number of threads to split among
+ * @return The split list of list of files
+ */
+ private List<List<FileStatus>> splitFileList(List<FileStatus> newLogFiles, long numThreads) {
+ int idx = 0;
+ long splits = numThreads;
+ List<List<FileStatus>> ret = new ArrayList<List<FileStatus>>();
+
+ if (newLogFiles.size() < numThreads) {
+ splits = newLogFiles.size();
+ }
+
+ // Initialize the return array
+ for (int i = 0; i < splits; i++) {
+ ret.add(new ArrayList<FileStatus>());
+ }
+
+ // Divide files among threads
+ for (FileStatus file : newLogFiles) {
+ ret.get(idx).add(file);
+ idx++;
+
+ if (idx >= ret.size()) {
+ idx = 0;
+ }
+ }
+
+ return ret;
+ }
+
+ /**
* Get the oldest start time from list of HLogs.
* HLogs contain their start time at the end of the file name
* @param logFiles The list of HLog files
* @return The oldest time stamp
*/
+ @SuppressWarnings("unused")
private long getOldestFileStartTime(List<FileStatus> logFiles) {
long ret = Long.MAX_VALUE;
@@ -285,10 +342,119 @@ private long getOldestFileStartTime(List<FileStatus> logFiles) {
* Delete out-dated HLogs from archive. It goes through each HLog file in logFiles, and looks for it in
* copiedLogFiles. If it finds it, it deletes it from the file system
* @param fs The files system
+ * @param copiedLogFiles The HLogs files in our archive
+ * @throws IOException
+ */
+ private void cleanOldLogsFromArchive(FileSystem fs, List<FileStatus> copiedLogFiles) throws IOException {
+ long dtl = getLogDaysToLive();
+ List<Path> deleteLogs = null;
+
+ if (dtl > 0) {
+ long deleteTimeMark = this.runStartTime - (dtl * ONE_DAY_MILLI);
+ deleteLogs = new ArrayList<Path>();
+
+ // Delete logs older than daysToLive
+ for (int i = 0; i < copiedLogFiles.size(); i++) {
+ FileStatus copiedLogFile = copiedLogFiles.get(i);
+ String copiedLogFileName = copiedLogFile.getPath().toString();
+ int iLogStartTime = copiedLogFileName.lastIndexOf('.');
+
+ if (iLogStartTime > 0) {
+ try {
+ long logStartTime = Long.parseLong(copiedLogFileName.substring(iLogStartTime + 1));
+ if (logStartTime <= deleteTimeMark) {
+ deleteLogs.add(copiedLogFile.getPath());
+ copiedLogFiles.remove(i--);
+ }
+ } catch (NumberFormatException x) {
+ iLogStartTime = 0;
+ }
+ }
+
+ if (iLogStartTime <= 0) {
+ LOG.warn("Invalid log file: " + copiedLogFile.getPath());
+ }
+ }
+
+ // Threaded delete, in case there are lots of deletes to perform
+ // Improvement: use thread pool
+ long numThreads = this.getNumberThreads();
+ if (deleteLogs.size() < numThreads) {
+ numThreads = deleteLogs.size();
+ }
+
+ if (numThreads > 0) {
+ List<Thread> delThreads = new ArrayList<Thread>();
+ int result = (int)(deleteLogs.size() / numThreads);
+ int length = result;
+
+ LOG.debug("Delete logs length: " + deleteLogs.size());
+
+ // init threads
+ for (int i = 0; i < numThreads; i++) {
+ int start = i * result;
+
+ if (i + 1 == numThreads) {
+ length = deleteLogs.size() - start;
+ }
+
+ LOG.debug("DThread " + i + ": start: " + start + " length: " + length);
+
+ Runnable deleter = new Deleter(fs, deleteLogs, start, length);
+ Thread deleteThread = new Thread(deleter);
+ deleteThread.start();
+
+ delThreads.add(deleteThread);
+ }
+
+ // wait for threads to finish
+ for (Thread t : delThreads) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ LOG.error("Failed to join delete thread", e);
+ }
+ }
+ }
+
+ // delete empty directories
+ FileStatus[] dirs = fs.listStatus(this.destinationPath);
+ if (dirs != null) {
+ for (FileStatus dir : dirs) {
+ if (dir.isDir()) {
+ String name = dir.getPath().getName();
+
+ try {
+ long dirTime = BackupUtils.LOG_COPIER_DATE_FORMAT.parse(name).getTime();
+
+ if (dirTime <= deleteTimeMark) {
+ FileStatus[] logs = fs.listStatus(dir.getPath());
+
+ if (logs == null || logs.length == 0) {
+ LOG.info("Delete dir: " + dir.getPath());
+ fs.delete(dir.getPath(), true);
+ }
+ }
+ } catch (ParseException pe) {
+ LOG.warn("Invalid LogCopier directory: " + dir.getPath());
+ }
+ } else {
+ LOG.warn("Invalid LogCopier file");
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Delete out-dated HLogs from archive. It goes through each HLog file in logFiles, and looks for it in
+ * copiedLogFiles. If it finds it, it deletes it from the file system
+ * @param fs The files system
* @param logFiles The new log files we have determined we need to copy to archive
* @param copiedLogFiles The HLogs files in our archive
*/
- private void deleteOldLogsFromArchive(FileSystem fs, List<FileStatus> logFiles, List<FileStatus> copiedLogFiles) {
+ private void prepareLogsToBeReplaced(FileSystem fs, List<FileStatus> logFiles,
+ List<FileStatus> copiedLogFiles) {
for (FileStatus logFile : logFiles) {
String logFileName = logFile.getPath().getName();
@@ -300,7 +466,7 @@ private void deleteOldLogsFromArchive(FileSystem fs, List<FileStatus> logFiles,
// There is a new version of the file
// delete old version from file system so that it can be replaced.
try {
- LOG.info("Deleting out-dated archived HLog file " + copiedLogFile.getPath());
+ LOG.info("Replacing out-dated archived HLog file " + copiedLogFile.getPath());
fs.delete(copiedLogFile.getPath(), false);
copiedLogFiles.remove(i--);
} catch (IOException e) {
@@ -459,6 +625,63 @@ private void putToSleep() throws InterruptedException {
Thread.sleep(timeToSleep);
}
}
+
+ /**
+ * Get the archive directory
+ * @return the archive directory string
+ */
+ public String getArchiveDirectory() {
+ return this.destinationPath.toString();
+ }
+
+ /**
+ * Get the frequency (in minutes) to copy logs
+ * @return the copy time frequency
+ */
+ public long getCopyFrequency() {
+ return this.sleepMinutes;
+ }
+
+ /**
+ * Set the number of days that a log lives in the archive
+ * @param dtl days to live
+ * @throws IllegalArgumentException thrown if dtl is less than 0
+ */
+ public void setLogDaysToLive(long dtl) throws IllegalArgumentException {
+ if (dtl < 0) {
+ throw new IllegalArgumentException("dtl must be non-negative");
+ }
+
+ this.daysToLive = dtl;
+ }
+
+ /**
+ * Get the number of days that a log lives in the archive
+ * @return the number of days that a log lives in the archive
+ */
+ public long getLogDaysToLive() {
+ return this.daysToLive;
+ }
+
+ /**
+ * Set the number of threads that perform the copy work
+ * @param numThreads The number of threads
+ */
+ private void setNumberThreads(long numThreads) {
+ if (numThreads <= 0) {
+ throw new IllegalArgumentException("Number of threads must be greater than zero");
+ }
+
+ this.numberThreads = numThreads;
+ }
+
+ /**
+ * Get the number of copy threads
+ * @return the number of copy threads
+ */
+ private long getNumberThreads() {
+ return this.numberThreads;
+ }
/**
* Returns the command-line options supported.
@@ -468,15 +691,23 @@ private static Options getOptions() {
Options options = new Options();
Option archiveDirectory = new Option("d", "archiveDir", true,
- "The root HLog archive directory. Example /foo/bar/archivedLogs");
+ "The root HLog archive directory. Example /path/to/log/archive");
Option minutes = new Option("m", "minutes", true,
"The frequency (in minutes) to copy new HLogs from HBase. Default: 10");
+ Option ttl = new Option("l", "daysToLive", true,
+ "Archived logs older than this many days will be deleted. Default is to not delete archived logs.");
+ Option threads = new Option("t", "threads", true,
+ "The number of copy threads");
+ ttl.setRequired(false);
archiveDirectory.setRequired(true);
minutes.setRequired(false);
+ threads.setRequired(false);
+ options.addOption(ttl);
options.addOption(archiveDirectory);
options.addOption(minutes);
+ options.addOption(threads);
return options;
}
@@ -489,4 +720,141 @@ private static void printOptions() {
String header = "Tool to continuously archive HLogs";
formatter.printHelp("LogCopier", header, getOptions(), "", true);
}
+
+ /**
+ * Performs the copy step of LogCopier
+ */
+ public class Copier implements Runnable {
+
+ /** The file system */
+ private FileSystem fs;
+
+ /** The log files to copy */
+ private List<FileStatus> logFiles;
+
+ /** The directory to save the files to */
+ private Path archivePath;
+
+ /**
+ * Construct a new Copier
+ * @param fs The file system
+ * @param files The files to copy
+ * @param path The archive path
+ */
+ public Copier(FileSystem fs, List<FileStatus> files, Path path) {
+ this.fs = fs;
+ this.logFiles = files;
+ this.archivePath = path;
+ }
+
+ /**
+ * Run copy step
+ */
+ @Override
+ public void run() {
+ copy();
+ }
+
+ /**
+ * Copy all files
+ */
+ private void copy() {
+ String hbaseDir = fs.getConf().get(HConstants.HBASE_DIR);
+ Path logsPath = new Path(hbaseDir, ".logs");
+ Path oldLogsPath = new Path(hbaseDir, ".oldlogs");
+
+ for (FileStatus newLogFile : logFiles) {
+ boolean copyFromOldLogs = false;
+ Path newLogFilePath = newLogFile.getPath();
+ Path logArchivePath = new Path(archivePath, newLogFilePath.getName());
+ LOG.info("HLog: " + newLogFilePath);
+
+ if (newLogFilePath.toString().startsWith(logsPath.toString())) {
+ try {
+ FileUtil.copy(fs, newLogFilePath, fs, logArchivePath, false, true, fs.getConf());
+ lastRunSuccess++;
+ } catch (IOException e) {
+ LOG.warn("Failed: " + newLogFilePath, e);
+
+ copyFromOldLogs = true;
+ newLogFilePath = new Path(oldLogsPath, newLogFilePath.getName());
+
+ LOG.info("Trying: " + newLogFilePath);
+ }
+ } else {
+ copyFromOldLogs = true;
+ }
+
+ if (copyFromOldLogs) {
+ try {
+ // Use our copy function this time. It checks the trash for deleted files before giving up
+ BackupUtils.copy(fs, newLogFilePath, fs, logArchivePath, buffer, username,
+ fs.getDefaultReplication());
+ lastRunSuccess++;
+ } catch (Exception e) {
+ LOG.error(" Failed to copy " + newLogFilePath, e);
+ lastRunFailed++;
+ }
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Class to delete files
+ */
+ class Deleter implements Runnable {
+
+ /** the paths to delete */
+ List<Path> paths;
+
+ /** the file system */
+ FileSystem fs;
+
+ /** the start index in the paths array list */
+ int startIdx;
+
+ /** number of paths to delete */
+ int count;
+
+ /**
+ * Construct a new Deleter
+ * @param fs the file system
+ * @param paths the paths to delete
+ * @param startIdx the start index in paths
+ * @param count the number of paths to delete
+ */
+ public Deleter(FileSystem fs, List<Path> paths, int startIdx, int count) {
+ this.fs = fs;
+ this.paths = paths;
+ this.startIdx = startIdx;
+ this.count = count;
+ }
+
+ /**
+ * Run thread
+ */
+ @Override
+ public void run() {
+ delete();
+ }
+
+ /**
+ * Delete given files
+ */
+ private void delete() {
+ for (int i = startIdx; i < startIdx + count; i++) {
+ Path path = paths.get(i);
+
+ try {
+ LOG.info("Delete: " + path);
+ fs.delete(path, false);
+ } catch (IOException e) {
+ LOG.error("Failed to delete old HLog");
+ }
+ }
+ }
+
+ }
}

0 comments on commit 786c401

Please sign in to comment.
Something went wrong with that request. Please try again.