<a href="https://colab.research.google.com/github/ngcxy/Systems-of-ML/blob/main/Output_Stationary.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Global Variable Settings

We first initialize multiprocessing tools, generate random activations and weights, and print these values.

In [58]:
from multiprocessing import Process, Queue, Array
import random

num_activations = 8
num_weights = 3

# Create activations randomly.
activations = [random.uniform(0, 1) for _ in range(num_activations)]
# activations = [1,2,3,4,5,6,7,8]

# The fixed weights. Each PE will be hardcoded to work with a single entry in this list.
weights = [random.uniform(0, 1) for _ in range(num_weights)]
# weights = [1,2,3]
print(activations)
print(weights)

[0.974968136476043, 0.9486859662853662, 0.8142852843671379, 0.4049645785260493, 0.4532858078378864, 0.833822314634862, 0.38672965890459365, 0.7937409781279717]
[0.7848930569125159, 0.9106794198455073, 0.5856651540532452]


## Convolution Results

1-D convolution results for validation. The outputs must exactly match these values.

In [59]:
ground_truth = [0 for _ in range(num_activations-num_weights+1)]
for i in range(num_activations-num_weights+1):
  for k in range(num_weights):
      ground_truth[i] += weights[k]*activations[i+k]
print(ground_truth)

[2.1060930229354824, 1.7233435207524903, 1.2733934759534087, 1.2189926168402678, 1.3416197903741698, 1.471514519097605]


## Validation Function
The `mse_error` function computes the Mean Square Error (MSE) between two sequences.

In [60]:
def mse_error(ground_truth, output):
    error = 0.0
    for gt, out in zip(ground_truth, output):
        error += (gt-out) ** 2
    print(f"The expected results : {ground_truth}")
    print(f"The simulated results: {output[:]}")
    print(f"The Mean Square Error: {error:.4f}")

## Output stationary data flow

We implement three data flow approaches to achieve the output stationary.

### OS type 1

The weights are broadcasted while activations are passed through the PEs sequentially.

Notice that the PEs only retrieve weights when the required activations are passed in. Before that, the weights will be stored in the buffer of the queues.

For each OS_worker, it gets the activation from the `activate_queue` in the "left". If the activation is within its multiplication range, it will get the weight from `weight_queue`, multiply them together, and accumulate this partial sum into the `partial_output_register`. Then, it passes the activation to the next PE through `output_queue`.

In [61]:
def OS_worker1(id, partial_output_register, activate_queue, weight_queue, output_queue):
    ## The PE function for the Output Stationary Scheme 1

    while True:
        # Get input activation
        data = activate_queue.get()

        # Check if the activation is a termination signal
        if data is None:
            output_queue.put(data)
            break

        activation, time_step = data
        # Each PE takes input within certain range of time step
        if time_step<id or time_step>id+num_weights-1:
            output_queue.put(data)
            continue

        # Get kernel weights
        weight = weight_queue.get()

        # Update the RegFile in the PE
        partial_output_register[id] += weight*activation

        # Pass the input data into the output_queue
        output_queue.put(data)

    print(f"Worker ID {id}: is Done!")

In [62]:
num_pes = num_activations - num_weights + 1

activate_queues = [Queue() for _ in range(num_pes+1)]
weight_queues = [Queue() for _ in range(num_pes)]

processes = []

# Create a global variable for the PE RegFiles
PE_RegFiles = Array('d', [0.0 for _ in range(num_pes)])

# Create and start a process for each PE.
for i in range(num_pes):
    p = Process(target=OS_worker1, args=(
        i, PE_RegFiles, activate_queues[i], weight_queues[i], activate_queues[i+1]))
    processes.append(p)
    p.start()

# Broadcasting the weights to all PEs.
# The weights will be stored in the queues until the PE fetching valid activations
for i in range(num_weights):
    for w in weight_queues:
        w.put(weights[i])

print("Done broadasting Weights!")

# TODO: Pass the activation data input the leftmost PE
for i, activation in enumerate(activations):
    activate_queues[0].put((activation, i))

# Pass the termination signal in the end of the input sequence
activate_queues[0].put(None)
print("Done putting None!")

# Make sure to join the PE processes to clean up properly
for i, p in enumerate(processes):
    p.join()

OS_output = PE_RegFiles

print("All Done!")
mse_error(ground_truth, OS_output)

Worker ID 0: is Done!Worker ID 1: is Done!

Worker ID 2: is Done!Worker ID 3: is Done!

Worker ID 4: is Done!Worker ID 5: is Done!

Done broadasting Weights!
Done putting None!
All Done!
The expected results : [2.1060930229354824, 1.7233435207524903, 1.2733934759534087, 1.2189926168402678, 1.3416197903741698, 1.471514519097605]
The simulated results: [2.1060930229354824, 1.7233435207524903, 1.2733934759534087, 1.2189926168402678, 1.3416197903741698, 1.471514519097605]
The Mean Square Error: 0.0000


### OS type 2

The activations are broadcasted while weights are passed through the PEs sequentially.

For each OS_worker, it waits for the weights coming from the `weight_queue` in the left. During this time, it just gets the activation, does nothing, and continues. As soon as it receives a weight, it will get the current activation from `activation_queue`, multiply them together, and accumulate this partial sum into the `partial_output_register`. Then, it passes the weight to the next PE through `output_queue`.

