Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration option allowing outstation to respond to any master #316

Merged
merged 1 commit into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions dnp3/src/master/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl MasterSession {
return self.handle_fragment_while_idle(io, writer, source, response).await
}
Some(TransportResponse::LinkLayerMessage(msg)) => self.notify_link_activity(msg.source),
Some(TransportResponse::Error(_)) => return Ok(()), // ignore the malformed response
Some(TransportResponse::Error(_, _)) => return Ok(()), // ignore the malformed response
None => return Ok(()),
}
}
Expand Down Expand Up @@ -196,7 +196,7 @@ impl MasterSession {
return self.handle_fragment_while_idle(io, writer, source, response).await
}
Some(TransportResponse::LinkLayerMessage(msg)) => self.notify_link_activity(msg.source),
Some(TransportResponse::Error(_)) => return Ok(()), // ignore the malformed response
Some(TransportResponse::Error(_, _)) => return Ok(()), // ignore the malformed response
None => return Ok(()),
}
}
Expand Down Expand Up @@ -436,7 +436,7 @@ impl MasterSession {
}
}
Some(TransportResponse::LinkLayerMessage(msg)) => self.notify_link_activity(msg.source),
Some(TransportResponse::Error(err)) => {
Some(TransportResponse::Error(_, err)) => {
task.on_task_error(self.associations.get_mut(destination).ok(), err.into());
return Err(err.into());
},
Expand Down Expand Up @@ -569,7 +569,7 @@ impl MasterSession {
}
}
Some(TransportResponse::LinkLayerMessage(msg)) => self.notify_link_activity(msg.source),
Some(TransportResponse::Error(err)) => return Err(err.into()),
Some(TransportResponse::Error(_, err)) => return Err(err.into()),
None => continue
}
}
Expand Down Expand Up @@ -802,7 +802,7 @@ impl MasterSession {
self.notify_link_activity(msg.source);
return Ok(());
}
Some(TransportResponse::Error(_)) => return Err(TaskError::UnexpectedResponseHeaders),
Some(TransportResponse::Error(_, _)) => return Err(TaskError::UnexpectedResponseHeaders),
None => continue,
}
}
Expand Down
7 changes: 7 additions & 0 deletions dnp3/src/outstation/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@ pub enum Feature {
feature = "serialization",
derive(serde::Serialize, serde::Deserialize)
)]
#[cfg_attr(not(feature = "ffi"), non_exhaustive)]
pub struct Features {
/// if enabled, the outstation responds to the self address (default == Disabled)
pub self_address: Feature,
/// if enabled, the outstation processes valid broadcast messages (default == Enabled)
pub broadcast: Feature,
/// if enabled, the outstation will send process enable/disable unsolicited and produce unsolicited responses (default == Enabled)
pub unsolicited: Feature,
/// if enabled, the outstation will process every request as if it came from the configured master address
///
/// This feature is a hack that can make configuration of some systems easier/more flexible, but
/// should not be used when unsolicited reporting is also required.
pub respond_to_any_master: Feature,
}

impl Default for Features {
Expand All @@ -37,6 +43,7 @@ impl Default for Features {
self_address: Feature::Disabled,
broadcast: Feature::Enabled,
unsolicited: Feature::Enabled,
respond_to_any_master: Feature::Disabled,
}
}
}
Expand Down
94 changes: 65 additions & 29 deletions dnp3/src/outstation/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,22 @@ pub(crate) struct SessionConfig {
select_timeout: Timeout,
broadcast: Feature,
unsolicited: Feature,
respond_to_any_master: Feature,
max_unsolicited_retries: Option<usize>,
unsolicited_retry_delay: std::time::Duration,
keep_alive_timeout: Option<std::time::Duration>,
max_controls_per_request: Option<u16>,
}

impl SessionConfig {
fn required_master_address(&self) -> Option<EndpointAddress> {
match self.respond_to_any_master {
Feature::Enabled => None,
Feature::Disabled => Some(self.master_address),
}
}
}

