Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
141 lines (101 sloc) 3.18 KB
import unittest
from abc import abstractmethod, ABC
from enum import Enum
from typing import List, Optional
class TaskType(Enum):
PROVISION = "provision"
DEPROVISION = "deprovision"
class Task:
type: TaskType
customer_id: int
def __init__(self, type: TaskType, customer_id: int):
self.type = type
self.customer_id = customer_id
def get_type(self) -> TaskType:
return self.type
def get_customer_id(self) -> int:
return self.customer_id
class Queue(ABC):
@abstractmethod
def fetch(self) -> Optional[Task]:
pass
@abstractmethod
def complete(self, task: Task) -> None:
pass
@abstractmethod
def fail(self, task: Task) -> None:
pass
class Backend(ABC):
@abstractmethod
def provision(self, customer_id: int):
pass
@abstractmethod
def deprovision(self, customer_id: int):
pass
class BackendException(Exception):
pass
class DataProcessor:
queue: Queue
backend: Backend
def __init__(self, queue: Queue, backend: Backend):
self.queue = queue
self.backend = backend
def process_one(self):
task = self.queue.fetch()
if task is None:
return
try:
if task.type == TaskType.PROVISION:
self.backend.provision(task.customer_id)
else:
self.backend.deprovision(task.customer_id)
self.queue.complete(task)
except BackendException:
self.queue.fail(task)
except Exception as e:
self.queue.fail(task)
raise e
class InMemoryQueue(Queue):
new_tasks: List[Task] = []
completed_tasks: List[Task] = []
failed_tasks: List[Task] = []
def enqueue(self, task: Task) -> None:
self.new_tasks.append(task)
def fetch(self) -> Optional[Task]:
if len(self.new_tasks) > 0:
item = self.new_tasks[0]
self.new_tasks = self.new_tasks[:-1]
return item
return None
def fail(self, task: Task) -> None:
self.failed_tasks.append(task)
def complete(self, task: Task) -> None:
self.completed_tasks.append(task)
def get_completed_tasks(self) -> List[Task]:
return self.completed_tasks
def get_failed_tasks(self) -> List[Task]:
return self.failed_tasks
class InMemoryBackend(Backend):
customer_ids: List[int] = []
def provision(self, customer_id: int):
if customer_id not in self.customer_ids:
self.customer_ids.append(customer_id)
def deprovision(self, customer_id: int):
if customer_id in self.customer_ids:
self.customer_ids.remove(customer_id)
def get_customer_ids(self):
return self.customer_ids
class TestDataProcessor(unittest.TestCase):
def test_success(self):
# Setup
queue = InMemoryQueue()
backend = InMemoryBackend()
processor = DataProcessor(queue, backend)
task = Task(TaskType.PROVISION, 3)
queue.enqueue(task)
# Execute
processor.process_one()
# Assert
self.assertEqual([3], backend.get_customer_ids())
if __name__ == '__main__':
unittest.main()
You can’t perform that action at this time.