Skip to content

Commit

Permalink
Update shell_package to both read from stdin and write to stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
aseering committed Apr 9, 2013
1 parent 49f1e35 commit ca48045
Show file tree
Hide file tree
Showing 6 changed files with 508 additions and 196 deletions.
15 changes: 9 additions & 6 deletions shell_package/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ THIRD_PARTY_INCLUDE = $(THIRD_PARTY)/include

# Add in your source files below
BUILD_FILES = build/Vertica.o \
$(addprefix build/, Hostname.o Shell.o IntSequence.o)
$(addprefix build/, Hostname.o Shell.o IntSequence.o ProcessLaunchingPlugin.o popen3.o)

# Define the .so name here (and update the references in ddl/install.sql and ddl/uninstall.sql)
PACKAGE_LIBNAME = lib/Shell.so

CXX=g++
CXXFLAGS=-g -D HAVE_LONG_LONG_INT_64 -c -I ../include -Wall -Wno-unused-value -fPIC -I $(VERTICA_SDK_INCLUDE) -I $(THIRD_PARTY_INCLUDE)
CXX?=g++
CXXFLAGS=-g -D HAVE_LONG_LONG_INT_64 -c -I ../include -Wall -Wno-unused-value -fPIC -I $(VERTICA_SDK_INCLUDE) -I $(THIRD_PARTY_INCLUDE) -DNO_SUDO
LDFLAGS=-shared

# add optimization if not a debug build
Expand Down Expand Up @@ -59,13 +59,16 @@ build/Vertica.o: $(VERTICA_SDK_INCLUDE)/Vertica.cpp

# Targets to install and uninstall the library and functions
install: $(PACKAGE_LIBNAME) ddl/install.sql
$(VSQL) -f ddl/install.sql
$(VSQL) -af ddl/install.sql
uninstall: ddl/uninstall.sql
$(VSQL) -f ddl/uninstall.sql
$(VSQL) -af ddl/uninstall.sql

# run examples
test:
$(VSQL) -f examples/removespace.sql
$(MAKE) install
$(VSQL) -af examples/diagnostics.sql
$(VSQL) -af examples/shell.sql
$(MAKE) uninstall

clean:
rm -rf build
Expand Down
163 changes: 163 additions & 0 deletions shell_package/src/ProcessLaunchingPlugin.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */

#include "ProcessLaunchingPlugin.h"
#include "Vertica.h"
#include "popen3.h"
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <errno.h>
#include <fcntl.h>

using namespace Vertica;

ProcessLaunchingPlugin::ProcessLaunchingPlugin(std::string cmd, std::vector<std::string> env) : cmd(cmd), env(env) {}

#ifndef NO_SUDO
#define ProcessLaunchingPluginArgv(...) {"/usr/bin/sudo", "-E", "-u", "nobody", "-n", "--", __VA_ARGS__, NULL}
#else
#define ProcessLaunchingPluginArgv(...) {__VA_ARGS__, NULL}
#endif

void ProcessLaunchingPlugin::setupProcess() {
// Convert std::vector<std::string> to char *const envp[]
std::vector<const char *>cStrArray(env.size()+1, NULL);
for(int i = 0; i < env.size(); i++) {
cStrArray[i] = env[i].c_str();
}

// Open child
char *const argv[] = ProcessLaunchingPluginArgv("/bin/sh", "-c", const_cast<char *const>(cmd.c_str()));
child = popen3(argv[0], argv, const_cast<char **>(&cStrArray[0]), O_NONBLOCK);

// Validate the file handle; make sure we can read from this file
if (child.pid < 0) {
vt_report_error(0, "Error running child process (cmd = %s, code = %d): %s", cmd.c_str(), errno, strerror(errno));
}
}

