# Development of new features for polymerist

## Sequence similarity comparison

In [None]:
from typing import Any, Generator, Iterable, Optional, Sequence, Type, TypeVar, TypeAlias, Union 
Shape : TypeAlias = tuple
T = TypeVar('T')
N = TypeVar('N')
M = TypeVar('M')

from dataclasses import dataclass, field, replace
from enum import Enum, auto
import numpy as np


from polymerist.genutils.bits import int_to_bits

# REPRESENTATION CLASSES
class EditOperation(Enum): # TODO: reimplement as bitwise flags
    '''For annotating distinct kinds of sequence edits and their associated index offsets'''
    NULL         = 0 # NOTE: order of fields here intentional and CANNOT be modified!
    INSERTION    = 1 # ... this is because the bits of each index are the row and column
    DELETION     = 2 # ... offsets corresponding to that edit operation in a Wagner-Fischer matrix
    SUBSTITUTION = 3

    @property
    def bits(self) -> tuple[int, int]:
        '''Convert the integer value of the Enum field into its binary bits'''
        return tuple(int_to_bits(self.value, num_bits=2, as_list=True))
    offsets = bits # alias to make dependent code more readable

@dataclass
class EditInfo:
    '''for bundling together information about a sequence edit step'''
    edit_op  : EditOperation
    indices  : tuple[int, int]
    distance : int


# WAGNER-FISCHER MATRIX OPERATION
def compute_wf_matrix(seq1 : Sequence[T], seq2 : Sequence[T], int_type : Type=int) -> np.ndarray[int]:
    '''Compute (N+1)x(M+1) matrix of Levenshtein distances between all partial prefices of a pair of sequences
    where N and M are the lengths of the first and second sequence, respectively. Implements the Wagner-Fischer algorithm'''
    n, m = len(seq1), len(seq2)
    n_aug, m_aug = n + 1, m + 1

    # initialize matrix with all zeros apart from first row and column,
    wf_matrix = np.zeros((n_aug, m_aug), dtype=int_type)
    wf_matrix[:, 0] = np.arange(n_aug, dtype=int_type) # index along first column is same as number of deletions to get to empty sequence - NOTE: element [0, 0] overlaps here
    wf_matrix[0, :] = np.arange(m_aug, dtype=int_type) # index along first row is same as number of insertions to get from empty sequence - NOTE: element [0, 0] overlaps here

    # populate matrix by iterating on distances between sub-sequence problems
    for i, elem1 in enumerate(seq1, start=1):
        for j, elem2 in enumerate(seq2, start=1):
            wf_matrix[i, j] = 1 + np.min([
                wf_matrix[i-1, j],  # deletion at end of second sequence (after augmentation to end of first sequence)
                wf_matrix[i, j-1],  # insertion at end of second sequence
                wf_matrix[i-1, j-1] - int(elem1 == elem2) # substition of last elements between sequences (if elements are equal, then substitution costs nothing)
            ])
     # TODO: implement support for transposition weighting (i.e. Damerau-Levenshtein distance)

    return wf_matrix

