diff --git a/dnp3/src/master/task.rs b/dnp3/src/master/task.rs index 1bb66410..abc30c33 100644 --- a/dnp3/src/master/task.rs +++ b/dnp3/src/master/task.rs @@ -163,7 +163,7 @@ impl MasterSession { return self.handle_fragment_while_idle(io, writer, source, response).await } Some(TransportResponse::LinkLayerMessage(msg)) => self.notify_link_activity(msg.source), - Some(TransportResponse::Error(_)) => return Ok(()), // ignore the malformed response + Some(TransportResponse::Error(_, _)) => return Ok(()), // ignore the malformed response None => return Ok(()), } } @@ -196,7 +196,7 @@ impl MasterSession { return self.handle_fragment_while_idle(io, writer, source, response).await } Some(TransportResponse::LinkLayerMessage(msg)) => self.notify_link_activity(msg.source), - Some(TransportResponse::Error(_)) => return Ok(()), // ignore the malformed response + Some(TransportResponse::Error(_, _)) => return Ok(()), // ignore the malformed response None => return Ok(()), } } @@ -436,7 +436,7 @@ impl MasterSession { } } Some(TransportResponse::LinkLayerMessage(msg)) => self.notify_link_activity(msg.source), - Some(TransportResponse::Error(err)) => { + Some(TransportResponse::Error(_, err)) => { task.on_task_error(self.associations.get_mut(destination).ok(), err.into()); return Err(err.into()); }, @@ -569,7 +569,7 @@ impl MasterSession { } } Some(TransportResponse::LinkLayerMessage(msg)) => self.notify_link_activity(msg.source), - Some(TransportResponse::Error(err)) => return Err(err.into()), + Some(TransportResponse::Error(_, err)) => return Err(err.into()), None => continue } } @@ -802,7 +802,7 @@ impl MasterSession { self.notify_link_activity(msg.source); return Ok(()); } - Some(TransportResponse::Error(_)) => return Err(TaskError::UnexpectedResponseHeaders), + Some(TransportResponse::Error(_, _)) => return Err(TaskError::UnexpectedResponseHeaders), None => continue, } } diff --git a/dnp3/src/outstation/config.rs b/dnp3/src/outstation/config.rs index 838b1570..79ab4a93 100644 --- a/dnp3/src/outstation/config.rs +++ b/dnp3/src/outstation/config.rs @@ -22,6 +22,7 @@ pub enum Feature { feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] +#[cfg_attr(not(feature = "ffi"), non_exhaustive)] pub struct Features { /// if enabled, the outstation responds to the self address (default == Disabled) pub self_address: Feature, @@ -29,6 +30,11 @@ pub struct Features { pub broadcast: Feature, /// if enabled, the outstation will send process enable/disable unsolicited and produce unsolicited responses (default == Enabled) pub unsolicited: Feature, + /// if enabled, the outstation will process every request as if it came from the configured master address + /// + /// This feature is a hack that can make configuration of some systems easier/more flexible, but + /// should not be used when unsolicited reporting is also required. + pub respond_to_any_master: Feature, } impl Default for Features { @@ -37,6 +43,7 @@ impl Default for Features { self_address: Feature::Disabled, broadcast: Feature::Enabled, unsolicited: Feature::Enabled, + respond_to_any_master: Feature::Disabled, } } } diff --git a/dnp3/src/outstation/session.rs b/dnp3/src/outstation/session.rs index 2f6a2dd9..846de01d 100644 --- a/dnp3/src/outstation/session.rs +++ b/dnp3/src/outstation/session.rs @@ -158,12 +158,22 @@ pub(crate) struct SessionConfig { select_timeout: Timeout, broadcast: Feature, unsolicited: Feature, + respond_to_any_master: Feature, max_unsolicited_retries: Option, unsolicited_retry_delay: std::time::Duration, keep_alive_timeout: Option, max_controls_per_request: Option, } +impl SessionConfig { + fn required_master_address(&self) -> Option { + match self.respond_to_any_master { + Feature::Enabled => None, + Feature::Disabled => Some(self.master_address), + } + } +} + pub(crate) struct SessionParameters { max_read_headers_per_request: u16, sol_tx_buffer_size: BufferSize<249, 2048>, @@ -179,6 +189,7 @@ impl From for SessionConfig { select_timeout: config.select_timeout, broadcast: config.features.broadcast, unsolicited: config.features.unsolicited, + respond_to_any_master: config.features.respond_to_any_master, max_unsolicited_retries: config.max_unsolicited_retries, unsolicited_retry_delay: config.unsolicited_retry_delay, keep_alive_timeout: config.keep_alive_timeout, @@ -256,7 +267,7 @@ pub(crate) struct OutstationSession { } enum Confirm { - Yes, + Yes(EndpointAddress), Timeout, NewRequest, } @@ -288,9 +299,9 @@ enum FragmentType<'a> { #[derive(Copy, Clone)] enum ConfirmAction { - Confirmed, + Confirmed(EndpointAddress), NewRequest, - EchoLastResponse(Option), + EchoLastResponse(EndpointAddress, Option), ContinueWait, } @@ -385,6 +396,7 @@ impl OutstationSession { &mut self, io: &mut PhysLayer, writer: &mut TransportWriter, + respond_to: EndpointAddress, mut response: Response, database: &DatabaseHandle, ) -> Result { @@ -397,7 +409,8 @@ impl OutstationSession { } } - self.repeat_solicited(io, writer, response).await?; + self.repeat_solicited(io, respond_to, writer, response) + .await?; Ok(response) } @@ -405,6 +418,7 @@ impl OutstationSession { async fn repeat_solicited( &mut self, io: &mut PhysLayer, + respond_to: EndpointAddress, writer: &mut TransportWriter, response: Response, ) -> Result<(), LinkError> { @@ -417,7 +431,7 @@ impl OutstationSession { .write( io, self.config.decode_level, - self.config.master_address.wrap(), + respond_to.wrap(), self.sol_tx_buffer.get(len).unwrap(), ) .await @@ -702,7 +716,7 @@ impl OutstationSession { return Ok(UnsolicitedWaitResult::Timeout); } - let mut guard = reader.pop_request(self.config.master_address); + let mut guard = reader.pop_request(self.config.required_master_address()); let (info, request) = match guard.get() { None => return Ok(UnsolicitedWaitResult::ReadNext), Some(TransportRequest::Request(info, request)) => { @@ -713,9 +727,10 @@ impl OutstationSession { self.on_link_activity(); return Ok(UnsolicitedWaitResult::ReadNext); } - Some(TransportRequest::Error(err)) => { + Some(TransportRequest::Error(from, err)) => { self.state.deferred_read.clear(); - self.write_error_response(io, writer, err, database).await?; + self.write_error_response(io, from, writer, err, database) + .await?; return Ok(UnsolicitedWaitResult::ReadNext); } }; @@ -755,8 +770,14 @@ impl OutstationSession { let seq = request.header.control.seq; let iin = Iin::default() | Iin2::from(err); - self.write_solicited(io, writer, Response::empty_solicited(seq, iin), database) - .await?; + self.write_solicited( + io, + writer, + info.source, + Response::empty_solicited(seq, iin), + database, + ) + .await?; Ok(UnsolicitedWaitResult::ReadNext) } FragmentType::NewNonRead(hash, objects) => { @@ -772,7 +793,7 @@ impl OutstationSession { .await; if let Some(response) = &mut response { *response = self - .write_solicited(io, writer, *response, database) + .write_solicited(io, writer, info.source, *response, database) .await?; } self.state.last_valid_request = Some(LastValidRequest::new( @@ -807,7 +828,8 @@ impl OutstationSession { } FragmentType::RepeatNonRead(_, last_response) => { if let Some(last_response) = last_response { - self.repeat_solicited(io, writer, last_response).await? + self.repeat_solicited(io, info.source, writer, last_response) + .await? } self.state.deferred_read.clear(); Ok(UnsolicitedWaitResult::ReadNext) @@ -898,7 +920,9 @@ impl OutstationSession { if let Some(x) = self.state.deferred_read.select(database) { tracing::info!("handling deferred READ request"); let (response, mut series) = self.write_read_response(database, true, x.seq, x.iin2); - let response = self.write_solicited(io, writer, response, database).await?; + let response = self + .write_solicited(io, writer, x.info.source, response, database) + .await?; self.state.last_valid_request = Some(LastValidRequest::new(x.seq, x.hash, Some(response), series)); @@ -929,7 +953,7 @@ impl OutstationSession { writer: &mut TransportWriter, database: &mut DatabaseHandle, ) -> Result<(), RunError> { - let mut guard = reader.pop_request(self.config.master_address); + let mut guard = reader.pop_request(self.config.required_master_address()); match guard.get() { Some(TransportRequest::Request(info, request)) => { self.on_link_activity(); @@ -940,7 +964,7 @@ impl OutstationSession { // optional response if let Some(response) = &mut result.response { *response = self - .write_solicited(io, writer, *response, database) + .write_solicited(io, writer, info.source, *response, database) .await?; // check if an extra confirmation was added due to broadcast @@ -968,9 +992,10 @@ impl OutstationSession { Some(TransportRequest::LinkLayerMessage(_)) => { self.on_link_activity(); } - Some(TransportRequest::Error(err)) => { + Some(TransportRequest::Error(from, err)) => { self.on_link_activity(); - self.write_error_response(io, writer, err, database).await?; + self.write_error_response(io, from, writer, err, database) + .await?; } None => (), } @@ -1045,6 +1070,7 @@ impl OutstationSession { async fn write_error_response( &mut self, io: &mut PhysLayer, + respond_to: EndpointAddress, writer: &mut TransportWriter, err: TransportRequestError, database: &DatabaseHandle, @@ -1059,8 +1085,14 @@ impl OutstationSession { if let Some(seq) = seq { let iin = Iin::default() | Iin2::NO_FUNC_CODE_SUPPORT; - self.write_solicited(io, writer, Response::empty_solicited(seq, iin), database) - .await?; + self.write_solicited( + io, + writer, + respond_to, + Response::empty_solicited(seq, iin), + database, + ) + .await?; } Ok(()) } @@ -1982,7 +2014,7 @@ impl OutstationSession { .wait_for_sol_confirm(io, reader, writer, series.ecsn) .await? { - Confirm::Yes => { + Confirm::Yes(respond_to) => { self.state.last_broadcast_type = None; database @@ -1997,7 +2029,8 @@ impl OutstationSession { series.ecsn.increment(); let (response, next) = self.write_read_response(database, false, series.ecsn, Iin2::default()); - self.write_solicited(io, writer, response, database).await?; + self.write_solicited(io, writer, respond_to, response, database) + .await?; match next { None => return Ok(()), Some(next) => { @@ -2035,15 +2068,15 @@ impl OutstationSession { } // process data TimeoutStatus::No => { - let mut guard = reader.pop_request(self.config.master_address); + let mut guard = reader.pop_request(self.config.required_master_address()); match self.expect_sol_confirm(ecsn, &mut guard) { ConfirmAction::ContinueWait => { // we ignored whatever the request was and logged it elsewhere // just go back to the loop and read another fragment } - ConfirmAction::Confirmed => { + ConfirmAction::Confirmed(respond_to) => { self.info.solicited_confirm_received(ecsn); - return Ok(Confirm::Yes); + return Ok(Confirm::Yes(respond_to)); } ConfirmAction::NewRequest => { self.info.solicited_confirm_wait_new_request(); @@ -2051,9 +2084,10 @@ impl OutstationSession { guard.retain(); return Ok(Confirm::NewRequest); } - ConfirmAction::EchoLastResponse(response) => { + ConfirmAction::EchoLastResponse(respond_to, response) => { if let Some(response) = response { - self.repeat_solicited(io, writer, response).await?; + self.repeat_solicited(io, respond_to, writer, response) + .await?; } // per the spec, we restart the confirm timer deadline = self.new_confirm_deadline(); @@ -2074,7 +2108,7 @@ impl OutstationSession { self.on_link_activity(); return ConfirmAction::ContinueWait; } - Some(TransportRequest::Error(_)) => { + Some(TransportRequest::Error(_, _)) => { self.on_link_activity(); return ConfirmAction::NewRequest; } @@ -2084,14 +2118,16 @@ impl OutstationSession { match self.classify(info, request) { FragmentType::MalformedRequest(_, _) => ConfirmAction::NewRequest, FragmentType::NewRead(_, _) => ConfirmAction::NewRequest, - FragmentType::RepeatRead(_, response, _) => ConfirmAction::EchoLastResponse(response), + FragmentType::RepeatRead(_, response, _) => { + ConfirmAction::EchoLastResponse(info.source, response) + } FragmentType::NewNonRead(_, _) => ConfirmAction::NewRequest, // this should never happen, but if it does, new request is probably best course of action FragmentType::RepeatNonRead(_, _) => ConfirmAction::NewRequest, FragmentType::Broadcast(_) => ConfirmAction::NewRequest, FragmentType::SolicitedConfirm(seq) => { if seq == ecsn { - ConfirmAction::Confirmed + ConfirmAction::Confirmed(info.source) } else { self.info .wrong_solicited_confirm_seq(ecsn, request.header.control.seq); diff --git a/dnp3/src/outstation/tests/addressing.rs b/dnp3/src/outstation/tests/addressing.rs index 458b5851..3d6efdba 100644 --- a/dnp3/src/outstation/tests/addressing.rs +++ b/dnp3/src/outstation/tests/addressing.rs @@ -1,8 +1,9 @@ use crate::link::EndpointAddress; use crate::outstation::tests::harness::*; +use crate::outstation::Feature; #[tokio::test] -async fn ignores_message_sent_from_master_different_than_configured() { +async fn ignores_message_sent_from_master_different_than_configured_by_default() { let config = get_default_config(); let different_address = EndpointAddress::try_new(config.master_address.raw_value() + 1).unwrap(); @@ -11,3 +12,16 @@ async fn ignores_message_sent_from_master_different_than_configured() { harness.send_and_process(&[0xC0, 0x01]).await; harness.expect_no_response(); } + +#[tokio::test] +async fn answers_message_sent_from_master_different_than_configured_when_enabled() { + let mut config = get_default_config(); + config.features.respond_to_any_master = Feature::Enabled; + let different_address = + EndpointAddress::try_new(config.master_address.raw_value() + 1).unwrap(); + let mut harness = new_harness_with_master_addr(config, different_address); + + harness + .test_request_response(&[0xC0, 0x01], &[0xC0, 0x81, 0x80, 0x00]) + .await; +} diff --git a/dnp3/src/transport/reader.rs b/dnp3/src/transport/reader.rs index c748c978..2418bf06 100644 --- a/dnp3/src/transport/reader.rs +++ b/dnp3/src/transport/reader.rs @@ -121,24 +121,29 @@ impl TransportReader { match data { Ok(ParsedTransportData::Fragment(info, fragment)) => match fragment.to_response() { Ok(response) => Some(TransportResponse::Response(info.source, response)), - Err(err) => Some(TransportResponse::Error(err.into())), + Err(err) => Some(TransportResponse::Error(info.source, err.into())), }, Ok(ParsedTransportData::LinkLayerMessage(msg)) => { Some(TransportResponse::LinkLayerMessage(msg)) } - Err(err) => Some(TransportResponse::Error(err.into())), + Err((err, source)) => Some(TransportResponse::Error(source, err.into())), } } - pub(crate) fn pop_request(&mut self, master_address: EndpointAddress) -> RequestGuard<'_> { + pub(crate) fn pop_request( + &mut self, + master_address: Option, + ) -> RequestGuard<'_> { if let Some(TransportRequest::Request(info, _)) = self.peek_request() { - if info.source != master_address { - tracing::warn!( - "Discarding ASDU from master address: {} (configured address == {})", - info.source.raw_value(), - master_address.raw_value() - ); - self.pop(); + if let Some(required_master_addr) = master_address { + if info.source != required_master_addr { + tracing::warn!( + "Discarding ASDU from master address: {} (configured address == {})", + info.source.raw_value(), + required_master_addr.raw_value() + ); + self.pop(); + } } } RequestGuard::new(self) @@ -149,16 +154,22 @@ impl TransportReader { match data { Ok(ParsedTransportData::Fragment(info, fragment)) => match fragment.to_request() { Ok(request) => Some(TransportRequest::Request(info, request)), - Err(err) => Some(TransportRequest::Error(err.into(fragment.control.seq))), + Err(err) => Some(TransportRequest::Error( + info.source, + err.into(fragment.control.seq), + )), }, Ok(ParsedTransportData::LinkLayerMessage(msg)) => { Some(TransportRequest::LinkLayerMessage(msg)) } - Err(err) => Some(TransportRequest::Error(err.into())), + Err((err, source)) => Some(TransportRequest::Error(source, err.into())), } } - fn parse(&mut self, peek: bool) -> Option> { + fn parse( + &mut self, + peek: bool, + ) -> Option> { let transport_data = if peek { self.inner.peek()? } else { @@ -168,7 +179,8 @@ impl TransportReader { match transport_data { TransportData::Fragment(fragment) => Some( ParsedFragment::parse(fragment.data) - .map(|parsed| ParsedTransportData::Fragment(fragment.info, parsed)), + .map(|parsed| ParsedTransportData::Fragment(fragment.info, parsed)) + .map_err(|err| (err, fragment.info.source)), ), TransportData::LinkLayerMessage(msg) => { Some(Ok(ParsedTransportData::LinkLayerMessage(msg))) diff --git a/dnp3/src/transport/types.rs b/dnp3/src/transport/types.rs index 215c55e6..d110d09d 100644 --- a/dnp3/src/transport/types.rs +++ b/dnp3/src/transport/types.rs @@ -51,7 +51,7 @@ pub(crate) enum LinkLayerMessageType { pub(crate) enum TransportResponse<'a> { Response(EndpointAddress, Response<'a>), LinkLayerMessage(LinkLayerMessage), - Error(TransportResponseError), + Error(EndpointAddress, TransportResponseError), } #[derive(Copy, Clone, Debug, PartialEq)] @@ -75,7 +75,7 @@ impl From for TransportResponseError { pub(crate) enum TransportRequest<'a> { Request(FragmentInfo, Request<'a>), LinkLayerMessage(LinkLayerMessage), - Error(TransportRequestError), + Error(EndpointAddress, TransportRequestError), } #[derive(Copy, Clone, Debug, PartialEq)] diff --git a/ffi/dnp3-ffi/src/outstation/mod.rs b/ffi/dnp3-ffi/src/outstation/mod.rs index f8752fad..438cf10a 100644 --- a/ffi/dnp3-ffi/src/outstation/mod.rs +++ b/ffi/dnp3-ffi/src/outstation/mod.rs @@ -533,6 +533,7 @@ impl From<&ffi::OutstationFeatures> for Features { self_address: to_feature(from.self_address()), broadcast: to_feature(from.broadcast()), unsolicited: to_feature(from.unsolicited()), + respond_to_any_master: to_feature(from.respond_to_any_master()), } } } diff --git a/ffi/dnp3-schema/src/outstation.rs b/ffi/dnp3-schema/src/outstation.rs index f04b9ca3..62e774d8 100644 --- a/ffi/dnp3-schema/src/outstation.rs +++ b/ffi/dnp3-schema/src/outstation.rs @@ -613,6 +613,7 @@ fn define_outstation_features(lib: &mut LibraryBuilder) -> BackTraced BackTraced BackTraced