Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Moved project from code.google.com and ported from autotools to rebar.

  • Loading branch information...
commit 958861ee251a8b57d66e85864dbe3cfbc782e365 1 parent e5ae5aa
@saleyn authored
View
6 AUTHORS
@@ -0,0 +1,6 @@
+Authors:
+ Serge Aleynikov <saleyn@gmail.com>
+
+Contributors:
+ Dmitry Kargapolov <dmitriy.kargapolov@gmail.com>
+ - Autotools support
View
30 LICENSE
@@ -0,0 +1,30 @@
+BSD LICENSE
+===========
+
+Copyright (C) 2003 Serge Aleynikov <saleyn@gmail.com>
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ 1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+ 2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in
+ the documentation and/or other materials provided with the distribution.
+
+ 3. The names of the authors may not be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
+INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JCRAFT,
+INC. OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
View
19 Makefile
@@ -0,0 +1,19 @@
+# See LICENSE for licensing information.
+
+PROJECT = erlexec
+
+DIALYZER = dialyzer
+REBAR = rebar
+
+all:
+ @$(REBAR) compile
+
+clean:
+ @$(REBAR) clean
+
+docs: all clean-docs
+ @$(REBAR) doc skip_deps=true
+
+clean-docs:
+ rm -f doc/*.{css,html,png} doc/edoc-info
+
View
12 README
@@ -0,0 +1,12 @@
+erlexec
+=======
+
+Execute and control OS processes from Erlang/OTP.
+
+This project implements a C++ port program and Erlang application that gives light-weight
+Erlang processes fine-grain control over execution of OS processes.
+
+It makes possible for an Erlang process to start, stop an OS process, send POSIX signals, know
+process IDs of the started OS process, set up a monitor and/or link to it. This application
+provides better control over OS processes than built-in erlang:open_port/2 command with a
+{spawn, Command} option, and performs proper OS child process cleanup when the emulator exits.
View
4 README.md
@@ -1,4 +0,0 @@
-erlexec
-=======
-
-Execute and control OS processes from Erlang/OTP
View
354 c_src/ei++.cpp
@@ -0,0 +1,354 @@
+#include <unistd.h>
+#include <fcntl.h>
+#include <sstream>
+#include <iomanip>
+#include "ei++.h"
+
+using namespace ei;
+
+//-----------------------------------------------------------------------------
+bool ei::dump ( const char* header, std::ostream& os, const ei_x_buff& buf, bool condition )
+{
+ if ( !condition ) os << (header ? header : "") << buf;
+ return condition;
+}
+
+//-----------------------------------------------------------------------------
+std::ostream& ei::dump (std::ostream& os, const unsigned char* buf, int n, bool eol)
+{
+ std::stringstream s;
+ for(int i=0; i < n; i++)
+ s << (i == 0 ? "<<" : ",") << (int) (buf[i]);
+ s << (n == 0 ? "<<>>" : ">>");
+ if (eol) s << std::endl;
+ return os << s.str();
+}
+
+//-----------------------------------------------------------------------------
+std::ostream& ei::operator<< (std::ostream& os, const ei_x_buff& buf)
+{
+ return dump(os, (const unsigned char*)buf.buff, buf.index);
+}
+
+//-----------------------------------------------------------------------------
+int ei::stringIndex(const char** cmds, const std::string& cmd, int firstIdx, int size)
+{
+ for (int i=firstIdx; cmds != NULL && i < size; i++, cmds++)
+ if (cmd == *cmds)
+ return i;
+ return firstIdx-1;
+}
+
+//-----------------------------------------------------------------------------
+std::ostream& ei::Serializer::dump (std::ostream& os, bool outWriteBuffer)
+{
+ if (outWriteBuffer) {
+ size_t len = m_wbuf.read_header();
+ if (!len) len = m_wIdx;
+ os << "--Erl-<-C--[" << std::setw(len < 10000 ? 4 : 9) << len << "]: ";
+ return ::dump(os, (const unsigned char*)&m_wbuf, len, false) << "\r\n";
+ } else {
+ size_t len = m_rbuf.read_header();
+ os << "--Erl->-C--[" << std::setw(len < 10000 ? 4 : 9) << len << "]: ";
+ return ::dump(os, (const unsigned char*)&m_rbuf, len, false) << "\r\n";
+ }
+}
+
+//-----------------------------------------------------------------------------
+int ei::Serializer::print (std::ostream& os, const std::string& header)
+{
+ char* s = NULL;
+ int idx = 0;
+ if (ei_s_print_term(&s, &m_rbuf, &idx) < 0)
+ return -1;
+ if (!header.empty())
+ os << header << s << std::endl;
+ else
+ os << s << std::endl;
+
+ if (s)
+ free(s);
+
+ return 0;
+}
+
+//-----------------------------------------------------------------------------
+TimeVal ei::operator- (const TimeVal& t1, const TimeVal& t2) {
+ TimeVal t = t1; t -= t2;
+ return t;
+}
+
+//-----------------------------------------------------------------------------
+TimeVal ei::operator+ (const TimeVal& t1, const TimeVal& t2) {
+ TimeVal t = t1; t += t2;
+ return t;
+}
+
+//-----------------------------------------------------------------------------
+TimeVal::TimeVal(TimeType tp, int _s, int _us)
+{
+ switch (tp) {
+ case NOW:
+ gettimeofday(&m_tv, NULL);
+ break;
+ case RELATIVE:
+ new (this) TimeVal();
+ }
+ if (_s != 0 || _us != 0) add(_s, _us);
+}
+
+//-----------------------------------------------------------------------------
+int Serializer::set_handles(int in, int out, bool non_blocking)
+{
+ m_fin = in;
+ m_fout = out;
+ if (non_blocking) {
+ return fcntl(m_fin, F_SETFL, fcntl(m_fin, F_GETFL) | O_NONBLOCK)
+ || fcntl(m_fout, F_SETFL, fcntl(m_fout, F_GETFL) | O_NONBLOCK);
+ } else
+ return 0;
+}
+
+//-----------------------------------------------------------------------------
+int Serializer::read()
+{
+ if (m_readPacketSz == 0) {
+ int size = m_rbuf.headerSize();
+ if (read_exact(m_fin, &m_rbuf.c_str()[-size], size, m_readOffset) < size)
+ return -1;
+
+ m_readPacketSz = m_rbuf.read_header();
+ m_readOffset = 0;
+
+ if (m_debug)
+ std::cerr << "Serializer::read() - message size: " << m_readPacketSz << std::endl;
+
+ if (!m_rbuf.resize(m_readPacketSz))
+ return -2;
+ }
+
+ int total = m_readPacketSz - m_readOffset;
+ if (read_exact(m_fin, &m_rbuf, m_readPacketSz, m_readOffset) < total)
+ return -3;
+
+ m_rIdx = 0;
+
+ if (m_debug)
+ dump(std::cerr, false);
+
+ int len = m_readPacketSz;
+ m_readOffset = m_readPacketSz = 0;
+
+ /* Ensure that we are receiving the binary term by reading and
+ * stripping the version byte */
+ int version;
+ if (ei_decode_version(&m_rbuf, &m_rIdx, &version))
+ return -4;
+
+ return len;
+}
+
+//-----------------------------------------------------------------------------
+int Serializer::write()
+{
+ if (m_writePacketSz == 0) {
+ m_wbuf.write_header(static_cast<size_t>(m_wIdx));
+ if (m_debug)
+ dump(std::cerr, true);
+
+ m_writePacketSz = m_wIdx+m_wbuf.headerSize();
+ m_writeOffset = 0;
+ }
+
+ int total = m_writePacketSz - m_writeOffset;
+ if (write_exact(m_fout, m_wbuf.header(), m_writePacketSz, m_writeOffset) < total)
+ return -1;
+
+ int len = m_writePacketSz;
+ m_writeOffset = m_writePacketSz = 0;
+
+ return len;
+}
+
+//-----------------------------------------------------------------------------
+int Serializer::read_exact(int fd, char *buf, size_t len, size_t& got)
+{
+ int i;
+
+ while (got < len) {
+ int size = len-got;
+ while ((i = ::read(fd, (void*)(buf+got), size)) < size && errno == EINTR)
+ if (i > 0)
+ got += i;
+
+ if (i <= 0)
+ return i;
+ got += i;
+ }
+
+ return len;
+}
+
+//-----------------------------------------------------------------------------
+int Serializer::write_exact(int fd, const char *buf, size_t len, size_t& wrote)
+{
+ int i;
+
+ while (wrote < len) {
+ int size = len-wrote;
+ while ((i = ::write(fd, buf+wrote, size)) < size && errno == EINTR)
+ if (i > 0)
+ wrote += i;
+
+ if (i <= 0)
+ return i;
+ wrote += i;
+ }
+
+ return wrote;
+}
+
+
+#define get8(s) ((s) += 1, ((unsigned char *)(s))[-1] & 0xff)
+#define put8(s,n) do { (s)[0] = (char)((n) & 0xff); (s) += 1; } while (0)
+
+#define put64be(s,n) do { \
+ (s)[0] = ((n) >> 56) & 0xff; \
+ (s)[1] = ((n) >> 48) & 0xff; \
+ (s)[2] = ((n) >> 40) & 0xff; \
+ (s)[3] = ((n) >> 32) & 0xff; \
+ (s)[4] = ((n) >> 24) & 0xff; \
+ (s)[5] = ((n) >> 16) & 0xff; \
+ (s)[6] = ((n) >> 8) & 0xff; \
+ (s)[7] = (n) & 0xff; \
+ (s) += 8; \
+ } while (0)
+
+#define get64be(s) \
+ ((s) += 8, \
+ (((unsigned long long)((unsigned char *)(s))[-8] << 56) | \
+ ((unsigned long long)((unsigned char *)(s))[-7] << 48) | \
+ ((unsigned long long)((unsigned char *)(s))[-6] << 40) | \
+ ((unsigned long long)((unsigned char *)(s))[-5] << 32) | \
+ ((unsigned long long)((unsigned char *)(s))[-4] << 24) | \
+ ((unsigned long long)((unsigned char *)(s))[-3] << 16) | \
+ ((unsigned long long)((unsigned char *)(s))[-2] << 8) | \
+ (unsigned long long)((unsigned char *)(s))[-1]))
+
+int Serializer::ei_decode_double(const char *buf, int *index, double *p)
+{
+ const char *s = buf + *index;
+ const char *s0 = s;
+ double f;
+
+ switch (get8(s)) {
+ case ERL_FLOAT_EXT:
+ if (sscanf(s, "%lf", &f) != 1) return -1;
+ s += 31;
+ break;
+ case NEW_FLOAT_EXT: {
+ // IEEE 754 decoder
+ const unsigned int bits = 64;
+ const unsigned int expbits = 11;
+ const unsigned int significantbits = bits - expbits - 1; // -1 for sign bit
+ unsigned long long i = get64be(s);
+ long long shift;
+ unsigned bias;
+
+ if (!p)
+ break;
+ else if (i == 0)
+ f = 0.0;
+ else {
+ // get the significant
+ f = (i & ((1LL << significantbits)-1)); // mask
+ f /= (1LL << significantbits); // convert back to float
+ f += 1.0f; // add the one back on
+
+ // get the exponent
+ bias = (1 << (expbits-1)) - 1;
+ shift = ((i >> significantbits) & ((1LL << expbits)-1)) - bias;
+ while (shift > 0) { f *= 2.0; shift--; }
+ while (shift < 0) { f /= 2.0; shift++; }
+
+ // signness
+ f *= (i >> (bits-1)) & 1 ? -1.0: 1.0;
+ }
+ break;
+ }
+ default:
+ return -1;
+ }
+
+ if (p) *p = f;
+ *index += s-s0;
+ return 0;
+}
+
+int Serializer::ei_encode_double(char *buf, int *index, double p)
+{
+ char *s = buf + *index;
+ char *s0 = s;
+
+ if (!buf)
+ s = s+9;
+ else { /* use IEEE 754 format */
+ const unsigned int bits = 64;
+ const unsigned int expbits = 11;
+ const unsigned int significantbits = bits - expbits - 1; // -1 for sign bit
+ long long sign, exp, significant;
+ long double norm;
+ int shift;
+
+ put8(s, NEW_FLOAT_EXT);
+ memset(s, 0, 8);
+
+ if (p == 0.0)
+ s += 8;
+ else {
+ // check sign and begin normalization
+ if (p < 0) { sign = 1; norm = -p; }
+ else { sign = 0; norm = p; }
+
+ // get the normalized form of p and track the exponent
+ shift = 0;
+ while(norm >= 2.0) { norm /= 2.0; shift++; }
+ while(norm < 1.0) { norm *= 2.0; shift--; }
+ norm = norm - 1.0;
+
+ // calculate the binary form (non-float) of the significant data
+ significant = (long long) ( norm * ((1LL << significantbits) + 0.5f) );
+
+ // get the biased exponent
+ exp = shift + ((1 << (expbits-1)) - 1); // shift + bias
+
+ // get the final answer
+ exp = (sign << (bits-1)) | (exp << (bits-expbits-1)) | significant;
+ put64be(s, exp);
+ }
+ }
+
+ *index += s-s0;
+ return 0;
+}
+
+int x_fix_buff(ei_x_buff* x, int szneeded)
+{
+ int sz = szneeded + 100;
+ if (sz > x->buffsz) {
+ sz += 100; /* to avoid reallocating each and every time */
+ x->buffsz = sz;
+ x->buff = (char*)realloc(x->buff, sz);
+ }
+ return x->buff != NULL;
+}
+
+int Serializer::ei_x_encode_double(ei_x_buff* x, double dbl)
+{
+ int i = x->index;
+ ei_encode_double(NULL, &i, dbl);
+ if (!x_fix_buff(x, i))
+ return -1;
+ return ei_encode_double(x->buff, &x->index, dbl);
+}
+
View
563 c_src/ei++.h
@@ -0,0 +1,563 @@
+/*
+ ei++.h
+
+ Author: Serge Aleynikov
+ Created: 2003/07/10
+
+ Description:
+ ============
+ C++ wrapper around C ei library distributed with Erlang.
+
+ LICENSE:
+ ========
+ Copyright (C) 2003 Serge Aleynikov <saleyn@gmail.com>
+
+ All rights reserved.
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+
+ 1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+ 2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in
+ the documentation and/or other materials provided with the distribution.
+
+ 3. The names of the authors may not be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+ THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
+ INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JCRAFT,
+ INC. OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT,
+ INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifndef _EMARSHAL_H_
+#define _EMARSHAL_H_
+
+#include <ei.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <string>
+#include <algorithm>
+#include <iostream>
+#include <sys/time.h>
+#include <limits.h>
+#include <assert.h>
+
+#define NEW_FLOAT_EXT 'F'
+
+namespace ei {
+ typedef unsigned char byte;
+
+ /// Looks up a <cmd> in the <cmds> array.
+ /// @param <cmds> an array that either ends with a NULL element or has <size> number of elements.
+ /// @param <cmd> string to find.
+ /// @param <firstIdx> the mapping of the first element in the array. If != 0, then the return
+ /// value will be based on this starting index.
+ /// @param <size> optional size of the <cmds> array.
+ /// @return an offset <cmd> in the cmds array starting with <firstIdx> value. On failure
+ /// returns <firstIdx>-1.
+ int stringIndex(const char** cmds, const std::string& cmd, int firstIdx = 0, int size = INT_MAX);
+
+ /// Class for stack-based (and on-demand heap based) memory allocation
+ /// of string buffers. It's very efficient for strings not exceeding <N>
+ /// bytes as it doesn't allocate heap memory.
+ template < int N, class Allocator = std::allocator<char> >
+ class StringBuffer
+ {
+ char m_buff[N];
+ char* m_buffer;
+ size_t m_size;
+ int m_minAlloc;
+ int m_headerSize;
+ size_t m_maxMsgSize;
+ Allocator m_alloc; // allocator to use
+
+ char* base() { return m_buffer + m_headerSize; }
+ const char* base() const { return m_buffer + m_headerSize; }
+ char* write( int pos, const char* fmt, va_list vargs ) {
+ char s[512];
+ vsnprintf(s, sizeof(s), fmt, vargs);
+ return copy( s, pos );
+ }
+
+ public:
+ enum { DEF_QUANTUM = 512 };
+
+ StringBuffer(int _headerSz = 0, int _quantumSz = DEF_QUANTUM)
+ : m_buffer(m_buff), m_size(N), m_minAlloc(_quantumSz)
+ { m_buff[0] = '\0'; packetHeaderSize(_headerSz); }
+
+ StringBuffer( const char (&s)[N] )
+ : m_buffer(m_buff), m_size(N), m_minAlloc(DEF_QUANTUM), m_headerSize(0), m_maxMsgSize(0)
+ { copy(s); }
+
+ StringBuffer( const std::string& s)
+ : m_buffer(m_buff), m_size(N), m_minAlloc(DEF_QUANTUM), m_headerSize(0), m_maxMsgSize(0)
+ { copy(s.c_str()); }
+
+ ~StringBuffer() { reset(); }
+
+ /// Buffer allocation quantum
+ int quantum() const { return m_minAlloc; }
+ void quantum(int n) { m_minAlloc = n; }
+ /// Defines a prefix space in the buffer used for encoding packet size.
+ int packetHeaderSize() { return m_headerSize; }
+ void packetHeaderSize(size_t sz) {
+ assert(sz == 0 || sz == 1 || sz == 2 || sz == 4);
+ m_headerSize = sz;
+ m_maxMsgSize = (1u << (8*m_headerSize)) - 1;
+ }
+ /// Does the buffer have memory allocated on heap?
+ bool allocated() const { return m_buffer != m_buff; }
+ size_t capacity() const { return m_size - m_headerSize; }
+ size_t length() const { return strlen(base()); }
+ void clear() { m_buffer[m_headerSize] = '\0'; }
+ /// Free heap allocated memory and shrink buffer to original statically allocated size.
+ void reset() { if (allocated()) delete [] m_buffer; m_buffer = m_buff; clear(); }
+ /// Pointer to a mutable char string of size <capacity()>.
+ const char* c_str() const { return base(); }
+ char* c_str() { return base(); }
+ char* append( const char* s ) { return copy( s, length() ); }
+ char* append( const std::string& s ) { return copy( s.c_str(), length() ); }
+ char* append( const char* fmt, ... ) {
+ va_list vargs;
+ va_start (vargs, fmt);
+ char* ret = write(length(), fmt, vargs);
+ va_end (vargs);
+ return ret;
+ }
+
+ char* operator[] ( int i ) { assert( i < (m_size-m_headerSize) ); return base()[i]; }
+ char* operator() () { return base(); }
+ char* operator& () { return base(); }
+ const char* operator& () const { return base(); }
+ bool operator== ( const char* rhs ) const { return strncmp(base(), rhs, m_size) == 0; }
+ bool operator== ( const std::string& rhs ) const { return operator== ( rhs.c_str() ); }
+ bool operator!= ( const std::string& rhs ) const { return !operator== ( rhs.c_str() ); }
+ bool operator!= ( const char* rhs ) const { return !operator== ( rhs ); }
+
+ size_t headerSize() const { return m_headerSize; }
+ const char* header() { return m_buffer; }
+
+ size_t read_header() {
+ size_t sz = (byte)m_buffer[m_headerSize-1];
+ for(int i=m_headerSize-2; i >= 0; i--)
+ sz |= (byte)m_buffer[i] << (8*(m_headerSize-i-1));
+ return sz;
+ }
+
+ int write_header(size_t sz) {
+ if (sz > m_maxMsgSize)
+ return -1;
+ byte b[4] = { (sz >> 24) & 0xff, (sz >> 16) & 0xff, (sz >> 8) & 0xff, sz & 0xff };
+ memcpy(m_buffer, b + 4 - m_headerSize, m_headerSize);
+ return 0;
+ }
+
+ char* write( const char* fmt, ... ) {
+ va_list vargs;
+ va_start (vargs, fmt);
+ char* ret = write(0, fmt, vargs);
+ va_end (vargs);
+ return ret;
+ }
+
+ char* copy( const char* s, size_t pos=0 )
+ {
+ if ( resize( strlen(s) + pos + 1, pos != 0 ) == NULL )
+ return NULL;
+ assert( pos < m_size );
+ strcpy( base() + pos, s );
+ return base();
+ }
+ char* copy( const std::string& s, size_t pos=0 )
+ {
+ if ( resize( length(s) + pos + 1, pos != 0 ) == NULL )
+ return NULL;
+ assert( pos < m_size );
+ strcpy( base() + pos, s.c_str() );
+ return base();
+ }
+
+ char* copy( const char* s, size_t pos, size_t len)
+ {
+ assert( pos > 0 && len > 0 && (pos+len) < m_size );
+ if ( resize( len + pos + 1, pos != 0 ) == NULL )
+ return NULL;
+ memcpy( base() + pos, s, len );
+ return base();
+ }
+
+ char* resize( size_t size, bool reallocate = false )
+ {
+ char* old = m_buffer;
+ const size_t old_sz = m_size;
+ const size_t new_sz = size + m_headerSize;
+
+ if ( new_sz <= m_size ) {
+ return m_buffer;
+ } else
+ m_size = std::max((const size_t)m_size + m_headerSize + m_minAlloc, new_sz);
+
+ if ( (m_buffer = m_alloc.allocate(m_size)) == NULL ) {
+ m_buffer = old;
+ m_size = old_sz;
+ return (char*) NULL;
+ }
+ //fprintf(stderr, "Allocated: x1 = %p, x2=%p (m_size=%d)\r\n", m_buffer, m_buff, m_size);
+ if ( reallocate && old != m_buffer )
+ memcpy(m_buffer, old, old_sz);
+ if ( old != m_buff ) {
+ m_alloc.deallocate(old, old_sz);
+ }
+ return base();
+ }
+
+ };
+
+ template<int N> std::ostream& operator<< ( std::ostream& os, StringBuffer<N>& buf ) {
+ return os << buf.c_str();
+ }
+
+ template<int N> StringBuffer<N>& operator<< ( StringBuffer<N>& buf, const std::string& s ) {
+ size_t n = buf.length();
+ buf.resize( n + s.size() + 1 );
+ strcpy( buf.c_str() + n, s.c_str() );
+ return buf;
+ }
+
+ /// A helper class for dealing with 'struct timeval' structure. This class adds ability
+ /// to perform arithmetic with the structure leaving the same footprint.
+ class TimeVal
+ {
+ struct timeval m_tv;
+
+ void normalize() {
+ if (m_tv.tv_usec >= 1000000)
+ do { ++m_tv.tv_sec; m_tv.tv_usec -= 1000000; } while (m_tv.tv_usec >= 1000000);
+ else if (m_tv.tv_usec <= -1000000)
+ do { --m_tv.tv_sec; m_tv.tv_usec += 1000000; } while (m_tv.tv_usec <= -1000000);
+
+ if (m_tv.tv_sec >= 1 && m_tv.tv_usec < 0) { --m_tv.tv_sec; m_tv.tv_usec += 1000000; }
+ else if (m_tv.tv_sec < 0 && m_tv.tv_usec > 0) { ++m_tv.tv_sec; m_tv.tv_usec -= 1000000; }
+ }
+
+ public:
+ enum TimeType { NOW, RELATIVE };
+
+ TimeVal() { m_tv.tv_usec=0; m_tv.tv_usec=0; }
+ TimeVal(int _s, int _us) { m_tv.tv_sec=_s; m_tv.tv_usec=_us; normalize(); }
+ TimeVal(const TimeVal& tv, int _s=0, int _us=0) { set(tv, _s, _us); }
+ TimeVal(const struct timeval& tv) { m_tv.tv_sec=tv.tv_sec; m_tv.tv_usec=tv.tv_usec; normalize(); }
+ TimeVal(TimeType tp, int _s=0, int _us=0);
+
+ struct timeval& timeval() { return m_tv; }
+ uint32_t sec() const { return m_tv.tv_sec; }
+ uint32_t usec() const { return m_tv.tv_usec; }
+ uint64_t microsec() const { return (uint64_t)m_tv.tv_sec*1000000ull + (uint64_t)m_tv.tv_usec; }
+ void sec (uint32_t _sec) { m_tv.tv_sec = _sec; }
+ void usec(uint32_t _usec) { m_tv.tv_usec = _usec; normalize(); }
+ void microsec(uint32_t _m) { m_tv.tv_sec = _m / 1000000ull; m_tv.tv_usec = _m % 1000000ull; }
+
+ void set(const TimeVal& tv, int _s=0, int _us=0) {
+ m_tv.tv_sec = tv.sec() + _s; m_tv.tv_usec = tv.usec() + _us; normalize();
+ }
+
+ double diff(const TimeVal& t) {
+ TimeVal tv(this->timeval());
+ tv -= t;
+ return (double)tv.sec() + (double)tv.usec() / 1000000.0;
+ }
+
+ bool zero() { return sec() == 0 && usec() == 0; }
+ void add(int _sec, int _us) { m_tv.tv_sec += _sec; m_tv.tv_usec += _us; if (_sec || _us) normalize(); }
+ TimeVal& now(int addS=0, int addUS=0) { gettimeofday(&m_tv, NULL); add(addS, addUS); return *this; }
+
+ void operator-= (const TimeVal& tv) {
+ m_tv.tv_sec -= tv.sec(); m_tv.tv_usec -= tv.usec(); normalize();
+ }
+ void operator+= (const TimeVal& tv) {
+ m_tv.tv_sec += tv.sec(); m_tv.tv_usec += tv.usec(); normalize();
+ }
+ void operator+= (int32_t _sec) { m_tv.tv_sec += _sec; }
+ void operator+= (int64_t _microsec) {
+ m_tv.tv_sec += (_microsec / 1000000ll);
+ m_tv.tv_usec += (_microsec % 1000000ll);
+ normalize();
+ }
+ TimeVal& operator= (const TimeVal& t) { m_tv.tv_sec = t.sec(); m_tv.tv_usec = t.usec(); return *this; }
+ struct timeval* operator& () { return &m_tv; }
+ bool operator== (const TimeVal& tv) const { return sec() == tv.sec() && usec() == tv.usec(); }
+ bool operator!= (const TimeVal& tv) const { return !operator== (tv); }
+ bool operator< (const TimeVal& tv) const {
+ return sec() < tv.sec() || (sec() == tv.sec() && usec() < tv.usec());
+ }
+ bool operator<= (const TimeVal& tv) const {
+ return sec() <= tv.sec() && usec() <= tv.usec();
+ }
+ };
+
+ TimeVal operator- (const TimeVal& t1, const TimeVal& t2);
+ TimeVal operator+ (const TimeVal& t1, const TimeVal& t2);
+
+ struct atom_t: public std::string {
+ typedef std::string BaseT;
+ atom_t() : BaseT() {}
+ atom_t(const char* s) : BaseT(s) {}
+ atom_t(const atom_t& a) : BaseT(reinterpret_cast<const BaseT&>(a)) {}
+ atom_t(const std::string& s): BaseT(s) {}
+ };
+
+ enum ErlTypeT {
+ etSmallInt = ERL_SMALL_INTEGER_EXT // 'a'
+ , etInt = ERL_INTEGER_EXT // 'b'
+ , etFloatOld = ERL_FLOAT_EXT // 'c'
+ , etFloat = NEW_FLOAT_EXT // 'F'
+ , etAtom = ERL_ATOM_EXT // 'd'
+ , etRefOld = ERL_REFERENCE_EXT // 'e'
+ , etRef = ERL_NEW_REFERENCE_EXT // 'r'
+ , etPort = ERL_PORT_EXT // 'f'
+ , etPid = ERL_PID_EXT // 'g'
+ , etTuple = ERL_SMALL_TUPLE_EXT // 'h'
+ , etTupleLarge = ERL_LARGE_TUPLE_EXT // 'i'
+ , etNil = ERL_NIL_EXT // 'j'
+ , etString = ERL_STRING_EXT // 'k'
+ , etList = ERL_LIST_EXT // 'l'
+ , etBinary = ERL_BINARY_EXT // 'm'
+ , etBignum = ERL_SMALL_BIG_EXT // 'n'
+ , etBignumLarge = ERL_LARGE_BIG_EXT // 'o'
+ , etFun = ERL_NEW_FUN_EXT // 'p'
+ , etFunOld = ERL_FUN_EXT // 'u'
+ , etNewCache = ERL_NEW_CACHE // 'N' /* c nodes don't know these two */
+ , etAtomCached = ERL_CACHED_ATOM // 'C'
+ };
+
+ /// Erlang term serializer/deserializer C++ wrapper around C ei library included in
+ /// Erlang distribution.
+ class Serializer
+ {
+ StringBuffer<1024> m_wbuf; // for writing output commands
+ StringBuffer<1024> m_rbuf; // for reading input commands
+ size_t m_readOffset, m_writeOffset;
+ size_t m_readPacketSz, m_writePacketSz;
+ int m_wIdx, m_rIdx, m_rsize;
+ int m_fin, m_fout;
+ bool m_debug;
+
+ void wcheck(int n) {
+ if (m_wbuf.resize(m_wIdx + n + 16, true) == NULL)
+ throw "out of memory";
+ }
+ static int ei_decode_double(const char *buf, int *m_wIdx, double *p);
+ static int ei_encode_double(char *buf, int *m_wIdx, double p);
+ static int ei_x_encode_double(ei_x_buff* x, double d);
+ static int read_exact (int fd, char *buf, size_t len, size_t& offset);
+ static int write_exact(int fd, const char *buf, size_t len, size_t& offset);
+ public:
+
+ Serializer(int _headerSz = 2)
+ : m_wbuf(_headerSz), m_rbuf(_headerSz)
+ , m_readOffset(0), m_writeOffset(0)
+ , m_readPacketSz(0), m_writePacketSz(0)
+ , m_wIdx(0), m_rIdx(0), m_rsize(0)
+ , m_fin(0), m_fout(1), m_debug(false)
+ , tuple(*this)
+ {
+ ei_encode_version(&m_wbuf, &m_wIdx);
+ }
+
+ void reset_rbuf(bool _saveVersion=true) {
+ m_rIdx = _saveVersion ? 1 : 0;
+ m_readPacketSz = m_readOffset = 0;
+ }
+ void reset_wbuf(bool _saveVersion=true) {
+ m_wIdx = _saveVersion ? 1 : 0;
+ m_writePacketSz = m_writeOffset = 0;
+ }
+ void reset(bool _saveVersion=true) {
+ reset_rbuf(_saveVersion);
+ reset_wbuf(_saveVersion);
+ }
+ void debug(bool _enable) { m_debug = _enable; }
+
+ // This is a helper class for encoding tuples using streaming operator.
+ // Example: encode {ok, 123, "test"}
+ //
+ // Serializer ser;
+ // ser.tuple << atom_t("ok") << 123 << "test";
+ //
+ class Tuple {
+ Serializer& m_parent;
+
+ class Temp {
+ Tuple& m_tuple;
+ mutable int m_idx; // offset to the tuple's size in m_parent.m_wbuf
+ mutable int m_size;
+ mutable bool m_last;
+ public:
+ template<typename T>
+ Temp(Tuple& t, const T& v)
+ : m_tuple(t), m_idx(m_tuple.m_parent.m_wIdx+1), m_size(1), m_last(true)
+ {
+ m_tuple.m_parent.encodeTupleSize(1);
+ m_tuple.m_parent.encode(v);
+ }
+
+ Temp(const Temp& o)
+ : m_tuple(o.m_tuple), m_idx(o.m_idx), m_size(o.m_size+1), m_last(o.m_last)
+ {
+ o.m_last = false;
+ }
+
+ ~Temp() {
+ if (m_last) {
+ // This is the end of the tuple being streamed to this class. Update tuple size.
+ if (m_size > 255)
+ throw "Use of operator<< only allowed for tuples with less than 256 items!";
+ else if (m_size > 1) {
+ char* sz = &m_tuple.m_parent.m_wbuf + m_idx;
+ *sz = m_size;
+ }
+ }
+ }
+ template<typename T>
+ Temp operator<< (const T& v) {
+ Temp t(*this);
+ m_tuple.m_parent.encode(v);
+ return t;
+ }
+ };
+
+ public:
+ Tuple(Serializer& s) : m_parent(s) {}
+
+ template<typename T>
+ Temp operator<< (const T& v) {
+ Temp t(*this, v);
+ return t;
+ }
+ };
+
+ /// Helper class for encoding/decoding tuples using streaming operator.
+ Tuple tuple;
+
+ void encode(const char* s) { wcheck(strlen(s)+1); ei_encode_string(&m_wbuf, &m_wIdx, s); }
+ void encode(char v) { wcheck(2); ei_encode_char(&m_wbuf, &m_wIdx, v); }
+ void encode(int i) { wcheck(sizeof(i)); ei_encode_long(&m_wbuf, &m_wIdx, i); }
+ void encode(unsigned int i) { wcheck(8); ei_encode_ulong(&m_wbuf, &m_wIdx, i); }
+ void encode(long long i) { int n=0; ei_encode_longlong (NULL,&n,i); wcheck(n); ei_encode_longlong(&m_wbuf,&m_wIdx,i); }
+ void encode(unsigned long long i) { int n=0; ei_encode_ulonglong(NULL,&n,i); wcheck(n); ei_encode_ulonglong(&m_wbuf,&m_wIdx,i); }
+ void encode(bool b) { wcheck(8); ei_encode_boolean(&m_wbuf, &m_wIdx, b); }
+ void encode(double v) { wcheck(9); ei_encode_double(&m_wbuf, &m_wIdx, v); }
+ void encode(const std::string& s) { wcheck(s.size()+1); ei_encode_string(&m_wbuf, &m_wIdx, s.c_str()); }
+ void encode(const atom_t& a) { wcheck(a.size()+1); ei_encode_atom(&m_wbuf, &m_wIdx, a.c_str()); }
+ void encode(const erlang_pid& p) { int n=0; ei_encode_pid(NULL, &n, &p); wcheck(n); ei_encode_pid(&m_wbuf, &m_wIdx, &p); }
+ void encodeTupleSize(int sz) { wcheck(5); ei_encode_tuple_header(&m_wbuf, &m_wIdx, sz); }
+ void encodeListSize(int sz) { wcheck(5); ei_encode_list_header(&m_wbuf, &m_wIdx, sz); }
+ void encodeListEnd() { wcheck(1); ei_encode_empty_list(&m_wbuf, &m_wIdx); }
+
+ int encodeListBegin() { wcheck(5); int n=m_wIdx; ei_encode_list_header(&m_wbuf, &m_wIdx, 1); return n; }
+ /// This function for encoding the list size after all elements are encoded.
+ /// @param sz is the number of elements in the list.
+ /// @param idx is the index position of the beginning of the list.
+ // E.g.
+ // Serializer se;
+ // int n = 0;
+ // int idx = se.encodeListBegin();
+ // se.encode(1); n++;
+ // se.encode("abc"); n++;
+ // se.encodeListEnd(n, idx);
+ void encodeListEnd(int sz,int idx) { ei_encode_list_header(&m_wbuf, &idx, sz); encodeListEnd(); }
+
+ ErlTypeT decodeType(int& size) { int t; return (ErlTypeT)(ei_get_type(&m_rbuf, &m_rIdx, &t, &size) < 0 ? -1 : t); }
+ int decodeInt(int& v) { long l, ret = decodeInt(l); v = l; return ret; }
+ int decodeInt(long& v) { return (ei_decode_long(&m_rbuf, &m_rIdx, &v) < 0) ? -1 : 0; }
+ int decodeUInt(unsigned int& v) { unsigned long l, ret = decodeUInt(l); v = l; return ret; }
+ int decodeUInt(unsigned long& v) { return (ei_decode_ulong(&m_rbuf, &m_rIdx, &v) < 0) ? -1 : 0; }
+ int decodeTupleSize() { int v; return (ei_decode_tuple_header(&m_rbuf,&m_rIdx,&v) < 0) ? -1 : v; }
+ int decodeListSize() { int v; return (ei_decode_list_header(&m_rbuf,&m_rIdx,&v) < 0) ? -1 : v; }
+ int decodeListEnd() { bool b = *(m_rbuf.c_str()+m_rIdx) == ERL_NIL_EXT; if (b) { m_rIdx++; return 0; } else return -1; }
+ int decodeAtom(std::string& a) { char s[MAXATOMLEN]; if (ei_decode_atom(&m_rbuf,&m_rIdx,s) < 0) return -1; a=s; return 0; }
+ int decodeString(std::string& a) {
+ StringBuffer<256> s;
+ if (decodeString(s) < 0)
+ return -1;
+ a = s.c_str();
+ return 0;
+ }
+ template <int N>
+ int decodeString(StringBuffer<N>& s) {
+ int size;
+ if (decodeType(size) != etString || !s.resize(size) || ei_decode_string(&m_rbuf, &m_rIdx, s.c_str()))
+ return -1;
+ return size;
+ }
+
+ /// Print input buffer to stream to stream.
+ int print(std::ostream& os, const std::string& header = "");
+
+ /// Assumes the command is encoded as an atom. This function takes an
+ /// array of strings and matches the atom to it. The index of the matched
+ /// string in the <cmds> array is returned.
+ template <int M>
+ int decodeAtomIndex(const char* (&cmds)[M], std::string& cmd, int firstIdx = 0) {
+ if (decodeAtom(cmd) < 0)
+ return firstIdx-2;
+ return stringIndex(cmds, cmd, firstIdx, M);
+ }
+
+ /// Same as previous version but <cmds> array must have the last element being NULL
+ int decodeAtomIndex(const char** cmds, std::string& cmd, int firstIdx = 0) {
+ if (decodeAtom(cmd) < 0)
+ return firstIdx-2;
+ return stringIndex(cmds, cmd, firstIdx);
+ }
+
+ int set_handles(int in, int out, bool non_blocking = false);
+ void close_handles() { ::close(m_fin); ::close(m_fout); }
+
+ int read_handle() { return m_fin; }
+ int write_handle() { return m_fout; }
+
+ const char* read_buffer() const { return &m_rbuf; }
+ const char* write_buffer() const { return &m_wbuf; }
+ int* read_index() { return &m_rIdx; }
+ int* write_index() { return &m_wIdx; }
+ int read_idx() const { return m_rIdx; }
+ int write_idx() const { return m_wIdx; }
+
+ /// Read command from <m_fin> into the internal buffer
+ int read();
+ /// Write command from <m_fout> into the internal buffer
+ int write();
+
+ /// Copy the content of write buffer from another serializer
+ int wcopy( const Serializer& ser) { return m_wbuf.copy( ser.write_buffer(), 0, ser.write_idx()) != 0 ? 0 : -1; }
+ /// Copy the content of read buffer from another serializer
+ int rcopy( const Serializer& ser) { return m_rbuf.copy( ser.read_buffer(), 0, ser.read_idx() ) != 0 ? 0 : -1; }
+
+ /// dump read/write buffer's content to stream
+ std::ostream& dump(std::ostream& os, bool outWriteBuffer);
+ };
+
+ /// Dump content of internal buffer to stream.
+ std::ostream& dump(std::ostream& out, const unsigned char* a_buf = NULL, int n = 0, bool eol = true);
+ // Write ei_x_buff to stream
+ std::ostream& operator<< (std::ostream& os, const ei_x_buff& buf);
+ bool dump(const char* header, std::ostream& out, const ei_x_buff& buf, bool condition);
+
+} // namespace
+
+#endif
+
View
947 c_src/exec.cpp
@@ -0,0 +1,947 @@
+/*
+ exec.cpp
+
+ Author: Serge Aleynikov
+ Created: 2003/07/10
+
+ Description:
+ ============
+
+ Erlang port program for spawning and controlling OS tasks.
+ It listens for commands sent from Erlang and executes them until
+ the pipe connecting it to Erlang VM is closed or the program
+ receives SIGINT or SIGTERM. At that point it kills all processes
+ it forked by issuing SIGTERM followed by SIGKILL in 6 seconds.
+
+ Marshalling protocol:
+ Erlang C++
+ | ---- {TransId::integer(), Instruction::tuple()} ---> |
+ | <----------- {TransId::integer(), Reply} ----------- |
+
+ Instruction = {run, Cmd::string(), Options} |
+ {shell, Cmd::string(), Options} |
+ {list} |
+ {stop, OsPid::integer()} |
+ {kill, OsPid::integer(), Signal::integer()}
+ Options = [Option]
+ Option = {cd, Dir::string()} | {env, [string()]} | {kill, Cmd::string()} |
+ {user, User::string()} | {nice, Priority::integer()} |
+ {stdout, Device::string()} | {stderr, Device::string()}
+ Device = null | stderr | stdout | File::string() | {append, File::string()}
+
+ Reply = ok | // For kill/stop commands
+ {ok, OsPid} | // For run/shell command
+ {ok, [OsPid]} | // For list command
+ {error, Reason} |
+ {exit_status, OsPid, Status} // OsPid terminated with Status
+
+ Reason = atom() | string()
+ OsPid = integer()
+ Status = integer()
+*/
+
+#include <stdio.h>
+#include <errno.h>
+#include <signal.h>
+#include <unistd.h>
+#include <signal.h>
+
+#ifdef __linux__
+#include <sys/prctl.h>
+#include <sys/capability.h>
+#endif
+
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <sys/time.h>
+#include <setjmp.h>
+#include <limits.h>
+#include <pwd.h>
+#include <map>
+#include <list>
+#include <deque>
+#include <sstream>
+
+#include <ei.h>
+#include "ei++.h"
+
+#if defined(__CYGWIN__) || defined(__WIN32)
+# define sigtimedwait(a, b, c) 0
+#endif
+
+using namespace ei;
+
+//-------------------------------------------------------------------------
+// Defines
+//-------------------------------------------------------------------------
+
+#define BUF_SIZE 2048
+
+//-------------------------------------------------------------------------
+// Types
+//-------------------------------------------------------------------------
+
+class CmdInfo;
+
+typedef unsigned char byte;
+typedef int exit_status_t;
+typedef pid_t kill_cmd_pid_t;
+typedef std::pair<pid_t, exit_status_t> PidStatusT;
+typedef std::pair<pid_t, CmdInfo> PidInfoT;
+typedef std::map <pid_t, CmdInfo> MapChildrenT;
+typedef std::pair<kill_cmd_pid_t, pid_t> KillPidStatusT;
+typedef std::map <kill_cmd_pid_t, pid_t> MapKillPidT;
+
+class CmdOptions {
+ ei::StringBuffer<256> m_tmp;
+ std::stringstream m_err;
+ std::string m_cmd;
+ std::string m_cd;
+ std::string m_stdout;
+ std::string m_stderr;
+ std::string m_kill_cmd;
+ std::list<std::string> m_env;
+ long m_nice; // niceness level
+ size_t m_size;
+ size_t m_count;
+ int m_user; // run as
+ const char** m_cenv;
+
+public:
+
+ CmdOptions() : m_tmp(0, 256), m_nice(INT_MAX), m_size(0), m_count(0), m_user(INT_MAX), m_cenv(NULL) {}
+ ~CmdOptions() { delete [] m_cenv; m_cenv = NULL; }
+
+ const char* strerror() const { return m_err.str().c_str(); }
+ const char* cmd() const { return m_cmd.c_str(); }
+ const char* cd() const { return m_cd.c_str(); }
+ char* const* env() const { return (char* const*)m_cenv; }
+ const char* kill_cmd() const { return m_kill_cmd.c_str(); }
+ int user() const { return m_user; }
+ int nice() const { return m_nice; }
+
+ int ei_decode(ei::Serializer& ei);
+};
+
+/// Contains run-time info of a child OS process.
+/// When a user provides a custom command to kill a process this
+/// structure will contain its run-time information.
+struct CmdInfo {
+ std::string cmd; // Executed command
+ pid_t cmd_pid; // Pid of the custom kill command
+ std::string kill_cmd; // Kill command to use (if provided - otherwise use SIGTERM)
+ kill_cmd_pid_t kill_cmd_pid; // Pid of the command that <pid> is supposed to kill
+ ei::TimeVal deadline; // Time when the <cmd_pid> is supposed to be killed using SIGTERM.
+ bool sigterm; // <true> if sigterm was issued.
+ bool sigkill; // <true> if sigkill was issued.
+
+ CmdInfo() : cmd_pid(-1), kill_cmd_pid(-1), sigterm(false), sigkill(false) {}
+ CmdInfo(const CmdInfo& ci) {
+ new (this) CmdInfo(ci.cmd.c_str(), ci.kill_cmd.c_str(), ci.cmd_pid);
+ }
+ CmdInfo(const char* _cmd, const char* _kill_cmd, pid_t _cmd_pid) {
+ new (this) CmdInfo();
+ cmd = _cmd;
+ cmd_pid = _cmd_pid;
+ kill_cmd = _kill_cmd;
+ }
+};
+
+//-------------------------------------------------------------------------
+// Global variables
+//-------------------------------------------------------------------------
+
+ei::Serializer eis(/* packet header size */ 2);
+
+sigjmp_buf jbuf;
+int alarm_max_time = 12;
+static bool debug = false;
+static bool oktojump = false;
+static bool signaled = false; // indicates that SIGCHLD was signaled
+static int terminated = 0; // indicates that we got a SIGINT / SIGTERM event
+static bool superuser = false;
+static bool pipe_valid = true;
+
+MapChildrenT children; // Map containing all managed processes started by this port program.
+MapKillPidT transient_pids; // Map of pids of custom kill commands.
+
+#define SIGCHLD_MAX_SIZE 4096
+std::deque< PidStatusT > exited_children; // deque of processed SIGCHLD events
+
+//-------------------------------------------------------------------------
+// Local Functions
+//-------------------------------------------------------------------------
+
+int send_ok(int transId, pid_t pid = -1);
+int send_pid_status_term(const PidStatusT& stat);
+int send_error_str(int transId, bool asAtom, const char* fmt, ...);
+int send_pid_list(int transId, const MapChildrenT& children);
+
+pid_t start_child(const CmdOptions& op);
+int kill_child(pid_t pid, int sig, int transId, bool notify=true);
+int check_children(int& isTerminated, bool notify = true);
+void stop_child(pid_t pid, int transId, const TimeVal& now);
+int stop_child(CmdInfo& ci, int transId, const TimeVal& now, bool notify = true);
+
+int process_child_signal(pid_t pid)
+{
+ if (exited_children.size() < exited_children.max_size()) {
+ int status;
+ pid_t ret;
+ while ((ret = waitpid(pid, &status, WNOHANG)) < 0 && errno == EINTR);
+
+ if (ret < 0 && errno == ECHILD) {
+ int status = ECHILD;
+ if (kill(pid, 0) == 0) // process likely forked and is alive
+ status = 0;
+ if (status != 0)
+ exited_children.push_back(std::make_pair(pid <= 0 ? ret : pid, status));
+ } else if (pid <= 0)
+ exited_children.push_back(std::make_pair(ret, status));
+ else if (ret == pid)
+ exited_children.push_back(std::make_pair(pid, status));
+ else
+ return -1;
+ return 1;
+ } else {
+ // else - defer calling waitpid() for later
+ signaled = true;
+ return 0;
+ }
+}
+
+void gotsignal(int signal)
+{
+ if (signal == SIGTERM || signal == SIGINT || signal == SIGPIPE)
+ terminated = 1;
+ if (signal == SIGPIPE)
+ pipe_valid = false;
+ if (debug)
+ fprintf(stderr, "Got signal: %d\r\n", signal);
+ if (oktojump) siglongjmp(jbuf, 1);
+}
+
+void gotsigchild(int signal, siginfo_t* si, void* context)
+{
+ // If someone used kill() to send SIGCHLD ignore the event
+ if (si->si_code == SI_USER || signal != SIGCHLD)
+ return;
+
+ if (debug)
+ fprintf(stderr, "Process %d exited (sig=%d)\r\n", si->si_pid, signal);
+ process_child_signal(si->si_pid);
+
+ if (oktojump) siglongjmp(jbuf, 1);
+}
+
+void check_pending()
+{
+ static const struct timespec timeout = {0, 0};
+
+ sigset_t set;
+ siginfo_t info;
+ int sig;
+ sigemptyset(&set);
+ if (sigpending(&set) == 0) {
+ while ((sig = sigtimedwait(&set, &info, &timeout)) > 0 || errno == EINTR)
+ switch (sig) {
+ case SIGCHLD: gotsigchild(sig, &info, NULL); break;
+ case SIGPIPE: pipe_valid = false; /* intentionally follow through */
+ case SIGTERM:
+ case SIGINT:
+ case SIGHUP: gotsignal(sig); break;
+ default: break;
+ }
+ }
+}
+
+void usage(char* progname) {
+ fprintf(stderr,
+ "Usage:\n"
+ " %s [-n] [-alarm N] [-debug] [-user User]\n"
+ "Options:\n"
+ " -n - Use marshaling file descriptors 3&4 instead of default 0&1.\n"
+ " -alarm N - Allow up to <N> seconds to live after receiving SIGTERM/SIGINT (default %d)\n"
+ " -debug - Turn on debug mode\n"
+ " -user User - If started by root, run as User\n"
+ "Description:\n"
+ " This is a port program intended to be started by an Erlang\n"
+ " virtual machine. It can start/kill/list OS processes\n"
+ " as requested by the virtual machine.\n",
+ progname, alarm_max_time);
+ exit(1);
+}
+
+//-------------------------------------------------------------------------
+// MAIN
+//-------------------------------------------------------------------------
+
+int main(int argc, char* argv[])
+{
+ fd_set readfds;
+ struct sigaction sact, sterm;
+ int userid = 0;
+
+ // Deque of all pids that exited and have their exit status available.
+ exited_children.resize(SIGCHLD_MAX_SIZE);
+
+ sterm.sa_handler = gotsignal;
+ sigemptyset(&sterm.sa_mask);
+ sigaddset(&sterm.sa_mask, SIGCHLD);
+ sterm.sa_flags = 0;
+ sigaction(SIGINT, &sterm, NULL);
+ sigaction(SIGTERM, &sterm, NULL);
+ sigaction(SIGHUP, &sterm, NULL);
+ sigaction(SIGPIPE, &sterm, NULL);
+
+ sact.sa_handler = NULL;
+ sact.sa_sigaction = gotsigchild;
+ sigemptyset(&sact.sa_mask);
+ sact.sa_flags = SA_SIGINFO | SA_RESTART | SA_NOCLDSTOP | SA_NODEFER;
+ sigaction(SIGCHLD, &sact, NULL);
+
+ if (argc > 1) {
+ int res;
+ for(res = 1; res < argc; res++) {
+ if (strcmp(argv[res], "-h") == 0 || strcmp(argv[res], "--help") == 0) {
+ usage(argv[0]);
+ } else if (strcmp(argv[res], "-debug") == 0) {
+ debug = true;
+ eis.debug(true);
+ } else if (strcmp(argv[res], "-alarm") == 0 && res+1 < argc) {
+ if (argv[res+1][0] != '-')
+ alarm_max_time = atoi(argv[++res]);
+ else
+ usage(argv[0]);
+ } else if (strcmp(argv[res], "-n") == 0) {
+ eis.set_handles(3, 4);
+ } else if (strcmp(argv[res], "-user") == 0 && res+1 < argc && argv[res+1][0] != '-') {
+ char* run_as_user = argv[++res];
+ struct passwd *pw = NULL;
+ if ((pw = getpwnam(run_as_user)) == NULL) {
+ fprintf(stderr, "User %s not found!\n", run_as_user);
+ exit(3);
+ }
+ userid = pw->pw_uid;
+ }
+ }
+ }
+
+ // If we are root, switch to non-root user and set capabilities
+ // to be able to adjust niceness and run commands as other users.
+ if (getuid() == 0) {
+ superuser = true;
+ if (userid == 0) {
+ fprintf(stderr, "When running as root, \"-user User\" option must be provided!");
+ exit(4);
+ }
+
+ #ifdef __linux__
+ if (prctl(PR_SET_KEEPCAPS, 1) < 0) {
+ perror("Failed to call prctl to keep capabilities");
+ exit(5);
+ }
+ if (setresuid(userid, userid, userid) < 0) {
+ perror("Failed to set userid");
+ exit(6);
+ }
+
+ struct passwd* pw;
+ if (debug && (pw = getpwuid(geteuid())) != NULL)
+ fprintf(stderr, "exec: running as: %s (uid=%d)\r\n", pw->pw_name, getuid());
+
+ cap_t cur;
+ if ((cur = cap_from_text("cap_setuid=eip cap_kill=eip cap_sys_nice=eip")) == 0) {
+ perror("Failed to convert cap_setuid & cap_sys_nice from text");
+ exit(7);
+ }
+ if (cap_set_proc(cur) < 0) {
+ perror("Failed to set cap_setuid & cap_sys_nice");
+ exit(8);
+ }
+ cap_free(cur);
+
+ if (debug && (cur = cap_get_proc()) != NULL) {
+ fprintf(stderr, "exec: current capabilities: %s\r\n", cap_to_text(cur, NULL));
+ cap_free(cur);
+ }
+ #elif defined(__CYGWIN__) || defined(__WIN32)
+ fprintf(stderr, "setuid is not supported on Windows!\r\n");
+ exit(9);
+ #else
+ fprintf(stderr, "setuid feature is not implemented for this plaform!\r\n");
+ exit(9);
+ #endif
+ }
+
+ const int maxfd = eis.read_handle()+1;
+
+ while (!terminated) {
+
+ /* Detect "open" for serial pty slave */
+ FD_ZERO (&readfds);
+ FD_SET (eis.read_handle(), &readfds);
+
+ sigsetjmp(jbuf, 1); oktojump = 0;
+
+ while (!terminated && (exited_children.size() > 0 || signaled))
+ check_children(terminated);
+
+ check_pending(); // Check for pending signals arrived while we were in the signal handler
+
+ if (terminated) break;
+
+ oktojump = 1;
+ ei::TimeVal timeout(5, 0);
+ int cnt = select (maxfd, &readfds, (fd_set *)0, (fd_set *) 0, &timeout.timeval());
+ int interrupted = (cnt < 0 && errno == EINTR);
+ oktojump = 0;
+
+ if (interrupted || cnt == 0) {
+ if (check_children(terminated) < 0)
+ break;
+ } else if (cnt < 0) {
+ perror("select");
+ exit(9);
+ } else if ( FD_ISSET (eis.read_handle(), &readfds) ) {
+ /* Read from fin a command sent by Erlang */
+ int err, arity;
+ long transId;
+ std::string command;
+
+ // Note that if we were using non-blocking reads, we'd also need to check
+ // for errno EWOULDBLOCK.
+ if ((err = eis.read()) < 0) {
+ terminated = 90-err;
+ break;
+ }
+
+ /* Our marshalling spec is that we are expecting a tuple {TransId, {Cmd::atom(), Arg1, Arg2, ...}} */
+ if (eis.decodeTupleSize() != 2 ||
+ (eis.decodeInt(transId)) < 0 ||
+ (arity = eis.decodeTupleSize()) < 1)
+ {
+ terminated = 10; break;
+ }
+
+ enum CmdTypeT { EXECUTE, SHELL, STOP, KILL, LIST } cmd;
+ const char* cmds[] = {"run", "shell", "stop", "kill", "list"};
+
+ /* Determine the command */
+ if ((int)(cmd = (CmdTypeT) eis.decodeAtomIndex(cmds, command)) < 0) {
+ if (send_error_str(transId, false, "Unknown command: %s", command.c_str()) < 0) {
+ terminated = 11; break;
+ } else
+ continue;
+ }
+
+ switch (cmd) {
+ case EXECUTE:
+ case SHELL: {
+ // {shell, Cmd::string(), Options::list()}
+ CmdOptions po;
+
+ if (arity != 3 || po.ei_decode(eis) < 0) {
+ send_error_str(transId, false, po.strerror());
+ continue;
+ }
+
+ pid_t pid;
+ if ((pid = start_child(po)) < 0)
+ send_error_str(transId, false, "Couldn't start pid: %s", strerror(errno));
+ else {
+ CmdInfo ci(po.cmd(), po.kill_cmd(), pid);
+ children[pid] = ci;
+ send_ok(transId, pid);
+ }
+ break;
+ }
+ case STOP: {
+ // {stop, OsPid::integer()}
+ long pid;
+ if (arity != 2 || (eis.decodeInt(pid)) < 0) {
+ send_error_str(transId, true, "badarg");
+ continue;
+ }
+ stop_child(pid, transId, TimeVal(TimeVal::NOW));
+ break;
+ }
+ case KILL: {
+ // {kill, OsPid::integer(), Signal::integer()}
+ long pid, sig;
+ if (arity != 3 || (eis.decodeInt(pid)) < 0 || (eis.decodeInt(sig)) < 0) {
+ send_error_str(transId, true, "badarg");
+ continue;
+ } if (superuser && children.find(pid) == children.end()) {
+ send_error_str(transId, false, "Cannot kill a pid not managed by this application");
+ continue;
+ }
+ kill_child(pid, sig, transId);
+ break;
+ }
+ case LIST:
+ // {list}
+ if (arity != 1) {
+ send_error_str(transId, true, "badarg");
+ continue;
+ }
+ send_pid_list(transId, children);
+ break;
+ }
+ }
+ }
+
+ sigsetjmp(jbuf, 1); oktojump = 0;
+ alarm(alarm_max_time); // Die in <alarm_max_time> seconds if not done
+
+ int old_terminated = terminated;
+ terminated = 0;
+
+ kill(0, SIGTERM); // Kill all children in our process group
+
+ TimeVal now(TimeVal::NOW);
+ TimeVal deadline(now, 6, 0);
+
+ while (children.size() > 0) {
+ sigsetjmp(jbuf, 1);
+
+ while (exited_children.size() > 0 || signaled) {
+ int term = 0;
+ check_children(term, pipe_valid);
+ }
+
+ for(MapChildrenT::iterator it=children.begin(), end=children.end(); it != end; ++it)
+ stop_child(it->first, 0, now);
+
+ for(MapKillPidT::iterator it=transient_pids.begin(), end=transient_pids.end(); it != end; ++it) {
+ kill(it->first, SIGKILL);
+ transient_pids.erase(it);
+ }
+
+ if (children.size() == 0)
+ break;
+
+ TimeVal timeout(TimeVal::NOW);
+ if (timeout < deadline) {
+ timeout = deadline - timeout;
+
+ oktojump = 1;
+ select (0, (fd_set *)0, (fd_set *)0, (fd_set *) 0, &timeout);
+ oktojump = 0;
+ }
+ }
+
+ if (debug)
+ fprintf(stderr, "Exiting (%d)\r\n", old_terminated);
+ return old_terminated;
+}
+
+pid_t start_child(const char* cmd, const char* cd, char* const* env, int user, int nice)
+{
+ pid_t pid = fork();
+ ei::StringBuffer<128> err;
+
+ switch (pid) {
+ case -1:
+ return -1;
+ case 0: {
+ #if !defined(__CYGWIN__) && !defined(__WIN32)
+ // I am the child
+ if (user != INT_MAX && setresuid(user, user, user) < 0) {
+ err.write("Cannot set effective user to %d", user);
+ perror(err.c_str());
+ return EXIT_FAILURE;
+ }
+ #endif
+ const char* const argv[] = { getenv("SHELL"), "-c", cmd, (char*)NULL };
+ if (cd != NULL && cd[0] != '\0' && chdir(cd) < 0) {
+ err.write("Cannot chdir to '%s'", cd);
+ perror(err.c_str());
+ return EXIT_FAILURE;
+ }
+ if (execve((const char*)argv[0], (char* const*)argv, env) < 0) {
+ err.write("Cannot execute '%s'", cd);
+ perror(err.c_str());
+ return EXIT_FAILURE;
+ }
+ }
+ default:
+ // I am the parent
+ if (nice != INT_MAX && setpriority(PRIO_PROCESS, pid, nice) < 0) {
+ err.write("Cannot set priority of pid %d to %d", pid, nice);
+ perror(err.c_str());
+ }
+ return pid;
+ }
+}
+
+pid_t start_child(const CmdOptions& op)
+{
+ return start_child(op.cmd(), op.cd(), op.env(), op.user(), op.nice());
+}
+
+int stop_child(CmdInfo& ci, int transId, const TimeVal& now, bool notify)
+{
+ bool use_kill = false;
+
+ if (ci.kill_cmd_pid > 0 || ci.sigterm) {
+ double diff = ci.deadline.diff(now);
+ if (debug)
+ fprintf(stderr, "Deadline: %.3f\r\n", diff);
+ // There was already an attempt to kill it.
+ if (ci.sigterm && ci.deadline.diff(now) < 0) {
+ // More than 5 secs elapsed since the last kill attempt
+ kill(ci.cmd_pid, SIGKILL);
+ kill(ci.kill_cmd_pid, SIGKILL);
+ ci.sigkill = true;
+ }
+ if (notify) send_ok(transId);
+ return 0;
+ } else if (!ci.kill_cmd.empty()) {
+ // This is the first attempt to kill this pid and kill command is provided.
+ ci.kill_cmd_pid = start_child(ci.kill_cmd.c_str(), NULL, NULL, INT_MAX, INT_MAX);
+ if (ci.kill_cmd_pid > 0) {
+ transient_pids[ci.kill_cmd_pid] = ci.cmd_pid;
+ ci.deadline.set(now, 5);
+ if (notify) send_ok(transId);
+ return 0;
+ } else {
+ if (notify) send_error_str(transId, false, "bad kill command - using SIGTERM");
+ use_kill = true;
+ notify = false;
+ }
+ } else {
+ // This is the first attempt to kill this pid and no kill command is provided.
+ use_kill = true;
+ }
+
+ if (use_kill) {
+ // Use SIGTERM / SIGKILL to nuke the pid
+ int n;
+ if (!ci.sigterm && (n = kill_child(ci.cmd_pid, SIGTERM, transId, notify)) == 0) {
+ ci.deadline.set(now, 5);
+ } else if (!ci.sigkill && (n = kill_child(ci.cmd_pid, SIGKILL, 0, false)) == 0) {
+ ci.deadline = now;
+ ci.sigkill = true;
+ } else {
+ n = 0; // FIXME
+ // Failed to send SIGTERM & SIGKILL to the process - give up
+ ci.sigkill = true;
+ MapChildrenT::iterator it = children.find(ci.cmd_pid);
+ if (it != children.end())
+ children.erase(it);
+ }
+ ci.sigterm = true;
+ return n;
+ }
+ return 0;
+}
+
+void stop_child(pid_t pid, int transId, const TimeVal& now)
+{
+ int n = 0;
+
+ MapChildrenT::iterator it = children.find(pid);
+ if (it == children.end()) {
+ send_error_str(transId, false, "pid not alive");
+ return;
+ } else if ((n = kill(pid, 0)) < 0) {
+ send_error_str(transId, false, "pid not alive (err: %d)", n);
+ return;
+ }
+ stop_child(it->second, transId, now);
+}
+
+int kill_child(pid_t pid, int signal, int transId, bool notify)
+{
+ // We can't use -pid here to kill the whole process group, because our process is
+ // the group leader.
+ int err = kill(pid, signal);
+ switch (err) {
+ case 0:
+ if (notify) send_ok(transId);
+ break;
+ case EINVAL:
+ if (notify) send_error_str(transId, false, "Invalid signal: %d", signal);
+ break;
+ case ESRCH:
+ if (notify) send_error_str(transId, true, "esrch");
+ break;
+ case EPERM:
+ if (notify) send_error_str(transId, true, "eperm");
+ break;
+ default:
+ if (notify) send_error_str(transId, true, strerror(err));
+ break;
+ }
+ return err;
+}
+
+int check_children(int& isTerminated, bool notify)
+{
+ do {
+ // For each process info in the <exited_children> queue deliver it to the Erlang VM
+ // and removed it from the managed <children> map.
+ std::deque< PidStatusT >::iterator it;
+ while (!isTerminated && (it = exited_children.begin()) != exited_children.end()) {
+ MapChildrenT::iterator i = children.find(it->first);
+ MapKillPidT::iterator j;
+ if (i != children.end()) {
+ if (notify && send_pid_status_term(*it) < 0) {
+ isTerminated = 1;
+ return -1;
+ }
+ children.erase(i);
+ } else if ((j = transient_pids.find(it->first)) != transient_pids.end()) {
+ // the pid is one of the custom 'kill' commands started by us.
+ transient_pids.erase(j);
+ }
+
+ exited_children.erase(it);
+ }
+ // Signaled flag indicates that there are more processes signaled SIGCHLD then
+ // could be stored in the <exited_children> deque.
+ if (signaled) {
+ signaled = false;
+ process_child_signal(-1);
+ }
+ } while (signaled && !isTerminated);
+
+ TimeVal now(TimeVal::NOW);
+
+ for (MapChildrenT::iterator it=children.begin(), end=children.end(); it != end; ++it) {
+ int status = ECHILD;
+ pid_t pid = it->first;
+ int n = kill(pid, 0);
+ if (n == 0) { // process is alive
+ if (it->second.kill_cmd_pid > 0 && now.diff(it->second.deadline) > 0) {
+ kill(pid, SIGTERM);
+ if ((n = kill(it->second.kill_cmd_pid, 0)) == 0)
+ kill(it->second.kill_cmd_pid, SIGKILL);
+ it->second.deadline.set(now, 5, 0);
+ }
+
+ while ((n = waitpid(pid, &status, WNOHANG)) < 0 && errno == EINTR);
+ if (n > 0)
+ exited_children.push_back(std::make_pair(pid <= 0 ? n : pid, status));
+ continue;
+ } else if (n < 0 && errno == ESRCH) {
+ if (notify)
+ send_pid_status_term(std::make_pair(it->first, status));
+ children.erase(it);
+ }
+ }
+
+ return 0;
+}
+
+int send_pid_list(int transId, const MapChildrenT& children)
+{
+ // Reply: {TransId, [OsPid::integer()]}
+ eis.reset();
+ eis.encodeTupleSize(2);
+ eis.encode(transId);
+ eis.encodeListSize(children.size());
+ for(MapChildrenT::const_iterator it=children.begin(), end=children.end(); it != end; ++it)
+ eis.encode(it->first);
+ eis.encodeListEnd();
+ return eis.write();
+}
+
+int send_error_str(int transId, bool asAtom, const char* fmt, ...)
+{
+ char str[MAXATOMLEN];
+ va_list vargs;
+ va_start (vargs, fmt);
+ vsnprintf(str, sizeof(str), fmt, vargs);
+ va_end (vargs);
+
+ eis.reset();
+ eis.encodeTupleSize(2);
+ eis.encode(transId);
+ eis.encodeTupleSize(2);
+ eis.encode(atom_t("error"));
+ (asAtom) ? eis.encode(atom_t(str)) : eis.encode(str);
+ return eis.write();
+}
+
+int send_ok(int transId, pid_t pid)
+{
+ eis.reset();
+ eis.encodeTupleSize(2);
+ eis.encode(transId);
+ if (pid < 0)
+ eis.encode(atom_t("ok"));
+ else {
+ eis.encodeTupleSize(2);
+ eis.encode(atom_t("ok"));
+ eis.encode(pid);
+ }
+ return eis.write();
+}
+
+int send_pid_status_term(const PidStatusT& stat)
+{
+ eis.reset();
+ eis.encodeTupleSize(2);
+ eis.encode(0);
+ eis.encodeTupleSize(3);
+ eis.encode(atom_t("exit_status"));
+ eis.encode(stat.first);
+ eis.encode(stat.second);
+ return eis.write();
+}
+
+int CmdOptions::ei_decode(ei::Serializer& ei)
+{
+ // {Cmd::string(), [Option]}
+ // Option = {env, Strings} | {cd, Dir} | {kill, Cmd}
+ int sz;
+ std::string op, val;
+
+ m_err.str("");
+ delete [] m_cenv;
+ m_cenv = NULL;
+ m_env.clear();
+ m_nice = INT_MAX;
+
+ if (eis.decodeString(m_cmd) < 0) {
+ m_err << "badarg: cmd string expected or string size too large";
+ return -1;
+ } else if ((sz = eis.decodeListSize()) < 0) {
+ m_err << "option list expected";
+ return -1;
+ } else if (sz == 0) {
+ m_cd = "";
+ m_kill_cmd = "";
+ return 0;
+ }
+
+ for(int i=0; i < sz; i++) {
+ enum OptionT { CD, ENV, KILL, NICE, USER, STDOUT, STDERR } opt;
+ const char* options[] = {"cd", "env", "kill", "nice", "user", "stdout", "stderr"};
+
+ if (eis.decodeTupleSize() != 2 || (int)(opt = (OptionT)eis.decodeAtomIndex(options, op)) < 0) {
+ m_err << "badarg: cmd option must be an atom"; return -1;
+ }
+
+ switch (opt) {
+ case CD:
+ case KILL:
+ case USER:
+ // {cd, Dir::string()} | {kill, Cmd::string()}
+ if (eis.decodeString(val) < 0) {
+ m_err << op << " bad option value"; return -1;
+ }
+ if (opt == CD) m_cd = val;
+ else if (opt == KILL) m_kill_cmd = val;
+ else if (opt == USER) {
+ struct passwd *pw = getpwnam(val.c_str());
+ if (pw == NULL) {
+ m_err << "Invalid user " << val << ": " << ::strerror(errno);
+ return -1;
+ }
+ m_user = pw->pw_uid;
+ }
+ break;
+
+ case NICE:
+ if (eis.decodeInt(m_nice) < 0 || m_nice < -20 || m_nice > 20) {
+ m_err << "nice option must be an integer between -20 and 20";
+ return -1;
+ }
+ break;
+
+ case ENV: {
+ // {env, [NameEqualsValue::string()]}
+ int env_sz = eis.decodeListSize();
+ if (env_sz < 0) {
+ m_err << "env list expected"; return -1;
+ } else if ((m_cenv = (const char**) new char* [env_sz+1]) == NULL) {
+ m_err << "out of memory"; return -1;
+ }
+
+ for (int i=0; i < env_sz; i++) {
+ std::string s;
+ if (eis.decodeString(s) < 0) {
+ m_err << "invalid env argument #" << i; return -1;
+ }
+ m_env.push_back(s);
+ m_cenv[i] = m_env.back().c_str();
+ }
+ m_cenv[env_sz] = NULL;
+ break;
+ }
+
+ case STDOUT:
+ case STDERR: {
+ int type = 0, sz;
+ std::string s, fop;
+ type = eis.decodeType(sz);
+
+ if (type == ERL_ATOM_EXT)
+ eis.decodeAtom(s);
+ else if (type == ERL_STRING_EXT)
+ eis.decodeString(s);
+ else if (type == ERL_SMALL_TUPLE_EXT && sz == 2 &&
+ eis.decodeAtom(fop) == 0 && eis.decodeString(s) == 0 && fop == "append") {
+ ;
+ } else {
+ m_err << "Atom, string or {'append', Name} tuple required for option " << op;
+ return -1;
+ }
+
+ std::string& rs = (opt == STDOUT) ? m_stdout : m_stderr;
+ std::stringstream ss;
+ int fd = (opt == STDOUT) ? 1 : 2;
+
+ if (s == "null") {
+ ss << fd << ">/dev/null";
+ rs = ss.str();
+ } else if (s == "stderr" && opt == STDOUT)
+ rs = "1>&2";
+ else if (s == "stdout" && opt == STDERR)
+ rs = "2>&1";
+ else if (s != "") {
+ ss << fd << (fop == "append" ? ">" : "") << ">\"" << s << "\"";
+ rs = ss.str();
+ }
+ break;
+ }
+ default:
+ m_err << "bad option: " << op; return -1;
+ }
+ }
+
+ if (m_stdout == "1>&2" && m_stderr != "2>&1") {
+ m_err << "cirtular reference of stdout and stderr";
+ return -1;
+ } else if (!m_stdout.empty() || !m_stderr.empty()) {
+ std::stringstream ss;
+ ss << m_cmd;
+ if (!m_stdout.empty()) ss << " " << m_stdout;
+ if (!m_stderr.empty()) ss << " " << m_stderr;
+ m_cmd = ss.str();
+ }
+ return 0;
+}
+
+/*
+int CmdOptions::init(const std::list<std::string>& list)
+{
+ int i, size=0;
+ for(std::list<std::string>::iterator it=list.begin(), end=list.end(); it != end; ++it)
+ size += it->size() + 1;
+ if (m_env.resize(m_size) == NULL)
+ return -1;
+ m_count = list.size() + 1;
+ char *p = m_env.c_str();
+ for(std::list<std::string>::iterator it=list.begin(), end=list.end(); it != end; ++it) {
+ strcpy(p, it->c_str());
+ m_cenv[i++] = p;
+ p += it->size() + 1;
+ }
+ m_cenv[i] = NULL;
+ return 0;
+}
+*/
View
8 include/exec.hrl
@@ -0,0 +1,8 @@
+-define(SIGHUP, -1).
+-define(SIGINT, -2).
+-define(SIGKILL, -9).
+-define(SIGTERM, -15).
+-define(SIGUSR1, -10).
+-define(SIGUSR2, -12).
+
+-define(FMT(Fmt, Args), lists:flatten(io_lib:format(Fmt, Args))).
View
12 rebar.config
@@ -0,0 +1,12 @@
+{erl_opts, [
+ warnings_as_errors,
+ warn_export_all
+]}.
+
+{port_env, [{"CC", "g++"},
+ {"CXX", "g++"},
+ {"linux", "LDFLAGS", "$LDFLAGS -lcap"}]}.
+
+{port_specs, [{"priv/exec-port", ["c_src/*.cpp"]}]}.
+
+{pre_hooks, [{clean, "rm -fr ebin priv erl_crash.dump"}]}.
View
16 src/exec.app.src
@@ -0,0 +1,16 @@
+{application, exec,
+ [
+ {description, "OS Process Manager"},
+ {vsn, "1.0"},
+ {id, "exec"},
+ {modules, []},
+ {registered, [ exec ] },
+ %% NOTE: do not list applications which are load-only!
+ {applications, [ kernel, stdlib ] },
+ %%
+ %% mod: Specify the module name to start the application, plus args
+ %%
+ {mod, {exec_app, []}},
+ {env, []}
+ ]
+}.
View
528 src/exec.erl
@@ -0,0 +1,528 @@
+%%%------------------------------------------------------------------------
+%%% File: $Id$
+%%%------------------------------------------------------------------------
+%%% @doc OS shell command starter.
+%%% It communicates with a C++ port process `exec-port' responsible
+%%% for starting, killing, listing, terminating, and notifying of
+%%% state changes. The port program serves as a middle-man between
+%%% the OS and the virtual machine to carry out OS-specific low-level
+%%% process control. The Erlang/C++ protocol is described in the
+%%% `exec.cpp' file. The `exec-port'
+%%% When `exec-port' is started with a root suid bit set, it
+%%% requires to be provided with the `{user, User}' option so that it
+%%% will not run as root. Before changing the effective `User',
+%%% it sets the kernel capabilities so that it's able to start
+%%% processes as other users and adjust process priorities.
+%%% At exit the port program makes its best effort to perform
+%%% clean shutdown of all child OS processes.
+%%% Every started OS process is linked to a spawned light-weight
+%%% Erlang process returned by the run/2, run_link/2 command.
+%%% The application ensures that termination of spawned OsPid
+%%% leads to termination of the associated Erlang Pid, and vice
+%%% versa.
+%%%
+%%% @author Serge Aleynikov <saleyn@gmail.com>
+%%% @version $Revision$
+%%%
+%%% @type exec_options() = [Option]
+%%% Option = debug | verbose | {args, Args} | {alarm, Secs} |
+%%% {user, User} | {limit_users, Users} |
+%%% {portexe, Exe::string()}
+%%% Users = [User]
+%%% User = Acount::string().
+%%% Options passed to the exec process at startup.
+%%% <dl>
+%%% <dt>debug</dt><dd>Enable port-programs debug trace.</dd>
+%%% <dt>verbose</dt><dd>Enable verbose prints of the Erlang process.</dd>
+%%% <dt>{args, Args}</dt><dd>Append `Args' to the port command.</dd>
+%%% <dt>{alarm, Secs}</dt>
+%%% <dd>Give `Secs' deadline for the port program to clean up
+%%% child pids before exiting</dd>
+%%% <dt>{user, User}</dt>
+%%% <dd>When port program is owned by root, this option must be
+%%% specified so that the port program is not running under
+%%% root account.</dd>
+%%% <dt>{limit_users, LimitUsers}</dt>
+%%% <dd>Limit execution of external commands to these set of users.
+%%% This option is only valid when the port program is owned
+%%% by root.</dd>
+%%% <dt>{portexe, Exe}</dt>
+%%% <dd>Provide an alternative location of the port program.
+%%% This option is useful when this application is stored
+%%% on NFS and the port program needs to be copied locally
+%%% so that root suid bit can be set.</dd>
+%%% </dl>.
+%%% @type cmd_options() = [Option]
+%%% Option = {cd, WorkDir::string()} | {env, Env} |
+%%% {kill, Cmd::string()} |
+%%% {user, RunAsUser::string()} |
+%%% {nice, Priority::integer()} |
+%%% {stdout, Device} | {stderr, Device}
+%%% Env = [VarEqVal::string()]
+%%% Device = null | stdout | stderr | File | {append, File}
+%%% File = string().
+%%% Command-line options:
+%%% <dl>
+%%% <dt>{cd, WorkDir}</dt><dd>Working directory</dd>
+%%% <dt>{env, Env}</dt><dd>List of "VAR=VALUE" environment variables</dd>
+%%% <dt>{kill, Cmd}</dt>
+%%% <dd>This command will be used for killing the process. After
+%%% a 5-sec timeout if the process is still alive, it'll be
+%%% killed with SIGTERM followed by SIGKILL. By default
+%%% SIGTERM/SIGKILL combination is used for process
+%%% termination.</dd>
+%%% <dt>{user, RunAsUser}</dt>
+%%% <dd>When exec-port has a suid bit set, it's capable of running
+%%% commands with a different RunAsUser effective user.</dd>
+%%% <dt>{nice, Priority}</dt>
+%%% <dd>Set process priority between -20 and 20. Note that
+%%% negative values can be specified only when `exec-port'
+%%% is started with a root suid bit set.</dd>
+%%% <dt>{stdout, Device}</dt>
+%%% <dd>Option for redirecting process's standard output stream</dd>
+%%% <dt>{stderr, Device}</dt>
+%%% <dd>Option for redirecting process's standard error stream</dd>
+%%% </dl>
+%%% @end
+%%%------------------------------------------------------------------------
+%%% Created: 2003-06-10 by Serge Aleynikov <saleyn@gmail.com>
+%%% $Header$
+%%%------------------------------------------------------------------------
+-module(exec).
+-author('saleyn@gmail.com').
+-id ("$Id$").
+
+-ifdef(ARCH).
+-define(system_architecture, ?ARCH).
+-else.
+-define(system_architecture, erlang:system_info(system_architecture)).
+-endif.
+
+-behaviour(gen_server).
+
+%% External exports
+-export([
+ start/1, start_link/1, run/2, run_link/2,
+ which_children/0, kill/2, stop/1, ospid/1, status/1
+]).
+
+%% Internal exports
+-export([default/0, default/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ code_change/3, terminate/2]).
+
+-include("exec.hrl").
+
+-record(state, {
+ port,
+ last_trans = 0, % Last transaction number sent to port
+ trans = queue:new(), % Queue of outstanding transactions sent to port
+ limit_users = [], % Restricted list of users allowed to run commands
+ registry = ets:new(exec_mon, [protected,named_table]), % Pids to notify when an OsPid exits
+ debug = false
+}).
+
+%%-------------------------------------------------------------------------
+%% @spec (Options::options()) -> {ok, Pid::pid()} | {error, Reason}
+%% @doc Supervised start an external program manager.
+%% @end
+%%-------------------------------------------------------------------------
+start_link(Options) when is_list(Options) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [Options], []).
+
+%%-------------------------------------------------------------------------
+%% @equiv start_link/1
+%% @doc Start of an external program manager without supervision.
+%% @end
+%%-------------------------------------------------------------------------
+start(Options) when is_list(Options) ->
+ gen_server:start({local, ?MODULE}, ?MODULE, [Options], []).
+
+%%-------------------------------------------------------------------------
+%% @spec (Exe::string(), Options::cmd_options()) -> Result
+%% Result = {ok, Pid::pid(), OsPid::integer()} | {error, Reason}
+%% @doc Start an external program.
+%% @end
+%%-------------------------------------------------------------------------
+run(Exe, Options) when is_list(Exe), is_list(Options) ->
+ gen_server:call(?MODULE, {port, {start, {run, Exe, Options}, nolink}}, 30000).
+
+%%-------------------------------------------------------------------------
+%% @equiv run/2
+%% @doc Start an external program and link to the OsPid. If OsPid exits,
+%% the calling process will be killed or if it's trapping exits,
+%% it'll get {'EXIT', OsPid, Status} message. If the calling process
+%% dies the OsPid will be killed.
+%% @end
+%%-------------------------------------------------------------------------
+run_link(Exe, Options) when is_list(Exe), is_list(Options) ->
+ gen_server:call(?MODULE, {port, {start, {run, Exe, Options}, link}}).
+
+%%-------------------------------------------------------------------------
+%% @spec () -> [OsPid::integer()]
+%% @doc Get a list of children managed by port program.
+%% @end
+%%-------------------------------------------------------------------------
+which_children() ->
+ gen_server:call(?MODULE, {port, {list}}).
+
+%%-------------------------------------------------------------------------
+%% @spec (Pid, Signal::integer()) -> ok | {error, Reason}
+%% Pid = pid() | OsPid
+%% OsPid = integer()
+%% @doc Send a `Signal' to a child `Pid' or `OsPid'.
+%% @end
+%%-------------------------------------------------------------------------
+kill(Pid, Signal) when is_pid(Pid); is_integer(Pid) ->
+ gen_server:call(?MODULE, {port, {kill, Pid, Signal}}).
+
+%%-------------------------------------------------------------------------
+%% @spec (Pid) -> ok | {error, Reason}
+%% Pid = pid() | OsPid
+%% OsPid = integer()
+%% @doc Terminate a managed `Pid' or `OsPid' process.
+%% @end
+%%-------------------------------------------------------------------------
+stop(Pid) when is_pid(Pid); is_integer(Pid) ->
+ gen_server:call(?MODULE, {port, {stop, Pid}}, 30000).
+
+%%-------------------------------------------------------------------------
+%% @spec (Pid::pid()) -> ok | {error, Reason}
+%% Pid = pid() | OsPid
+%% OsPid = integer()
+%% @doc Terminate a managed `Pid' or `OsPid' process.
+%% @end
+%%-------------------------------------------------------------------------
+ospid(Pid) ->
+ Ref = make_ref(),
+ Pid ! {{self(), Ref}, ospid},
+ receive
+ {Ref, Reply} -> Reply;
+ Other -> Other
+ after 7000 ->
+ {error, timeout}
+ end.
+
+%%-------------------------------------------------------------------------
+%% @spec (Status::integer()) ->
+%% {status, ExitStatus::integer()} |
+%% {signal, Signal::integer(), Core::boolean()}
+%% @doc Decode the program's exit_status.
+%% @end
+%%-------------------------------------------------------------------------
+status(Status) when is_integer(Status) ->
+ case {Status band 16#FF00 bsr 8, Status band 16#7F, (Status band 16#80) =:= 16#80} of
+ {Stat, 0, _} -> {status, Stat};
+ {_, Stat, false} -> {status, Stat};
+ {_, Signal, Core} -> {signal, Signal, Core}
+ end.
+
+%%-------------------------------------------------------------------------
+%% @spec () -> Default::exec_options()
+%% @doc Provide default value of a given option.
+%% @end
+%%-------------------------------------------------------------------------
+default() ->
+ [{debug, false}, % Debug mode of the port program.
+ {verbose, false}, % Verbose print of events on the Erlang side.
+ {args, ""}, % Extra arguments that can be passed to port program
+ {alarm, 12},
+ {user, ""}, % Run port program as this user
+ {limit_users, []}, % Restricted list of users allowed to run commands
+ {portexe, default(portexe)}].
+
+default(portexe) ->
+ % Get architecture (e.g. i386-linux)
+ Dir = filename:dirname(filename:dirname(code:which(?MODULE))),
+ %filename:join([Dir, "priv", ?system_architecture, "bin", "exec-port"]);
+ filename:join([Dir, "priv", "exec-port"]);
+default(Option) ->
+ proplists:get_value(Option, default()).
+
+get_opt({Option, Value}) -> {Option, Value};
+get_opt(debug) -> {debug, true}.
+
+%%%----------------------------------------------------------------------
+%%% Callback functions from gen_server
+%%%----------------------------------------------------------------------
+
+%%-----------------------------------------------------------------------
+%% Func: init/1
+%% Returns: {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @private
+%%-----------------------------------------------------------------------
+init([Options]) ->
+ process_flag(trap_exit, true),
+ Args = lists:foldl(
+ fun({debug, true}, Acc) -> [" -debug" | Acc];
+ ({alarm, I}, Acc) -> [" -alarm "++integer_to_list(I) | Acc];
+ ({args, Arg}, Acc) -> [" "++Arg | Acc];
+ ({user, User}, Acc) when User =/= "" -> [" -user "++User | Acc];
+ (_, Acc) -> Acc
+ end, [], [get_opt(O) || O <- Options]),
+ Exe = proplists:get_value(portexe, Options, default(portexe)) ++ lists:flatten([" -n"|Args]),
+ Users = proplists:get_value(limit_users, Options, default(limit_users)),
+ Debug = proplists:get_value(verbose, Options, default(verbose)),
+ try
+ debug(Debug, "exec: port program: ~s\n", [Exe]),
+ Port = erlang:open_port({spawn, Exe}, [binary, exit_status, {packet, 2}, nouse_stdio, hide]),
+ {ok, #state{port=Port, limit_users=Users, debug=Debug}}
+ catch _:Reason ->
+ {stop, ?FMT("Error starting port '~s': ~200p", [Exe, Reason])}
+ end.
+
+%%----------------------------------------------------------------------
+%% Func: handle_call/3
+%% Returns: {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} | (terminate/2 is called)
+%% {stop, Reason, State} (terminate/2 is called)
+%% @private
+%%----------------------------------------------------------------------
+handle_call({port, Instruction}, From, #state{last_trans=Last} = State) ->
+ try is_port_command(Instruction, State) of
+ {ok, Term, Link} ->
+ Next = next_trans(Last),
+ erlang:port_command(State#state.port, term_to_binary({Next, Term})),
+ {noreply, State#state{trans = queue:in({Next, From, Link}, State#state.trans)}}
+ catch _:{error, Why} ->
+ {reply, {error, Why}, State}
+ end;
+
+handle_call(Request, _From, _State) ->
+ {stop, {not_implemented, Request}}.
+
+%%----------------------------------------------------------------------
+%% Func: handle_cast/2
+%% Returns: {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State} (terminate/2 is called)
+%% @private
+%%----------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%----------------------------------------------------------------------
+%% Func: handle_info/2
+%% Returns: {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State} (terminate/2 is called)
+%% @private
+%%----------------------------------------------------------------------
+handle_info({Port, {data, Bin}}, #state{port=Port, debug=Debug} = State) ->
+ Term = binary_to_term(Bin),
+ %io:format("Msg from port: ~p\n", [Term]),
+ case Term of
+ {0, {exit_status, OsPid, Status}} ->
+ debug(Debug, "Pid ~w exited with status: {~w,~w}\n", [OsPid, (Status band 16#FF00 bsr 8), Status band 127]),
+ notify_ospid_owner(OsPid, Status),
+ {noreply, State};
+ {N, Reply} when N =/= 0 ->
+ case get_transaction(State#state.trans, N) of
+ {true, {Pid,_} = From, MonType, Q} ->
+ NewReply = maybe_add_monitor(Reply, Pid, MonType, Debug),
+ gen_server:reply(From, NewReply);
+ {false, Q} ->
+ ok
+ end,
+ {noreply, State#state{trans=Q}};
+ {0, _Ignore} ->
+ {noreply, State}
+ end;
+
+handle_info({Port, {exit_status, 0}}, #state{port=Port} = State) ->
+ {stop, normal, State};
+handle_info({Port, {exit_status, Status}}, #state{port=Port} = State) ->
+ {stop, {exit_status, Status}, State};
+handle_info({'EXIT', Port, Reason}, #state{port=Port} = State) ->
+ {stop, Reason, State};
+handle_info({'EXIT', Pid, Reason}, State) ->
+ % OsPid's Pid owner died. Kill linked OsPid.
+ do_unlink_ospid(Pid, Reason, State),
+ {noreply, State};
+handle_info(_Info, State) ->
+ error_logger:info_msg("~w - unhandled message: ~p\n", [?MODULE, _Info]),
+ {noreply, State}.
+
+%%----------------------------------------------------------------------
+%% Func: code_change/3
+%% Purpose: Convert process state when code is changed
+%% Returns: {ok, NewState}
+%% @private
+%%----------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------
+%% Func: terminate/2
+%% Purpose: Shutdown the server
+%% Returns: any (ignored by gen_server)
+%% @private
+%%----------------------------------------------------------------------
+terminate(_Reason, #state{}) ->
+ error_logger:warning_msg("~w - exec process terminated\n", [self()]),
+ ok.
+
+%%%---------------------------------------------------------------------
+%%% Internal functions
+%%%---------------------------------------------------------------------
+
+%% Add a link for Pid to OsPid if requested.
+maybe_add_monitor({ok, OsPid}, Pid, MonType, Debug) when is_integer(OsPid) ->
+ % This is a reply to a run/run_link command. The port program indicates
+ % of creating a new OsPid process.
+ % Spawn a light-weight process responsible for monitoring this OsPid
+ Self = self(),
+ LWP = spawn_link(fun() -> ospid_init(Pid, OsPid, MonType, Self, Debug) end),
+ ets:insert(exec_mon, [{OsPid, LWP}, {LWP, OsPid}]),
+ {ok, LWP, OsPid};
+maybe_add_monitor(Reply, _Pid, _MonType, _Debug) ->
+ Reply.
+
+%%-------------------------