def traverse_wf_matrix(wf_matrix : np.ndarray[Shape[N, M], int], begin_idxs : tuple[int, int]=(0, 0), end_idxs : tuple[int, int]=(-1, -1)) -> Generator[list[EditInfo], None, None]:
    '''Takes a Wagner-Fischer Levenshtein distance matrix and returns the indices of the minimal path through the matrix
    from the origin (i.e. empty sequences) to the '''
    assert(wf_matrix.ndim == 2)
    n_aug, m_aug = wf_matrix.shape

    if end_idxs == begin_idxs:
        yield [ # need to wrap in list for consistent typing in recursive calls
            EditInfo( # base case
                edit_op=EditOperation.NULL,
                indices=begin_idxs,
                distance=0
            )
        ]
    else:
        i, j = end_idxs
        i %= n_aug # ensure values are positive
        j %= m_aug # ensure values are positive
        curr_dist = wf_matrix[i, j]

        prev_edits = []
        for edit_op in EditOperation:
            if edit_op == EditOperation.NULL:
                continue # need to skip over to avoid RecursionError - TOSELF: would be nice to reimplement EditOperations as bitwise flags to streamline this

            di, dj = edit_op.offsets # unpack index offsets
            i_prev, j_prev = i - di, j - dj
            prev_edits.append(
                EditInfo(
                    edit_op=edit_op,
                    indices=(i_prev, j_prev),
                    distance=wf_matrix[i_prev, j_prev]
                )
            )

        min_prev_dist = min(ei.distance for ei in prev_edits)
        for edit_info in prev_edits:
            ret_edit_info = replace(edit_info, indices=(i, j)) # create a copy of the current edit info which reports the current 
            if (edit_info.edit_op == EditOperation.SUBSTITUTION) and (curr_dist == min_prev_dist):
                ret_edit_info.edit_op = EditOperation.NULL
                
            if edit_info.distance == min_prev_dist:
                for edit_steps in traverse_wf_matrix(wf_matrix, begin_idxs=begin_idxs, end_idxs=edit_info.indices): # recursive tail call through all possible predecessors
                    yield edit_steps + [ret_edit_info]

def describe_edits(seq1 : Sequence[T], seq2 : Sequence[T], int_type : Type=int, indicator : str=' -> ', delimiter : str='\n') -> Generator[str, None, None]:
    '''Describes step-by-step the insertion, deletion, or substitution operations needed to transform one sequence into another'''
    seqs = (seq1, seq2) # need to bundle for zipping later
    wf_matrix = compute_wf_matrix(seq1, seq2, int_type=int_type)

    for edits in traverse_wf_matrix(wf_matrix):
        edit_descs : list[str] = []
        for edit_info in edits:
            edit_op = edit_info.edit_op
            if edit_op == EditOperation.NULL:
                continue

            elem_edit_str = indicator.join(
                str(seq[idx - 1] if to_show else None) # need to subtract 1 to get index of step previous to current edit
                    for idx, to_show, seq in zip(edit_info.indices, edit_op.bits, seqs)
            )

            edit_descs.append(f'{edit_info.edit_op.name}: {elem_edit_str}')
        yield delimiter.join(edit_descs)

In [13]:
a = 'Sunday'

i = 0
a[:i] + a[i+1:]

'unday'

In [14]:
for i, c in enumerate(a):
    print(a[i:])

Sunday
unday
nday
day
ay
y


In [None]:
# SEQUENCE METRICS
def hamming_distance(seq1 : Sequence[T], seq2 : Sequence[T]) -> int:
    '''Compute the Hamming distance between a pair of sequences with elements of compatible type (sequences must have the same length)
    Denotes the number of elements at the same positions in each sequence which are different'''
    if len(seq1) != len(seq2):
        raise ValueError('Cannot compute Hamming distance between sequences of different lengths')
    
    return sum(
        int(elem1 != elem2) # NOTE: type conversion not strictly necessary here, but done for self-documentation
            for elem1, elem2 in zip(seq1, seq2, strict=True)
    )

def jaccard_distance(seq1 : Sequence[T], seq2 : Sequence[T]) -> float:
    '''Compute the Jaccard distance between a pair of sequences with elements of compatible type
    Denotes the complement of the ratio of shared elements (intersection) to total elements (union)'''
    set1, set2 = set(seq1), set(seq2)
    size_intersect = len(set.intersection(set1, set2))
    size_union     = len(set.union(set1, set2))
    jaccard_coeff = size_intersect / size_union

    return 1 - jaccard_coeff
tanimoto_distance = jaccard_distance # TOSELF: debatable whether this alias is really accurate (literature suggests it may be context/field-dependent)

def levenshtein_distance(seq1 : Sequence[T], seq2 : Sequence[T], int_type : Type=int) -> int:
    '''Compute the Levenshtein (edit) distance between a pair of sequences with elements of compatible type
    Denotes the minimal number of insertion, deletion, or substitution operations needed to transform either sequence into the other'''
    return compute_wf_matrix(seq1, seq2, int_type=int_type)[-1, -1]
