In [None]:
import json
import networkx as nx
import matplotlib.pyplot as plt


In [None]:
def read_json(filename):
    with open(filename, 'r') as f:
        j = json.load(f)
        
    return j

In [None]:
spec = read_json('nodes.json')

In [None]:
spec

In [None]:
def make_constant(x):
    return {
        "type": "CONSTANT",
        "params": {
            "signal": x
        }
    }
def make_sine():
    return {
        "type": "SINE_OSC",
        "params": {
            "freq": 440.0,
            "amp": 0.5
        }
    }

def make_midi():
    return {
        "type": "MIDI",
        "params": {}
    }

def make_channel():
    return {
        "type": "CHANNEL_UNPACK",
        "params": {}
    }
    
def make_node(node_type):
    return {
        "type": node_type,
        "params": {}
    }
    
def make_slider(value):
    return {
        "type": "SLIDER",
        "params": {
            "v_min": 0.0,
            "v_max": 1.0,
            "signal": value
        }
    }
    
def make_adsr(param):
    return {
        "type": "ADSR",
        "params": {
            "attack": {"value": param[0]}, 
            "decay": {"value": param[1]},
            "sustain": {"value": param[2]},
            "release": {"value": param[3]}
        }
    }

In [None]:
def process_spec(node_spec):
    inputs = {}
    outputs = {}
    
    for x in node_spec["inputs"]:
        inputs[x["name"]] = x

    for x in node_spec["outputs"]:
        outputs[x["name"]] = x
        
    return {
        "spec": node_spec,
        "inputs": inputs,
        "outputs": outputs
    }
        
class GraphBuilder:
    def __init__(self, spec):
        self._id = 0
        self._nodes = {}
        self._id_to_name = {}
        self._links = []
        self._spec = {
            x["name"]: process_spec(x)
            for x in spec
        }
                        
    def add_node(self, name, node):
        assert(name not in self._nodes)
        node["id"] = self._id
        node["attributes"] = {
            "x": self._id * 200,
            "y": 100,
            "display_name": name
        }
        
        self._id_to_name[self._id] = name     
        self._nodes[name] = node
        self._id += 1
        return name
    
    def get_id(self, name):
        return self._nodes[name]["id"]
    
    def get_name(self, node_id):
        return self._id_to_name[node_id]
    
    def get_type(self, name):
        return self._nodes[name]["type"]
    
    def get_input_by_name(self, node_type, input_name):
        return self._spec[node_type]["inputs"][input_name]

    def get_output_by_name(self, node_type, output_name):
        return self._spec[node_type]["outputs"][output_name]
    
    def connect(self, src_name, out_name, dst_name, in_name):
        self._links.append([
            self.get_id(src_name), 
            self.get_output_by_name(self.get_type(src_name), out_name)["index"],
            self.get_id(dst_name),
            self.get_input_by_name(self.get_type(dst_name), in_name)["index"],
        ])
        
    def assign_positions(self, scale=2000, prog='neato', args=''):
        plt.figure(figsize=(20, 20))
        G = nx.DiGraph(directed=True)
        for node in builder._nodes.keys():
            G.add_node(node)

        for lnk in builder._links:
            src_name = builder.get_name(lnk[0])
            dst_name = builder.get_name(lnk[2])
            G.add_edge(src_name, dst_name)
        positions = nx.nx_agraph.graphviz_layout(G, prog=prog, args=args)
        for name, pos in positions.items():
            self._nodes[name]["attributes"].update({
                "x": pos[0] * scale,
                "y": pos[1] * scale,
            })
            
        nx.draw_networkx_nodes(G, positions, node_size = 100)
        nx.draw_networkx_labels(G, positions)
        nx.draw_networkx_edges(G, positions, arrows=True)

        plt.show()
        return positions
        
    def make_graph(self):
        return {
            "nodes": [v for v in self._nodes.values()],
            "links": self._links
        }

In [None]:
builder = GraphBuilder(spec)

In [None]:
def add_sum_n(builder, outputs, name):
    for i in range(len(outputs) - 1):
        builder.add_node(f"add_{name}_{i}", make_node("ADD"))
        
    for i in range(len(outputs) - 1):
        builder.connect(outputs[i][0], outputs[i][1], f"add_{name}_{i}", "b")
        if i < len(outputs) - 2:
            builder.connect(f"add_{name}_{i+1}", "signal" , f"add_{name}_{i}", "a")
        else:
            builder.connect(outputs[-1][0], outputs[-1][1], f"add_{name}_{i}", "a")
            
    return f"add_{name}_0"