pub(crate) struct SessionParameters {
max_read_headers_per_request: u16,
sol_tx_buffer_size: BufferSize<249, 2048>,
Expand All @@ -179,6 +189,7 @@ impl From<OutstationConfig> for SessionConfig {
select_timeout: config.select_timeout,
broadcast: config.features.broadcast,
unsolicited: config.features.unsolicited,
respond_to_any_master: config.features.respond_to_any_master,
max_unsolicited_retries: config.max_unsolicited_retries,
unsolicited_retry_delay: config.unsolicited_retry_delay,
keep_alive_timeout: config.keep_alive_timeout,
Expand Down Expand Up @@ -256,7 +267,7 @@ pub(crate) struct OutstationSession {
}

enum Confirm {
Yes,
Yes(EndpointAddress),
Timeout,
NewRequest,
}
Expand Down Expand Up @@ -288,9 +299,9 @@ enum FragmentType<'a> {

#[derive(Copy, Clone)]
enum ConfirmAction {
Confirmed,
Confirmed(EndpointAddress),
NewRequest,
EchoLastResponse(Option<Response>),
EchoLastResponse(EndpointAddress, Option<Response>),
ContinueWait,
}

Expand Down Expand Up @@ -385,6 +396,7 @@ impl OutstationSession {
&mut self,
io: &mut PhysLayer,
writer: &mut TransportWriter,
respond_to: EndpointAddress,
mut response: Response,
database: &DatabaseHandle,
) -> Result<Response, LinkError> {
Expand All @@ -397,14 +409,16 @@ impl OutstationSession {
}
}

self.repeat_solicited(io, writer, response).await?;
self.repeat_solicited(io, respond_to, writer, response)
.await?;

Ok(response)
}

async fn repeat_solicited(
&mut self,
io: &mut PhysLayer,
respond_to: EndpointAddress,
writer: &mut TransportWriter,
response: Response,
) -> Result<(), LinkError> {
Expand All @@ -417,7 +431,7 @@ impl OutstationSession {
.write(
io,
self.config.decode_level,
self.config.master_address.wrap(),
respond_to.wrap(),
self.sol_tx_buffer.get(len).unwrap(),
)
.await
Expand Down Expand Up @@ -702,7 +716,7 @@ impl OutstationSession {
return Ok(UnsolicitedWaitResult::Timeout);
}

let mut guard = reader.pop_request(self.config.master_address);
let mut guard = reader.pop_request(self.config.required_master_address());
let (info, request) = match guard.get() {
None => return Ok(UnsolicitedWaitResult::ReadNext),
Some(TransportRequest::Request(info, request)) => {
Expand All @@ -713,9 +727,10 @@ impl OutstationSession {
self.on_link_activity();
return Ok(UnsolicitedWaitResult::ReadNext);
}
Some(TransportRequest::Error(err)) => {
Some(TransportRequest::Error(from, err)) => {
self.state.deferred_read.clear();
self.write_error_response(io, writer, err, database).await?;
self.write_error_response(io, from, writer, err, database)
.await?;
return Ok(UnsolicitedWaitResult::ReadNext);
}
};
Expand Down Expand Up @@ -755,8 +770,14 @@ impl OutstationSession {

let seq = request.header.control.seq;
let iin = Iin::default() | Iin2::from(err);
self.write_solicited(io, writer, Response::empty_solicited(seq, iin), database)
.await?;
self.write_solicited(
io,
writer,
info.source,
Response::empty_solicited(seq, iin),
database,
)
.await?;
Ok(UnsolicitedWaitResult::ReadNext)
}
FragmentType::NewNonRead(hash, objects) => {
Expand All @@ -772,7 +793,7 @@ impl OutstationSession {
.await;
if let Some(response) = &mut response {
*response = self
.write_solicited(io, writer, *response, database)
.write_solicited(io, writer, info.source, *response, database)
.await?;
}
self.state.last_valid_request = Some(LastValidRequest::new(
Expand Down Expand Up @@ -807,7 +828,8 @@ impl OutstationSession {
}
FragmentType::RepeatNonRead(_, last_response) => {
if let Some(last_response) = last_response {
self.repeat_solicited(io, writer, last_response).await?
self.repeat_solicited(io, info.source, writer, last_response)
.await?
}
self.state.deferred_read.clear();
Ok(UnsolicitedWaitResult::ReadNext)
Expand Down Expand Up @@ -898,7 +920,9 @@ impl OutstationSession {
if let Some(x) = self.state.deferred_read.select(database) {
tracing::info!("handling deferred READ request");
let (response, mut series) = self.write_read_response(database, true, x.seq, x.iin2);
let response = self.write_solicited(io, writer, response, database).await?;
let response = self
.write_solicited(io, writer, x.info.source, response, database)
.await?;
self.state.last_valid_request =
Some(LastValidRequest::new(x.seq, x.hash, Some(response), series));

Expand Down Expand Up @@ -929,7 +953,7 @@ impl OutstationSession {
writer: &mut TransportWriter,
database: &mut DatabaseHandle,
) -> Result<(), RunError> {
let mut guard = reader.pop_request(self.config.master_address);
let mut guard = reader.pop_request(self.config.required_master_address());
match guard.get() {
Some(TransportRequest::Request(info, request)) => {
self.on_link_activity();
Expand All @@ -940,7 +964,7 @@ impl OutstationSession {
// optional response
if let Some(response) = &mut result.response {
*response = self
.write_solicited(io, writer, *response, database)
.write_solicited(io, writer, info.source, *response, database)
.await?;

// check if an extra confirmation was added due to broadcast
Expand Down Expand Up @@ -968,9 +992,10 @@ impl OutstationSession {
Some(TransportRequest::LinkLayerMessage(_)) => {
self.on_link_activity();
}
Some(TransportRequest::Error(err)) => {
Some(TransportRequest::Error(from, err)) => {
self.on_link_activity();
self.write_error_response(io, writer, err, database).await?;
self.write_error_response(io, from, writer, err, database)
.await?;
}
None => (),
}
Expand Down Expand Up @@ -1045,6 +1070,7 @@ impl OutstationSession {
async fn write_error_response(
&mut self,
io: &mut PhysLayer,
respond_to: EndpointAddress,
writer: &mut TransportWriter,
err: TransportRequestError,
database: &DatabaseHandle,
Expand All @@ -1059,8 +1085,14 @@ impl OutstationSession {

if let Some(seq) = seq {
let iin = Iin::default() | Iin2::NO_FUNC_CODE_SUPPORT;
self.write_solicited(io, writer, Response::empty_solicited(seq, iin), database)
.await?;
self.write_solicited(
io,
writer,
respond_to,
Response::empty_solicited(seq, iin),
database,
)
.await?;
}
Ok(())
}
Expand Down Expand Up @@ -1982,7 +2014,7 @@ impl OutstationSession {
.wait_for_sol_confirm(io, reader, writer, series.ecsn)
.await?
{
Confirm::Yes => {
Confirm::Yes(respond_to) => {
self.state.last_broadcast_type = None;

database
Expand All @@ -1997,7 +2029,8 @@ impl OutstationSession {
series.ecsn.increment();
let (response, next) =
self.write_read_response(database, false, series.ecsn, Iin2::default());
self.write_solicited(io, writer, response, database).await?;
self.write_solicited(io, writer, respond_to, response, database)
.await?;
match next {
None => return Ok(()),
Some(next) => {
Expand Down Expand Up @@ -2035,25 +2068,26 @@ impl OutstationSession {
}
// process data
TimeoutStatus::No => {
let mut guard = reader.pop_request(self.config.master_address);
let mut guard = reader.pop_request(self.config.required_master_address());
match self.expect_sol_confirm(ecsn, &mut guard) {
ConfirmAction::ContinueWait => {
// we ignored whatever the request was and logged it elsewhere
// just go back to the loop and read another fragment
}
ConfirmAction::Confirmed => {
ConfirmAction::Confirmed(respond_to) => {
self.info.solicited_confirm_received(ecsn);
return Ok(Confirm::Yes);
return Ok(Confirm::Yes(respond_to));
}
ConfirmAction::NewRequest => {
self.info.solicited_confirm_wait_new_request();
// retain the fragment so that it can be processed from the idle state
guard.retain();
return Ok(Confirm::NewRequest);
}
ConfirmAction::EchoLastResponse(response) => {
ConfirmAction::EchoLastResponse(respond_to, response) => {
if let Some(response) = response {
self.repeat_solicited(io, writer, response).await?;
self.repeat_solicited(io, respond_to, writer, response)
.await?;
}
// per the spec, we restart the confirm timer
deadline = self.new_confirm_deadline();
Expand All @@ -2074,7 +2108,7 @@ impl OutstationSession {
self.on_link_activity();
return ConfirmAction::ContinueWait;
}
Some(TransportRequest::Error(_)) => {
Some(TransportRequest::Error(_, _)) => {
self.on_link_activity();
return ConfirmAction::NewRequest;
}
Expand All @@ -2084,14 +2118,16 @@ impl OutstationSession {
match self.classify(info, request) {
FragmentType::MalformedRequest(_, _) => ConfirmAction::NewRequest,
FragmentType::NewRead(_, _) => ConfirmAction::NewRequest,
FragmentType::RepeatRead(_, response, _) => ConfirmAction::EchoLastResponse(response),
FragmentType::RepeatRead(_, response, _) => {
ConfirmAction::EchoLastResponse(info.source, response)
}
FragmentType::NewNonRead(_, _) => ConfirmAction::NewRequest,
// this should never happen, but if it does, new request is probably best course of action
FragmentType::RepeatNonRead(_, _) => ConfirmAction::NewRequest,
FragmentType::Broadcast(_) => ConfirmAction::NewRequest,
FragmentType::SolicitedConfirm(seq) => {
if seq == ecsn {
ConfirmAction::Confirmed
ConfirmAction::Confirmed(info.source)
} else {
self.info
.wrong_solicited_confirm_seq(ecsn, request.header.control.seq);
Expand Down
16 changes: 15 additions & 1 deletion dnp3/src/outstation/tests/addressing.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::link::EndpointAddress;
use crate::outstation::tests::harness::*;
use crate::outstation::Feature;

#[tokio::test]
async fn ignores_message_sent_from_master_different_than_configured() {
async fn ignores_message_sent_from_master_different_than_configured_by_default() {
let config = get_default_config();
let different_address =
EndpointAddress::try_new(config.master_address.raw_value() + 1).unwrap();
Expand All @@ -11,3 +12,16 @@ async fn ignores_message_sent_from_master_different_than_configured() {
harness.send_and_process(&[0xC0, 0x01]).await;
harness.expect_no_response();
}

#[tokio::test]
async fn answers_message_sent_from_master_different_than_configured_when_enabled() {
let mut config = get_default_config();
config.features.respond_to_any_master = Feature::Enabled;
let different_address =
EndpointAddress::try_new(config.master_address.raw_value() + 1).unwrap();
let mut harness = new_harness_with_master_addr(config, different_address);

harness
.test_request_response(&[0xC0, 0x01], &[0xC0, 0x81, 0x80, 0x00])
.await;
}
Loading