levenshtein_dist = edit_distance = edit_dist = levenshtein_distance # aliases for convenience

In [None]:
a = 'Sunday'
b = 'Saturday'
levenshtein_dist(a, b)

In [None]:
wf_matrix = compute_wf_matrix(a, b)
wf_matrix

In [None]:
for edits in traverse_wf_matrix(wf_matrix):
    
    mask = np.full_like(wf_matrix, fill_value=99)
    for edit in edits:
        print(edit)
        mask[*edit.indices] = wf_matrix[*edit.indices]
    print(mask)


In [None]:
for desc in describe_edits(a, b):
    print(desc)
    print('\n')

# Reimplementing bin choice enumeration with dynamic programming

In [21]:
from collections import defaultdict, Counter
from itertools import product as cartesian_product

def bin_ids_forming_sequence(
        sequence : Sequence[T], choice_bins : Sequence[Iterable[T]], draw_without_repeats : bool=True, unique_bins : bool=False,
        _symbol_inventory : Optional[defaultdict[Counter]]=None
    ) -> Generator[tuple[int, ...], None, None]:
    '''
    Takes an ordered sequence of N objects of a given type and an ordered of any number of bins, each containing an arbitary amount of unordered objects of the same type
    Generates all possible N-tuples of bin indices which could produce the target sequence when drawing from those bins in the 
    
    if draw_without_repeats=True, will respect the multiplicity of elements in each bin when drawing
    (i.e. will never have a bin position appear for a given object more times that that object appears in the corresponding bin)

    if unique_bins=True, will only allow each bin to be sampled from once, EVEN if that bin contains elements which may occur later in the sequence
    '''
    if _symbol_inventory is None:
        symbol_inventory = defaultdict(Counter) # keys are objects of type T ("symbols"), values give multiplicities of symbols keyed by bin position
        for i, choice_bin in enumerate(choice_bins):
            for sym in choice_bin:
                symbol_inventory[sym][i] += 1 # NOTE : implementation here requires that T be a hashable type
    else:
        symbol_inventory = _symbol_inventory

    print(symbol_inventory)


In [17]:
choice_bins = ['bbc', 'aced', 'bd', 'daea', 'fccce', 'g']
bin_ids_forming_sequence('abc', choice_bins)

defaultdict(<class 'collections.Counter'>, {'b': Counter({0: 2, 2: 1}), 'c': Counter({4: 3, 0: 1, 1: 1}), 'a': Counter({3: 2, 1: 1}), 'e': Counter({1: 1, 3: 1, 4: 1}), 'd': Counter({1: 1, 2: 1, 3: 1}), 'f': Counter({4: 1}), 'g': Counter({5: 1})})


## General development of OpenMM utils and interfaces

In [None]:
from openff.toolkit import Molecule, Topology, ForceField

smi = 'Oc1ccc(cc1)C(c2ccc(O)cc2)(C)C'
offmol = Molecule.from_smiles(smi)
offmol.generate_conformers(n_conformers=1)
offmol.assign_partial_charges('am1bccelf10')

forcefield = ForceField('openff-2.0.0.offxml')
inc = forcefield.create_interchange(offmol.to_topology(), charge_from_molecules=[offmol])

In [None]:
from polymerist.mdtools.openmmtools.parameters import ThermoParameters, IntegratorParameters
from polymerist.mdtools.openmmtools.thermo import EnsembleFactory
from openmm.unit import femtosecond

thermo_params = ThermoParameters()
ensfac = EnsembleFactory.subclass_registry['NVT'](thermo_params)
integrator = ensfac.integrator(time_step=2*femtosecond)

ommsim = inc.to_openmm_simulation(integrator=integrator, combine_nonbonded_forces=False)

In [None]:
from openmm.unit import kilocalorie_per_mole, joule, kilojoule_per_mole

pot, kin = eval_openmm_energies_separated(ommsim.context)
pot

