In [None]:
import enum
import functools
import importlib.util
import io
import itertools
import re
import subprocess
import yaml

if importlib.util.find_spec("cedar") is None:
    print("running startup script ...")
    with open("startup.log", "w") as f:
        result = subprocess.call(["bash", "/home/shared/admin/startup.sh"], stdout=f)
    print("startup (one off) script complete")
if importlib.util.find_spec('import_ipynb') is None:
    print('running first time startup script ...')
    with open('startup.log', 'w') as f:
        result = subprocess.call(['bash', 'start-T3072.sh'], stdout=f)
    print('complete ... resuming normal actions')

import networkx as nx
from cedar import *
from IPython.display import *
import matplotlib.pyplot as plt

In [None]:
%%capture

# Altered Reference condition
T_REF = Q_(20, ureg.degC).to(ureg.K)
ureg.autoconvert_offset_to_baseunit = True
ureg.define("kPag = 1e3 Pa; offset: 101300")
ureg.setup_matplotlib(True)

plt

# Produces higher quality diagrams
%config InlineBackend.figure_format ='retina'

def out(*args, **kwargs):
    print(output_string(*args, **kwargs))

In [None]:
def system_review(name, in_streams, out_streams):
    parameters = [
        ["m"],
        ["Q", "HHV"],
    ]  # check for mass parameters then energy parameters
    flag = [True, True]
    for n in in_streams + out_streams:
        has_parameters = [
            all([hasattr(S[n], p) for p in params]) for params in parameters
        ]
        for i in range(len(parameters)):
            if not has_parameters[i]:
                flag[i] = False
                print(
                    f"Stream {n} is missing at least one of the attributes in {parameters[i]}"
                )

    if flag[0]:
        m_in, m_out = sum([S[n].m for n in in_streams]), sum(
            [S[n].m for n in out_streams]
        )
        print()
        stream_table(
            *[S[n] for n in sorted([*in_streams, *out_streams])],
            label=name,
            col_numbers=6,
            long_form=True,
        )
        print(f"{'mass input':40}", m_in)
        print(f"{'mass output':40}", m_out)
        print(
    ##################
    ##HIDDEN PORTION##
    ##################
        
    if flag[1]:
    ##################
    ##HIDDEN PORTION##
    ##################
        print(f"{'net energy difference per unit mass':40}", result / m_out)

In [None]:
def heat_proximate(K, T, cp_dry, T_REF=T_REF):
    """returns the heat of a proximate stream based on the proximate mass vector, temperature and the specific heat of
    the dry material"""
    ##################
    ##HIDDEN PORTION##
    ##################


def CV_proximate(K, CV_vols):
    """Calculates the calorific value of a solids with an assumption of what the volatiles calorific value is"""
    ##################
    ##HIDDEN PORTION##
    ##################


def CV_volatiles(K, CV):
    """Estimates the calorific value of the portion which is volatiles"""
    if K[Volatiles].m > 0:
    ##################
    ##HIDDEN PORTION##
    ##################
    else:
    ##################
    ##HIDDEN PORTION##
    ##################
        )


def proximate_ultimate_analysis(K, A_vols):
    """Calculates the ultimate analysis of the materials using the proximate analysis and analysis of volatiles"""
    return frac(
    ##################
    ##HIDDEN PORTION##
    ##################
    )

In [None]:
def combustion_coordinate_ultimate_analysis(B):
    """
    Returns a coordinate of a combustion reaction (without including the primary reactant) for the combustion of a material
    with the ultimate analysis as given.

    :param B: Element vector
    :type B: pint.Quantity containing a ndarry of dimension N_ELEMENTS

    :return: unbalanced combustion coordinate
    :rtype: pint.Quantity containing a ndarry of dimension N_COMPONENTS
    """
    pass

    ##################
    ##HIDDEN PORTION##
    ##################
    return Y


def heat_of_formation_for_volatiles(
    CV: pint.Quantity, B: pint.Quantity
) -> pint.Quantity:
    """
    returns the heat of formation based on the calorific value and the ultimate analysis

    should be used on dry samples only

    CV should be a positive value and not a negative HHV value.
    """
    ##################
    ##HIDDEN PORTION##
    ##################

    return (heat_of_formation(N_poc) + CV * EW) / EW


