Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Lesson_28/project/analytics_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Any


class AnalyticsService:
"""Analytics Service — counts the number of orders and payments."""
def __init__(self) -> None:
self.orders_count = 0
self.payments_count = 0

def track_order(self, data: Any) -> None:
"""Increases the order counter."""
self.orders_count += 1
print(f"[ANALYTICS] Кількість замовлень: {self.orders_count}")

def track_payment(self, data: Any) -> None:
"""Increments the payment counter."""
self.payments_count += 1
print(f"[ANALYTICS] Кількість оплат: {self.payments_count}")
52 changes: 52 additions & 0 deletions Lesson_28/project/event_mp_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""
Event Queue — асинхронне оброблення подій через multiprocessing.Queue.
"""

import multiprocessing
from typing import Any, Callable, Dict


class MPEventQueue:
"""Event Queue — asynchronous event processing via multiprocessing.Queue."""

def __init__(self) -> None:
self._queue: "multiprocessing.Queue[Dict[str, Any]]" = multiprocessing.Queue()
self._processes = []
self._running = multiprocessing.Value('b', True)

def start_worker(self, handler: Callable[[str, Any], None]) -> None:
"""
Runs a worker in a separate process. The worker reads events from
the queue and processes them.
"""

def worker(queue: multiprocessing.Queue,
running: multiprocessing.Value) -> None:
while running.value:
try:
event = queue.get(timeout=1)
event_name = event["event"]
data = event["data"]
try:
handler(event_name, data)
except Exception as e:
print(
f"[WORKER ERROR] Помилка при обробці {event_name}: {e}")
except Exception:
continue

p = multiprocessing.Process(target=worker,
args=(self._queue, self._running))
p.daemon = True
p.start()
self._processes.append(p)

def stop_workers(self) -> None:
"""Stops all workers."""
self._running.value = False
for p in self._processes:
p.join(timeout=1)

def put_event(self, event_name: str, data: Any) -> None:
"""Producer throws an event into the queue."""
self._queue.put({"event": event_name, "data": data})
47 changes: 47 additions & 0 deletions Lesson_28/project/event_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import queue
import threading
from typing import Any, Callable, Dict


class EventQueue:
"""Event Queue — asynchronous event processing via queue.Queue."""

def __init__(self) -> None:
self._queue: "queue.Queue[Dict[str, Any]]" = queue.Queue()
self._workers = []
self._running = False

def start_worker(self, handler: Callable[[str, Any], None]) -> None:
"""
Starts a worker that reads events from the queue and processes them.
The worker does not crash on errors.
"""
self._running = True

def worker() -> None:
while self._running:
try:
event = self._queue.get(timeout=1)
event_name = event["event"]
data = event["data"]
try:
handler(event_name, data)
except Exception as e:
print(
f"[WORKER ERROR] Помилка при обробці {event_name}: {e}")
except queue.Empty:
continue

t = threading.Thread(target=worker, daemon=True)
t.start()
self._workers.append(t)

def stop_workers(self) -> None:
"""Stops all workers."""
self._running = False
for t in self._workers:
t.join(timeout=1)

def put_event(self, event_name: str, data: Any) -> None:
"""Producer throws an event into the queue."""
self._queue.put({"event": event_name, "data": data})
65 changes: 65 additions & 0 deletions Lesson_28/project/eventbus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import fnmatch
import json
from typing import Callable, Dict, List, Any, Set


class EventBus:
"""
A simple EventBus that supports:
- Subscribing to events (with wildcard support)
- Unsubscribing from events
- Emitting events with data
- Logging all emitted events
"""

def __init__(self) -> None:
self._subscribers: Dict[str, Set[Callable[[Any], None]]] = {}
self._wildcard_subscribers: Dict[str, Set[Callable[[Any], None]]] = {}
self._event_log: List[Dict[str, Any]] = []

def subscribe(self, event_name: str,
callback: Callable[[Any], None]) -> None:
"""Subscribe a callback to an event."""
if "*" in event_name:
self._wildcard_subscribers.setdefault(event_name, set()).add(
callback)
else:
self._subscribers.setdefault(event_name, set()).add(callback)

def unsubscribe(self, event_name: str,
callback: Callable[[Any], None]) -> None:
"""Unsubscribe a callback from an event."""
if "*" in event_name:
if event_name in self._wildcard_subscribers:
self._wildcard_subscribers[event_name].discard(callback)
else:
if event_name in self._subscribers:
self._subscribers[event_name].discard(callback)

def emit(self, event_name: str, data: Any) -> None:
"""
Emit an event with associated data.
Triggers all matching subscribers (including wildcard).
Logs the event.
"""
# Log the event
event = {"event": event_name, "data": data}
self._event_log.append(event)

# Save to file
with open("events.log", "a", encoding="utf-8") as f:
f.write(json.dumps(event) + "\n")

# Call subscribers
if event_name in self._subscribers:
for callback in self._subscribers[event_name]:
callback(data)

for pattern, callbacks in self._wildcard_subscribers.items():
if fnmatch.fnmatch(event_name, pattern):
for callback in callbacks:
callback(data)

def get_event_log(self) -> List[Dict[str, Any]]:
"""Return the list of all emitted events."""
return self._event_log
78 changes: 78 additions & 0 deletions Lesson_28/project/eventbus_replay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import fnmatch
import json
from typing import Callable, Dict, List, Any, Set


class EventBus:
"""EventBus with Event Replay support (reading events from a file)."""

def __init__(self) -> None:
self._subscribers: Dict[str, Set[Callable[[Any], None]]] = {}
self._wildcard_subscribers: Dict[str, Set[Callable[[Any], None]]] = {}
self._event_log: List[Dict[str, Any]] = []

def subscribe(self, event_name: str,
callback: Callable[[Any], None]) -> None:
"""Subscribe a callback to an event."""
if "*" in event_name:
self._wildcard_subscribers.setdefault(event_name, set()).add(
callback)
else:
self._subscribers.setdefault(event_name, set()).add(callback)

def unsubscribe(self, event_name: str,
callback: Callable[[Any], None]) -> None:
"""Unsubscribe a callback from an event."""
if "*" in event_name:
if event_name in self._wildcard_subscribers:
self._wildcard_subscribers[event_name].discard(callback)
else:
if event_name in self._subscribers:
self._subscribers[event_name].discard(callback)

def emit(self, event_name: str, data: Any) -> None:
"""
Emit an event with associated data.
Triggers all matching subscribers (including wildcard).
Logs the event.
"""
# Log the event
event = {"event": event_name, "data": data}
self._event_log.append(event)

# Save to file
with open("events.log", "a", encoding="utf-8") as f:
f.write(json.dumps(event) + "\n")

# Call subscribers
if event_name in self._subscribers:
for callback in self._subscribers[event_name]:
callback(data)

for pattern, callbacks in self._wildcard_subscribers.items():
if fnmatch.fnmatch(event_name, pattern):
for callback in callbacks:
callback(data)

def get_event_log(self) -> List[Dict[str, Any]]:
"""Return the list of all emitted events."""
return self._event_log

def replay_from_file(self, filename: str) -> None:
"""Replays all events from the file (events.log)."""
try:
with open(filename, "r", encoding="utf-8") as f:
for line in f:
event = json.loads(line.strip())
event_name = event["event"]
data = event["data"]
print(f"[REPLAY] Відтворюємо {event_name}: {data}")
if event_name in self._subscribers:
for callback in self._subscribers[event_name]:
callback(data)
for pattern, callbacks in self._wildcard_subscribers.items():
if fnmatch.fnmatch(event_name, pattern):
for callback in callbacks:
callback(data)
except FileNotFoundError:
print(f"[REPLAY ERROR] Файл {filename} не знайдено.")
4 changes: 4 additions & 0 deletions Lesson_28/project/events.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"event": "order.created", "data": {"order_id": 1, "amount": 100}}
{"event": "order.paid", "data": {"order_id": 1, "amount": 100}}
{"event": "order.created", "data": {"order_id": 2, "amount": 250}}
{"event": "order.paid", "data": {"order_id": 2, "amount": 250}}
16 changes: 16 additions & 0 deletions Lesson_28/project/listeners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Any


def email_sender(data: Any) -> None:
"""Send an email when a user registers."""
print(f"[EMAIL] Sending welcome email to {data}")


def logger(data: Any) -> None:
"""Log event data."""
print(f"[LOGGER] Event logged: {data}")


def analytics(data: Any) -> None:
"""Track analytics for events."""
print(f"[ANALYTICS] Tracking event: {data}")
27 changes: 27 additions & 0 deletions Lesson_28/project/main_e_shop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""A mini-simulation of an online store with EventBus."""

