A generic unidirectional function coupling module based on loose coupling.
This module provides an interface for the implementation side to send information.
- Submitting information from the implementation side for white-box testing
- Creating entry points for simple add-ons
- Python 3.10 or later
- No external dependencies
This module is provided under the MIT License. See the LICENSE file for details.
PyPI
pip install fportGitHub
pip install git+https://github.com/minoru_jp/fport.git- Provides a communication channel to the implementation side with minimal setup.
- Designed so that the sending interface has no side effects on the implementation side (only computation cost on the receiving side).
- The sending interface does not propagate errors from the receiver or framework to the implementation side.
- The sending interface always returns
None. - The scope of information transfer can be flexibly defined by where and how the interface is defined and shared.
- You can configure the sending interface to reject connections.
- Even if the connection is rejected, the implementation side always gets a valid interface.
The communication mechanism adopted by this module is loosely coupled and does not explicitly specify the destination from the implementation side. Information transmitted from the implementation must be carefully considered. Careless transmission may lead to leaks of authentication data, personal information, or other critical data. The same applies to information that can be reconstructed into such sensitive data.
The sending interface is thread-unsafe. This design avoids unintended serialization on the implementation side. Maintaining overall consistency, including the use of interfaces in parallel processing, is the responsibility of the implementation side.
from fport import create_session_policy
policy = create_session_policy()
port = policy.create_port()
def add(a, b):
port.send("add", a, b)
return a + b
def listener(tag, *args, **kwargs):
print("Received:", tag, args, kwargs)
with policy.session(listener, port) as state:
result = add(2, 3)
print("Result:", result)
# Output:
# Received: add (2, 3) {}
# Result: 5create_session_policy(*, block_port: bool = False, message_validator: SendFunction | None = None) -> SessionPolicy
Factory function to generate a SessionPolicy.
-
Parameters
block_port: boolIfTrue, allPorts created by this policy reject connections.message_validator: SendFunction | NoneOptional validation function for sending. Called beforePort.send(). If an exception is raised, the send is rejected. The exception does not propagate to the sender; instead, it is treated as a session termination.
-
Returns
SessionPolicy
Interface for managing Port creation and session establishment.
-
Methods
-
create_port() -> PortCreates a connectablePort. -
create_noop_port() -> PortCreates a no-opPortthat rejects connections. -
session(listener: ListenFunction, target: Port) -> ContextManager[SessionState]Returns a context manager to start a session by connectinglistenerto the specifiedPort.-
Parameters
listener: ListenFunctionA callback function that receives messages sent viaPort.send(). Takes arguments(tag: str, *args, **kwargs).target: PortThe targetPortinstance.
-
Returns
ContextManager[SessionState]Used in awithblock. ProvidesSessionStatefor monitoring withokanderror. -
Exceptions
TypeError: Iftargetis not aPortinstanceOccupiedError: If the specifiedPortis already used by another sessionDeniedError: If thePortorSessionPolicyis set to reject connectionsRuntimeError: Unexpected internal inconsistencies
-
-
Interface for the implementation (sending side) to transmit information.
-
Methods
-
send(tag: str, *args, **kwargs) -> NoneSends arbitrary information to registered listeners.- Does nothing if no listener is registered
- Exceptions are not propagated to the sender (fail-silent)
- Thread-unsafe: designed to avoid unintended serialization
-
Read-only interface for monitoring session status.
-
Properties
ok: boolWhether the session is still activeerror: Exception | NoneThe first error that caused the session to end, orNone
-
class DeniedError(Exception)Raised when a policy orPortrejects a connection. -
class OccupiedError(Exception)Raised when aPortis already occupied by another session.
-
class SendFunction(Protocol)def __call__(tag: str, *args, **kwargs) -> None
Callable object used by the sender to send messages.
-
class ListenFunction(Protocol)def __call__(tag: str, *args, **kwargs) -> None
Callable object used by the receiver to process messages.
This library includes an observer implementation as a listener.
from fport import create_session_policy
from fport.observer import ProcessObserver
def create_weather_sensor(port):
"""Weather sensor
Specification:
temp < 0 -> "Freezing" + send("freezing")
0 <= temp <= 30 -> "Normal" + send("normal")
temp > 30 -> "Hot" + send("hot")
"""
def check_weather(temp: int) -> str:
# If there is a bug here, it will be detected by the test
if temp <= 0: # ← Common place to inject a bug
port.send("freezing", temp)
return "Freezing"
elif temp <= 30:
port.send("normal", temp)
return "Normal"
else:
port.send("hot", temp)
return "Hot"
return check_weather
policy = create_session_policy()
port = policy.create_port()
# Define expected conditions according to the specification
conditions = {
"freezing": lambda t: t < 0,
"normal": lambda t: 0 <= t <= 30,
"hot": lambda t: t > 30,
}
observer = ProcessObserver(conditions)
check_weather = create_weather_sensor(port)
with policy.session(observer.listen, port) as state:
# Test coverage for all three branches
for i in (-5, 0, 31):
check_weather(i)
if not state.ok:
raise AssertionError(f"observation failed on '{i}'")
# Verify that the Observer did not detect any specification violations
if observer.violation:
details = []
for tag, obs in observer.get_violated().items():
details.append(
f"[{tag}] reason={obs.fail_reason}, "
f"count={obs.count}, first_violation_at={obs.first_violation_at}"
)
raise AssertionError("Observer detected violations:\n" + "\n".join(details))
print("All checks passed!")Monitors process state, handling condition violations and exceptions.
ProcessObserver(conditions: dict[str, Callable[..., bool]])Initializes with the given set of conditions to monitor.
-
reset_observations() -> NoneReset all observation results. -
listen(tag: str, *args, **kwargs) -> NoneEvaluate the condition for the given tag. Calls handlers on violation or exception. -
get_all() -> dict[str, Observation]Returns all observation results. -
get_violated() -> dict[str, Observation]Returns observations where violations occurred. -
get_compliant() -> dict[str, Observation]Returns observations with no violations. -
get_unevaluated() -> dict[str, Observation]Returns unevaluated observations. -
set_violation_handler(tag: str, fn: Callable[[Observation], None]) -> NoneSets a violation handler for the specified tag. -
set_exception_handler(fn: Callable[[str, ExceptionKind, Observation | None, Exception], None]) -> NoneSets an exception handler. -
get_stat(tag: str) -> ConditionStatReturns statistical information for the specified tag.
-
violation: boolWhether any violation exists. -
global_violation: boolWhether any global violation exists. -
local_violation: boolWhether any local violation exists. -
global_fail_reason: strReturns the reason for the global violation. -
global_exception: Exception | NoneReturns the global exception, if any.
Holds detailed observation results per condition.
count: int– Number of evaluationsviolation: bool– Whether a violation occurredfirst_violation_at: int– Trial number of the first violationexc: Exception | None– Exception that occurredfail_condition: Callable[..., bool] | None– Condition function that failedfail_reason: str– Reason for the violation
Simplified statistical representation of condition results.
ConditionStat(count: int, violation: bool, first_violation_at: int)count: int– Number of evaluationsviolation: bool– Whether a violation occurredfirst_violation_at: int– Trial number of the first violation
Indicates where an exception occurred.
ON_CONDITION– Exception during condition evaluationON_VIOLATION– Exception during violation handler executionON_INTERNAL– Exception during internal processing
This module uses pytest for testing.
Tests are located in the tests/ directory.
The legacy/ directory contains disabled tests and should be skipped.