In [None]:
ommsys = ommsim.context.getSystem()

## Developing monomer graphs

In [None]:
import networkx as nx
from rdkit import Chem

import mbuild
from mbuild.compound import Compound
from mbuild.conversion import load, load_smiles, from_rdkit, to_smiles, to_pybel
from mbuild.lib.recipes.polymer import Polymer

comp = mbuild.Compound()

### String/graph translation (SMILES-like)

In [None]:
from polymerist.polymers.monographs import MonomerGraph

In [None]:
from openff.interchange.drivers import gromacs
from openff.interchange import Interchange

In [None]:
# test = f'<tests[-2]>.<tests[-2]>'
tests = [
    '[a]<-1>[A](<2-3>[Bee]<2=5>[C]<5=2>[Bee]<3-6>[Bee])(<2-3>[Bee](<3-6>[Bee])<3->[a])<->[A]<2-2>[Bee]<3-6>[Bee]',
    '[A]<1-2>[B]<6=5>[C]<#>[D]',
]
seq = [0]
test = '.'.join(tests[i] for i in seq)
print(test)

In [None]:
tail = '[tail]<2-0>' * 10 + '[tail_end]'
lipid = f'[A3](<4-3>[A2]{tail})(<4-3>[A2]{tail})<2-0>{tail}'

In [None]:
# G = MonomerGraph.from_SMIDGE(test)
G = MonomerGraph.from_SMIDGE(lipid)
G.visualize(label_monomers=True)

In [None]:
rep = G.to_SMIDGE(start_node_idxs=6)
H = MonomerGraph.from_SMIDGE(rep)
H.draw()

### "Alphabet" of monomer fragment chemical information 

In [None]:
from string import ascii_uppercase 
from polymerist.polymers.monomers import MonomerGroup
from polymerist.rdutils.bonding.portlib import get_ports
from polymerist.rdutils.labeling.molwise import clear_atom_map_nums


parent_monomers = {
    'ethane-1,2-diol' : 'OCCO',
    'furan-2,5-dicarboxylic acid' : 'O=C(O)c1ccc(C(=O)O)o1',
}
monomer_aliases = {
    mononame : lett*3
        for mononame, lett in zip(parent_monomers.keys(), ascii_uppercase)
}

monogrp = MonomerGroup.from_file('poly(ethane-1,2-diol-co-furan-2,5-dicarboxylic acid).json')
moldict, monosmiles = {}, {}
for mononame, rdmol in monogrp.iter_rdmols():
    for i, port in enumerate(get_ports(rdmol)):
        rdmol.GetAtomWithIdx(port.linker.GetIdx()).SetIsotope(i)

    print(mononame)
    display(rdmol)
    moldict[   mononame] = rdmol
    monosmiles[mononame] = Chem.MolToSmiles(clear_atom_map_nums(rdmol))

### Defining monomer information 

In [None]:
from typing import Optional, ClassVar
from rdkit import Chem
from rdkit.Chem.Draw import IPythonConsole
from dataclasses import dataclass, field


from polymerist.genutils.fileutils.jsonio.jsonify import make_jsonifiable, dataclass_serializer_factory
from polymerist.genutils.fileutils.jsonio.serialize import JSONSerializable, TypeSerializer
from polymerist.rdutils.bonding.portlib import get_num_linkers, get_num_ports
from polymerist.polymers.monomers.specification import expanded_SMILES, compliant_mol_SMARTS