from analytics_service import AnalyticsService
from eventbus import EventBus
from notification_service import NotificationService
from order_service import OrderService

bus = EventBus()

order_service = OrderService(bus)
notification_service = NotificationService()
analytics_service = AnalyticsService()

bus.subscribe("order.created", notification_service.send_email)
bus.subscribe("order.created", analytics_service.track_order)
bus.subscribe("order.paid", notification_service.send_sms)
bus.subscribe("order.paid", analytics_service.track_payment)

order_service.create_order({"order_id": 1, "amount": 100})
order_service.pay_order({"order_id": 1, "amount": 100})

order_service.create_order({"order_id": 2, "amount": 250})
order_service.pay_order({"order_id": 2, "amount": 250})

print("\nЛог подій:")
for entry in bus.get_event_log():
print(entry)
45 changes: 45 additions & 0 deletions Lesson_28/project/main_event_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""A mini-simulation of an online store with EventBus and event queue."""

import time

from analytics_service import AnalyticsService
from event_queue import EventQueue
from eventbus import EventBus
from notification_service import NotificationService
from order_service import OrderService

bus = EventBus()
event_queue = EventQueue()

order_service = OrderService(bus)
notification_service = NotificationService()
analytics_service = AnalyticsService()

bus.subscribe("order.created", notification_service.send_email)
bus.subscribe("order.created", analytics_service.track_order)
bus.subscribe("order.paid", notification_service.send_sms)
bus.subscribe("order.paid", analytics_service.track_payment)


def event_handler(event_name: str, data: dict) -> None:
bus.emit(event_name, data)


event_queue.start_worker(event_handler)

event_queue.put_event("order.created",
{"order_id": 1, "amount": 100})
event_queue.put_event("order.paid",
{"order_id": 1, "amount": 100})
event_queue.put_event("order.created",
{"order_id": 2, "amount": 250})
event_queue.put_event("order.paid",
{"order_id": 2, "amount": 250})

time.sleep(2)

event_queue.stop_workers()

print("\nЛог подій:")
for entry in bus.get_event_log():
print(entry)
18 changes: 18 additions & 0 deletions Lesson_28/project/main_eventbus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Example usage of EventBus with events and listeners."""

from eventbus import EventBus
from listeners import email_sender, logger, analytics

bus = EventBus()

bus.subscribe("user.registered", email_sender)
bus.subscribe("user.*", logger)
bus.subscribe("order.*", analytics)

bus.emit("user.registered", {"username": "maksym"})
bus.emit("user.deleted", {"username": "john_doe"})
bus.emit("order.created", {"order_id": 123, "amount": 250})

print("\nEvent Log:")
for entry in bus.get_event_log():
print(entry)
Loading