In [32]:
import multiprocessing as mp
import time
import os
import inspect
import threading
from IPython.display import clear_output, display, update_display
import random
import datetime
from rich.console import Console
from rich.table import Table
from rich import print as rprint
from enum import Enum
from typing import Optional, Any, Callable, Union
import psutil


class Task:
    def __init__(self, id, function, args=(), kwargs={}):
        self.id = id
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.result = None


class TaskStatus(Enum):
    STARTED = 0
    COMPLETED = 1
    PROCESSING = 2
    WAITING = 3
    FAILED = 4


class StatusReport:
    def __init__(
        self,
        pid: int,
        status: TaskStatus,
        task_id: Optional[str] = None,
        message: Optional[str] = None,
        exception: Optional[Exception] = None,
    ) -> None:
        self.pid = pid
        self.status = status
        self.task_id = task_id
        self.message = message

        self.exception = exception
        self.timestamp = time.time()

    def __str__(self) -> str:
        return f"StatusReport(pid={self.pid}, status={self.status}, task_id={self.task_id}, exception={self.exception})"


class StatusUpdater:
    def __init__(self, queue: mp.Queue, pid: int, task_id: str):
        self.__queue = queue
        self.__pid = pid
        self.__task_id = task_id

    def report(self, message: str):
        self.__queue.put(
            StatusReport(
                pid=self.__pid,
                status=TaskStatus.PROCESSING,
                task_id=self.__task_id,
                message=message,
            )
        )


def report(message: str) -> None:
    """Global function to report status from within any task"""
    if report.last_update_time is None or time.time() - report.last_update_time > 0.2:
        report.last_update_time = time.time()
        if _current_status_updater is not None:
            _current_status_updater.report(message)


report.last_update_time = None


class Worker:
    def __init__(self, task_queue, result_queue, status_queue):
        import inspect

        self.__task_queue = task_queue
        self.__result_queue = result_queue
        self.__status_queue = status_queue
        self.__process_id = os.getpid()
        self.__task_count = 0

        # report to status queue saying initialization is successful
        # self.__status_queue.put(
        #     StatusReport(
        #         pid=self.__process_id,
        #         status=TaskStatus.STARTED,
        #         task_id=None,
        #         message="Worker initialized",
        #     )
        # )

        self.run()

    def run(self):
        try:
            while True:
                self.__status_queue.put(
                    StatusReport(
                        pid=self.__process_id,
                        status=TaskStatus.WAITING,
                        message="Waiting for new task",
                    )
                )

                task = self.__task_queue.get()
                if task is None:
                    break

                # Report task started
                self.__status_queue.put(
                    StatusReport(
                        pid=self.__process_id,
                        status=TaskStatus.STARTED,
                        task_id=task.id,
                    )
                )

                # setup status updater
                global _current_status_updater
                _current_status_updater = StatusUpdater(
                    self.__status_queue, self.__process_id, task.id
                )

                try:
                    sig = inspect.signature(task.function)

                    # if "status_updater" in sig.parameters:
                    #     result = task.function(
                    #         *task.args,
                    #         **task.kwargs,
                    #         status_updater=StatusUpdater(
                    #             self.__status_queue, self.__process_id, task.id
                    #         ),
                    #     )
                    # else:
                    #     result = task.function(*task.args, **task.kwargs)
                    result = task.function(*task.args, **task.kwargs)

                    # Report task completed
                    self.__status_queue.put(
                        StatusReport(
                            pid=self.__process_id,
                            status=TaskStatus.COMPLETED,
                            task_id=task.id,
                        )
                    )
                    self.__result_queue.put({task.id: result})
                except Exception as e:
                    # Report task failed
                    self.__status_queue.put(
                        StatusReport(
                            pid=self.__process_id,
                            status=TaskStatus.FAILED,
                            task_id=task.id,
                            exception=str(e),
                        )
                    )
                finally:
                    # cleanup status updater
                    _current_status_updater = None

                time.sleep(0.1)
        except Exception as e:
            self.__status_queue.put(
                StatusReport(
                    pid=self.__process_id, status=TaskStatus.FAILED, exception=str(e)
                )
            )


