diff --git a/llvm/include/llvm/Analysis/InteractiveModelRunner.h b/llvm/include/llvm/Analysis/InteractiveModelRunner.h index a9324f19aceb8e..ffcd4a37c5c73d 100644 --- a/llvm/include/llvm/Analysis/InteractiveModelRunner.h +++ b/llvm/include/llvm/Analysis/InteractiveModelRunner.h @@ -14,6 +14,7 @@ #include "llvm/Analysis/TensorSpec.h" #include "llvm/Analysis/Utils/TrainingLogger.h" #include "llvm/Config/llvm-config.h" +#include "llvm/Support/FileSystem.h" #include "llvm/Support/raw_ostream.h" #include @@ -32,6 +33,11 @@ namespace llvm { /// Note that the correctness of the received data is the responsibility of the /// host. In particular, if insufficient data were sent, the compiler will block /// when waiting for an advice. +/// +/// Note that the host can either open the pipes RW, or open first the pipe to +/// the compiler - i.e. the "Inbound" - and then the "Outbound", to avoid +/// deadlock. This is because the compiler first tries to open the inbound +/// (which will hang until there's a writer on the other end). class InteractiveModelRunner : public MLModelRunner { public: InteractiveModelRunner(LLVMContext &Ctx, @@ -43,19 +49,21 @@ class InteractiveModelRunner : public MLModelRunner { return R->getKind() == MLModelRunner::Kind::Interactive; } void switchContext(StringRef Name) { - Log.switchContext(Name); - Log.flush(); + Log->switchContext(Name); + Log->flush(); } + virtual ~InteractiveModelRunner(); + private: void *evaluateUntyped() override; const std::vector InputSpecs; const TensorSpec OutputSpec; std::error_code OutEC; std::error_code InEC; - raw_fd_stream Inbound; + sys::fs::file_t Inbound; std::vector OutputBuffer; - Logger Log; + std::unique_ptr Log; }; } // namespace llvm #endif // LLVM_ANALYSIS_INTERACTIVEMODELRUNNER_H diff --git a/llvm/lib/Analysis/InteractiveModelRunner.cpp b/llvm/lib/Analysis/InteractiveModelRunner.cpp index a347b49eb0729e..c449ab4dffdacd 100644 --- a/llvm/lib/Analysis/InteractiveModelRunner.cpp +++ b/llvm/lib/Analysis/InteractiveModelRunner.cpp @@ -13,6 +13,7 @@ #include "llvm/Analysis/TensorSpec.h" #include "llvm/Support/CommandLine.h" #include "llvm/Support/ErrorHandling.h" +#include "llvm/Support/FileSystem.h" #include "llvm/Support/raw_ostream.h" using namespace llvm; @@ -34,44 +35,53 @@ InteractiveModelRunner::InteractiveModelRunner( LLVMContext &Ctx, const std::vector &Inputs, const TensorSpec &Advice, StringRef OutboundName, StringRef InboundName) : MLModelRunner(Ctx, MLModelRunner::Kind::Interactive, Inputs.size()), - InputSpecs(Inputs), OutputSpec(Advice), Inbound(InboundName, InEC), - OutputBuffer(OutputSpec.getTotalTensorBufferSize()), - Log(std::make_unique(OutboundName, OutEC), InputSpecs, - Advice, /*IncludeReward=*/false, Advice) { + InputSpecs(Inputs), OutputSpec(Advice), + InEC(sys::fs::openFileForRead(InboundName, Inbound)), + OutputBuffer(OutputSpec.getTotalTensorBufferSize()) { if (InEC) { Ctx.emitError("Cannot open inbound file: " + InEC.message()); return; } - if (OutEC) { - Ctx.emitError("Cannot open outbound file: " + OutEC.message()); - return; + { + auto OutStream = std::make_unique(OutboundName, OutEC); + if (OutEC) { + Ctx.emitError("Cannot open outbound file: " + OutEC.message()); + return; + } + Log = std::make_unique(std::move(OutStream), InputSpecs, Advice, + /*IncludeReward=*/false, Advice); } // Just like in the no inference case, this will allocate an appropriately // sized buffer. for (size_t I = 0; I < InputSpecs.size(); ++I) setUpBufferForTensor(I, InputSpecs[I], nullptr); - Log.flush(); + Log->flush(); +} + +InteractiveModelRunner::~InteractiveModelRunner() { + sys::fs::closeFile(Inbound); } void *InteractiveModelRunner::evaluateUntyped() { - Log.startObservation(); + Log->startObservation(); for (size_t I = 0; I < InputSpecs.size(); ++I) - Log.logTensorValue(I, reinterpret_cast(getTensorUntyped(I))); - Log.endObservation(); - Log.flush(); + Log->logTensorValue(I, reinterpret_cast(getTensorUntyped(I))); + Log->endObservation(); + Log->flush(); size_t InsPoint = 0; char *Buff = OutputBuffer.data(); const size_t Limit = OutputBuffer.size(); while (InsPoint < Limit) { - auto Read = Inbound.read(Buff + InsPoint, OutputBuffer.size() - InsPoint); - if (Read < 0) { + auto ReadOrErr = ::sys::fs::readNativeFile( + Inbound, {Buff + InsPoint, OutputBuffer.size() - InsPoint}); + if (ReadOrErr.takeError()) { Ctx.emitError("Failed reading from inbound file"); break; } - InsPoint += Read; + InsPoint += *ReadOrErr; } if (DebugReply != TensorType::Invalid) dbgs() << tensorValueToString(OutputBuffer.data(), OutputSpec); return OutputBuffer.data(); -} \ No newline at end of file +} diff --git a/llvm/unittests/Analysis/MLModelRunnerTest.cpp b/llvm/unittests/Analysis/MLModelRunnerTest.cpp index 1f80eb78209835..f953c45cfc3187 100644 --- a/llvm/unittests/Analysis/MLModelRunnerTest.cpp +++ b/llvm/unittests/Analysis/MLModelRunnerTest.cpp @@ -11,9 +11,12 @@ #include "llvm/Analysis/NoInferenceModelRunner.h" #include "llvm/Analysis/ReleaseModeModelRunner.h" #include "llvm/Support/BinaryByteStream.h" +#include "llvm/Support/FileSystem.h" #include "llvm/Support/FileUtilities.h" #include "llvm/Support/JSON.h" +#include "llvm/Support/Path.h" #include "llvm/Support/raw_ostream.h" +#include "llvm/Testing/Support/SupportHelpers.h" #include "gtest/gtest.h" #include @@ -126,6 +129,7 @@ TEST(ReleaseModeRunner, ExtraFeaturesOutOfOrder) { EXPECT_EQ(*Evaluator->getTensor(2), -3); } +#if defined(LLVM_ON_UNIX) TEST(InteractiveModelRunner, Evaluation) { LLVMContext Ctx; // Test the interaction with an external advisor by asking for advice twice. @@ -141,68 +145,65 @@ TEST(InteractiveModelRunner, Evaluation) { // Create the 2 files. Ideally we'd create them as named pipes, but that's not // quite supported by the generic API. std::error_code EC; - SmallString<64> FromCompilerName; - SmallString<64> ToCompilerName; - int FromCompilerFD = 0; - int ToCompilerFD = 0; - ASSERT_EQ(sys::fs::createTemporaryFile("InteractiveModelRunner_Evaluation", - "temp", FromCompilerFD, - FromCompilerName), - std::error_code()); + llvm::unittest::TempDir Tmp("tmpdir", /*Unique=*/true); + SmallString<128> FromCompilerName(Tmp.path().begin(), Tmp.path().end()); + SmallString<128> ToCompilerName(Tmp.path().begin(), Tmp.path().end()); + sys::path::append(FromCompilerName, "InteractiveModelRunner_Evaluation.out"); + sys::path::append(ToCompilerName, "InteractiveModelRunner_Evaluation.in"); + EXPECT_EQ(::mkfifo(FromCompilerName.c_str(), 0666), 0); + EXPECT_EQ(::mkfifo(ToCompilerName.c_str(), 0666), 0); - ASSERT_EQ(sys::fs::createTemporaryFile("InteractiveModelRunner_Evaluation", - "temp", ToCompilerFD, ToCompilerName), - std::error_code()); - - raw_fd_stream FromCompiler(FromCompilerName, EC); - EXPECT_FALSE(EC); - raw_fd_ostream ToCompiler(ToCompilerName, EC); - EXPECT_FALSE(EC); FileRemover Cleanup1(FromCompilerName); FileRemover Cleanup2(ToCompilerName); - InteractiveModelRunner Evaluator(Ctx, Inputs, AdviceSpec, FromCompilerName, - ToCompilerName); - - Evaluator.switchContext("hi"); - - // Helper to read headers and other json lines. - SmallVector Buffer; - auto ReadLn = [&]() { - Buffer.clear(); - while (true) { - char Chr = 0; - auto Read = FromCompiler.read(&Chr, 1); - EXPECT_GE(Read, 0); - if (!Read) - continue; - if (Chr == '\n') - return StringRef(Buffer.data(), Buffer.size()); - Buffer.push_back(Chr); - } - }; - // See include/llvm/Analysis/Utils/TrainingLogger.h - // First comes the header - auto Header = json::parse(ReadLn()); - EXPECT_FALSE(Header.takeError()); - EXPECT_NE(Header->getAsObject()->getArray("features"), nullptr); - EXPECT_NE(Header->getAsObject()->getObject("advice"), nullptr); - // Then comes the context - EXPECT_FALSE(json::parse(ReadLn()).takeError()); // Since the evaluator sends the features over and then blocks waiting for // an answer, we must spawn a thread playing the role of the advisor / host: std::atomic SeenObservations = 0; + // Start the host first to make sure the pipes are being prepared. Otherwise + // the evaluator will hang. std::thread Advisor([&]() { + // Open the writer first. This is because the evaluator will try opening + // the "input" pipe first. An alternative that avoids ordering is for the + // host to open the pipes RW. + raw_fd_ostream ToCompiler(ToCompilerName, EC); + EXPECT_FALSE(EC); + sys::fs::file_t FromCompiler = {}; + EXPECT_FALSE(sys::fs::openFileForRead(FromCompilerName, FromCompiler)); EXPECT_EQ(SeenObservations, 0); + // Helper to read headers and other json lines. + SmallVector Buffer; + auto ReadLn = [&]() { + Buffer.clear(); + while (true) { + char Chr = 0; + auto ReadOrErr = sys::fs::readNativeFile(FromCompiler, {&Chr, 1}); + EXPECT_FALSE(ReadOrErr.takeError()); + if (!*ReadOrErr) + continue; + if (Chr == '\n') + return StringRef(Buffer.data(), Buffer.size()); + Buffer.push_back(Chr); + } + }; + // See include/llvm/Analysis/Utils/TrainingLogger.h + // First comes the header + auto Header = json::parse(ReadLn()); + EXPECT_FALSE(Header.takeError()); + EXPECT_NE(Header->getAsObject()->getArray("features"), nullptr); + EXPECT_NE(Header->getAsObject()->getObject("advice"), nullptr); + // Then comes the context + EXPECT_FALSE(json::parse(ReadLn()).takeError()); + int64_t Features[3] = {0}; auto FullyRead = [&]() { size_t InsPt = 0; const size_t ToRead = 3 * Inputs[0].getTotalTensorBufferSize(); char *Buff = reinterpret_cast(Features); while (InsPt < ToRead) { - auto Read = FromCompiler.read(Buff + InsPt, ToRead - InsPt); - EXPECT_GE(Read, 0); - InsPt += Read; + auto ReadOrErr = sys::fs::readNativeFile( + FromCompiler, {Buff + InsPt, ToRead - InsPt}); + EXPECT_FALSE(ReadOrErr.takeError()); + InsPt += *ReadOrErr; } }; // Observation @@ -211,8 +212,15 @@ TEST(InteractiveModelRunner, Evaluation) { FullyRead(); // a "\n" char Chr = 0; - while (FromCompiler.read(&Chr, 1) == 0) { - } + auto ReadNL = [&]() { + do { + auto ReadOrErr = sys::fs::readNativeFile(FromCompiler, {&Chr, 1}); + EXPECT_FALSE(ReadOrErr.takeError()); + if (*ReadOrErr == 1) + break; + } while (true); + }; + ReadNL(); EXPECT_EQ(Chr, '\n'); EXPECT_EQ(Features[0], 42); EXPECT_EQ(Features[1], 43); @@ -228,8 +236,7 @@ TEST(InteractiveModelRunner, Evaluation) { // Second observation, and same idea as above EXPECT_FALSE(json::parse(ReadLn()).takeError()); FullyRead(); - while (FromCompiler.read(&Chr, 1) == 0) { - } + ReadNL(); EXPECT_EQ(Chr, '\n'); EXPECT_EQ(Features[0], 10); EXPECT_EQ(Features[1], -2); @@ -239,8 +246,14 @@ TEST(InteractiveModelRunner, Evaluation) { ToCompiler.write(reinterpret_cast(&Advice), AdviceSpec.getTotalTensorBufferSize()); ToCompiler.flush(); + sys::fs::closeFile(FromCompiler); }); + InteractiveModelRunner Evaluator(Ctx, Inputs, AdviceSpec, FromCompilerName, + ToCompilerName); + + Evaluator.switchContext("hi"); + EXPECT_EQ(SeenObservations, 0); *Evaluator.getTensor(0) = 42; *Evaluator.getTensor(1) = 43; @@ -256,4 +269,5 @@ TEST(InteractiveModelRunner, Evaluation) { EXPECT_EQ(SeenObservations, 2); EXPECT_FLOAT_EQ(Ret, 50.30); Advisor.join(); -} \ No newline at end of file +} +#endif