// Pumps data
// - from input to child process's stdin, and
// - from child process's stdout to output.
StreamState ProcessLaunchingPlugin::pump(DataBuffer &input, InputState input_state, DataBuffer &output) {
if (output.size == output.offset && input.size == input.offset && input_state != END_OF_FILE) {
vt_report_error(0, "Can't read nor write, why poll?");
}

bool stdin_can_accept_data, stdout_has_data, stderr_has_data;
if (ppoll3(child, &stdin_can_accept_data, &stdout_has_data, &stderr_has_data, 10)) {
int err = errno;
vt_report_error(0, "Error while doing poll() (%d): %s)", err, strerror(err));
}

// Buffer states
bool input_buffer_empty = input.offset == input.size;
bool output_buffer_full = output.offset == output.size;

// If upstream is done, close our stdin handle
// so that our wrapped process knows to stop.
// Don't do this if we're currently processing data;
// it's unnecessary and the received-data event
// masks the EOF on some platforms with this implementation.
if (input_state == END_OF_FILE && input_buffer_empty && child.stdin >= 0) {
close(child.stdin);
child.stdin = -1;
}

// Check stderr first since we treat it as fatal
if (stderr_has_data) {
// Use stderr for actual errors
char errbuf[2048];
ssize_t bytes_read = read(child.stderr, errbuf, 2047);
if (bytes_read < 0) {
int err = errno;
if (err == EAGAIN || err == EWOULDBLOCK) {
// Nothing to read, ignore
} else {
vt_report_error(0, "Error while reading stderr of external process (%d): %s", err, strerror(err));
}
} else if (bytes_read > 0) {
errbuf[bytes_read] = '\0';
vt_report_error(0, "External process '%s' reported error: %s", cmd.c_str(), errbuf);
} else if (bytes_read == 0) {
// Child process closed it's stderr
close(child.stderr);
child.stderr = -1;
}
}

if (stdin_can_accept_data && !input_buffer_empty) {
ssize_t bytes_written = write(child.stdin, input.buf + input.offset, input.size - input.offset);
if (bytes_written < 0) {
int err = errno;
vt_report_error(0, "Error while writing to stdin of external process (%d): %s", err, strerror(err));
} else if (bytes_written >= 0) {
input.offset += bytes_written;
}
}

if (stdout_has_data && !output_buffer_full) {
ssize_t bytes_read = read(child.stdout, output.buf + output.offset, output.size - output.offset);
if (bytes_read < 0) {
int err = errno;
if (err == EAGAIN || err == EWOULDBLOCK) {
// Nothing to read, ignore
} else {
vt_report_error(0, "Error while reading stdout of external process (%d): %s", err, strerror(err));
}
} else if (bytes_read > 0) {
output.offset += bytes_read;
} else if (bytes_read == 0) {
// Child process closed it's stdout
close(child.stdout);
child.stdout = -1;
}
}

// Buffer states after reading and writing
input_buffer_empty = input.offset == input.size;
output_buffer_full = output.offset == output.size;

// Return value
if (child.stdin == -1 && child.stdout == -1 && child.stderr == -1) {
checkProcessStatus();
return DONE;
} else {
if (input_state != END_OF_FILE && input_buffer_empty && child.stdin >= 0) {
return INPUT_NEEDED;
} else if (output_buffer_full && child.stdout >= 0) {
return OUTPUT_NEEDED;
} else {
return KEEP_GOING;
}
}
}

void ProcessLaunchingPlugin::destroyProcess() {
pclose3(child);
waitpid(child.pid, NULL, 0);
}

void ProcessLaunchingPlugin::checkProcessStatus() {
int status;
pid_t waitpid_ret = waitpid(child.pid, &status, 0);
if (waitpid_ret == -1) {
int err = errno;
vt_report_error(0, "Error retrieving the termination status of child with PID %d: %s (%d)", child.pid, strerror(err), err);
} else if (waitpid_ret == child.pid) {
// Child terminated, check termination status
if (WIFEXITED(status)) {
if (WEXITSTATUS(status) == 0) {
// Success!
} else {
vt_report_error(0, "Process exited with status %d", WEXITSTATUS(status));
}
} else if (WIFSIGNALED(status)) {
vt_report_error(0, "Process killed by signal %d%s", WTERMSIG(status), WCOREDUMP(status) ? " (core dumped)" : "");
} else {
vt_report_error(0, "Process terminated with unexpected status - 0x%x\n", status);
}
} else {
vt_report_error(0, "Internal error: waitpid returned %d (child pid is %d)", (int)waitpid_ret, child.pid);
}
}
29 changes: 29 additions & 0 deletions shell_package/src/ProcessLaunchingPlugin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */
#ifndef PROCESSLAUNCHINGPLUGIN_H
#define PROCESSLAUNCHINGPLUGIN_H

#include "Vertica.h"
#include "popen3.h"

using namespace Vertica;

//
// Helper class for UD* libraries which launch an
// external process.
//
class ProcessLaunchingPlugin {
public:
ProcessLaunchingPlugin(std::string cmd, std::vector<std::string> env);
void setupProcess();
StreamState pump(DataBuffer &input, InputState input_state, DataBuffer &output);
void destroyProcess();

private:
std::string cmd;
std::vector<std::string> env;
Popen3Proc child;
void checkProcessStatus();

};

#endif
Loading

0 comments on commit ca48045

Please sign in to comment.