class BackendMultiprocessing:
    def __init__(self, n_workers=0):
        self.__mp_context = mp.get_context("fork")
        self.initialize_queues()
        self.__workers = []
        self.__task_added = 0
        self.__task_processing = 0
        self.__task_finished = 0
        self.__task_failed = 0
        self.__workers_status = {}
        self.__stop_monitor = False
        self.__time_start = time.time()
        self.__task_start_time = {}
        self.__task_time_taken = {}
        self.__active_task_id = set()

        self.__task_time_taken_total = 0

        # Add status monitoring thread
        self.__monitor_thread = threading.Thread(
            target=self.__update_workers_status, daemon=True
        )
        self.__monitor_thread.start()

        # Add status printing thread
        self.__status_printer_thread = None

        if n_workers > 0:
            for _ in range(n_workers):
                self.add_worker()

        # print("debug: workers", self.__workers)

    def print_workers(self):
        print("debug: workers", self.__workers)

    def get_status_queue(self):
        return self.__status_queue

    def __update_workers_status(self):
        """Background thread that monitors and prints worker status updates"""
        while True:
            try:
                # Non-blocking check for status updates
                while not self.__status_queue.empty():
                    status = self.__status_queue.get_nowait()
                    self.__workers_status[status.pid] = status
                    if status.status == TaskStatus.COMPLETED:
                        self.__task_finished += 1
                        self.__task_processing -= 1
                        self.__task_time_taken[status.task_id] = (
                            status.timestamp - self.__task_start_time[status.task_id]
                        )
                        self.__task_time_taken_total += self.__task_time_taken[
                            status.task_id
                        ]
                        self.__active_task_id.remove(status.task_id)
                    if status.status == TaskStatus.STARTED:
                        self.__task_processing += 1
                        self.__task_start_time[status.task_id] = status.timestamp
                        self.__active_task_id.add(status.task_id)
                    if status.status == TaskStatus.FAILED:
                        self.__task_failed += 1
                    # print(f"Status Update: {status}")
            except Exception as e:
                print(f"Status monitoring error: {e}")
            time.sleep(0.1)  # Prevent busy waiting

    def stop_status_print(self):
        self.__stop_monitor = True
        self.__status_printer_thread.join()

    def print_status_backend(self, constantly_update=False, jupyter_mode=True):
        def create_table():
            table = Table(title="Workers Status")
            table.add_column("PID", justify="right", style="cyan")
            table.add_column("Status", style="magenta")
            table.add_column("Task ID", style="green")
            table.add_column("Message/Exception", style="yellow")

            for pid, status in self.__workers_status.items():
                table.add_row(
                    str(pid),
                    str(status.status.name),
                    str(status.task_id) if hasattr(status, "task_id") else "",
                    (
                        status.message
                        if hasattr(status, "message")
                        else status.exception if hasattr(status, "exception") else ""
                    ),
                )
            return table

        def print_status_loop(jupyter_mode, display_id=None):
            count = 3
            while not self.__stop_monitor:

                if jupyter_mode:
                    # update_display(create_table(), display_id=output.display_id)
                    # time.sleep(1)
                    display_id.update(f"12345 {count} {display_id} {random.random()}")
                    count += 1
                else:
                    console = Console()
                    console.clear()
                    console.print(create_table())
                time.sleep(0.1)

        if constantly_update:
            if self.__status_printer_thread is not None:
                # Stop existing printer thread if any
                self.__stop_monitor = True
                self.__status_printer_thread.join()
                self.__stop_monitor = False

            display_handler = (
                display("initial", display_id=True) if jupyter_mode else None
            )
            display_handler.update(f"1231231 {display_handler}")

            self.__status_printer_thread = threading.Thread(
                target=print_status_loop,
                args=(jupyter_mode, display_handler),
                daemon=True,
            )
            self.__status_printer_thread.start()
        else:
            # Print once without thread
            if jupyter_mode:
                display(create_table())
            else:
                console = Console()
                console.print(create_table())
            self.__stop_monitor = True

    def print_status(self, jupyter_mode=True, constantly_update=True):
        """Print status table in the frontend using rich and display updates"""

        def create_worker_table():
            table = Table(title="Workers Status")
            table.add_column("ID", style="green")
            # table.add_column("PID", justify="right", style="cyan")
            table.add_column("Status", style="magenta", min_width=10)
            table.add_column("Task ID", style="green")
            table.add_column("Message/Exception", style="yellow", min_width=20)

            worker_id = 0
            for pid, status in self.__workers_status.items():
                table.add_row(
                    str(worker_id),
                    # str(pid),
                    str(status.status.name),
                    str(status.task_id) if hasattr(status, "task_id") else "",
                    (
                        status.message
                        if hasattr(status, "message")
                        else status.exception if hasattr(status, "exception") else ""
                    ),
                )
                worker_id += 1
            return table

        def create_manager_table():
            table = Table(title="Task Pipeline Status")
            table.add_column("Total Tasks", justify="right", style="cyan")
            table.add_column("Pending", justify="right", style="magenta")
            table.add_column("In Progress", justify="right", style="yellow")
            table.add_column("Completed", justify="right", style="green")
            table.add_column("Results Ready", justify="right", style="blue")
            table.add_column("Failed", justify="right", style="red")
            # table.add_column("Avg Time", justify="right", style="green")

            tasks_added = self.__task_added
            tasks_in_queue = self.__task_queue.qsize()
            tasks_processing = self.__task_processing
            tasks_finished = self.__task_finished
            results_in_queue = self.__result_queue.qsize()
            tasks_failed = self.__task_failed
            # average_time_taken = (
            #     self.__task_time_taken_total / tasks_finished
            #     if tasks_finished > 0
            #     else 0
            # )

            table.add_row(
                str(tasks_added),
                str(tasks_in_queue),
                str(tasks_processing),
                str(tasks_finished),
                str(results_in_queue),
                str(tasks_failed),
                # (
                #     f"{int(average_time_taken//3600)}h {int((average_time_taken%3600)//60)}m {average_time_taken%60:.2f}s".replace(
                #         "0h ", ""
                #     ).replace(
                #         "0m ", ""
                #     )
                #     if average_time_taken > 0
                #     else "-"
                # ),
            )
            return table

        def create_time_table():
            timing_table = Table(title="Task Timing Statistics")
            timing_table.add_column("Avg Task Time", style="green")
            timing_table.add_column("Total Task Time", style="green")
            timing_table.add_column("CPU Utilization", style="yellow")

            average_time_taken = (
                self.__task_time_taken_total / self.__task_finished
                if self.__task_finished > 0
                else 0
            )
            current_time = time.time()
            total_cpu_time_on_tasks = self.__task_time_taken_total + sum(
                [
                    current_time - self.__task_start_time[task_id]
                    for task_id in self.__active_task_id
                ]
            )
            # Update CPU usage once per second
            if (
                not hasattr(self, "_last_cpu_update")
                or current_time - self._last_cpu_update >= 1
            ):
                self._cpu_usage = psutil.cpu_percent(interval=0)
                self._last_cpu_update = time.time()
            cpu_usage = self._cpu_usage

            timing_table.add_row(
                (
                    f"{int(average_time_taken//3600)}h {int((average_time_taken%3600)//60)}m {average_time_taken%60:.2f}s".replace(
                        "0h ", ""
                    ).replace(
                        "0m ", ""
                    )
                    if average_time_taken > 0
                    else "-"
                ),
                (
                    f"{int(total_cpu_time_on_tasks//3600)}h {int((total_cpu_time_on_tasks%3600)//60)}m {total_cpu_time_on_tasks%60:.0f}s".replace(
                        "0h ", ""
                    ).replace(
                        "0m ", ""
                    )
                    if total_cpu_time_on_tasks > 0
                    else "-"
                ),
                f"{cpu_usage:.2f}%",
            )

            return timing_table

        if constantly_update:
            manager_output = display("initial", display_id=True)
            time_output = display("initial", display_id=True)
            worker_output = display("initial", display_id=True)
            try:
                while True:
                    if jupyter_mode:
                        update_display(
                            create_manager_table(), display_id=manager_output.display_id
                        )
                        update_display(
                            create_time_table(), display_id=time_output.display_id
                        )
                        update_display(
                            create_worker_table(), display_id=worker_output.display_id
                        )

                    else:
                        console = Console()
                        console.clear()
                        console.print(create_worker_table())
                    time.sleep(0.1)
            except KeyboardInterrupt:
                rprint(
                    "KeyboardInterrupted, monitor has stopped and tasks will keep running"
                )

        else:
            # Print once
            if jupyter_mode:
                display(create_table())
            else:
                console = Console()
                console.print(create_table())

    def initialize_queues(self):
        self.__task_queue = self.__mp_context.Queue()
        self.__result_queue = self.__mp_context.Queue()
        self.__status_queue = self.__mp_context.Queue()

    def add_worker(self):
        process = self.__mp_context.Process(
            target=Worker,
            kwargs={
                "task_queue": self.__task_queue,
                "result_queue": self.__result_queue,
                "status_queue": self.__status_queue,
            },
            daemon=True,
        )
        process.start()
        self.__workers.append(process)

    def add_task(
        self,
        task_function: Callable,
        task_id: Optional[str] = None,
        args: Optional[Union[list, tuple]] = (),
        kwargs: Optional[dict] = {},
    ):
        task_id = task_id or f"task_{self.__task_count}"
        task = Task(id=task_id, function=task_function, args=args, kwargs=kwargs)
        self.__task_queue.put(task)
        self.__task_added += 1

    def get_one_result(self):
        return self.__result_queue.get()

    def get_all_results(self, dict_format=True):
        results = []
        if dict_format:
            results = {}
        while not self.__result_queue.empty():
            result = self.__result_queue.get()
            if dict_format:
                results.update(result)
            else:
                results.append(result)
        return results

    def force_close_all_workers(self):
        # attention: this function will force close all workers, and the queues will be corrupted
        for worker in self.__workers:
            worker.terminate()
        self.initialize_queues()

    def status_report(self, message: str):
        self.__status_queue.put(
            StatusReport(pid=os.getpid(), status=TaskStatus.PROCESSING, message=message)
        )

    def close_all_workers(self):
        # Empty the task queue first
        while not self.__task_queue.empty():
            try:
                self.__task_queue.get_nowait()
            except:
                break

        # Send poison pills to all workers to gracefully shut them down
        for _ in self.__workers:
            self.__task_queue.put(None)

        # Wait for all workers to finish
        for worker in self.__workers:
            worker.join()

    def __del__(self):
        for worker in self.__workers:
            worker.terminate()
            # terminate() will make all our queues liable to become corrupted, and should not be used further
        self.__task_queue.close()
        self.__result_queue.close()
        self.__status_queue.close()

