Skip to content

Commit

Permalink
[Backport 2.6][#8637][PLAT-1672]: Adding APIs to schedule External us…
Browse files Browse the repository at this point in the history
…er-defined scripts.

Summary:
  - Added APIs to schedule/stop/modify external scripts in Platform
  - Scripts are defined on the universe level and a universe can only have a single external script scheduled.
  - These scripts are stored in RunTimeConfig DB in this phase but will be shifted to a more secured DB in the next phase.
  - These APIs are highly sensitive and can be exploited to affect the Platform, Currently, only ATS will be accessing these APIs.

These APIs send data in Multipart/From data format and this API can be accessed using X-AUTH-TOKEN as well as X-AUTH-YW-API-TOKEN.

Schedule external script:

```
POST /api/v1/customers/:cUUID/universes/:uniUUID/schedule_script

{
	script: FileUpload,
	cornExpression: ${cornExpression}
	scriptParameter: {"param1": "va1l", "param2": "val2"}
        timeLimitMins: ${timeLimitMins}
}

```
example
```
{
	script: FileUpload,
	cronExperssion: 5 * * * *,
	scriptParameter: {"restore_time": "2021-08-08 12:00:00", "platform_version": "2.6.7"}
	timeLimitMins: 4
}
```

Update existing external scheduled script

```
PUT /api/v1/customers/:cUUID/universes/:uniUUID/update_scheduled_script

{
	script: FileUpload,
	cornExpression: ${cornExpression}
	scriptParameter: {"param1": "val1", "param2": "val2"}
        timeLimitMins: ${timeLimitMins}
}
```

stop scheduled external scheduled script

```

PUT /api/v1/customers/:cUUID/universes/:uniUUID/stop_scheduled_script
```

Original commit: D12324 / 717ef62

Test Plan:
Jenkins: rebase: 2.6

  - Added unit tests.
  - Tested on a local setup by running self-generated scripts and an infinitely long-running script

Reviewers: arnav, hkandala, sanketh

Reviewed By: hkandala, sanketh

Subscribers: yugaware, jenkins-bot, hsu

Differential Revision: https://phabricator.dev.yugabyte.com/D12981
  • Loading branch information
vipul-yb committed Sep 21, 2021
1 parent e836362 commit 2220e08
Show file tree
Hide file tree
Showing 15 changed files with 762 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@
import com.yugabyte.yw.models.helpers.DataConverters;
import com.yugabyte.yw.models.helpers.NodeDetails;

import play.api.Play;
import play.libs.Json;

public abstract class AbstractTaskBase implements ITask {

public static final Logger LOG = LoggerFactory.getLogger(AbstractTaskBase.class);

// Number of concurrent tasks to execute at a time.
private static final int TASK_THREADS = 10;
// Number of threads to keep in the pool, even if they are idle
private static final int CORE_POOL_SIZE = 2;

// Maximum number of threads to allow in the pool at a time.
private static final int MAX_POOL_SIZE = 10;

// The maximum time that excess idle threads will wait for new tasks before terminating.
// The unit is specified in the API (and is seconds).
Expand Down Expand Up @@ -94,8 +98,8 @@ public void createThreadpool() {
new ThreadFactoryBuilder().setNameFormat("TaskPool-" + getName() + "-%d").build();
executor =
new ThreadPoolExecutor(
TASK_THREADS,
TASK_THREADS,
CORE_POOL_SIZE,
MAX_POOL_SIZE,
THREAD_ALIVE_TIME,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
Expand Down Expand Up @@ -193,4 +197,13 @@ public void sendNotification() {
true,
null);
}
/**
* Creates task with appropriate dependency injection
*
* @param taskClass task class
* @return Task instance with injected dependencies
*/
public static <T> T createTask(Class<T> taskClass) {
return Play.current().injector().instanceOf(taskClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@

package com.yugabyte.yw.commissioner;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.yugabyte.yw.models.TaskInfo;
import com.yugabyte.yw.models.helpers.TaskType;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -179,6 +185,15 @@ public boolean waitFor() {
// Wait for each future to finish.
String errorString = null;
try {
if (taskInfo.getTaskType() == TaskType.RunExternalScript) {
try {
JsonNode jsonNode = (JsonNode) taskInfo.getTaskDetails();
long timeLimitMins = Long.parseLong(jsonNode.get("timeLimitMins").asText());
future.get(timeLimitMins, TimeUnit.MINUTES);
} catch (TimeoutException e) {
throw new Exception("External Script execution failed as it exceeds timeLimit");
}
}
if (future.get() == null) {
// Task succeeded.
numTasksCompleted.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.yugabyte.yw.commissioner.tasks;

import com.yugabyte.yw.commissioner.AbstractTaskBase;
import com.yugabyte.yw.commissioner.SubTaskGroup;
import com.yugabyte.yw.commissioner.SubTaskGroupQueue;
import com.yugabyte.yw.commissioner.tasks.subtasks.RunExternalScript;

public class ExternalScript extends AbstractTaskBase {

public RunExternalScript.Params params() {
return (RunExternalScript.Params) taskParams;
}

@Override
public void run() {
createThreadpool();
try {
SubTaskGroupQueue subTaskGroupQueue = new SubTaskGroupQueue(userTaskUUID);
RunExternalScript task = createTask(RunExternalScript.class);
task.initialize(params());
SubTaskGroup subTaskGroup = new SubTaskGroup("RunExternalScript", executor);
subTaskGroup.addTask(task);
subTaskGroupQueue.add(subTaskGroup);
subTaskGroupQueue.run();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
executor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package com.yugabyte.yw.commissioner.tasks.subtasks;

import static com.yugabyte.yw.controllers.ScheduleScriptController.PLT_EXT_SCRIPT_CONTENT;
import static com.yugabyte.yw.controllers.ScheduleScriptController.PLT_EXT_SCRIPT_PARAM;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.yugabyte.yw.commissioner.AbstractTaskBase;
import com.yugabyte.yw.common.ShellProcessHandler;
import com.yugabyte.yw.common.ShellResponse;
import com.yugabyte.yw.common.Util;
import com.yugabyte.yw.common.config.impl.RuntimeConfig;
import com.yugabyte.yw.common.config.impl.SettableRuntimeConfigFactory;
import com.yugabyte.yw.forms.AbstractTaskParams;
import com.yugabyte.yw.models.Universe;
import com.yugabyte.yw.models.Users;
import java.nio.charset.Charset;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;

public class RunExternalScript extends AbstractTaskBase {
@Inject private ShellProcessHandler shellProcessHandler;

@Inject private SettableRuntimeConfigFactory sConfigFactory;

@Inject private play.Configuration appConfig;

private static final String TEMP_SCRIPT_FILE_NAME = "tempScript_";
private static final String SCRIPT_DIR = "tmp_external_scripts/";
private static final String SCRIPT_STORE_DIR = "/tmp_external_scripts";

public static class Params extends AbstractTaskParams {
public String platformUrl;
public String timeLimitMins;
public UUID customerUUID;
public UUID universeUUID;
public UUID userUUID;
}

public Params params() {
return (Params) taskParams;
}

@Override
public void run() {
File tempScriptFile = null;
try {
Universe universe = Universe.getOrBadRequest(params().universeUUID);
RuntimeConfig<Universe> config = sConfigFactory.forUniverse(universe);

List<String> keys = Arrays.asList(PLT_EXT_SCRIPT_CONTENT, PLT_EXT_SCRIPT_PARAM);
Map<String, String> configKeysMap = null;
try {
// Extracting the set of keys in synchronized way as they are interconnected and During the
// scheduled script update the task should not extract partially updated multi keys.
configKeysMap = Util.getLockedMultiKeyConfig(config, keys);
} catch (Exception e) {
throw new RuntimeException(
"External Script Task failed as the schedule is stopped and this is a old task");
}

// Create a temporary file to store script and make it executable.
String devopsHome = appConfig.getString("yb.devops.home");
File directory = new File(devopsHome + SCRIPT_STORE_DIR);
if (!directory.exists()) {
directory.mkdir();
}
tempScriptFile =
File.createTempFile(
TEMP_SCRIPT_FILE_NAME + params().universeUUID.toString(), ".py", directory);

FileOutputStream file = new FileOutputStream(tempScriptFile.getAbsoluteFile());
OutputStreamWriter output = new OutputStreamWriter(file, Charset.forName("UTF-8"));
output.write(configKeysMap.get(PLT_EXT_SCRIPT_CONTENT));
output.close();
tempScriptFile.setExecutable(true);

// Add the commands to the script.
List<String> commandList = new ArrayList<>();
commandList.add(SCRIPT_DIR + tempScriptFile.getName());
commandList.add("--universe_name");
commandList.add(universe.name);
commandList.add("--universe_uuid");
commandList.add(params().universeUUID.toString());
commandList.add("--platform_url");
commandList.add(params().platformUrl);
commandList.add("--auth_token");
commandList.add(Users.getOrBadRequest(params().userUUID).createAuthToken());

String scriptParam = configKeysMap.get(PLT_EXT_SCRIPT_PARAM);
if (!StringUtils.isEmpty(scriptParam)) {
final ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(scriptParam);
Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
String fieldName = field.getKey();
String fieldValue = field.getValue().asText();
if (!StringUtils.isEmpty(fieldName) && !StringUtils.isEmpty(fieldValue)) {
commandList.add("--" + fieldName);
commandList.add(fieldValue);
}
}
}

String description = String.join(" ", commandList);

// Execute the command.
ShellResponse shellResponse =
shellProcessHandler.run(commandList, new HashMap<>(), description);
processShellResponse(shellResponse);
} catch (Exception e) {
LOG.error("Error executing task {}, error='{}'", getName(), e.getMessage(), e);
throw new RuntimeException(e);
} finally {
// Delete temporary file if exists.
if (tempScriptFile != null && tempScriptFile.exists()) {
tempScriptFile.delete();
}
}
LOG.info("Finished {} task.", getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public ShellResponse run(
File tempOutputFile = null;
File tempErrorFile = null;
long startMs = 0;
Process process = null;
try {
tempOutputFile = File.createTempFile("shell_process_out", "tmp");
tempErrorFile = File.createTempFile("shell_process_err", "tmp");
Expand All @@ -83,7 +84,8 @@ public ShellResponse run(
tempOutputFile.getAbsolutePath(),
tempErrorFile.getAbsolutePath());

Process process = pb.start();
process = pb.start();

waitForProcessExit(process, tempOutputFile, tempErrorFile);
try (FileInputStream outputInputStream = new FileInputStream(tempOutputFile);
InputStreamReader outputReader = new InputStreamReader(outputInputStream);
Expand All @@ -104,6 +106,10 @@ public ShellResponse run(
response.code = -1;
LOG.error("Exception running command", e);
response.message = e.getMessage();
// Send a kill signal to ensure process is cleaned up in case of any failure.
if (process != null && process.isAlive()) {
process.destroyForcibly();
}
} finally {
if (startMs > 0) {
response.durationMs = System.currentTimeMillis() - startMs;
Expand Down
32 changes: 32 additions & 0 deletions managed/src/main/java/com/yugabyte/yw/common/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.yugabyte.yw.common.config.impl.RuntimeConfig;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams.Cluster;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams.ClusterType;
Expand Down Expand Up @@ -379,4 +380,35 @@ public static int compareYbVersions(String v1, String v2) {

throw new RuntimeException("Unable to parse YB version strings");
}

// This will help us in insertion of set of keys in locked synchronized way as no
// extraction/deletion action should be performed on RunTimeConfig object during the process.
public static synchronized void setLockedMultiKeyConfig(
RuntimeConfig<Universe> config, Map<String, String> configKeysMap) {
configKeysMap.forEach(
(key, value) -> {
config.setValue(key, value);
});
}

// This will help us in extraction of set of keys in locked synchronized way as no
// insertion/deletion action should be performed on RunTimeConfig object during the process.
public static synchronized Map<String, String> getLockedMultiKeyConfig(
RuntimeConfig<Universe> config, List<String> configKeys) {
Map<String, String> configKeysMap = new HashMap<>();
configKeys.forEach((key) -> configKeysMap.put(key, config.getString(key)));
return configKeysMap;
}

// This will help us in deletion of set of keys in locked synchronized way as no
// insertion/extraction action should be performed on RunTimeConfig object during the process.
public static synchronized void deleteLockedMultiKeyConfig(
RuntimeConfig<Universe> config, List<String> configKeys) {
configKeys.forEach(
(key) -> {
if (config.hasPath(key)) {
config.deleteEntry(key);
}
});
}
}
Loading

0 comments on commit 2220e08

Please sign in to comment.