A simple, robust, and production-ready WebSocket Pub/Sub client for Python, built on Socket.IO.
- 🔄 Automatic reconnection with exponential backoff
- 📨 Message queuing for reliable sequential processing
- 🎯 Topic-based subscription with custom handlers
- 🔌 Both publish and subscribe capabilities
- 📝 Comprehensive logging for debugging
- 🧪 Type hints for better IDE support
- ⚡ Async message processing with threading
- 🛡️ Error handling with graceful degradation
- 🔁 Idempotence support to prevent duplicate message processing
- Idempotence Guide - Learn about idempotence classes and preventing duplicate message processing
pip install python-pubsub-client
git clone https://github.com/venantvr/Python.PubSub.Client.git
cd Python.PubSub.Client
pip install .
git clone https://github.com/venantvr/Python.PubSub.Client.git
cd Python.PubSub.Client
# Install in editable mode with development dependencies
make dev
# Or manually:
pip install -e ".[dev]"
The client can be configured using environment variables. Copy .env.example
to .env
and adjust the values:
cp .env.example .env
# Edit .env with your configuration
Key configuration options:
PUBSUB_SERVER_URL
: WebSocket server URL (default:http://localhost:5000
)PUBSUB_CONSUMER_NAME
: Client identifier (default:demo-client
)PUBSUB_TOPICS
: Comma-separated list of topics to subscribePUBSUB_LOG_LEVEL
: Logging level (DEBUG, INFO, WARNING, ERROR)
See .env.example
for all available options.
from python_pubsub_client import PubSubClient # Simple import thanks to __init__.py
# Create client
client = PubSubClient(
url="http://localhost:5000",
consumer="alice",
topics=["notifications", "updates"]
)
# Register message handlers
def handle_notification(message):
print(f"Received notification: {message}")
client.register_handler("notifications", handle_notification)
# Start the client
client.start()
# Publish a message to a topic
# noinspection PyUnresolvedReferences
client.publish(
topic="notifications",
message={"type": "alert", "content": "Hello World!"},
producer="alice",
message_id="msg-001"
)
import logging
from python_pubsub_client import PubSubClient
# Configure logging
logging.basicConfig(level=logging.INFO)
# Create client with multiple topics
client = PubSubClient(
url="http://localhost:5000",
consumer="service-a",
topics=["orders", "inventory", "shipping"]
)
# Define custom handlers for each topic
def process_order(message):
order_id = message.get("order_id")
print(f"Processing order: {order_id}")
# Your order processing logic here
def update_inventory(message):
item_id = message.get("item_id")
quantity = message.get("quantity")
print(f"Updating inventory for item {item_id}: {quantity}")
# Your inventory logic here
def track_shipping(message):
tracking_number = message.get("tracking_number")
print(f"Tracking shipment: {tracking_number}")
# Your shipping logic here
# Register handlers
client.register_handler("orders", process_order)
client.register_handler("inventory", update_inventory)
client.register_handler("shipping", track_shipping)
# Start client (blocking)
try:
client.start()
except KeyboardInterrupt:
print("Shutting down client...")
- Python 3.9 or higher
- pip and virtualenv
- Make (optional, for using Makefile commands)
# Clone the repository
git clone https://github.com/venantvr/Python.PubSub.Client.git
cd Python.PubSub.Client
# Create virtual environment
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install development dependencies
make dev
# Or manually:
pip install -e ".[dev]"
# Run all tests
make test
# Run specific test file
pytest tests/test_client.py -v
Python.PubSub.Client/
├── src/
│ └── pubsub/
│ ├── __init__.py
│ ├── pubsub_client.py # Main client implementation
│ └── pubsub_message.py # Message model
├── tests/
│ ├── __init__.py
│ ├── test_client.py
│ └── test_message.py
├── examples/
│ └── simple_client.py # Example usage
├── Makefile # Development commands
├── pyproject.toml # Project configuration (single source of truth)
└── README.md # This file
The project uses pytest for testing. Tests are located in the tests/
directory.
# Run tests
pytest
# Run with verbose output
pytest -v
# Run with coverage report
pytest --cov=pubsub --cov-report=html
PubSubClient(url: str, consumer: str, topics: List[str])
url
: WebSocket server URLconsumer
: Consumer identifiertopics
: List of topics to subscribe to
Register a callback function for a specific topic.
Publish a message to a topic via HTTP POST.
Start the client and begin listening for messages (blocking).
Stop the client gracefully and clean up resources.
PubSubMessage.new(topic: str, message: Any, producer: str, message_id: str)
Create a new message instance.
Contributions are welcome! Please follow these steps:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
Please ensure:
- All tests pass
- Coverage remains above 80%
- Documentation is updated as needed
This project is licensed under the MIT License.
- Built with python-socketio
- Inspired by various Pub/Sub patterns and implementations
- Thanks to all contributors!
- Author: venantvr
- Email: venantvr@gmail.com
- GitHub: @venantvr
Please report bugs via GitHub Issues.
- Add support for message persistence
- Implement message encryption
- Add metrics and monitoring
- Support for multiple server URLs (failover)
- GraphQL subscription support
- WebRTC data channel support
Made with ❤️ by venantvr