In [37]:
backend = BackendMultiprocessing(n_workers=3)

for i in range(1, 11):
    event_data = get_event_data(i)
    backend.add_task(level, task_id=f"{i}", args=(event_data,))
backend.print_status(constantly_update=True)

In [38]:
results = backend.get_all_results()

In [None]:
b

In [26]:
backend = BackendMultiprocessing(n_workers=2)

In [222]:
import time
import random
import math
from typing import List


def complex_computation(size: int, iterations: int, status_updater=None) -> List[float]:
    """Performs complex matrix-like computations with progress updates"""
    result = [random.random() for _ in range(size)]

    for iter in range(iterations):
        # Simulate complex computation
        for i in range(size):
            result[i] = (
                math.sin(result[i]) * math.cos(result[i]) * math.exp(-abs(result[i]))
            )
            time.sleep(0.01)  # Small delay to simulate work

        if status_updater:
            status_updater.report(
                f"Completed iteration {iter + 1}/{iterations} with current sum: {sum(result):.3f}"
            )

    return result


def cascading_task(depth: int, branch_factor: int, status_updater=None) -> dict:
    """Simulates a tree-like computation with multiple branches"""
    if depth <= 0:
        time.sleep(0.5)  # Base case work
        return {"value": random.random()}

    result = {"branches": []}
    total_branches = branch_factor**depth
    branches_completed = 0

    for i in range(branch_factor):
        time.sleep(0.2)  # Work before branching
        sub_result = cascading_task(depth - 1, branch_factor)
        result["branches"].append(sub_result)

        branches_completed += branch_factor ** (depth - 1)
        if status_updater:
            progress = (branches_completed / total_branches) * 100
            status_updater.report(
                f"Completed {progress:.1f}% of branches (depth {depth})"
            )

    return result