def H_f_proximate(
    K: pint.Quantity, CV_vols: pint.Quantity, B_vols: pint.Quantity
) -> pint.Quantity:
    """
    returns the effective heat of formation based on the amount of material and nature of volatiles
    """
    ##################
    ##HIDDEN PORTION##
    ##################
    return np.sum(h_f*K).to(ureg.MJ / ureg.hr)

In [None]:
def test_h_f_proximate():
    # test a proximate stream using methane as the volatile matter
    ##################
    ##HIDDEN PORTION##
    ##################
    print(f"check {check} is determined through component means")
    print(f"concept {concept} is determined through proximate means")
    print(f"{(check-concept).m/check.m:.3E} is the ratio")
    print("This checks out")

test_h_f_proximate()

check -5972.26 kJ/kg is determined through component means
concept -5.97 MJ/kg is determined through proximate means
-4.569E-16 is the ratio
This checks!


In [None]:
def test_HHV_from_prox():
    # test a proximate stream using methane as the volatile matter
    #K = #HIDDEN
    #a = #HIDDEN

    # Evaluating as component
    ##################
    ##HIDDEN PORTION##
    ##################
    print(f"Reactants          = {reactants_h_f}")
    print(f"Products           = {products_h_f}")
    print(f"Products H2O_g     = {products_h_f_v}")
    print(f"ΔH                 = {products_h_f - reactants_h_f}")
    print(f"ΔH                 = {products_h_f_v - reactants_h_f}")

    print(f"Using LHV function = {LHV(N/n)}")
    print(f"Using HHV function = {HHV(N/n)}")

    out([N, Na, Nc])
    out([mass_vector(N), mass_vector(Na), mass_vector(Nc)])

test_HHV_from_prox()

Reactants          = -92.34 kJ/mol
Products           = -644.99 kJ/mol
Products H2O_g     = -703.43 kJ/mol
ΔH                 = -552.65 kJ/mol
ΔH                 = -611.09 kJ/mol
Using LHV function = -552.65 kJ/mol
Using HHV function = -611.09 kJ/mol