In [None]:
def gen_io(builder):
    builder.add_node("midi", make_midi())
    builder.add_node("channel", make_channel())
    builder.connect("midi", "ch", "channel", "ch")
    
    builder.add_node("output", make_node("OUTPUT"))
    return builder

In [None]:
def gen_hammond(builder, drawbars, adsr_param):
    hammond_harmonics = [
        0.5, 1.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0
    ]
    builder.add_node("adsr", make_adsr(adsr_param))
    builder.connect("midi", "ch", "adsr", "ch")

    for i, harmonic in enumerate(hammond_harmonics):
        builder.add_node(f"harmonic_scale_{i}", make_constant(harmonic))
        builder.add_node(f"freq_mul{i}", make_node("MULTIPLY"))
        builder.connect("channel", "freq", f"freq_mul{i}", "a")
        builder.connect(f"harmonic_scale_{i}", "signal", f"freq_mul{i}", "b")
        
        builder.add_node(f"osc_{i}", make_sine())
        builder.connect(f"freq_mul{i}", "signal", f"osc_{i}", "freq")
        
        builder.add_node(f"amp_{i}", make_slider(0.1 * drawbars[i]))
        builder.add_node(f"ampmul_{i}", make_node("MULTIPLY"))
        builder.connect(f"amp_{i}", "signal", f"ampmul_{i}", "a")
        builder.connect("channel", "vel", f"ampmul_{i}", "b")
        builder.add_node(f"adsr_mul_{i}", make_node("MULTIPLY"));
        builder.connect("adsr", "vel", f"adsr_mul_{i}", "a")
        builder.connect(f"ampmul_{i}", "signal", f"adsr_mul_{i}", "b")
        builder.connect(f"adsr_mul_{i}", "signal", f"osc_{i}", "amp")
        
    osc_outputs = [
        (f"osc_{i}", "signal") 
        for i in range(len(hammond_harmonics))
    ]
    sum_name = add_sum_n(builder, osc_outputs, "harmonic")
    builder.connect(sum_name, "signal", "output", "signal")
    
    return builder

In [None]:
drawbars = [0.0] * 9
drawbars[:3] = [1.0, 1.0, 1.0]
adsr_param = [0.01, 0.15, 0.3, 0.2]

builder = GraphBuilder(spec)
builder = gen_io(builder)
builder = gen_hammond(builder, drawbars=drawbars, adsr_param=adsr_param)
pos = builder.assign_positions(scale=5, prog='dot')
graph = builder.make_graph()

In [None]:
with open("hammond.json", "wt") as f:
    json.dump(graph, f, indent=4)

# DX7 algorithms

In [None]:
"""
The DX 7 has 6 sine wave operators
Operators are actually more than just oscillators.
They can be viewed as a package containing an oscillator, an amplifier & an envelope Generator.
The operators can be either a carrier or modulator.

https://djjondent.blogspot.com/2019/10/yamaha-dx7-algorithms.html
"""



In [None]:
def make_nx_graph(alg):
    G = nx.DiGraph()
    for op_desc in alg["operators"]:
        G.add_node(op_desc["index"])

    for e in alg["edges"]:
        G.add_edge(e[0], e[1])
        
    return G

def make_ancestors_subgraph(g: nx.DiGraph, source):
    nodes = list(nx.ancestors(g, source)) + [source]
    return nx.subgraph(g, nodes)

def add_op(builder, op_desc, prefix):
    op_idx = op_desc["index"]
    osc_node = builder.add_node(f"{prefix}_alg_{op_idx}_osc", make_sine())
    adsr_node = builder.add_node(f"{prefix}_alg_{op_idx}_env", make_adsr(op_desc["env"]))
    vel_node = builder.add_node(f"{prefix}_alg_{op_idx}_vel", make_constant(op_desc["vel"]))
    ratio_node = builder.add_node(f"{prefix}_alg_{op_idx}_ratio", make_constant(op_desc["freq_ratio"]))
    mul1_node = builder.add_node(f"{prefix}_alg_{op_idx}_mul1", make_node("MULTIPLY"))
    mul2_node = builder.add_node(f"{prefix}_alg_{op_idx}_mul2", make_node("MULTIPLY"))
    
    builder.connect("channel", "freq", mul1_node, "a")
    builder.connect(ratio_node, "signal", mul1_node, "b")
    
    builder.connect(mul1_node, "signal", osc_node, "freq")
    builder.connect(vel_node, "signal", osc_node, "amp")
    
    builder.connect("midi", "ch", adsr_node, "ch")
    
    builder.connect(osc_node, "signal", mul2_node, "a")
    builder.connect(adsr_node, "vel", mul2_node, "b")
    
    return {
        "osc": osc_node,
        "adsr": adsr_node,
        "output": mul2_node
    }
    