@make_jsonifiable
@dataclass
class MonomerFragmentInfo:
    '''Naming and in-line chemical encodings for a monomer unit within a polymer chain'''
    name   : str
    smiles : str
    exp_smiles : Optional[str] = field(default=None, init=False, repr=False)
    smarts     : Optional[str] = field(default=None)
    category   : Optional[str] = field(default=None)

    n_atoms       : int = field(init=False)
    functionality : int = field(init=False)
    contribution  : int = field(init=False)

    FOO : ClassVar[str] = 'extra bits'

    def __post_init__(self) -> None:
        self.exp_smiles = expanded_SMILES(self.smiles, assign_map_nums=True)
        if self.smarts is None:
            self.smarts = compliant_mol_SMARTS(self.exp_smiles)

        tempmol = self.rdmol
        self.n_atoms = tempmol.GetNumAtoms()
        self.functionality = get_num_ports(tempmol) # get_num_linkers(tempmol) 
        self.contribution = self.n_atoms - self.functionality

    @property
    def rdmol(self) -> Chem.Mol:
        return Chem.MolFromSmiles(self.smiles, sanitize=False)

In [None]:
from polymerist.polymers.monomers import specification

mono_infos = {}
for mononame, smiles in monosmiles.items():
    parent_mononame = mononame.split('_')[0]
    parent_smiles = parent_monomers[parent_mononame]
    parent_alias  = monomer_aliases[parent_mononame]

    mono_info = MonomerFragmentInfo(
        name=mononame,
        smiles=smiles,
        # smarts=specification.compliant_mol_SMARTS(smiles),
        category=parent_smiles,
    )
    alias = parent_alias.lower() if (mono_info.functionality == 1) else parent_alias.upper()
    mono_infos[alias] = mono_info
mono_infos

### Defining polymer composition class

In [None]:
from enum import Enum, StrEnum, auto

from polymerist.genutils.fileutils.jsonio.serialize import JSONSerializable, TypeSerializer, MultiTypeSerializer
from polymerist.genutils.fileutils.jsonio.jsonify import make_jsonifiable, JSONifiable
from polymerist.polymers.monographs import MonomerGraph, MonomerGraphSerializer
from polymerist.rdutils.bonding.portlib import get_num_linkers, get_ports


MONOMER_CHARS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
class MonomerNeighborMismatch(Enum):
    '''For annotating the various ways in which a piece of monomer information in a monomer alphabet does not match a monomer graph'''
    NONE = 0
    COUNT = auto()
    BONDTYPE = auto()
    NO_FLAVOR = auto()
    DIFF_FLAVOR = auto()
    NOT_NEIGHBORS = auto()


