In [20]:
import random
import time

from rich.live import Live
from rich.table import Table


def generate_table() -> Table:
    """Make a new table."""
    table = Table()
    table.add_column("ID")
    table.add_column("Value")
    table.add_column("Status")

    for row in range(random.randint(2, 6)):
        value = random.random() * 100
        table.add_row(
            f"{row}", f"{value:3.2f}", "[red]ERROR" if value < 50 else "[green]SUCCESS"
        )
    return table


with Live(generate_table(), refresh_per_second=4) as live:
    for _ in range(40):
        time.sleep(0.4)
        live.update(generate_table())

Output()

KeyboardInterrupt: 

In [1]:
from rich.console import Console
from rich.layout import Layout

console = Console()
layout = Layout()

# 将命令行分为上下两个部分，比例为2:1
layout.split(
    Layout(name="top", ratio=2),
    Layout(name="bottom", ratio=1)
)

# 将上面的layout分为两部分，比例为1:2
layout["top"].split_row(
    Layout(name="Job", ratio=1),
    Layout(name="Cluster", ratio=2)
)

# 给每个layout添加一些内容
layout["Job"].update("这里是Job的内容")
layout["Cluster"].update("这里是Cluster的内容")
layout["bottom"].update("这里是log的内容")

# 在控制台上显示layout
console.print(layout)


In [2]:
from rich.console import Console
from rich.table import Table
import Job
class JobTable:
    def __init__(self):
        # Create a table object with a title
        self.table = Table(title="Job Table")
        # Add columns with headers and styles
        self.table.add_column("ID", style="cyan", justify="right") # Right align this column
        self.table.add_column("Res Vec", style="magenta", justify="right") # Right align this column
        self.table.add_column("Len", style="green", justify="right") # Right align this column
        self.table.add_column("Priority", style="yellow", justify="right") # Right align this column
        self.table.add_column("Budget", style="red", justify="right") # Right align this column
        self.table.add_column("Restrict Machines", style="blue", justify="right") # Right align this column
        # Create a console object to print the table
        self.console = Console()

    def add_job(self, data):
        # Add a row with the data dict values
        self.table.add_row(
            data["id"],
            data["res_vec"],
            data["len"],
            data["priority"],
            data["budget"],
            data["restrict_machines"]
        )

    def show_table(self):
        # Print the table to the console
        self.console.print(self.table)
jt = JobTable()
jt.add_job(Job.Job().static_info())
jt.show_table()

Output()

In [1]:
from pettingzoo.test import api_test
import Environment
env = Environment.VehicleJobSchedulingEnvACE()
# env = Environment.VehicleJobSchedulingEnvACE()
env.reset()
api_test(env, num_cycles=100)
para = env.parameters

Starting API test
Passed API test




In [2]:
para.cluster.machines[0].info()

{'reward': 0,
 'earning': 0,
 'slot': {'Resource 0': '██████████', 'Resource 1': '██████████'},
 'action': 0.6888888888888889,
 'bid': 25.33333333333333,
 'finished_job_num': 1,
 'running_job': []}

In [1]:
"""Lite simulation of the top linux command."""
import datetime
import random
import sys
import time
from dataclasses import dataclass

from rich import box
from rich.console import Console
from rich.live import Live
from rich.table import Table

if sys.version_info >= (3, 8):
    from typing import Literal
else:
    from typing_extensions import Literal


@dataclass
class Process:
    pid: int
    command: str
    cpu_percent: float
    memory: int
    start_time: datetime.datetime
    thread_count: int
    state: Literal["running", "sleeping"]

    @property
    def memory_str(self) -> str:
        if self.memory > 1e6:
            return f"{int(self.memory/1e6)}M"
        if self.memory > 1e3:
            return f"{int(self.memory/1e3)}K"
        return str(self.memory)

    @property
    def time_str(self) -> str:
        return str(datetime.datetime.now() - self.start_time)


def generate_process(pid: int) -> Process:
    return Process(
        pid=pid,
        command=f"Process {pid}",
        cpu_percent=random.random() * 20,
        memory=random.randint(10, 200) ** 3,
        start_time=datetime.datetime.now()
        - datetime.timedelta(seconds=random.randint(0, 500) ** 2),
        thread_count=random.randint(1, 32),
        state="running" if random.randint(0, 10) < 8 else "sleeping",
    )


def create_process_table(height: int) -> Table:

    processes = sorted(
        [generate_process(pid) for pid in range(height)],
        key=lambda p: p.cpu_percent,
        reverse=True,
    )
    table = Table(
        "PID", "Command", "CPU %", "Memory", "Time", "Thread #", "State", box=box.SIMPLE
    )

    for process in processes:
        table.add_row(
            str(process.pid),
            process.command,
            f"{process.cpu_percent:.1f}",
            process.memory_str,
            process.time_str,
            str(process.thread_count),
            process.state,
        )

    return table


console = Console()

with Live(console=console, screen=True, auto_refresh=False) as live:
    while True:
        live.update(create_process_table(console.size.height - 4), refresh=True)
        time.sleep(1)

Output()

KeyboardInterrupt: 

In [2]:
27/3*2-8


10.0

In [6]:
from rich import print
from rich.panel import Panel

print(Panel("你好，[red]世界！", title="欢迎", subtitle="谢谢"))