[1m            [0m [1m          [0m [1m          [0m [1m          [0m
[1mSymbol      [0m [1m     mol/h[0m [1m     mol/h[0m [1m     mol/h[0m
─────────────────────────────────────────────
N2                 0.00   34090.26   34090.26
O2                 0.00    9061.97       0.00
H2O                0.00       0.00    8590.26
CO2                0.00       0.00    5321.93
C1              1581.88       0.00       0.00
CH4             3740.04       0.00       0.00
H2O_l           1110.17       0.00       0.00
Si1               35.61       0.00      35.61
─────────────────────────────────────────────
[1mTotal       [0m [1m   6467.70[0m [1m  43152.23[0m [1m  48038.05[0m



[1m            [0m [1m          [0m [1m          [0m [1m          [0m
[1mSymbol      [0m [1m      kg/h[0m [1m      kg/h[0m [1m      kg/h[0m
─────────────────────────────────────────────
N2                 0.00     954.98     954.98
O2                 0.00     289.97       0.00
H2O                0.00       0.00     154.76
CO2                0.00       0.00     234.22
C1                19.00       0.00       0.00
CH4               60.00       0.00       0.00
H2O_l             20.00       0.00       0.00
Si1                1.00       0.00       1.00
─────────────────────────────────────────────
[1mTotal       [0m [1m    100.00[0m [1m   1244.96[0m [1m   1344.96[0m



In [None]:
def set_proximate_stream(
    stream: Stream,
    V: pint.Quantity = None,
    L_dry: pint.Quantity = None,
    x_moisture: pint.Quantity = None,
    m_dry: pint.Quantity = None,
    L: pint.Quantity = None,
    m: pint.Quantity = None,
    K: pint.Quantity = None,
    cp_dry: pint.Quantity = None,
    Q: pint.Quantity = None,
    T: pint.Quantity = None,
    CV: pint.Quantity = None,
    CV_dry: pint.Quantity = None,
    CV_vols: pint.Quantity = None,
    A: pint.Quantity = None,
    A_vols: pint.Quantity = None,
    rho: pint.Quantity = None,
    chain=False,
):
    """Adds attributes to plant for the complete definition of the stream based on inputs and state variables T and P.

    Inputs
    ------

    Nominally the N mole vector (in [substance]/[time] is expected along with the Temperature and Pressure of the stream. However, if
    there is no value of M passed, and in the following order, M will be determined then the remainder of the stream
    populated.
    - M (mass vector)
    - A combination of n, m or v (preferred order) and Y or X (preferred order)
    *Note:* v yet to be implemented

    - allow_gas is normally true. The purpose is to allow the density of the gas to be calculated.

    :returns: populated stream
    :rtype: Stream"""

    # Mass Vector
    # Combos to test = ['K', [['L', ['L_dry', 'x_moisture']], ['m_dry', 'm', ['V', 'rho']]]] OR/AND/OR/AND etc
    if K is not None:
        stream.K = K
        L = frac(K)
        m = np.sum(K)
    elif ((L is not None) or (L_dry is not None and x_moisture is not None)) and (
        (m is not None) or (m_dry is not None) or (V is not None and rho is not None)
    ):
        L = (
    ##################
    ##HIDDEN PORTION##
    ##################
        )
        m = (
    ##################
    ##HIDDEN PORTION##
    ##################
        )
        K = stream.K = (L * m).to(ureg.kg / ureg.hr)

    if stream.K is None:
        L, L_dry, x_moisture = (
            getattr(stream, k, None) for k in ["L", "L_dry", "x_moisture"]
        )
        m, m_dry, V, rho = (
            getattr(stream, k, None) for k in ["m", "m_dry", "V", "rho"]
        )
        if ((L is not None) or (L_dry is not None and x_moisture is not None)) and (
    ##################
    ##HIDDEN PORTION##
    ##################
        ):
            L = (
                L
                if L is not None
                else frac(L_dry * (1 - x_moisture) + L_MOISTURE() * x_moisture)
            )
            m = (
                m
                if m is not None
                else m_dry / np.sum(L * (1 - L_MOISTURE()))
                if m_dry is not None
                else V * rho
            )
            K = stream.K = (L * m).to(ureg.kg / ureg.hr)

    if stream.K is not None:
    ##################
    ##HIDDEN PORTION##
    ##################
    else:
        raise ValueError(
            "Insufficient information to determine mass and composition of stream provided"
        )

    # Temperature and Heat
    stream.cp_dry = (
    ##################
    ##HIDDEN PORTION##
    ##################
    )
    if (hasattr(stream, "T") and Q is None) or (T is not None):
    ##################
    ##HIDDEN PORTION##
    ##################
    elif (hasattr(stream, "Q")) or (Q is not None):
    ##################
    ##HIDDEN PORTION##
    ##################

    # Calorific value
    if (hasattr(stream, "CV") and CV_vols is None and CV_dry is None) or (
        CV is not None
    ):
        stream.CV = (CV if CV is not None else stream.CV).to(ureg.MJ / ureg.kg)
    elif (hasattr(stream, "CV_dry") and CV_vols is None) or (CV_dry is not None):
    ##################
    ##HIDDEN PORTION##
    ##################
    elif (hasattr(stream, "CV_vols")) or (CV_vols is not None):
    ##################
    ##HIDDEN PORTION##
    ##################
    if hasattr(stream, "CV"):
    ##################
    ##HIDDEN PORTION##
    ##################

    # Elemental balance
    if (
    ##################
    ##HIDDEN PORTION##
    ##################
    ):
        if (hasattr(stream, "A") and A_vols is None) or (A is not None):
    ##################
    ##HIDDEN PORTION##
    ##################
            )
        elif (hasattr(stream, "A_vols")) or (A_vols is not None):
    ##################
    ##HIDDEN PORTION##
    ##################
            )
            stream.A = proximate_ultimate_analysis(stream.K, stream.A_vols).to(ureg.pbw)
        stream.B = frac(element_moles(stream.A)).to(ureg.pbm)
        stream.B_vols = frac(element_moles(stream.A_vols)).to(ureg.pbm)

    if hasattr(stream, "B_vols") and hasattr(stream, "CV_vols"):
        stream.H_f = H_f_proximate(stream.K, stream.CV_vols, stream.B_vols)

    # Pressure
    stream.P = getattr(stream, "P", Q_(101.3, ureg.kPa))

    return stream if chain else None

In [None]:
def plant_as_dict(plant: Plant) -> dict:
    """
    Get the dictionary of the Plant object
    """
    silent_attributes = getattr(plant, "_silent_attributes", ["tag"])
    return {k: v for k, v in vars(plant) if k not in silent_attributes}


def copy_plant_object(obj: Plant):
    """
    Provides a copy of a plant object. Note that reassignment will fail and only supply the same reference.
    """
    cls = obj.__class__
    new_obj = cls(**vars(obj).copy())
    for v in vars(new_obj):
    ##################
    ##HIDDEN PORTION##
    ##################
    return new_obj


def is_empty_plant(test: Plant) -> bool:
    """
    Simple test to determine if a Plant object does or does not have any properties
    """
    return True if isinstance(test, Plant) and len(plant_as_dict) == 0 else False


def is_proximate_stream(stream: Stream) -> bool:
    """
    Returns if the stream is a properly formed proximate stream with values
    """
    # test_attributes = {'K': {"fn": isqarray, "args": [], "kwargs": {}},
    #                    'T': {"fn": isinstance, "args": [pint.Quantity], "kwargs": {}},
    #                    'rho': {"fn": isinstance, "args": [pint.Quantity], "kwargs": {}},
    #                   }
    tests = [
        TypeConstraint(
            keys="K", units=[ureg.kg, ureg.kg / ureg.hr], dimensions=(N_PROX,)
        ),
        TypeConstraint(keys="T", units=ureg.K),
        TypeConstraint(keys="rho", units=ureg.kg / ureg.m**3),
    ]
    return all([test.check(stream) for test in tests])


def is_stream(stream: Stream) -> bool:
    """
    Returns True if the stream is a properly formed stream with values
    """
    tests = [
        TypeConstraint(
            keys="M", units=[ureg.kg, ureg.kg / ureg.hr], dimensions=(N_COMPONENTS,)
        ),
        TypeConstraint(keys="T", units=ureg.K),
        TypeConstraint(keys="P", units=ureg.kPa),
    ]
    return all([test.check(stream) for test in tests])

In [11]:
def load_yaml_specification(title, path) -> Plant:
    """
    Loads the yaml specification for the MEB into a dict
    """

    qarray_pattern = re.compile(
        "^(.*)\W*\[(\W*(?:(?!-0?(?:\.0+)?(?:e|$))-?(?:0|[1-9]\d*)?(?:\.\d+)?(?<=\d)(?:e-?(?:0|[1-9]\d*))?|0x[0-9a-f]+)\W*(?:,\W*(?:(?!-0?(?:\.0+)?(?:e|$))-?(?:0|[1-9]\d*)?(?:\.\d+)?(?<=\d)(?:e-?(?:0|[1-9]\d*))?|0x[0-9a-f]+)\W*)*)\]"
    )

    def string_to_qarray(test_string):
        unit_string, array_string = qarray_pattern.match(test_string).groups()
        return qarray([float(n) for n in array_string.split(",")]) * Q_(unit_string)

    def qarray_constructor(loader: yaml.Loader, node: yaml.Node):
        value = loader.construct_scalar(node)
        return string_to_qarray(value)

    def quantity_constructor(loader: yaml.Loader, node: yaml.Node):
        value = loader.construct_scalar(node)
        try:
            q = pint.Quantity(value)
            return q
        except pint.UndefinedUnitError as e:
            return value
        except pint.DefinitionSyntaxError as e:
            raise yaml.parser.ParserError(
                "while parsing a quantity ", node.start_mark, " units were not found"
            )

    def get_yaml_loader():
        """Return a yaml loader."""
        loader = yaml.SafeLoader

        # remove any previously added constructors then add the necessary
        for k in list(loader.yaml_constructors.keys()):
            if not str(k).startswith("tag"):
                loader.yaml_constructors.pop(k)
        loader.add_constructor("!q", quantity_constructor)
        loader.add_constructor("!qarray", qarray_constructor)

        # remove any implicit resolvers then add the necessary
        if None in loader.yaml_implicit_resolvers.keys():
            del loader.yaml_implicit_resolvers[None]
        loader.add_implicit_resolver("!q", re.compile("^\d.*$"), None)
        loader.add_implicit_resolver("!qarray", qarray_pattern, None)

        return loader

    def construct_stream_list(data: dict) -> dict:
        """reads shorthand string from the YAML config file and adds Stream object"""
        return {n: load_plant_object(i, f"{n} : {i['name']}") for n, i in data.items()}

    def construct_units_list(data: dict) -> dict:
        return {n: load_plant_object(i, f"{n} : {i['name']}") for n, i in data.items()}

    def load_plant_object(data: dict, title: str) -> Plant:
        """Recursively loads dictionary objects into Plant objects"""
        re_subs = re.compile("(?<=\w)([ ]+)(?=\w)")
        re_valid = re.compile("^[A-Za-z_][A-Za-z0-9_\-.]*$")
        keys = list(map(lambda k: re_subs.sub("_", k), data.keys()))
        if not all(map(re_valid.match, keys)):
            raise KeyError("cannot form a Plant object from given dictionary")
        else:
            p = Plant(title)
            for key, value in zip(keys, data.values()):
                if isinstance(value, dict):
                    try:
                        value = load_plant_object(data=value, title=key)
                    except KeyError:
                        pass
                setattr(p, key.replace(".", "").replace("-", ""), value)
            return p

    with open(path, "r") as f:
        a = yaml.load(f, Loader=get_yaml_loader())

    spec = Plant(title)
    spec.streams = construct_stream_list(a["streams"])
    spec.units = construct_units_list(a["units"])
    spec.design = load_plant_object(a["design"], "design")

    return spec

In [12]:
def get_stream(s) -> stream:
    """returns a stream from the stream set S of number s (if s is in S) else expects to be a stream"""
    if s in S:
        return S[s]
    else:
        if isinstance(Stream, s):
            return s
        else:
            raise TypeError(
                "cannot get a stream that is either not an index in the streams dictionary or is not a Stream itself"
            )

In [13]:
@functools.cache
def get_specification_endpoints(specification) -> dict:
    """
    returns the feed streams from the specification - based on only having no inlets
    """
    all_outlets = {
        getattr(unit.outlets, outlet)
        for unit in specification.units.values()
        for outlet in [v for v in vars(unit.outlets) if v != "tag"]
    }
    all_inlets = {
        getattr(unit.inlets, inlet)
        for unit in specification.units.values()
        for inlet in [v for v in vars(unit.inlets) if v != "tag"]
    }
    all_streams = set(specification.streams.keys())
    return {
        "feed": all_streams - all_outlets,
        "product": all_streams.difference(all_inlets),
        "non-endpoint": all_inlets & all_outlets,
        "all": all_streams,
    }


@functools.cache
def get_all_nodes(specification) -> dict:
    """
    returns all of the nodes for the specificaiton including special nodes for endpoints
    """
    nodes = {}
    endpoints = get_specification_endpoints(specification)
    for n, f in enumerate(endpoints["feed"], start=1):
        p = Plant(f"Start {n}", type="feed")
        p.inlets, p.outlets = Plant(""), Plant("", F=f)
        nodes.update({str(n): p})
    for n, f in enumerate(endpoints["product"], start=len(nodes) + 1):
        p = Plant(f"End {n}")
        p.type, p.inlets, p.outlets = "product", Plant("", P=f), Plant("")
        nodes.update({str(n): p})
    nodes.update(units)
    return nodes


@functools.cache
def get_all_edges(specification) -> dict:
    """
    returns a dictionary containing stream: (from, to)
    """
    nodes = get_all_nodes(specification)
    inlets, outlets = {}, {}
    for n in nodes:
        inlets.update(
            {
                n: [v for k, v in vars(nodes[n].inlets).items() if k != "tag"]
                if hasattr(nodes[n], "inlets") and isinstance(nodes[n].inlets, Plant)
                else []
            }
        )
        outlets.update(
            {
                n: [v for k, v in vars(nodes[n].outlets).items() if k != "tag"]
                if hasattr(nodes[n], "outlets") and isinstance(nodes[n].outlets, Plant)
                else []
            }
        )
    all_streams = get_specification_endpoints(specification)["all"]

    display(inlets, outlets)

    reverse_inlets = {k: [n for n in inlets if k in inlets[k]] for k in all_streams}
    reverse_outlets = {k: [n for n in outlets if k in outlets[k]] for k in all_streams}

    # for k in all_streams:
    #     rid[k] = [n for n in id if k in id[n]]
    #     rod[k] = [n for n in id if k in od[n]]

    # display(rid, rod)
    return dict(
        zip(all_streams, list(zip(reverse_outlets.values(), reverse_inlets.values())))
    )


def print_stream_names(*args, title=""):
    print(f"{f' {title} ':=^80}")
    print("\n".join([f"    {get_stream(n).tag}" for n in args]))


def print_endpoints(specification):
    """prints out a list of names of the feed and product streams"""
    endpoints = get_specification_endpoints(specification)
    print_stream_names(title="Feed Streams", *endpoints["feed"])
    print_stream_names(title="Product Streams", *endpoints["product"])
    print_stream_names(title="Other", *endpoints["non-endpoint"])

In [14]:
def graph_analysis(specification) -> nx.DiGraph:
    """
    Performs graph analysis and work on the specification and returns a Graph object
    """
    nodes = get_all_nodes(specification)
    dg = nx.DiGraph()
    dg.add_nodes_from(nodes)
    streams = specification.streams

    dg.add_edges_from(get_all_edges(specification).values())

    # for s in streams:
    #     nf, nt = [], []
    #     for n in nodes:
    #         nf += [s] if hasattr(nodes[n], 'outlets') and (s in [v for v in vars(nodes[n].outlets) if v!='tag']) else []
    #         nt += [s] if hasattr(nodes[n], 'inlets') and (s in [v for v in vars(nodes[n].inlets) if v!='tag']) else []
    #     streams[s].node_from = nf
    #     streams[s].node_to = nt
    #     print(streams[s])

    # edges = {s:list(itertools.product(S[s].node_from, S[s].node_to)) for s in S}

    # dg.add_edges_from([(i, o) for value in edges.values() for i,o in value])

    return dg

In [15]:
specification = load_yaml_specification("Factibox balance", "design.yaml")
S = specification.streams
units = specification.units
design = specification.design

In [16]:
def get_connections(tag: str) -> list:
    return ([v for k, v in units[tag].inlets.__dict__.items() if k not in ['tag']],
            [v for k,v in units[tag].outlets.__dict__.items() if k not in ['tag']])

def review_unit(tag: str):
    system_review(units[tag].tag, *get_connections(tag))

In [17]:
def rotating_drum_flow_regime(Fr):
    """Return the flow regime given a Froude number for rotational movement"""
    if Fr < 1.0e-5:
        return "slipping"
    elif Fr < 0.3e-3:
        return "slumping"
    elif Fr < 0.5e-4:
        return "rolling/slumping"
    elif Fr < 0.2e-1:
        return "rolling"
    elif Fr < 0.4e-1:
        return "cascading/rolling"
    elif Fr < 0.8e-1:
        return "cascading"
    elif Fr < 0.9e-1:
        return "cataracting/cascading"
    elif Fr < 1.0:
        return "cataracting"
    else:
        return "centrifuging"

In [None]:
def heat_solids(K: pint.Quantity, T: pint.Quantity, cp_dry: pint.Quantity = None) -> pint.Quantity:
    """determines the heat of the solids using the analysis and a specific heat for the solids component."""
    if cp_dry is None:
        cp_dry = Q_(1.0, ureg.kJ / ureg.K / ureg.kg)
    return (
    ##################
    ##HIDDEN PORTION##
    ##################
    ).to_reduced_units()


def temp_solids(K: pint.Quantity, Q: pint.Quantity, cp_dry: pint.Quantity = None) -> pint.Quantity:
    """determines the temperature of solids based on an input heat value"""
    if cp_dry is None:
        cp_dry = Q_(1.0, ureg.kJ / ureg.K / ureg.kg)
    return (
    ##################
    ##HIDDEN PORTION##
    ##################
    ).to_reduced_units()