@make_jsonifiable(type_serializer=MultiTypeSerializer(MonomerGraphSerializer, MonomerFragmentInfo.serializer))
@dataclass
class PolymerStructure:
    '''Encodes a multi-scale structural representation of a polymer topology'''
    mono_alphabet : dict[str, MonomerFragmentInfo] 
    monograph : MonomerGraph

    single_char_mononames : dict[str, str] = field(default_factory=dict, init=False) 

    def __post_init__(self) -> None:
        '''Post-process init attributes'''
        self.single_char_mononames = { # remapping from the assigned monomers names to single characters for mbuild compatibility
            mononame : remap_char
                for (mononame, remap_char) in zip(self.mono_alphabet.keys(), MONOMER_CHARS)
        }

        self.validate_monoinfo_is_compatible() # will raise targetted exceptions if incompatible
        self.assign_monoinfo_to_monograph()

    @property
    def node_info_map(self) -> dict[int, MonomerFragmentInfo]:
        '''Map from node indices to relevant monomer information'''
        return {
            node_id : self.mono_alphabet[alias]
                for node_id, alias in nx.get_node_attributes(self.monograph, self.monograph.MONOMER_NAME_ATTR).items()
        }
    
    @property
    def pdb_substructures(self) -> dict[str, list[str]]:
        '''Substructure dict formatted for the OpenFF Topology.from_pdb RDKit wrapper hook'''
        return {
            monoinfo.name : [mono_info.smarts]
                for monoinfo in self.mono_alphabet.values()
        }
    
    @property
    def num_atoms(self) -> int:
        '''Total number of atoms in the topology specified'''
        return sum(nx.get_node_attributes(self.monograph, 'contribution').values())

    # validation
    def monoalpha_surjective_to_monograph(self) -> bool:
        '''Check whether the monomer alphabet covers all monomer types defined in the Graph'''
        return self.monograph.unique_monomer_names.issubset(set(self.mono_alphabet.keys()))

    def monoalpha_neighbors_are_valid(self) -> tuple[bool, int, MonomerNeighborMismatch]:
        '''Determine whether and why adjacent monomers in the monomer graph do (or don't) have compatible chemical info'''
        for node_idx, neighbor_dict in self.monograph.adj.items():
            # 1) check that the number of neighbors in the graph matches the number of intermonomer bonding sites given chemically
            neighbor_dict = dict(neighbor_dict) # convert from networkx object to vanilla dict
            degree = len(neighbor_dict) # self.monograph.degree[node_idx]
            if (degree != self.node_info_map[node_idx].functionality):
                return False, node_idx, MonomerNeighborMismatch.COUNT
            
            # 2) check that all reported neighbor nodes are actually adjacent in the graph
            found_ports = set()
            for i, flavor in self.monograph.get_flavor_dict(node_idx).items():
                if i not in neighbor_dict:
                    return False, node_idx, MonomerNeighborMismatch.NOT_NEIGHBORS
                
                nb_bond_info = neighbor_dict.pop(i)
                found_ports.add( (flavor, nb_bond_info[self.monograph.BONDTYPE_ATTR]) )

            # 3) check that every neighbors has been provided a flavor
            if neighbor_dict:
                return False, node_idx, MonomerNeighborMismatch.NO_FLAVOR # at least one of the neighbors must nnot have had a flavor provided

            # 4) check that the provided flavors match those chemically specified
            monoinfo = self.mono_alphabet[self.monograph.get_monomer_name(node_idx)]
            portinfo = set(
                (port.flavor, port.bond.GetBondType())
                    for port in get_ports(monoinfo.rdmol)
            )
            if (portinfo != found_ports):
                return False, node_idx, MonomerNeighborMismatch.DIFF_FLAVOR
        else:
            return True, -1, MonomerNeighborMismatch.NONE
        
    def validate_monoinfo_is_compatible(self) -> None:
        if not self.monoalpha_surjective_to_monograph():
            raise ValueError('Provided monomer alphabet does not cover all monomers in the corresponding monomer graph')
        
        nb_match, mismatch_idx, reason = self.monoalpha_neighbors_are_valid()
        if not nb_match:
            raise ValueError(f'Graph node {mismatch_idx} (designation "{self.monograph.get_monomer_name(mismatch_idx)}") mismatched (reason : {reason.name})')
    
    def assign_monoinfo_to_monograph(self) -> None:
        '''Map the chemical info for each unique monomer onto corresponding monomer nodes in the monomer graph'''
        node_info_map = {
            node_idx : self.mono_alphabet[self.monograph.monomer_name(node_idx)].__dict__
                for node_idx in self.monograph.nodes
        }
        nx.set_node_attributes(self.monograph, node_info_map)

In [None]:
import networkx as nx
from polymerist.genutils.textual import delimiters
from polymerist.polymers.monographs import MonomerGraph

smidge = 'aBABABABa'
# smidge = '{2-3}'.join(smidge)
# smidge = '{0-1}'.join(smidge)
smidge = '<0-1>'.join(smidge[:-1]) + '<0-0>' + smidge[-1]
smidge = delimiters.square_brackets_around_letters(smidge)
smidge = ''.join(3*c if c.isalpha() else c for c in smidge)
print(smidge)

monograph = MonomerGraph.from_smidge(smidge)
monograph.draw()

poly = PolymerStructure(
    mono_alphabet=mono_infos,
    monograph=monograph
)

In [None]:
# testing JSON I/O
poly.to_file('test.json')
poly2 = PolymerStructure.from_file('test.json')
poly2.monograph.draw()

In [None]:
from mbuild.lib.recipes import Polymer

### Developing mbuild coordinate generator hook for linear polymer graphs  

In [None]:
poly.monograph.is_linear

In [None]:
for i in poly.monograph.termini:
    print(poly.monograph.nodes[i])

In [None]:
from polymerist.genutils.textual.strsearch import shortest_repeating_substring


