Skip to content

Commit

Permalink
ProcessManager: redirect process output to a temp file, rename at the…
Browse files Browse the repository at this point in the history
… end
  • Loading branch information
marta-lokhova committed Aug 14, 2019
1 parent 105954c commit c056dbf
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 15 deletions.
5 changes: 4 additions & 1 deletion src/main/ApplicationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ ApplicationImpl::initialize(bool createNewDB)
mHistoryManager = HistoryManager::create(*this);
mInvariantManager = createInvariantManager();
mMaintainer = std::make_unique<Maintainer>(*this);
mProcessManager = ProcessManager::create(*this);
mCommandHandler = std::make_unique<CommandHandler>(*this);
mWorkScheduler = WorkScheduler::create(*this);
mBanManager = BanManager::create(*this);
Expand All @@ -153,6 +152,10 @@ ApplicationImpl::initialize(bool createNewDB)
upgradeDB();
}

// Subtle: process manager should come to existence _after_ BucketManager
// initialization and newDB run, as it relies on tmp dir created in the
// constructor
mProcessManager = ProcessManager::create(*this);
LOG(DEBUG) << "Application constructed";
}

Expand Down
76 changes: 65 additions & 11 deletions src/process/ProcessManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
#include "process/PosixSpawnFileActions.h"
#include "process/ProcessManager.h"
#include "process/ProcessManagerImpl.h"
#include "util/Fs.h"
#include "util/Logging.h"
#include "util/Timer.h"
#include "util/format.h"

#include <algorithm>
#include <functional>
Expand Down Expand Up @@ -55,8 +57,9 @@ class ProcessExitEvent::Impl
public:
std::shared_ptr<RealTimer> mOuterTimer;
std::shared_ptr<asio::error_code> mOuterEc;
std::string mCmdLine;
std::string mOutFile;
std::string const mCmdLine;
std::string const mOutFile;
std::string const mTempFile;
bool mRunning{false};
#ifdef _WIN32
asio::windows::object_handle mProcessHandle;
Expand All @@ -67,17 +70,42 @@ class ProcessExitEvent::Impl
Impl(std::shared_ptr<RealTimer> const& outerTimer,
std::shared_ptr<asio::error_code> const& outerEc,
std::string const& cmdLine, std::string const& outFile,
std::weak_ptr<ProcessManagerImpl> pm)
std::string const& tempFile, std::weak_ptr<ProcessManagerImpl> pm)
: mOuterTimer(outerTimer)
, mOuterEc(outerEc)
, mCmdLine(cmdLine)
, mOutFile(outFile)
, mTempFile(tempFile)
#ifdef _WIN32
, mProcessHandle(outerTimer->get_executor())
#endif
, mProcManagerImpl(pm)
{
}

bool
finish()
{
if (!mOutFile.empty())
{
if (fs::exists(mOutFile))
{
CLOG(WARNING, "Process")
<< "Outfile already exists: " << mOutFile;
return false;
}
else if (mRunning &&
std::rename(mTempFile.c_str(), mOutFile.c_str()))
{
CLOG(ERROR, "Process")
<< fmt::format("{} -> {} rename failed, error: {}",
mTempFile, mOutFile, errno);
return false;
}
}
return true;
}

void run();
void
cancel(asio::error_code const& ec)
Expand Down Expand Up @@ -324,7 +352,7 @@ ProcessExitEvent::Impl::run()

si.dwFlags = STARTF_USESTDHANDLES;
si.hStdOutput =
CreateFile((LPCTSTR)mOutFile.c_str(), // name of the file
CreateFile((LPCTSTR)mTempFile.c_str(), // name of the file
GENERIC_WRITE, // open for writing
FILE_SHARE_WRITE | FILE_SHARE_READ, // share r/w access
&sa, // security attributes
Expand Down Expand Up @@ -399,17 +427,24 @@ ProcessExitEvent::Impl::run()
}
ec = asio::error_code(exitCode, asio::system_category());
}
manager->handleProcessTermination(sf->mProcessId, ec.value());
ec = manager->handleProcessTermination(sf->mProcessId, ec.value());
sf->cancel(ec);
});
mRunning = true;
}

void
asio::error_code
ProcessManagerImpl::handleProcessTermination(int pid, int /*status*/)
{
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
auto ec = asio::error_code();
auto process = mProcesses.find(pid);
if (process != mProcesses.end() && !process->second->mImpl->finish())
{
ec = asio::error_code(asio::error::try_again, asio::system_category());
}
mProcesses.erase(pid);
return ec;
}

