Skip to content
This repository has been archived by the owner on Dec 7, 2023. It is now read-only.

Commit

Permalink
+ ability to filter out plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
simple0x47 committed Sep 28, 2023
1 parent e19857c commit 0b34f88
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 18 deletions.
9 changes: 6 additions & 3 deletions src/api/server/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ impl<InputImpl: 'static + Input + Send, LogicRequestType: 'static + Send>
} else {
for (index, plugin) in plugins_pointer.as_slice().iter().enumerate()
{
if input.filter_out_plugins().contains(&plugin.id()) {
if input
.filter_out_plugins()
.contains(&plugin.id().to_string())
{
continue;
}

Expand Down Expand Up @@ -177,7 +180,7 @@ impl Input for InputTimedImpl {
})
}

fn filter_out_plugins(&self) -> &[&str] {
fn filter_out_plugins(&self) -> &[String] {
&[]
}
}
Expand Down Expand Up @@ -248,7 +251,7 @@ impl Input for InputDummyImpl {
Ok(InputData { request, replier })
}

fn filter_out_plugins(&self) -> &[&str] {
fn filter_out_plugins(&self) -> &[String] {
&[]
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/api/server/input/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ use crate::core::error::Error;
#[async_trait::async_trait]
pub trait Input {
async fn receive(&mut self) -> Result<InputData, Error>;
fn filter_out_plugins(&self) -> &[&str];
fn filter_out_plugins(&self) -> &[String];
}
12 changes: 6 additions & 6 deletions src/impl/api/server/input/amqp_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ use crate::api::shared::request::Request;
use crate::core::error::{Error, ErrorKind};
use crate::r#impl::api::shared::amqp_queue_consumer::AmqpQueueConsumer;

pub struct AmqpInput<'a> {
pub struct AmqpInput {
channel: Arc<Channel>,
consumer: Consumer,
reject_options: BasicRejectOptions,
ack_options: BasicAckOptions,
filter_out_plugins: Vec<&'a str>,
filter_out_plugins: Vec<String>,
}

impl<'a> AmqpInput<'a> {
impl AmqpInput {
pub async fn try_new(
channel: Arc<Channel>,
queue_consumer: AmqpQueueConsumer,
filter_out_plugins: Vec<&'a str>,
filter_out_plugins: Vec<String>,
) -> Result<AmqpInput, Error> {
let _queue = match channel
.queue_declare(
Expand Down Expand Up @@ -122,7 +122,7 @@ impl<'a> AmqpInput<'a> {
}

#[async_trait]
impl<'a> Input for AmqpInput<'a> {
impl Input for AmqpInput {
async fn receive(&mut self) -> Result<InputData, Error> {
let delivery = match self.consumer.try_next().await {
Ok(optional_delivery) => match optional_delivery {
Expand Down Expand Up @@ -236,7 +236,7 @@ impl<'a> Input for AmqpInput<'a> {
Ok(InputData::new(request, replier))
}

fn filter_out_plugins(&self) -> &[&str] {
fn filter_out_plugins(&self) -> &[String] {
self.filter_out_plugins.as_slice()
}
}
9 changes: 9 additions & 0 deletions src/impl/api/shared/amqp_api_entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use serde::{Deserialize, Serialize};

use crate::r#impl::api::shared::amqp_queue_consumer::AmqpQueueConsumer;

#[derive(Deserialize, Serialize)]
pub struct AmqpApiEntry {
pub amqp_queue_consumer: AmqpQueueConsumer,
pub filter_out_plugins: Vec<String>,
}
1 change: 1 addition & 0 deletions src/impl/api/shared/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod amqp_api_entry;
pub mod amqp_consume;
pub mod amqp_publish;
pub mod amqp_qos;
Expand Down
23 changes: 15 additions & 8 deletions src/impl/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use multiple_connections_lapin_wrapper::{
amqp_wrapper::AmqpWrapper, config::amqp_connect_config::AmqpConnectConfig,
};

use crate::r#impl::api::shared::amqp_api_entry::AmqpApiEntry;
use crate::{
api::server::{action::Action, input::input_plugin::InputPlugin},
r#impl::api::{
Expand Down Expand Up @@ -99,10 +100,10 @@ fn get_amqp_connect_config(
Ok(amqp_connect_config)
}

fn get_amqp_api(amqp_api_file: String) -> Result<Vec<AmqpQueueConsumer>, std::io::Error> {
fn get_amqp_api<'a>(amqp_api_file: String) -> Result<Vec<AmqpApiEntry>, std::io::Error> {
let amqp_api_file_content = std::fs::read_to_string(amqp_api_file)?;

let amqp_api = match serde_json::from_str::<Vec<AmqpQueueConsumer>>(&amqp_api_file_content) {
let amqp_api = match serde_json::from_str::<Vec<AmqpApiEntry>>(&amqp_api_file_content) {
Ok(amqp_api) => amqp_api,
Err(error) => {
return Err(std::io::Error::new(
Expand All @@ -115,13 +116,13 @@ fn get_amqp_api(amqp_api_file: String) -> Result<Vec<AmqpQueueConsumer>, std::io
Ok(amqp_api)
}

async fn generate_inputs_from_api<'a>(
async fn generate_inputs_from_api(
mut amqp_wrapper: AmqpWrapper,
amqp_api: Vec<AmqpQueueConsumer>,
) -> Result<Vec<AmqpInput<'a>>, std::io::Error> {
let mut inputs: Vec<AmqpInput<'a>> = Vec::new();
amqp_api: Vec<AmqpApiEntry>,
) -> Result<Vec<AmqpInput>, std::io::Error> {
let mut inputs: Vec<AmqpInput> = Vec::new();

for amqp_queue_consumer in amqp_api {
for amqp_api_entry in amqp_api {
let channel = match amqp_wrapper.try_get_channel().await {
Ok(channel) => channel,
Err(error) => {
Expand All @@ -132,7 +133,13 @@ async fn generate_inputs_from_api<'a>(
}
};

let amqp_input = match AmqpInput::try_new(channel, amqp_queue_consumer, Vec::new()).await {
let amqp_input = match AmqpInput::try_new(
channel,
amqp_api_entry.amqp_queue_consumer,
amqp_api_entry.filter_out_plugins,
)
.await
{
Ok(amqp_input) => amqp_input,
Err(error) => {
return Err(std::io::Error::new(
Expand Down

0 comments on commit 0b34f88

Please sign in to comment.