terms = list(monograph.termini)
assert(len(terms) == 2)
head_node = terms[0]

seq = ''
for i in nx.dfs_preorder_nodes(monograph, source=head_node):
    if i not in terms:
        mononame = monograph.nodes[i][monograph.MONOMER_NAME_ATTR]
        seq += poly.single_char_mononames[mononame]
min_seq = shortest_repeating_substring(seq)
seq, min_seq, seq.count(min_seq)

In [None]:
from polymerist.rdutils.bonding.portlib import Port
from polymerist.rdutils.bonding.substitution import saturate_ports

Port.bondable_flavors.reset()
Port.bondable_flavors.insert((0,2))
Port.bondable_flavors.insert((1,2))

rm = mono_info.rdmol
newmol = saturate_ports(rm, cap=Chem.MolFromSmiles('*-[2H]', sanitize=False), flavor_to_saturate=0)
display(newmol)
for atom in newmol.GetAtoms():
    print(atom.GetIdx(), atom.GetSymbol(), atom.GetIsotope())

In [None]:
from polymerist.rdutils.bonding.substitution import hydrogenate_rdmol_ports
from mbuild.conversion import from_rdkit
from polymerist.polymers.building import mbmol_to_openmm_pdb


prot_mol = hydrogenate_rdmol_ports(mono_info.rdmol)
Chem.SanitizeMol(prot_mol, sanitizeOps=specification.SANITIZE_AS_KEKULE)
mbmol = from_rdkit(prot_mol)
mbmol_to_openmm_pdb('test.pdb', mbmol)

# Playing with rich progress

In [None]:
from rich.progress import track, Progress
from time import sleep

with Progress() as progress:

    task1 = progress.add_task("[red]Downloading...", total=1000)
    task2 = progress.add_task("[green]Processing...", total=1000)
    task3 = progress.add_task("[cyan]Cooking...", total=1000)

    while not progress.finished:
        progress.update(task1, advance=0.5)
        progress.update(task2, advance=0.3)
        progress.update(task3, advance=0.9)
        sleep(0.02)

In [None]:
import time
import random
from rich.progress import (
    BarColumn,
    Progress,
    SpinnerColumn,
    TaskProgressColumn,
    TimeElapsedColumn,
    TimeRemainingColumn,
)

def process(chunks):
    for chunk in chunks:
        time.sleep(0.1)
        yield chunk

chunks = [random.randint(1,20) for _ in range(100)]

progress_columns = (
    SpinnerColumn(),
    "[progress.description]{task.description}",
    BarColumn(),
    TaskProgressColumn(),
    "Elapsed:",
    TimeElapsedColumn(),
    "Remaining:",
    TimeRemainingColumn(),
)

with Progress(*progress_columns) as progress_bar:
    task = progress_bar.add_task("[blue]Downloading...", total=sum(chunks))
    for chunk in process(chunks):
        progress_bar.update(task, advance=chunk)

In [None]:
import random
import time

from rich.live import Live
from rich.table import Table


def generate_table() -> Table:
    """Make a new table."""
    table = Table()
    table.add_column("ID")
    table.add_column("Value")
    table.add_column("Status")

    for row in range(random.randint(2, 6)):
        value = random.random() * 100
        table.add_row(
            f"{row}", f"{value:3.2f}", "[red]ERROR" if value < 50 else "[green]SUCCESS"
        )
    return table


with Live(generate_table(), refresh_per_second=4) as live:
    for _ in range(40):
        time.sleep(0.4)
        live.update(generate_table())

In [None]:
from dataclasses import dataclass
from rich.console import Console, ConsoleOptions, RenderResult
from rich.table import Table

@dataclass
class Student:
    id: int
    name: str
    age: int
    def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderResult:
        yield f"[b]Student:[/b] #{self.id}"
        my_table = Table("Attribute", "Value")
        my_table.add_row("name", self.name)
        my_table.add_row("age", str(self.age))
        yield my_table

## Another thing