Skip to content

Commit

Permalink
[#7149] Platform: Trigger trying to take a platform backup when a tas…
Browse files Browse the repository at this point in the history
…k has completed

Summary:
Every time a task completes we should optimistically take a platform backup to reduce the window of the latest backups being out of date.

This diff also fixes:
1. A small bug with disabling the sync schedule when deleting an HA config.
2. Fix a few instances of file descriptors being left hanging around which would grow unboundedly until we hit the max open files limit.

Test Plan:
Run a few tasks and see that a one-off sync is triggered.
Run tasks with no HA config and ensure there is no regression to legacy behavior.

For the additional fixes in this diff:
1. Delete an HA config and ensure the config for sync schedule enabled == false.
2. Set sync to 1m and leave running for an hour or two. Run `ls -l /proc/<PID>/fd` and ensure there aren't dangling file descriptors left around.

Reviewers: sanketh, sb-yb

Reviewed By: sb-yb

Subscribers: jenkins-bot, yugaware

Differential Revision: https://phabricator.dev.yugabyte.com/D10703
  • Loading branch information
daniel-yb committed Feb 25, 2021
1 parent 9a6b65e commit 900a662
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 46 deletions.
Expand Up @@ -9,6 +9,7 @@
import java.util.Map;
import java.util.UUID;

import com.yugabyte.yw.common.ha.PlatformReplicationManager;
import com.yugabyte.yw.models.helpers.TaskType;
import com.yugabyte.yw.models.ScheduleTask;
import com.yugabyte.yw.models.CustomerTask;
Expand All @@ -17,6 +18,7 @@

import com.yugabyte.yw.forms.ITaskParams;
import com.yugabyte.yw.models.TaskInfo;
import play.api.Play;

/**
* This class is responsible for creating and running a task. It provides all the common
Expand All @@ -36,6 +38,9 @@ public class TaskRunner implements Runnable {
// The task object that will run the current task.
private ITask task;

// A utility for Platform HA.
private final PlatformReplicationManager replicationManager;

static {
// Initialize the map which holds the task types to their task class.
Map<TaskType, Class<? extends ITask>> typeMap = new HashMap<TaskType, Class<? extends ITask>>();
Expand Down Expand Up @@ -99,6 +104,7 @@ private TaskRunner(TaskType taskType, ITaskParams taskParams)
LOG.error("Could not determine the hostname", e);
}
taskInfo.setOwner(hostname);
replicationManager = Play.current().injector().instanceOf(PlatformReplicationManager.class);
}

public UUID getTaskUUID() {
Expand Down Expand Up @@ -164,6 +170,9 @@ public void run() {
if (scheduleTask != null) {
scheduleTask.setCompletedTime();
}

// Run a one-off Platform HA sync every time a task finishes.
replicationManager.oneOffSync();
}
}

Expand Down
Expand Up @@ -88,18 +88,23 @@ public ShellResponse run(

Process process = pb.start();
waitForProcessExit(process, tempOutputFile, tempErrorFile);
BufferedReader outputStream = new BufferedReader(
new InputStreamReader(new FileInputStream(tempOutputFile)));
BufferedReader errorStream = new BufferedReader(
new InputStreamReader(new FileInputStream(tempErrorFile)));
String processOutput = outputStream.lines().collect(Collectors.joining("\n")).trim();
String processError = errorStream.lines().collect(Collectors.joining("\n")).trim();
if (logCmdOutput) {
try (
FileInputStream outputInputStream = new FileInputStream(tempOutputFile);
InputStreamReader outputReader = new InputStreamReader(outputInputStream);
BufferedReader outputStream = new BufferedReader(outputReader);
FileInputStream errorInputStream = new FileInputStream(tempErrorFile);
InputStreamReader errorReader = new InputStreamReader(errorInputStream);
BufferedReader errorStream = new BufferedReader(errorReader)
) {
String processOutput = outputStream.lines().collect(Collectors.joining("\n")).trim();
String processError = errorStream.lines().collect(Collectors.joining("\n")).trim();
if (logCmdOutput) {
LOG.debug("Proc stdout for '{}' | {}", response.description, processOutput);
LOG.debug("Proc stderr for '{}' | {}", response.description, processError);
}
response.code = process.exitValue();
response.message = (response.code == 0) ? processOutput : processError;
}
response.code = process.exitValue();
response.message = (response.code == 0) ? processOutput : processError;
} catch (IOException | InterruptedException e) {
response.code = -1;
LOG.error("Exception running command", e);
Expand Down Expand Up @@ -135,24 +140,26 @@ public ShellResponse run(
}

private static void waitForProcessExit(
Process process,
File outFile,
File errFile) throws IOException, InterruptedException {

BufferedReader outputStream = new BufferedReader(
new InputStreamReader(new FileInputStream(outFile)));
BufferedReader errorStream = new BufferedReader(
new InputStreamReader(new FileInputStream(errFile)));

Process process,
File outFile,
File errFile
) throws IOException, InterruptedException {
try (
FileInputStream outputInputStream = new FileInputStream(outFile);
InputStreamReader outputReader = new InputStreamReader(outputInputStream);
FileInputStream errInputStream = new FileInputStream(errFile);
InputStreamReader errReader = new InputStreamReader(errInputStream);
BufferedReader outputStream = new BufferedReader(outputReader);
BufferedReader errorStream = new BufferedReader(errReader)
) {
while (!process.waitFor(1, TimeUnit.SECONDS)) {
tailStream(outputStream);
tailStream(errorStream);
tailStream(outputStream);
tailStream(errorStream);
}
// check for any remaining lines
tailStream(outputStream);
tailStream(errorStream);
outputStream.close();
errorStream.close();
}
}

private static void tailStream(
Expand Down
13 changes: 8 additions & 5 deletions managed/src/main/java/com/yugabyte/yw/common/Util.java
Expand Up @@ -20,6 +20,7 @@
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
Expand Down Expand Up @@ -310,11 +311,13 @@ public static String getFileChecksum(String file) throws IOException, NoSuchAlgo
}

public static List<File> listFiles(Path backupDir, String pattern) throws IOException {
return StreamSupport.stream(
Files.newDirectoryStream(backupDir, pattern).spliterator(), false)
.map(Path::toFile)
.sorted(File::compareTo)
.collect(Collectors.toList());
try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(backupDir, pattern)) {
return StreamSupport.stream(
directoryStream.spliterator(), false)
.map(Path::toFile)
.sorted(File::compareTo)
.collect(Collectors.toList());
}
}

public static void moveFile(Path source, Path destination) throws IOException {
Expand Down
Expand Up @@ -294,31 +294,41 @@ private void syncToRemoteInstance(PlatformInstance remoteInstance) {
replicationHelper.exportPlatformInstances(config, remoteAddr);
}

private void sync() {
public void oneOffSync() {
if (replicationHelper.isBackupScheduleEnabled()) {
this.sync();
}
}

private synchronized void sync() {
HighAvailabilityConfig.list().forEach(config -> {
List<PlatformInstance> remoteInstances = config.getRemoteInstances();
// No point in taking a backup if there is no one to send it to.
if (remoteInstances.isEmpty()) {
LOG.debug("Skipping HA cluster sync...");
try {
List<PlatformInstance> remoteInstances = config.getRemoteInstances();
// No point in taking a backup if there is no one to send it to.
if (remoteInstances.isEmpty()) {
LOG.debug("Skipping HA cluster sync...");

return;
}
return;
}

// Create the platform backup.
if (!this.createBackup()) {
LOG.error("Error creating platform backup");
// Create the platform backup.
if (!this.createBackup()) {
LOG.error("Error creating platform backup");

return;
}
return;
}

// Update local last backup time if creating the backup succeeded.
config.getLocal().updateLastBackup();
// Update local last backup time if creating the backup succeeded.
config.getLocal().updateLastBackup();

// Sync data to remote address.
remoteInstances.forEach(this::syncToRemoteInstance);
// Sync data to remote address.
remoteInstances.forEach(this::syncToRemoteInstance);

// Remove locally created backups since they have already been sent to followers.
cleanupCreatedBackups();
// Remove locally created backups since they have already been sent to followers.
cleanupCreatedBackups();
} catch (Exception e) {
LOG.error("Error running sync for HA config {}", config.getUUID(), e);
}
});
}

Expand Down
Expand Up @@ -108,7 +108,7 @@ public Result deleteHAConfig(UUID configUUID) {
}

// Stop the backup schedule.
replicationManager.stop();
replicationManager.stopAndDisable();
HighAvailabilityConfig.delete(configUUID);

return ok();
Expand Down

0 comments on commit 900a662

Please sign in to comment.