Skip to content

Commit

Permalink
wrx
Browse files Browse the repository at this point in the history
  • Loading branch information
zdenyhraz committed May 30, 2024
1 parent 91727cb commit 28ff1f0
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 170 deletions.
15 changes: 10 additions & 5 deletions src/Gui/Windows/NodeEditor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,21 @@ struct NodeEditor

void DrawNode(Microservice& microservice)
{
usize id = microservice.GetId();
ed::BeginNode(id);
ed::BeginNode(microservice.GetId());
ImGui::Text(microservice.GetName().c_str());

ImGuiEx_BeginColumn();
for (const auto& [name, param] : microservice.GetInputParameters())
{
ed::BeginPin(++id, ed::PinKind::Input);
ed::BeginPin(param.GetId(), ed::PinKind::Input);
ImGui::Text(fmt::format("-> {}", name).c_str());
ed::EndPin();
}

ImGuiEx_NextColumn();
for (const auto& [name, param] : microservice.GetOutputParameters())
{
ed::BeginPin(++id, ed::PinKind::Input);
ed::BeginPin(param.GetId(), ed::PinKind::Input);
ImGui::Text(fmt::format("{} ->", name).c_str());
ed::EndPin();
}
Expand All @@ -83,7 +82,10 @@ struct NodeEditor
DrawNode(*microservice);

// for (const auto& connection : m_Workflow.GetMicroserviceConnections())
// ed::Link(linkInfo.Id, linkInfo.InputId, linkInfo.OutputId);
// ed::Link(connection.id, connection.idOutput, connection.idInput);

// for (const auto& connection : m_Workflow.GetParameterConnections())
// ed::Link(connection.id, linkInfo.InputId, linkInfo.OutputId);

// Submit Links
for (auto& linkInfo : m_Links)
Expand Down Expand Up @@ -119,6 +121,9 @@ struct NodeEditor
// Since we accepted new link, lets add one to our list of links.
m_Links.push_back({ed::LinkId(m_NextLinkId++), inputPinId, outputPinId});

// TODO: fix this
// m_Workflow.Connect(*reinterpret_cast<MicroserviceOutputParameter*>(outputPinId), *reinterpret_cast<MicroserviceInputParameter*>(inputPinId));

// Draw new link.
ed::Link(m_Links.back().Id, m_Links.back().InputId, m_Links.back().OutputId);
}
Expand Down
17 changes: 17 additions & 0 deletions src/Microservice/LoadImageMicroservice.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once
#include "Microservice.hpp"

class LoadImageMicroservice : public Microservice
{
void DefineInputParameters() override {}

void DefineOutputParameters() override { DefineOutputParameter<cv::Mat>("image"); }

void Process() override
{
LOG_DEBUG("{} processing", GetName());
auto image = cv::imread(GetProjectDirectoryPath("data/ml/object_detection/datasets/cats/cats2.jpg").string());
LOG_DEBUG("Loaded image size: {}x{}x{}", image.cols, image.rows, image.channels());
SetOutputParameter("image", image);
}
};
202 changes: 71 additions & 131 deletions src/Microservice/Microservice.hpp
Original file line number Diff line number Diff line change
@@ -1,88 +1,65 @@
#pragma once

class Microservice
struct MicroserviceInputParameter
{
friend class Workflow;
std::type_index type = std::type_index(typeid(void));
std::string name;
std::any* value; // pointer ot avoid data duplication

struct MicroserviceParameter
{
std::type_index type = std::type_index(typeid(void));
std::any value;

auto operator<=>(const MicroserviceParameter&) const = default;

void Reset()
{
type = std::type_index(typeid(void));
value.reset();
}

template <typename T>
bool CheckType()
{
return std::type_index(typeid(T)) == type;
}

template <typename T>
static MicroserviceParameter Get(T&& _value)
{
return MicroserviceParameter(std::type_index(typeid(T)), std::forward<T>(_value));
}
};

struct MicroserviceConnection
{
Microservice* outputMicroservice;
Microservice* inputMicroservice;
uintptr_t GetId() const { return reinterpret_cast<uintptr_t>(this); }

auto operator<=>(const MicroserviceConnection&) const = default;
};
void Reset() { value = nullptr; }
};

struct MicroserviceParameterConnection
{
Microservice* outputMicroservice;
Microservice* inputMicroservice;
std::string outputParameterName;
std::string inputParameterName;
struct MicroserviceOutputParameter
{
std::type_index type = std::type_index(typeid(void));
std::string name;
std::any value;

auto operator<=>(const MicroserviceParameterConnection&) const = default;
};
uintptr_t GetId() const { return reinterpret_cast<uintptr_t>(this); }

std::string microserviceName;
usize microserviceId;
std::unordered_map<std::string, MicroserviceParameter> inputParameters;
std::unordered_map<std::string, MicroserviceParameter> outputParameters;
void Reset() { value.reset(); }
};