In [63]:
def OS_worker2(id, partial_output_register, activate_queue, weight_queue, output_queue):
    ## The PE function for the Output Stationary Scheme 2

    while True:

        # Get input activation
        activation, time_step = activate_queue.get()

        # only process when there's weight com
        if time_step  < id:
            continue

        # Get kernel weight
        weight = weight_queue.get()

        # Check if the activation is a termination signal
        if weight is None:
            output_queue.put(weight)
            break

        # Update the RegFile in the PE
        partial_output_register[id] += weight*activation

        # Pass the input data into the output_queue
        output_queue.put(weight)

    print(f"Worker ID {id}: is Done!")

In [64]:
num_pes = num_activations - num_weights + 1

activate_queues = [Queue() for _ in range(num_pes)]
weight_queues = [Queue() for _ in range(num_pes+1)]

# List of processes: [PE[0]...PE[num_pes-1]]
processes = []

# Create a global variable for the PE RegFiles
PE_RegFiles = Array('d', [0.0 for _ in range(num_pes)])

# Create and start a process for each PE.
for i in range(num_pes):
    p = Process(target=OS_worker2, args=(
        i, PE_RegFiles, activate_queues[i], weight_queues[i], weight_queues[i+1]))
    processes.append(p)
    p.start()

# Broadcasting the activations to all PEs.
for i,activation in enumerate(activations):
    for a in activate_queues:
        a.put((activation,i))
# Broadcasting one more redundant signal for the last PE to terminate
for a in activate_queues:
    a.put((0,num_pes))


print("Done broadasting Activations!")

# TODO: Pass the activation data input the leftmost PE
for weight in weights:
    weight_queues[0].put(weight)

# Pass the termination signal in the end of the input sequence
weight_queues[0].put(None)
print("Done passing None!")

# Make sure to join the PE processes to clean up properly
for i, p in enumerate(processes):
    p.join()

OS_output = PE_RegFiles

print("All Done!")
mse_error(ground_truth, OS_output)

Worker ID 0: is Done!
Worker ID 1: is Done!Worker ID 2: is Done!
Worker ID 3: is Done!

Worker ID 4: is Done!
Worker ID 5: is Done!
Done broadasting Activations!
Done passing None!
All Done!
The expected results : [2.1060930229354824, 1.7233435207524903, 1.2733934759534087, 1.2189926168402678, 1.3416197903741698, 1.471514519097605]
The simulated results: [2.1060930229354824, 1.7233435207524903, 1.2733934759534087, 1.2189926168402678, 1.3416197903741698, 1.471514519097605]
The Mean Square Error: 0.0000


### OS type 3

The weights are broadcasted while activations are passed through the PEs sequentially.

Notice that all of the PEs will not start retrieving the broadcasted weight until a certain `time_step` (when the first activation arrives at the last PE). Further, the IDs of the PEs in this approach are arranged in a reversed order.


For each OS_worker, it gets the activation from the `activate_queue` in the "left" and keeps passing the activation to the next PE through the `output_queue`. As soon as the last PE receives an activation, it'll start retrieving the broadcasted weight in `weight_queue` and accumulate the multiplication result into the `partial_output_register`.

In [65]:
def OS_worker3(id, partial_output_register, activate_queue, weight_queue, output_queue):
    ## The PE function for the Output Stationary Scheme 1

    while True:
        # Get input activation
        data = activate_queue.get()

        # Check if the activation is a termination signal

        activation, time_step = data
        # Each PE takes input within certain range of time step
        if time_step<id:
            output_queue.put(data)
            continue

        # Get kernel weights
        weight = weight_queue.get()

        if weight is None:
            break;

        # Update the RegFile in the PE
        partial_output_register[id] += weight*activation

        # Pass the input data into the output_queue
        output_queue.put(data)

    print(f"Worker ID {id}: is Done!")

In [66]:
num_pes = num_activations - num_weights + 1

activate_queues = [Queue() for _ in range(num_pes+1)]
weight_queues = [Queue() for _ in range(num_pes)]

processes = []

# Create a global variable for the PE RegFiles
PE_RegFiles = Array('d', [0.0 for _ in range(num_pes)])

# Create and start a process for each PE.
for i in range(num_pes):
    p = Process(target=OS_worker3, args=(
        num_pes-1-i, PE_RegFiles, activate_queues[i], weight_queues[i], activate_queues[i+1]))
    processes.append(p)
    p.start()

# Broadcasting the weights to all PEs.
# The weights will be stored in the queues until the PE fetching valid activations
for i in range(num_weights):
    for w in weight_queues:
        w.put(weights[i])

print("Done broadasting Weights!")

# Pass the activation data input the leftmost PE
for i, activation in enumerate(activations):
    activate_queues[0].put((activation, i))
# Pass one more redundant activations for the PEs to receive terminate signal
activate_queues[0].put((0, num_pes))

# Pass the termination signal in the end of the input sequence
for i in range(num_weights):
    for w in weight_queues:
        w.put(None)
print("Done broadcasting None!")

# Make sure to join the PE processes to clean up properly
for i, p in enumerate(processes):
    p.join()

OS_output = PE_RegFiles

print("All Done!")
mse_error(ground_truth, OS_output)

Worker ID 5: is Done!Worker ID 4: is Done!

Worker ID 3: is Done!
Worker ID 2: is Done!
Worker ID 1: is Done!
Worker ID 0: is Done!
Done broadasting Weights!
Done broadcasting None!
All Done!
The expected results : [2.1060930229354824, 1.7233435207524903, 1.2733934759534087, 1.2189926168402678, 1.3416197903741698, 1.471514519097605]
The simulated results: [2.1060930229354824, 1.7233435207524903, 1.2733934759534087, 1.2189926168402678, 1.3416197903741698, 1.471514519097605]
The Mean Square Error: 0.0000