def probabilistic_failure_task(
    steps: int, failure_rate: float, status_updater=None
) -> str:
    """Task that might fail at any step with detailed progress reporting"""
    for step in range(steps):
        time.sleep(0.3)

        # Chance of failure at each step
        if random.random() < failure_rate:
            raise ValueError(f"Process failed at step {step + 1}/{steps}")

        # Simulate varying workloads
        work_amount = random.randint(1, 5)
        time.sleep(work_amount * 0.1)

        if status_updater:
            status_updater.report(
                f"Step {step + 1}/{steps} completed (workload: {work_amount})"
            )

    return "Successfully completed all steps"


# Add the new complex tasks to the backend
# backend = BackendMultiprocessing(n_workers=3)

# Add various complex tasks
for _ in range(4):
    backend.add_task(
        complex_computation,
        task_id="matrix_comp",
        args=(1000, 5),  # size=1000, iterations=5
    )

    backend.add_task(
        cascading_task,
        task_id="tree_comp",
        args=(3, 2),  # depth=3, branch_factor=2
    )

    backend.add_task(
        probabilistic_failure_task,
        task_id="risky_long_task",
        args=(10, 0.15),  # steps=10, failure_rate=0.15
    )

# Start monitoring
backend.print_status(constantly_update=True)
# backend.print_status_backend(constantly_update=True, jupyter_mode=True)