std::vector<MicroserviceParameterConnection> inputParameterConnections;
std::vector<MicroserviceParameterConnection> outputParameterConnections;
class Microservice
{
friend class Workflow;
static constexpr std::string startParameterName = "start";
static constexpr std::string finishParameterName = "finish";

std::vector<std::shared_ptr<Microservice>> outputMicroservices;
std::string microserviceName;
std::unordered_map<std::string, MicroserviceInputParameter> inputParameters;
std::unordered_map<std::string, MicroserviceOutputParameter> outputParameters;

protected:
virtual void DefineInputParameters() = 0;
virtual void DefineOutputParameters() = 0;
virtual void Process() = 0;

template <typename T>
void DefineInputParameter(const std::string& paramName)
{
inputParameters[paramName] = MicroserviceParameter(std::type_index(typeid(T)));
inputParameters[paramName] = MicroserviceInputParameter(std::type_index(typeid(T)));
}

template <typename T>
void DefineOutputParameter(const std::string& paramName)
{
outputParameters[paramName] = MicroserviceParameter(std::type_index(typeid(T)));
outputParameters[paramName] = MicroserviceOutputParameter(std::type_index(typeid(T)));
}

void SetInputParameter(const std::string& paramName, const MicroserviceParameter& value)
template <typename T>
void SetInputParameter(const std::string& paramName, T& value)
{
if (not inputParameters.contains(paramName))
throw std::runtime_error(fmt::format("Input parameter '{}' not found", paramName));

auto& param = inputParameters.at(paramName);

param = value;
if (param.type != std::type_index(typeid(T)))
throw std::runtime_error(fmt::format("Input parameter '{}' type mismatch: {} != {}", paramName, typeid(T).name(), param.type.name()));

param.value = &value;
}

template <typename T>
Expand All @@ -93,13 +70,16 @@ class Microservice

auto& param = inputParameters.at(paramName);

if (not param.value.has_value())
if (not param.value)
throw std::runtime_error(fmt::format("Input parameter '{}' not set (parameter has no connections)", paramName));

if (not param.value->has_value())
throw std::runtime_error(fmt::format("Input parameter '{}' not set", paramName));

if (not param.CheckType<T>())
throw std::runtime_error(fmt::format("Input parameter '{}' type mismatch: {} != {}", paramName, typeid(T).name(), param.value.type().name()));
if (param.type != std::type_index(typeid(T)))
throw std::runtime_error(fmt::format("Input parameter '{}' type mismatch: {} != {}", paramName, typeid(T).name(), param.type.name()));

return std::any_cast<T&>(param.value);
return std::any_cast<T&>(*param.value);
}

template <typename T>
Expand All @@ -110,41 +90,27 @@ class Microservice

auto& param = outputParameters.at(paramName);

if (not param.CheckType<T>())
throw std::runtime_error(fmt::format("Output parameter '{}' type mismatch: {} != {}", paramName, typeid(T).name(), param.value.type().name()));
if (param.type != std::type_index(typeid(T)))
throw std::runtime_error(fmt::format("Output parameter '{}' type mismatch: {} != {}", paramName, typeid(T).name(), param.type.name()));

param.value = value;
}

MicroserviceParameter& GetOutputParameter(const std::string& paramName)
template <typename T>
T& GetOutputParameter(const std::string& paramName)
{
if (not outputParameters.contains(paramName))
throw std::runtime_error(fmt::format("Output parameter '{}' not found", paramName));

auto& param = outputParameters.at(paramName);

if (param.type != std::type_index(typeid(T)))
throw std::runtime_error(fmt::format("Output parameter '{}' type mismatch: {} != {}", paramName, typeid(T).name(), param.type.name()));

if (not param.value.has_value())
throw std::runtime_error(fmt::format("Output parameter '{}' not set", paramName));

return param;
}

void PropagateOutputs()
{
for (const auto& connection : outputParameterConnections)
{
LOG_DEBUG("{}:{} -> {}:{}", GetName(), connection.outputParameterName, connection.inputMicroservice->GetName(), connection.inputParameterName);
connection.inputMicroservice->SetInputParameter(connection.inputParameterName, GetOutputParameter(connection.outputParameterName));
}
}

void Notify()
{
for (auto& microservice : outputMicroservices)
{
LOG_DEBUG("{} -> {}", GetName(), microservice->GetName());
microservice->Execute();
}
return std::any_cast<T&>(param.value);
}

void GenerateMicroserviceName()
Expand All @@ -158,25 +124,44 @@ class Microservice
if (posMs != std::string::npos)
typeName.erase(posMs, std::string("Microservice").length());

static usize idx = 0;
microserviceId = idx += 100; // for unique input/output pin ids
microserviceName = fmt::format("{}{}", typeName, microserviceId);
microserviceName = fmt::format("{}.{}", typeName, GetId());
}

public:
virtual ~Microservice() {}

virtual void Process() = 0;

const std::string& GetName() { return microserviceName; }