bool
Expand Down Expand Up @@ -450,6 +485,8 @@ ProcessManagerImpl::ProcessManagerImpl(Application& app)
: mMaxProcesses(app.getConfig().MAX_CONCURRENT_SUBPROCESSES)
, mIOContext(app.getClock().getIOContext())
, mSigChild(mIOContext, SIGCHLD)
, mTmpDir(
std::make_unique<TmpDir>(app.getTmpDirManager().tmpDir("process")))
{
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
startSignalWait();
Expand Down Expand Up @@ -500,19 +537,20 @@ ProcessManagerImpl::handleSignalWait()
startSignalWait();
}

void
asio::error_code
ProcessManagerImpl::handleProcessTermination(int pid, int status)
{
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
auto ec = asio::error_code();

auto pair = mProcesses.find(pid);
if (pair == mProcesses.end())
{
CLOG(DEBUG, "Process") << "failed to find process with pid " << pid;
return;
return ec;
}
auto impl = pair->second->mImpl;

asio::error_code ec;
if (WIFEXITED(status))
{
if (WEXITSTATUS(status) == 0)
Expand Down Expand Up @@ -564,6 +602,11 @@ ProcessManagerImpl::handleProcessTermination(int pid, int status)
ec = asio::error_code(1, asio::system_category());
}

if (!impl->finish())
{
ec = asio::error_code(asio::error::try_again, asio::system_category());
}

--gNumProcessesActive;
mProcesses.erase(pair);

Expand All @@ -572,6 +615,7 @@ ProcessManagerImpl::handleProcessTermination(int pid, int status)
maybeRunPendingProcesses();

impl->cancel(ec);
return ec;
}

bool
Expand Down Expand Up @@ -632,7 +676,7 @@ ProcessExitEvent::Impl::run()
PosixSpawnFileActions fileActions;
if (!mOutFile.empty())
{
fileActions.addOpen(1, mOutFile, O_RDWR | O_CREAT, 0600);
fileActions.addOpen(1, mTempFile, O_RDWR | O_CREAT, 0600);
}
// Iterate through all possibly open file descriptors except stdin, stdout,
// and stderr and set FD_CLOEXEC so the subprocess doesn't inherit them
Expand Down Expand Up @@ -681,8 +725,12 @@ ProcessManagerImpl::runProcess(std::string const& cmdLine, std::string outFile)

std::weak_ptr<ProcessManagerImpl> weakSelf(
std::static_pointer_cast<ProcessManagerImpl>(shared_from_this()));

auto tempFile =
mTmpDir->getName() + "/temp-" + std::to_string(mTempFileCount++);

pe->mImpl = std::make_shared<ProcessExitEvent::Impl>(
pe->mTimer, pe->mEc, cmdLine, outFile, weakSelf);
pe->mTimer, pe->mEc, cmdLine, outFile, tempFile, weakSelf);
mPending.push_back(pe);

maybeRunPendingProcesses();
Expand All @@ -705,6 +753,12 @@ ProcessManagerImpl::maybeRunPendingProcesses()
{
CLOG(DEBUG, "Process") << "Running: " << i->mImpl->mCmdLine;

if (fs::exists(i->mImpl->mOutFile))
{
throw std::runtime_error(fmt::format(
"output file {} already exists", i->mImpl->mOutFile));
}

i->mImpl->run();
mProcesses[i->mImpl->getProcessId()] = i;
++gNumProcessesActive;
Expand Down
9 changes: 6 additions & 3 deletions src/process/ProcessManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "process/ProcessManager.h"
#include "util/TmpDir.h"
#include <atomic>
#include <deque>
#include <mutex>
Expand All @@ -27,16 +28,18 @@ class ProcessManagerImpl : public ProcessManager
bool mIsShutdown{false};
size_t mMaxProcesses;
asio::io_context& mIOContext;
// These are only used on POSIX, but they're harmless here.
asio::signal_set mSigChild;
std::unique_ptr<TmpDir> mTmpDir;
uint64_t mTempFileCount{0};

std::deque<std::shared_ptr<ProcessExitEvent>> mPending;
std::deque<std::shared_ptr<ProcessExitEvent>> mKillable;
void maybeRunPendingProcesses();

// These are only used on POSIX, but they're harmless here.
asio::signal_set mSigChild;
void startSignalWait();
void handleSignalWait();
void handleProcessTermination(int pid, int status);
asio::error_code handleProcessTermination(int pid, int status);
bool cleanShutdown(ProcessExitEvent& pe);
bool forceShutdown(ProcessExitEvent& pe);

Expand Down

5 comments on commit c056dbf

@latobarita
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from MonsieurNicolas
at marta-lokhova@c056dbf

@latobarita
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging marta-lokhova/stellar-core/avoid_gzip_race_on_publish = c056dbf into auto

@latobarita
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

marta-lokhova/stellar-core/avoid_gzip_race_on_publish = c056dbf merged ok, testing candidate = 5e26a68

@latobarita
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@latobarita
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fast-forwarding master to auto = 5e26a68

Please sign in to comment.