Skip to content
This repository has been archived by the owner on Apr 16, 2022. It is now read-only.

Commit

Permalink
Message delivery mode (#8)
Browse files Browse the repository at this point in the history
Added message delivery mode option (PERSISTENT/NON-PERSISTENT)
  • Loading branch information
antikantian authored and Steve Chávez committed Dec 20, 2017
1 parent 686941e commit 32466ca
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 6 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
target/
**/*.rs.bk
/.idea/
Cargo.lock
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Configuration is done through environment variables:
- **POSTGRESQL_URI**: e.g. `postgresql://username:password@domain.tld:port/database`
- **AMQP_URI**: e.g. `amqp://rabbitmq//`
- **BRIDGE_CHANNELS**: e.g. `pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange`
- **DELIVERY_MODE**: e.g. `PERSISTENT`, `NON-PERSISTENT`

**Note:** It's recommended to always use the same name for postgresql channel and exchange/queue in `BRIDGE_CHANNELS`, for example
`app_events:app_events,table_changes:tables_changes`
Expand All @@ -37,6 +38,7 @@ mv pg-amqp-bridge /usr/local/bin
POSTGRESQL_URI="postgres://postgres@localhost" \
AMQP_URI="amqp://localhost//" \
BRIDGE_CHANNELS="pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange" \
DELIVERY_MODE="NON-PERSISTENT" \
pg-amqp-bridge
```

Expand All @@ -47,6 +49,7 @@ docker run --rm -it --net=host \
-e POSTGRESQL_URI="postgres://postgres@localhost" \
-e AMQP_URI="amqp://localhost//" \
-e BRIDGE_CHANNELS="pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange" \
-e DELIVERY_MODE="NON-PERSISTENT" \
subzerocloud/pg-amqp-bridge
```

Expand Down Expand Up @@ -145,6 +148,7 @@ curl https://sh.rustup.rs -sSf | sh
POSTGRESQL_URI="postgres://postgres@localhost" \
AMQP_URI="amqp://localhost//" \
BRIDGE_CHANNELS="pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange" \
DELIVERY_MODE="NON-PERSISTENT" \
cargo run
```

Expand Down
15 changes: 11 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Bridge {
pub fn new() -> Bridge {
Bridge { is_ready: false }
}
pub fn start(&mut self, amqp_uri: &str, pg_uri: &String, bridge_channels: &str){
pub fn start(&mut self, amqp_uri: &str, pg_uri: &String, bridge_channels: &str, delivery_mode: &str){

let mut bindings = parse_bridge_channels(bridge_channels);
let mut children = Vec::new();
Expand Down Expand Up @@ -74,7 +74,7 @@ impl Bridge {

for binding in bindings{
channel_id += 1; let channel = session.open_channel(channel_id).unwrap();
children.push(spawn_listener_publisher(channel, pg_uri.clone(), binding));
children.push(spawn_listener_publisher(channel, pg_uri.clone(), binding, delivery_mode.to_owned()));
}

for child in children{
Expand All @@ -83,7 +83,7 @@ impl Bridge {
}
}

fn spawn_listener_publisher(mut channel: Channel, pg_uri: String, binding: Binding) -> JoinHandle<()>{
fn spawn_listener_publisher(mut channel: Channel, pg_uri: String, binding: Binding, delivery_mode: String) -> JoinHandle<()>{
thread::spawn(move ||{
let pg_conn = Connection::connect(pg_uri, TlsMode::None).expect("Could not connect to PostgreSQL");

Expand All @@ -97,6 +97,12 @@ fn spawn_listener_publisher(mut channel: Channel, pg_uri: String, binding: Bindi

let amqp_entity_type = binding.amqp_entity_type.unwrap();

let persistence = match delivery_mode.to_uppercase().as_ref() {
"PERSISTENT" => 2,
_ => 1,
};


while let Ok(Some(notification)) = it.next() {
let (routing_key, message) = parse_notification(&notification.payload);
let (exchange, key) = if amqp_entity_type == Type::Exchange {
Expand All @@ -105,7 +111,7 @@ fn spawn_listener_publisher(mut channel: Channel, pg_uri: String, binding: Bindi
("", binding.amqp_entity.as_str())
};
channel.basic_publish(exchange, key, true, false,
protocol::basic::BasicProperties{ content_type: Some("text".to_string()), ..Default::default()},
protocol::basic::BasicProperties{ content_type: Some("text".to_string()), delivery_mode: Some(persistence), ..Default::default()},
message.as_bytes().to_vec()).unwrap();
info!("{:?} -> {:?} {:?} ( routing_key: {:?}, message: {:?} )",
binding.pg_channel, amqp_entity_type, binding.amqp_entity, routing_key, message);
Expand Down Expand Up @@ -210,3 +216,4 @@ mod tests {
assert!(catch_unwind(|| parse_bridge_channels("pgchannel2, pgchannel2")).is_err());
}
}

5 changes: 4 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ struct Config {
postgresql_uri: String,
amqp_uri: String,
bridge_channels: String,
delivery_mode: String,
}

impl Config {
Expand All @@ -17,12 +18,14 @@ impl Config {
postgresql_uri: env::var("POSTGRESQL_URI").expect("POSTGRESQL_URI environment variable must be defined"),
amqp_uri: env::var("AMQP_URI").expect("AMQP_URI environment variable must be defined"),
bridge_channels: env::var("BRIDGE_CHANNELS").expect("BRIDGE_CHANNELS environment variable must be defined"),
delivery_mode: env::var("DELIVERY_MODE").expect("DELIVERY_MODE environment variable must be defined"),
}
}
}

fn main() {
env_logger::init().unwrap();
let config = Config::new();
Bridge::new().start(&config.amqp_uri, &config.postgresql_uri, &config.bridge_channels);
Bridge::new().start(&config.amqp_uri, &config.postgresql_uri, &config.bridge_channels, &config.delivery_mode);
}

5 changes: 4 additions & 1 deletion tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,17 @@ fn main() {
TEST_1_PG_CHANNEL, TEST_1_QUEUE,
TEST_2_PG_CHANNEL, TEST_2_EXCHANGE,
TEST_3_PG_CHANNEL, TEST_3_EXCHANGE);

let delivery_mode: &str = "PERSISTENT";
setup();
add_test(&mut tests, "publishing_to_queue_works".to_string(), publishing_to_queue_works);
add_test(&mut tests, "publishing_to_direct_exchange_works".to_string(), publishing_to_direct_exchange_works);
add_test(&mut tests, "publishing_to_topic_exchange_works".to_string(), publishing_to_topic_exchange_works);
thread::spawn(move ||
Bridge::new().start(&TEST_AMQP_URI.to_string(),
&TEST_PG_URI.to_string(),
&bridge_channels)
&bridge_channels,
&delivery_mode)
);
thread::sleep(Duration::from_secs(2));
test::test_main(&args, tests);
Expand Down

0 comments on commit 32466ca

Please sign in to comment.