usize GetId() { return microserviceId; }
uintptr_t GetId() const { return reinterpret_cast<uintptr_t>(this); }

const std::unordered_map<std::string, MicroserviceInputParameter>& GetInputParameters() { return inputParameters; }

const std::unordered_map<std::string, MicroserviceOutputParameter>& GetOutputParameters() { return outputParameters; }

MicroserviceInputParameter& GetInputParameter(const std::string& paramName)
{
if (not inputParameters.contains(paramName))
throw std::runtime_error(fmt::format("Input parameter '{}' not found", paramName));

const std::unordered_map<std::string, MicroserviceParameter>& GetInputParameters() { return inputParameters; }
return inputParameters.at(paramName);
}

const std::unordered_map<std::string, MicroserviceParameter>& GetOutputParameters() { return outputParameters; }
MicroserviceOutputParameter& GetOutputParameter(const std::string& paramName)
{
if (not outputParameters.contains(paramName))
throw std::runtime_error(fmt::format("Output parameter '{}' not found", paramName));

return outputParameters.at(paramName);
}

void Initialize()
{
GenerateMicroserviceName();
// TODO: add input / output flow connections
DefineInputParameter<void>(startParameterName);
DefineOutputParameter<void>(finishParameterName);
DefineInputParameters();
DefineOutputParameters();
}
Expand All @@ -189,49 +174,4 @@ class Microservice
for (auto& [name, param] : outputParameters)
param.Reset();
}

static void Connect(std::shared_ptr<Microservice> outputMicroservice, std::shared_ptr<Microservice> inputMicroservice, const std::string& outputParameterName,
const std::string& inputParameterName) // connect output / input parameters
{
if (not outputMicroservice->outputParameters.contains(outputParameterName))
throw std::runtime_error(fmt::format("Cannot connect microservice '{}' parameter '{}' to microservice '{}' parameter '{}' - output parameter not found",
outputMicroservice->GetName(), outputParameterName, inputMicroservice->GetName(), inputParameterName));
if (not inputMicroservice->inputParameters.contains(inputParameterName))
throw std::runtime_error(fmt::format("Cannot connect microservice '{}' parameter '{}' to microservice '{}' parameter '{}' - input parameter not found",
outputMicroservice->GetName(), outputParameterName, inputMicroservice->GetName(), inputParameterName));

MicroserviceParameterConnection connection(outputMicroservice.get(), inputMicroservice.get(), outputParameterName, inputParameterName);
if (not std::ranges::any_of(outputMicroservice->outputParameterConnections, [&connection](const auto& conn) { return conn == connection; }))
outputMicroservice->outputParameterConnections.push_back(connection);
else
LOG_WARNING(
"Ignoring duplicate output parameter connection: {}:{} -> {}:{}", outputMicroservice->GetName(), outputParameterName, inputMicroservice->GetName(), inputParameterName);
if (not std::ranges::any_of(inputMicroservice->inputParameterConnections, [&connection](const auto& conn) { return conn == connection; }))
inputMicroservice->inputParameterConnections.push_back(connection);
else
LOG_WARNING(
"Ignoring duplicate input parameter connection: {}:{} -> {}:{}", outputMicroservice->GetName(), outputParameterName, inputMicroservice->GetName(), inputParameterName);
}

static void Connect(std::shared_ptr<Microservice> outputMicroservice, std::shared_ptr<Microservice> inputMicroservice) // main execution flow: notify when done processing
{
if (not std::ranges::any_of(outputMicroservice->outputMicroservices, [&inputMicroservice](const auto& ms) { return ms.get() == inputMicroservice.get(); }))
outputMicroservice->outputMicroservices.push_back(inputMicroservice);
else
LOG_WARNING("Ignoring duplicate microservice connection: {} -> {}", outputMicroservice->GetName(), inputMicroservice->GetName());
}

bool HasInputConnection() const { return inputParameterConnections.size() > 0; }

void Execute()
try
{
Process(); // do the processing
PropagateOutputs(); // propagate outputs to connected inputs
Notify(); // notify connected microservices to start processing
}
catch (const std::exception& e)
{
LOG_ERROR("Microservice '{}' error: {}", microserviceName, e.what());
}
};
9 changes: 9 additions & 0 deletions src/Microservice/StartMicroservice.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once
#include "Microservice.hpp"

class StartMicroservice : public Microservice
{
void DefineInputParameters() override {}
void DefineOutputParameters() override { DefineOutputParameter<bool>("start"); }
void Process() override { SetOutputParameter("start", true); }
};
4 changes: 4 additions & 0 deletions src/Microservice/TODO.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
- [x] use std::any, std::any_cast for input / output parameters / connections
- [x] named inputs / outputs
- [ ] dont duplicate data storage during ms output -> ms input
- [ ] add dedicated start node
- [ ] add parameter id for pins
- [ ] use void* as node/link/pin ids

Loading

0 comments on commit 28ff1f0

Please sign in to comment.