In [30]:
backend.print_status(constantly_update=True)

In [23]:
os.getpid()

3755924

In [13]:
update_display("1231231", display_id="97c7de156320a16ea6d914a9d33a2705")

In [4]:
backend.stop_status_print()

In [30]:
backend.print_workers()

debug: workers [<ForkProcess name='ForkProcess-7' pid=3753468 parent=3753368 stopped exitcode=0 daemon>, <ForkProcess name='ForkProcess-8' pid=3753470 parent=3753368 stopped exitcode=0 daemon>]


In [4]:
import time
import random


# Simple task that just returns a number after a short delay
def simple_task(x: int):
    time.sleep(1)
    return x * 2


# Task that uses the status updater to report progress
def long_task_with_updates(iterations: int, status_updater=None):
    for i in range(iterations):
        time.sleep(0.5)
        if status_updater:
            status_updater.report(f"Completed iteration {i+1}/{iterations}")
    return f"Completed all {iterations} iterations"


# Task that might randomly fail
def unreliable_task():
    time.sleep(0.5)
    if random.random() < 0.3:  # 30% chance of failure
        raise ValueError("Random failure occurred!")
    return "Task completed successfully"


backend.add_task(simple_task, task_id="simple1", args=(5,))
backend.add_task(simple_task, task_id="simple2", args=(10,))
backend.add_task(long_task_with_updates, task_id="long_task", args=(5,))
backend.add_task(unreliable_task, task_id="risky_task1")
backend.add_task(unreliable_task, task_id="risky_task2")

In [232]:
table = Table(title="Workers Status")
table.add_column("workerId", style="green")
table.add_column("PID", justify="right", style="cyan")
table.add_column("Status", style="magenta")
table.add_column("Task ID", style="green")
table.add_column("Message/Exception", style="yellow")

display(table)

In [5]:
backend.print_status_backend(constantly_update=True, jupyter_mode=True)

In [27]:
backend.print_status(constantly_update=True, jupyter_mode=True)

KeyboardInterrupt: 

In [9]:
backend.stop_monitor()

KeyboardInterrupt: 

In [29]:
backend.close_all_workers()

In [6]:
backend.get_status_queue().get()

KeyboardInterrupt: 

In [12]:
backend.print_status(constantly_update=False, jupyter_mode=True)