def add_dx7_algorithm(builder, alg, prefix):
    num_nodes = len(alg["operators"])
    G = make_nx_graph(alg)
        
    ops = {}

    for op_desc in alg["operators"]:
        ops[op_desc["index"]] = add_op(builder, op_desc, prefix)
        
    for node in G.nodes:
        degree = G.in_degree[node]
        in_edges = list(G.in_edges(node))

        if degree == 1:
            edge = in_edges[0]
            builder.connect(ops[edge[0]]["output"], "signal", ops[node]["osc"], "phase")
        elif degree > 1:
            outputs = [
                (node[edge[0]]["output"], "signal") for edge in in_edges
            ]
            out_sum = add_sum_n(builder, outputs, f"{prefix}_sum_phase_op_{node}_")
            builder.connect(out_sum, "signal", ops[node]["osc"], "phase")
    
    # Get sum of leaf node outputs
    final_nodes = [node for node, degree in G.out_degree if degree == 0]
    final_ops = [ops[i] for i in final_nodes]
    
    if len(final_ops) == 1:
        final_sum = final_ops[0]["output"]
    elif len(final_ops) > 1:
        final_outputs = [
            (op["output"], "signal") for op in final_ops
        ]
        final_sum = add_sum_n(builder, final_outputs, f"{prefix}_final_out")
    else:
        raise RuntimeError("Algorithm has no final nodes")
        
    return final_sum, ops

In [None]:
def make_feedback_subgraph(alg_desc, feedback_edge):
    feedback_src = feedback_edge[0]
    G = make_nx_graph(alg)
    subgraph = make_ancestors_subgraph(G, feedback_src)
    sub_desc = {
        "edges": list(subgraph.edges),
        "operators": [
            op for op in alg_desc["operators"]
            if op["index"] in subgraph.nodes
        ],
        "feedback": []
    }
    
    return sub_desc

def gen_dx7(builder, alg):
    final_sum, ops = add_dx7_algorithm(builder, alg, "main")
    
#     for i, feedback_edge in enumerate(alg["feedback"]):
#         sub_alg = make_feedback_subgraph(alg, feedback_edge)
#         feedback, _ = add_dx7_algorithm(builder, sub_alg, f"feedback_{i}")
#         builder.connect(feedback, "signal", ops[feedback_edge[1]]["osc"], "phase")
    
    builder.connect(final_sum, "signal", "output", "signal")
    return builder

In [None]:
default_adsr = [0.1, 0.1, 0.5, 0.1]

alg = {
    "edges": [
        [2, 6], [6, 3], [6, 4], [6, 5]
    ],
    "feedback": [[6, 6]],
    "operators": [
        {
            "index": 1,
            "freq_ratio": 2.0,
            "vel": 0.2,
            "env": default_adsr
        },
        {
            "index": 2,
            "freq_ratio": 2.0,
            "vel": 0.5,
            "env": default_adsr
        },        
        {
            "index": 3,
            "freq_ratio": 4.0,
            "vel": 0.2,
            "env": default_adsr
        },       
        {
            "index": 4,
            "freq_ratio": 1.0,
            "vel": 0.2,
            "env": default_adsr
        },
        {
            "index": 5,
            "freq_ratio": 1.0,
            "vel": 0.2,
            "env": default_adsr
        },
        {
            "index": 6,
            "freq_ratio": 3.0,
            "vel": 1.0,
            "env": default_adsr
        },
    ]
}

In [None]:
builder = GraphBuilder(spec)
builder = gen_io(builder)
builder = gen_dx7(builder, alg)
pos = builder.assign_positions(scale=5, prog='dot')
graph = builder.make_graph()

In [None]:
with open("dx7.json", "wt") as f:
    json.dump(graph, f, indent=4)

In [None]:
G = make_nx_graph(alg)
for fb in alg["feedback"]:
    G.add_edge(fb[0], fb[1])

In [None]:
plt.figure(figsize=(10,10))
positions = nx.nx_agraph.graphviz_layout(G, prog='dot')
# for name, pos in positions.items():
#     self._nodes[name]["attributes"].update({
#         "x": pos[0] * scale,
#         "y": pos[1] * scale,
#     })

node_color = [
    G.in_degree[i] for i in G.nodes
]

nx.draw_networkx_nodes(G, positions, node_size = 200, node_shape='o', node_color=node_color)
nx.draw_networkx_labels(G, positions)
nx.draw_networkx_edges(G, positions, arrows=True)

plt.show()

In [None]:
def 

In [None]:
nx.algorithms.ancestors(G, 6)

In [None]:
nx.DiGraph()