Skip to content

Commit

Permalink
refactor pipe related api
Browse files Browse the repository at this point in the history
  • Loading branch information
sekiguchi-nagisa committed Apr 17, 2024
1 parent 044d3c6 commit b5868af
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 81 deletions.
27 changes: 18 additions & 9 deletions src/redir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@

namespace arsh {

bool Pipe::open() {
#ifdef __APPLE__
return pipe(this->fds) == 0 && setCloseOnExec(this->fds[0], true) &&
setCloseOnExec(this->fds[1], true);
#else
return pipe2(this->fds, O_CLOEXEC) == 0;
#endif
}

PipelineObject::~PipelineObject() { this->syncStatusAndDispose(); }

Job PipelineObject::syncStatusAndDispose() {
Expand Down Expand Up @@ -56,10 +65,10 @@ RedirObject::~RedirObject() {
}

static int doIOHere(const StringRef &value, int newFd, bool insertNewline) {
pipe_t pipe;
initAllPipe(pipe);

dup2(pipe[READ_PIPE], newFd);
Pipe pipe;
if (!pipe.open() || dup2(pipe[READ_PIPE], newFd) < 0) {
return errno;
}

if (value.size() + (insertNewline ? 1 : 0) <= PIPE_BUF) {
int errnum = 0;
Expand All @@ -71,7 +80,7 @@ static int doIOHere(const StringRef &value, int newFd, bool insertNewline) {
errnum = errno;
}
}
closeAllPipe(pipe);
pipe.close();
return errnum;
} else {
pid_t pid = fork();
Expand All @@ -81,9 +90,9 @@ static int doIOHere(const StringRef &value, int newFd, bool insertNewline) {
if (pid == 0) { // child
pid = fork(); // double-fork (not wait IO-here process termination.)
if (pid == 0) { // child
close(pipe[READ_PIPE]);
dup2(pipe[WRITE_PIPE], STDOUT_FILENO);
if (write(STDOUT_FILENO, value.data(), value.size()) < 0) {
pipe.close(READ_PIPE);
if (dup2(pipe[WRITE_PIPE], STDOUT_FILENO) < 0 ||
write(STDOUT_FILENO, value.data(), value.size()) < 0) {
exit(1);
}
if (insertNewline) { // for here str (insert newline)
Expand All @@ -94,7 +103,7 @@ static int doIOHere(const StringRef &value, int newFd, bool insertNewline) {
}
exit(0);
}
closeAllPipe(pipe);
pipe.close();
waitpid(pid, nullptr, 0);
return 0;
}
Expand Down
103 changes: 50 additions & 53 deletions src/redir.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,73 +26,71 @@

namespace arsh {

constexpr unsigned int READ_PIPE = 0;
constexpr unsigned int WRITE_PIPE = 1;
enum class PipeAccessor : unsigned char {};
constexpr auto READ_PIPE = PipeAccessor{0};
constexpr auto WRITE_PIPE = PipeAccessor{1};

inline void tryToDup(int srcFd, int targetFd) {
if (srcFd > -1) {
dup2(srcFd, targetFd);
}
}

inline void tryToClose(int fd) {
if (fd > -1) {
close(fd);
}
}

using pipe_t = int[2];
class Pipe {
private:
int fds[2]{-1, -1};

inline void tryToClose(pipe_t &pipefds) {
tryToClose(pipefds[0]);
tryToClose(pipefds[1]);
}
public:
/**
* open pipe with CLOEXEC
* @return
* if has error, return false
*/
bool open();

inline void tryToPipe(pipe_t &pipefds, bool openPipe) {
if (openPipe) {
if (pipe(pipefds) < 0) {
perror("pipe creation failed\n");
exit(1); // FIXME: throw exception
bool tryToOpen(const bool shouldOpen) {
if (shouldOpen) {
return this->open();
}
} else {
pipefds[0] = -1;
pipefds[1] = -1;
}
}

inline void initAllPipe(unsigned int size, pipe_t *pipes) {
for (unsigned int i = 0; i < size; i++) {
tryToPipe(pipes[i], true);
return true;
}
}

template <unsigned int N>
void initAllPipe(pipe_t (&pipes)[N]) {
initAllPipe(N, pipes);
}

inline void initAllPipe(pipe_t &pipe) { initAllPipe(1, &pipe); }
int &operator[](PipeAccessor i) { return this->fds[toUnderlying(i)]; }

inline void closeAllPipe(unsigned int size, pipe_t *pipefds) {
for (unsigned int i = 0; i < size; i++) {
tryToClose(pipefds[i]);
void close() {
this->close(READ_PIPE);
this->close(WRITE_PIPE);
}
}

template <unsigned int N>
void closeAllPipe(pipe_t (&pipes)[N]) {
closeAllPipe(N, pipes);
}
void close(PipeAccessor i) { tryToClose(this->fds[toUnderlying(i)]); }

inline void closeAllPipe(pipe_t &pipe) { closeAllPipe(1, &pipe); }
private:
static void tryToClose(int &fd) {
if (fd > -1) {
::close(fd);
fd = -1;
}
}
};

class PipeList : public InlinedArray<pipe_t, 6> {
class PipeList : public InlinedArray<Pipe, 6> {
public:
explicit PipeList(size_t size) : InlinedArray(size) {}

void initAll() { initAllPipe(this->size(), this->ptr()); }
bool openAll() {
for (size_t i = 0; i < this->size(); i++) {
if (!(*this)[i].open()) {
return false;
}
}
return true;
}

void closeAll() { closeAllPipe(this->size(), this->ptr()); }
void closeAll() {
for (size_t i = 0; i < this->size(); i++) {
(*this)[i].close();
}
}
};

inline void redirInToNull() {
Expand All @@ -102,10 +100,10 @@ inline void redirInToNull() {
}

struct PipeSet {
pipe_t in{-1};
pipe_t out{-1};
Pipe in;
Pipe out;

explicit PipeSet(ForkKind kind) { // FIXME: error reporting
bool openAll(ForkKind kind) {
bool useInPipe = false;
bool useOutPipe = false;

Expand All @@ -130,8 +128,7 @@ struct PipeSet {
case ForkKind::PIPE_FAIL:
break;
}
tryToPipe(this->in, useInPipe);
tryToPipe(this->out, useOutPipe);
return this->in.tryToOpen(useInPipe) && this->out.tryToOpen(useOutPipe);
}

/**
Expand All @@ -153,8 +150,8 @@ struct PipeSet {
* call in parent and child
*/
void closeAll() {
tryToClose(this->in);
tryToClose(this->out);
this->in.close();
this->out.close();
}
};

Expand Down
37 changes: 18 additions & 19 deletions src/vm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,30 +526,31 @@ bool VM::forkAndEval(ARState &state, Value &&desc) {
const unsigned short offset = read16(state.stack.ip() + 1);

// set in/out pipe
PipeSet pipeSet(forkKind);
PipeSet pipeSet;
pipeSet.openAll(forkKind); // FIXME: check errno
const pid_t pgid = resolvePGID(state.isRootShell(), forkKind);
const auto procOp = resolveProcOp(state, forkKind);
const bool jobCtrl = state.isJobControl();
auto proc = Proc::fork(state, pgid, procOp);
if (proc.pid() > 0) { // parent process
tryToClose(pipeSet.in[READ_PIPE]);
tryToClose(pipeSet.out[WRITE_PIPE]);
pipeSet.in.close(READ_PIPE);
pipeSet.out.close(WRITE_PIPE);

Value obj;

switch (forkKind) {
case ForkKind::STR:
case ForkKind::ARRAY: { // always disable job control (so not change foreground process group)
assert(!hasFlag(procOp, Proc::Op::JOB_CONTROL));
tryToClose(pipeSet.in[WRITE_PIPE]);
pipeSet.in.close(WRITE_PIPE);
const bool ret = forkKind == ForkKind::STR
? readAsStr(state, pipeSet.out[READ_PIPE], obj)
: readAsStrArray(state, pipeSet.out[READ_PIPE], obj);
if (!ret || ARState::isInterrupted()) {
/**
* if read failed, not wait termination (always attach to job table)
*/
tryToClose(pipeSet.out[READ_PIPE]); // close read pipe after wait, due to prevent EPIPE
pipeSet.out.close(READ_PIPE); // close read pipe after wait, due to prevent EPIPE
state.jobTable.attach(
JobObject::create(proc, state.emptyFDObj, state.emptyFDObj, std::move(desc)));

Expand All @@ -563,7 +564,7 @@ bool VM::forkAndEval(ARState &state, Value &&desc) {
const auto waitOp = jobCtrl ? WaitOp::BLOCK_UNTRACED : WaitOp::BLOCKING;
const int status = proc.wait(waitOp); // wait exit
const int errNum = errno;
tryToClose(pipeSet.out[READ_PIPE]); // close read pipe after wait, due to prevent EPIPE
pipeSet.out.close(READ_PIPE); // close read pipe after wait, due to prevent EPIPE
if (!proc.is(Proc::State::TERMINATED)) {
state.jobTable.attach(
JobObject::create(proc, state.emptyFDObj, state.emptyFDObj, std::move(desc)));
Expand Down Expand Up @@ -774,31 +775,28 @@ static Value toCmdDesc(const ArrayObject &argvObj) {
bool VM::forkAndExec(ARState &state, const char *filePath, const ArrayObject &argvObj,
Value &&redirConfig) {
// setup self pipe
pipe_t selfPipe;
if (pipe(selfPipe) < 0) {
Pipe selfPipe;
if (!selfPipe.open()) {
fatal_perror("pipe creation error");
}
if (!setCloseOnExec(selfPipe[WRITE_PIPE], true)) {
fatal_perror("fcntl error");
}

const pid_t pgid = resolvePGID(state.isRootShell(), ForkKind::NONE);
const auto procOp = resolveProcOp(state, ForkKind::NONE);
auto proc = Proc::fork(state, pgid, procOp);
if (proc.pid() == -1) {
closeAllPipe(selfPipe);
selfPipe.close();
raiseCmdError(state, argvObj.getValues()[0].asCStr(), EAGAIN);
return false;
} else if (proc.pid() == 0) { // child
close(selfPipe[READ_PIPE]);
selfPipe.close(READ_PIPE);
xexecve(filePath, argvObj, nullptr);

const int errNum = errno;
const ssize_t r = write(selfPipe[WRITE_PIPE], &errNum, sizeof(int));
(void)r; // FIXME:
exit(-1);
} else { // parent process
close(selfPipe[WRITE_PIPE]);
selfPipe.close(WRITE_PIPE);
redirConfig = nullptr; // restore redirConfig

ssize_t readSize;
Expand All @@ -808,7 +806,7 @@ bool VM::forkAndExec(ARState &state, const char *filePath, const ArrayObject &ar
break;
}
}
close(selfPipe[READ_PIPE]);
selfPipe.close(READ_PIPE);
if (readSize > 0 && errNum == ENOENT) { // remove cached path
state.pathCache.removePath(argvObj.getValues()[0].asCStr());
}
Expand Down Expand Up @@ -1224,9 +1222,10 @@ bool VM::callPipeline(ARState &state, Value &&desc, bool lastPipe, ForkKind fork

assert(pipeSize > 0);

PipeSet pipeSet(forkKind);
PipeSet pipeSet;
pipeSet.openAll(forkKind); // FIXME: check errno
PipeList pipes(pipeSize);
pipes.initAll();
pipes.openAll(); // FIXME: checl errno

// fork
InlinedArray<Proc, 6> children(procSize);
Expand Down Expand Up @@ -1285,8 +1284,8 @@ bool VM::callPipeline(ARState &state, Value &&desc, bool lastPipe, ForkKind fork
pipes.closeAll();
state.stack.push(Value::create<PipelineObject>(state, std::move(jobEntry)));
} else {
tryToClose(pipeSet.in[READ_PIPE]);
tryToClose(pipeSet.out[WRITE_PIPE]);
pipeSet.in.close(READ_PIPE);
pipeSet.out.close(WRITE_PIPE);
pipes.closeAll();
Value obj;
if (!attachAsyncJob(state, std::move(desc), procSize, children.ptr(), forkKind, pipeSet,
Expand Down

0 comments on commit b5868af

Please sign in to comment.