In [1]:
# ------------------------------------------------------
# Letter State Machine
# ------------------------------------------------------
LETTERS = "ETIANMSURWDKGOHVF L PJBXCYZQ"

CURRENT_LETTER = 0

'''
Initializes the machine
'''
def init_letters():
    global CURRENT_LETTER

    CURRENT_LETTER = 0

'''
Shifts the state machine to the next letter

Params:
- is_dash(bool): is really some bit indiciating if the incoming symbol is a dot or dash
'''
def shift_letter(is_dash: bool) -> None:
    global CURRENT_LETTER

    CURRENT_LETTER = CURRENT_LETTER * 2 + is_dash + 1

'''
Gets the letter that the current state machine is on

Returns:
The length 1 string that has the character
None, if CURRENT_LETTER is not in the range of the list of letters
'''
def get_letter() -> str: 
    global LETTERS, CURRENT_LETTER

    ret_letter = ""

    if CURRENT_LETTER > 0 and CURRENT_LETTER <= len(LETTERS):
        ret_letter = LETTERS[CURRENT_LETTER - 1]

    CURRENT_LETTER = 0

    return ret_letter

'''
Finishes the machine and grabs the last letter
'''
def finalize() -> str:
    return get_letter()

In [2]:
from enum import IntEnum

BEAT_ERROR_RANGE = 0.15

'''
A list of the possible parsings from the morse code
'''
class MEANING(IntEnum):
    # Symbol
    DOT = 0
    DASH = 1

    # Pauses
    NEXT_SYMBOL = 2
    NEXT_LETTER = 3
    NEXT_WORD = 4

    # Unknown
    UNKNOWN = -1
        
'''
Removes noise from the beat_duration

Params:
- beat_nums(float): the rough number of beats
'''
def remove_noise(beat_nums: float):
    global BEAT_ERROR_RANGE

    error_ranges = [(dur, dur * (1 - BEAT_ERROR_RANGE), dur * (1 + (BEAT_ERROR_RANGE if dur != 7 else 1))) for dur in (1,3,7)]

    for (duration, duration_min, duration_max) in error_ranges:
        if beat_nums >= duration_min and beat_nums <= duration_max:
            return duration

    return -1

In [3]:
'''
Processes some parsed information and interacts with the "letter" state machine

Params:
- meaning (MEANING): some parsed info from the bitstream input

Returns:
Either a letter, a letter + a space, or None
'''
def process(meaning: MEANING) -> str | None:
    ret_proc = None
    match meaning:
        case MEANING.DOT:
            shift_letter(MEANING.DOT)
        case MEANING.DASH:
            shift_letter(MEANING.DASH)
        case MEANING.NEXT_LETTER:
            ret_proc = get_letter()
        case MEANING.NEXT_WORD:
            ret_proc = get_letter() + " "

    return ret_proc

In [4]:
# ------------------------------------------------------
# Beat Decoder
# ------------------------------------------------------
BEAT_DURATION = 1 # How many bits is a beat?

PREV_BIT = 0 # The value of the previous bit
NUM_OF_BIT = 0 # The number of occurrences of "prev bit"

'''
Determines what the last string of bit B means

Returns:
The information as one of the discrete meanings above.
'''
def parse_prev_inputs():
    global PREV_BIT, NUM_OF_BIT

    # TODO: eventually make this into some type of range
    beats = remove_noise(NUM_OF_BIT / BEAT_DURATION)

    meaning = None
    match (PREV_BIT, beats):
        case (1, 1):
            meaning = MEANING.DOT
        case (1, 3):
            meaning = MEANING.DASH
        case (0, 1):
            meaning = MEANING.NEXT_SYMBOL
        case (0, 3):
            meaning = MEANING.NEXT_LETTER
        case (0, 7):
            meaning = MEANING.NEXT_WORD
        case _:
            meaning = MEANING.UNKNOWN
    return meaning

'''
Processes the next bit

Params:
- bit(int): either a 0 or 1 that is either an "on" or "off" signal from the sender

Returns:
A letter if we have reached the end of a letter/word, else None
'''
def process_next_bit(bit: int) -> str | None:
    global PREV_BIT, NUM_OF_BIT

    if bit == PREV_BIT:
        NUM_OF_BIT += 1
    else:
        meaning = parse_prev_inputs()
        maybe_output = process(meaning)

        PREV_BIT = bit
        NUM_OF_BIT = 1

        return maybe_output

In [5]:
import sys

In [6]:
from pynq import Overlay, DefaultIP, allocate
import numpy as np

In [7]:
class Decoder(DefaultIP):
    CONTROL_REGISTER = 0x0
    def __init__(self, description):
        super().__init__(description = description)
        
    bindto = ['xilinx.com:hls:processNextBit:1.0']
        
    def start(self):
        self.write(Decoder.CONTROL_REGISTER, 0x01);

In [8]:
overlay = Overlay("/home/xilinx/pynq/overlays/project/decoder/process_next_bit.bit")
decoder = overlay.processNextBit_0

In [9]:
send_channel = overlay.axi_dma.sendchannel
recv_channel = overlay.axi_dma.recvchannel

In [10]:
def process_next_bit_hw(in_bits, recv_chars):
    global decoder, send_channel, recv_channel
    decoder.start()
    send_channel.start()
    recv_channel.start()
    send_channel.transfer(in_bits)
    send_channel.wait()
    recv_channel.transfer(recv_chars)
    recv_channel.wait()
    
    return recv_chars

In [11]:
decoder.register_map