In [24]:
# Get all results
results = backend.get_all_results()
print("\nResults:", results)


Results: []


In [25]:
# Clean up
backend.close_all_workers()

In [5]:
# Add this test code after the Worker class definition
def test_worker_pickling():
    import pickle

    # Create dummy queues for testing
    task_queue = mp.Queue()
    result_queue = mp.Queue()
    status_queue = mp.Queue()

    try:
        # Try to pickle the Worker class (not instance)
        pickled_class = pickle.dumps(Worker)
        print("Worker class can be pickled successfully")

        # Note: We can't directly test pickling a Worker instance
        # because the __init__ immediately starts running the worker

    except Exception as e:
        print(f"Worker class pickling failed: {str(e)}")


# Run the test
test_worker_pickling()

Worker class can be pickled successfully


In [2]:
from rich import print as rprint
from IPython.display import clear_output
import time

count = 0
while True:
    clear_output(wait=True)
    rprint(f"a lot of text {count}")
    time.sleep(0.01)
    count += 1

KeyboardInterrupt: 

In [12]:
from IPython.display import display, update_display
import sys

output = display("", display_id=True)
count = 0
while True:
    time.sleep(0.1)
    update_display(f"a lot of text {count}", display_id=output.display_id)
    count += 1

'a lot of text 29'

KeyboardInterrupt: 

In [14]:
from rich.live import Live
from rich import print
import time


count = 0
with Live(auto_refresh=False) as live:
    while True:
        time.sleep(0.1)
        live.update(f"a lot of text {count}")
        count += 1

KeyboardInterrupt: 

In [5]:
from rich.live import Live
from rich import print
import time

count = 0
with Live(auto_refresh=True) as live:
    while True:
        time.sleep(0.1)
        live.update(f"a lot of text {count}")
        live.refresh()
        count += 1

Output()

KeyboardInterrupt: 

In [2]:
from rich.table import Table
from time import sleep
from rich import box
import random
from IPython.display import display, update_display


def generate_table():
    table = Table(box=box.ROUNDED)
    table.add_column("ID", justify="center", width=8)
    table.add_column("Value", justify="center", width=8)
    table.add_column("Status", justify="center", width=8)

    for i in range(5):
        table.add_row(
            str(i),
            str(random.randint(0, 100)),
            "correct" if random.random() > 0.5 else "incorrect",
        )
    return table


output = display("", display_id=True)
while True:
    update_display(generate_table(), display_id=output.display_id)
    sleep(0.1)

KeyboardInterrupt: 

In [9]:
from rich.live import Live
from rich import print
import time

# count = 0
# with Live(auto_refresh=True) as live:
#     while True:
#         time.sleep(0.1)
#         live.update(f"a lot of text {count}")
#         live.refresh()
#         count += 1

count = 0
with Live(auto_refresh=True) as live:
    time.sleep(0.1)
    live.update(f"a lot of text {count}")
    live.refresh()
    count += 1

Output()

In [7]:
import pandas as pd
import random
import time
from IPython.display import display, update_display
from IPython import get_ipython
from IPython.display import clear_output


def create_table():
    data = {
        "ID": range(5),
        "Value": [random.randint(0, 100) for _ in range(5)],
        "Status": ["\u2705" if random.random() > 0.5 else "\u274C" for _ in range(5)],
    }
    df = pd.DataFrame(data)
    return df.style.set_properties(**{"text-align": "center"}).set_table_styles(
        [{"selector": "th", "props": [("text-align", "center")]}]
    )


while True:
    clear_output(wait=True)
    display(create_table())
    time.sleep(0.01)

Unnamed: 0,ID,Value,Status
0,0,69,✅
1,1,32,❌
2,2,17,❌
3,3,34,✅
4,4,55,❌


KeyboardInterrupt: 

In [53]:
output = display("1231231", display_id=True)

'131444'

In [54]:
output.display_id

'a80e2652042a6de84692042db5c87a01'

In [58]:
update_display("131444", display_id="a80e2652042a6de84692042db5c87a01")

In [131]:
np.random.choice(4, p=[0.4, 0.2, 0.2, 0.2])

0