RegisterMap {
  CTRL = Register(AP_START=0, AP_DONE=0, AP_IDLE=1, AP_READY=0, RESERVED_1=0, AUTO_RESTART=0, RESERVED_2=0, INTERRUPT=0, RESERVED_3=0),
  GIER = Register(Enable=0, RESERVED=0),
  IP_IER = Register(CHAN0_INT_EN=0, CHAN1_INT_EN=0, RESERVED_0=0),
  IP_ISR = Register(CHAN0_INT_ST=0, CHAN1_INT_ST=0, RESERVED_0=0)
}

In [12]:
bit_list = []

In [74]:
def process_hw(bit, finalize = False):
    global bit_list
    list_capacity = 100
    if finalize or len(bit_list) == list_capacity:
        print(bit_list)
        list_capacity = len(bit_list)
        in_bits = allocate(shape=(list_capacity,), dtype=np.uint32)
        recv_chars = allocate(shape=(list_capacity,), dtype=np.uint32)
        
        in_bits[:] = bit_list[:]
        print("in_bits", in_bits)
        output = process_next_bit_hw(in_bits, recv_chars)
        
        print("output", output)
        
        return_letter = ''.join(map(chr, recv_chars))
        bit_list = []
        del in_bits, recv_chars
        
        return return_letter
    else:
        bit_list.append(bit)

In [75]:
def finalize_hw():
    return process_hw(0, finalize = True)

In [61]:
# ------------------------------------------------------
# Translation
# ------------------------------------------------------
def translate(input_file, out = None, process_func = process_next_bit, finalize_func = finalize):
    if out is None:
        stdout = sys.stdout
    else:
        stdout = open(out, "w")

    with open(input_file, "r") as in_file:
        init_letters()
        for line in in_file:
            for bit in line:
                if bit in [" ", "\n"]:
                    continue

                bit = int(bit)
                
                letter = process_func(bit)

                if letter is not None:
                    print(letter, end = "", file = stdout)
                    
        print(finalize_func(), file = stdout)

    if out is not None:
        stdout.close()
        

In [15]:
from pathlib import Path
import time

In [16]:
'''
Basic Test Template to make other tests out of
'''
def translate_test(in_file, golden_file, result_file = None, time_it = False, **kwargs):
    if result_file is None:
        dot_split = in_file.split(".")
        result_file = "{}_out.{}".format("".join(dot_split[:-1]), dot_split[-1])
    
    timing = time.time()

    translate(in_file, out=result_file, **kwargs)

    if time_it:
        timing = time.time() - timing
    else:
        timing = 0

    correct_output = True

    with open(result_file, "r") as results:
        with open(golden_file, "r") as golden:
            next_result = results.readline().strip().lower()
            next_golden = golden.readline().strip().lower()

            while next_result != "" and next_golden != "":
                if next_result != next_golden:
                    print(f"'{next_result}' does not match '{next_golden}'")
                    correct_output = False
                    break
                next_result = results.readline()
                next_golden = golden.readline()

            if next_result != next_golden:
                print(f"One was empty:\n'{next_result}' does not match '{next_golden}'")
                correct_output = False
            
#     Path(result_file).unlink()

    if correct_output:
        timing_statement = "" if not time_it else f" in {timing:4.2} seconds"
        print(f"Successfully translated {in_file}{timing_statement}")

    return correct_output
                 


In [17]:
'''
Tests if the alphabet with no spaces is accurate
'''
def test1():
    translate_test("decoder_tests/test1.txt", "decoder_tests/golden_out1.txt", time_it = True)
    
test1()

Successfully translated decoder_tests/test1.txt in 0.016 seconds


In [18]:
'''
Tests if the alphabet with 1 space is accuate
'''
def test2():
    translate_test("decoder_tests/test2.txt", "decoder_tests/golden_out2_3.txt", time_it = True)

test2()

Successfully translated decoder_tests/test2.txt in 0.013 seconds


In [19]:
'''
Tests if multiple spaces are ommitted
'''
def test3():
    translate_test("decoder_tests/test3.txt", "decoder_tests/golden_out2_3.txt", time_it = True)

test3()

Successfully translated decoder_tests/test3.txt in 0.014 seconds


In [20]:
'''
Tests the "the quick brown fox jumps over the lazy dog"
'''
def test4():
    translate_test("decoder_tests/test4.txt", "decoder_tests/golden_out4.txt", time_it = True)

test4()

Successfully translated decoder_tests/test4.txt in 0.014 seconds


In [79]:
def testA_hw():
    translate_test("decoder_tests/untitled.txt", "decoder_tests/untitled1.txt", time_it = True, process_func = process_hw, finalize_func = finalize_hw)
testA_hw()

[1, 0, 1, 1, 1, 0, 0, 0]
in_bits [1 0 1 1 1 0 0 0]
output [0 0 0 0 0 0 0 0]
'        ' does not match 'a'
One was empty:
'        ' does not match 'a'


In [80]:
def test1_hw():
    translate_test("decoder_tests/test1.txt", "decoder_tests/golden_out1.txt", time_it = True, process_func = process_hw, finalize_func = finalize_hw)
    
test1_hw()

[1, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1]
in_bits [1 0 1 1 1 0 0 0 1 1 1 0 1 0 1 0 1 0 0 0 1 1 1 0 1 0 1 1 1 0 1 0 0 0 1 1 1
 0 1 0 1 0 0 0 1 0 0 0 1 0 1 0 1 1 1 0 1 0 0 0 1 1 1 0 1 1 1 0 1 0 0 0 1 0
 1 0 1 0 1 0 0 0 1 0 1 0 0 0 1 0 1 1 1 0 1 1 1 0 1 1]
output [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0, 0, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1,