In [65]:
def simulate_clock(cycles, period=1, clock = 0, prev_clock = 1):
        if clock == 1 and prev_clock == 0:
            posedgeClk = True
        elif clock == 0 and prev_clock == 1:
            posedgeClk = False
        
        return posedgeClk

In [None]:
class InstructionMemory:
    def __init__(self):
        self.instructions = InstructionMemory.extractInstructions("imem.txt")
        
    @staticmethod   
    def extractInstructions(file_path):
        binary_instructions = []
        with open(file_path, "r") as file:
            for line in file:
                parts = line.split(":")
                if len(parts) > 1:
                    binary_value = parts[1].strip().rstrip(";")
                    binary_instructions.append(int(binary_value, 2))
        return binary_instructions
        

    def readInstruction(self, address, rst):
        if address < len(self.instructions) and not rst:
            return self.instructions[address]
        else:
            return 0




In [67]:
class ControlUnit:
    _RType = 0x0 
    _addi = 0x8 
    _ori_ = 0xd
    _xori_ = 0xe
    _andi_ = 0xc
    _slti_ = 0xa
    _lw = 0x23 
    _sw = 0x2b 
    _beq = 0x4
    _j_ = 0x2
    _jal_ = 0x3
    _bne_ = 0x5
    _add_ = 0x20
    _sub_ = 0x22
    _and_ = 0x24
    _or_ = 0x25
    _slt_ = 0x2a
    _xor_ = 0x26
    _nor_ = 0x27
    _sll_ = 0x0
    _srl_ = 0x2 
    _jr_ = 0x8    

    def __init__(self, opCode, funct):
        self.opCode = opCode
        self.funct = funct
        self.RegDst = 0b0
        self.Branch = 0b0
        self.MemReadEn = 0b0
        self.MemtoReg = 0b0
        self.MemWriteEn = 0b0
        self.RegWriteEn = 0b0
        self.ALUSrc = 0b0
        self.ALUOp = 0b0
        self.bne = 0b0
        self.jump = 0b0
        self.jal = 0b0
        self.jr = 0b0

    def execute(self):
        if self.opCode == ControlUnit._RType:
            if self.funct == ControlUnit._add_:
                self.ALUOp = 0b0000
                self.RegDst = 0b1
                self.Branch = 0b0
                self.MemReadEn = 0b0
                self.MemtoReg = 0b0
                self.MemWriteEn = 0b0
                self.RegWriteEn = 0b1
                self.ALUSrc = 0b0
            elif self.funct == ControlUnit._sub_:
                self.ALUOp = 0b0001
                self.RegDst = 0b1
                self.Branch = 0b0
                self.MemReadEn = 0b0
                self.MemtoReg = 0b0
                self.MemWriteEn = 0b0
                self.RegWriteEn = 0b1
                self.ALUSrc = 0b0
            elif self.funct == ControlUnit._and_:
                self.ALUOp = 0b0010
                self.RegDst = 0b1
                self.Branch = 0b0
                self.MemReadEn = 0b0
                self.MemtoReg = 0b0
                self.MemWriteEn = 0b0
                self.RegWriteEn = 0b1
                self.ALUSrc = 0b0
            elif self.funct == ControlUnit._or_:
                self.ALUOp = 0b0011
                self.RegDst = 0b1
                self.Branch = 0b0
                self.MemReadEn = 0b0
                self.MemtoReg = 0b0
                self.MemWriteEn = 0b0
                self.RegWriteEn = 0b1
                self.ALUSrc = 0b0
            elif self.funct == ControlUnit._slt_:
                self.ALUOp = 0b0100
                self.RegDst = 0b1
                self.Branch = 0b0
                self.MemReadEn = 0b0
                self.MemtoReg = 0b0
                self.MemWriteEn = 0b0
                self.RegWriteEn = 0b1
                self.ALUSrc = 0b0
            elif self.funct == ControlUnit._xor_:
                self.ALUOp = 0b0101
                self.RegDst = 0b1
                self.Branch = 0b0
                self.MemReadEn = 0b0
                self.MemtoReg = 0b0
                self.MemWriteEn = 0b0
                self.RegWriteEn = 0b1
                self.ALUSrc = 0b0
            elif self.funct == ControlUnit._nor_:
                self.ALUOp = 0b0110
                self.RegDst = 0b1
                self.Branch = 0b0
                self.MemReadEn = 0b0
                self.MemtoReg = 0b0
                self.MemWriteEn = 0b0
                self.RegWriteEn = 0b1
                self.ALUSrc = 0b0
            elif self.funct == ControlUnit._sll_:
                self.ALUOp = 0b0111
                self.RegDst = 0b1
                self.Branch = 0b0
                self.MemReadEn = 0b0
                self.MemtoReg = 0b0
                self.MemWriteEn = 0b0
                self.RegWriteEn = 0b1
                self.ALUSrc = 0b0
            elif self.funct == ControlUnit._srl_:
                self.ALUOp = 0b1000
                self.RegDst = 0b1
                self.Branch = 0b0
                self.MemReadEn = 0b0
                self.MemtoReg = 0b0
                self.MemWriteEn = 0b0
                self.RegWriteEn = 0b1
                self.ALUSrc = 0b0
            elif self.funct == ControlUnit._jr_:
                self.RegDst = 0b0
                self.Branch = 0b0
                self.MemReadEn = 0b0
                self.MemtoReg = 0b0
                self.ALUOp = 0b0000
                self.MemWriteEn = 0b0
                self.RegWriteEn = 0b0
                self.ALUSrc = 0b0
                self.jump = 0b1
                self.bne = 0b0
                self.jal = 0b0
                self.jr = 0b1
        elif self.opCode == ControlUnit._addi:
            self.RegDst = 0b0
            self.Branch = 0b0
            self.MemReadEn = 0b0
            self.MemtoReg = 0b0
            self.ALUOp = 0b0000
            self.MemWriteEn = 0b0
            self.RegWriteEn = 0b1
            self.ALUSrc = 0b1
        elif self.opCode == ControlUnit._lw:
            self.RegDst = 0b0
            self.Branch = 0b0
            self.MemReadEn = 0b1
            self.MemtoReg = 0b1
            self.ALUOp = 0b0000
            self.MemWriteEn = 0b0
            self.RegWriteEn = 0b1
            self.ALUSrc = 0b1
        elif self.opCode == ControlUnit._sw:
            self.RegDst = 0b0
            self.Branch = 0b0
            self.MemReadEn = 0b0
            self.MemtoReg = 0b0
            self.ALUOp = 0b0000
            self.MemWriteEn = 0b1
            self.RegWriteEn = 0b0
            self.ALUSrc = 0b1
        elif self.opCode == ControlUnit._beq:
            self.RegDst = 0b0
            self.Branch = 0b1
            self.MemReadEn = 0b0
            self.MemtoReg = 0b0
            self.ALUOp = 0b0001
            self.MemWriteEn = 0b0
            self.RegWriteEn = 0b0
            self.ALUSrc = 0b0
            self.jump = 0b0
            self.bne = 0b0
            self.jal = 0b0
            self.jr = 0b0
        elif self.opCode == ControlUnit._ori_:
            self.RegDst = 0b0
            self.Branch = 0b0
            self.MemReadEn = 0b0
            self.MemtoReg = 0b0
            self.ALUOp = 0b0011
            self.MemWriteEn = 0b0
            self.RegWriteEn = 0b1
            self.ALUSrc = 0b1
        elif self.opCode == ControlUnit._xori_:
            self.RegDst = 0b0
            self.Branch = 0b0
            self.MemReadEn = 0b0
            self.MemtoReg = 0b0
            self.ALUOp = 0b0101
            self.MemWriteEn = 0b0
            self.RegWriteEn = 0b1
            self.ALUSrc = 0b1


In [68]:
cu = ControlUnit(opCode=0x0, funct=0x20)
cu.execute()
print(cu.RegWriteEn)

1


In [69]:
class RF:
    def __init__(self):
        self.registers = [0]*32
        self.readData1 = 0
        self.readData2 = 0
    def writeRegister(self, rst, writeRegister, writeData, writeEnable):
        if rst:
            self.registers = [0]*32
        elif writeEnable:
            self.registers[writeRegister] = writeData
    def readRegisters(self, readRegister1, readRegister2):
        self.readData1 = self.registers[readRegister1]
        self.readData2 = self.registers[readRegister2]
    

In [70]:
class SignExtender:
    def __init__(self):
        self.data = 0
    def signExtend(self,data):
        if data & 0x8000:
            return data - (1 << 16)  
        else: return data
        

In [84]:
class Comparator:
    def __init__(self):
        self.In1 =0
        self.In2 =0
        self.bne =0
        self.rst =0
        self.branch =0
        self.hold = 0
        self.branchValid = 0
    def compare(self, In1, In2, bne, rst, branch, hold):
        if rst:
            self.branchValid = 0
        else:
            if(branch and not hold and ((bne and (In1 != In2)) or (not bne and (In1 == In2)))):
                self.branchValid = 1
            else:
                self.branchValid = 0
    

In [None]:
class ALU:

    _AND  = 0b010
    _SUB  = 0b001
    _ADD = 0b000
    _OR   = 0b011
    _SLT  = 0b100 
    _XOR = 0b101
    _NOR = 0b110
    _SLL = 0b111
    _SLR = 0b1000
    
    def __init__(self, operand1, operand2, opSel, shamt):
        self.operand1 = operand1
        self.operand2 = operand2
        self.opSel = opSel
        self.shamt = shamt
        self.result = 0
    def signed(self,data):
        if data & 0x80000:
            return data - (1 << 32)  
        else: return data
        
    def execute(self):
        self.soperand1 = self.signed(self.operand1)
        self.soperand2 = self.signed(self.operand2)
        if(self.opSel == ALU._ADD):
            self.result = self.operand1 + self.operand2
        elif(self.opSel == ALU._SUB):
            self.result = self.operand1 - self.operand2
        elif(self.opSel == ALU._AND):
            self.result = self.operand1 & self.operand2
        elif(self.opSel == ALU._OR):
            self.result = self.operand1 | self.operand2
        elif(self.opSel == ALU._SLT):
            self.result = 1 if self.soperand1 < self.soperand2 else 0
        elif(self.opSel == ALU._XOR):
            self.result = self.operand1 ^ self.operand2
        elif(self.opSel == ALU._NOR):
            self.result = ~(self.operand1 | self.operand2)
        elif(self.opSel == ALU._SLL):
            self.result = self.soperand1 << self.shamt
        elif(self.opSel == ALU._SLR):
            self.result = self.soperand1 >>self.shamt 
        
        
        
            
        
            
        

    
        

In [None]:
def test_ALU():
    # Define test cases for operand1 and operand2
    test_cases = [
        {"operand1": 5, "operand2": 3, "description": "Positive operands"},
        {"operand1": -5, "operand2": -3, "description": "Negative operands"},
        {"operand1": -5, "operand2": 3, "description": "One negative, one positive"},
    ]

    # Define operations and expected outputs (description only for reference)
    operations = [
        (ALU._ADD, "Addition"),
        (ALU._SUB, "Subtraction"),
        (ALU._AND, "Bitwise AND"),
        (ALU._OR, "Bitwise OR"),
        (ALU._SLT, "Set Less Than"),
        (ALU._XOR, "Bitwise XOR"),
        (ALU._NOR, "Bitwise NOR"),
        (ALU._SLL, "Shift Left Logical"),
        (ALU._SLR, "Shift Right Logical"),
    ]

    # Test and output results
    for case in test_cases:
        print(f"--- Test: {case['description']} ---")
        for op, op_desc in operations:
            alu = ALU(operand1=case["operand1"], operand2=case["operand2"], opSel=op, shamt=2)
            alu.execute()
            print(f"{op_desc} ({alu.operand1}, {alu.operand2}): Result = {alu.result}")
        print()

# Run the tests
test_ALU()


--- Test: Positive operands ---
Addition (5, 3): Result = 8
Subtraction (5, 3): Result = 2
Bitwise AND (5, 3): Result = 1
Bitwise OR (5, 3): Result = 7
Set Less Than (5, 3): Result = 0
Bitwise XOR (5, 3): Result = 6
Bitwise NOR (5, 3): Result = -8
Shift Left Logical (5, 3): Result = 20
Shift Right Logical (5, 3): Result = 1

--- Test: Negative operands ---
Addition (-5, -3): Result = -8
Subtraction (-5, -3): Result = -2
Bitwise AND (-5, -3): Result = -7
Bitwise OR (-5, -3): Result = -1
Set Less Than (-5, -3): Result = 1
Bitwise XOR (-5, -3): Result = 6
Bitwise NOR (-5, -3): Result = 0
Shift Left Logical (-5, -3): Result = -17179869204
Shift Right Logical (-5, -3): Result = -1073741826

--- Test: One negative, one positive ---
Addition (-5, 3): Result = -2
Subtraction (-5, 3): Result = -8
Bitwise AND (-5, 3): Result = 3
Bitwise OR (-5, 3): Result = -5
Set Less Than (-5, 3): Result = 1
Bitwise XOR (-5, 3): Result = -8
Bitwise NOR (-5, 3): Result = 4
Shift Left Logical (-5, 3): Result = -

In [83]:
class Execute:
    def __init__(self):
        # Inputs
        self.ALUSrc = 0
        self.forwardA = 0
        self.forwardB = 0
        self.ALUOp = 0
        self.shamt_EX = 0
        self.readData1 = 0
        self.readData2 = 0
        self.aluRes_MEM = 0
        self.aluRes_WB = 0
        self.extImm = 0

        # Outputs
        self.aluRes_EX = 0
        self.forwardBRes = 0

    def execute(self):
        """Perform ALU operations based on current inputs."""
        # Forwarding logic for operand A
        if self.forwardA == 0:
            self.forwardARes = self.readData1
        elif self.forwardA == 1:
            self.forwardARes = self.aluRes_WB
        elif self.forwardA == 2:
            self.forwardARes = self.aluRes_MEM
        else:
            self.forwardARes = 0

        # Forwarding logic for operand B
        if self.forwardB == 0:
            self.forwardBRes = self.readData2
        elif self.forwardB == 1:
            self.forwardBRes = self.aluRes_WB
        elif self.forwardB == 2:
            self.forwardBRes = self.aluRes_MEM
        else:
            self.forwardBRes = 0

        # Select ALU operand 2 based on ALUSrc signal
        self.ALUin2 = self.extImm if self.ALUSrc else self.forwardBRes

        # Perform the ALU operation
        alu = ALU(
            operand1=self.forwardARes,
            operand2=self.ALUin2,
            opSel=self.ALUOp,
            shamt=self.shamt_EX
        )
        alu.execute()
        self.aluRes_EX = alu.result

    def __repr__(self):
        """Returns a string representation of the Execute stage."""
        return (
            f"Execute Stage:\n"
            f"ALU Result: {self.aluRes_EX}\n"
            f"ForwardARes: {self.forwardARes}, ForwardBRes: {self.forwardBRes}\n"
            f"ALUSrc: {self.ALUSrc}, ALUOp: {self.ALUOp}\n"
            f"Inputs: ReadData1={self.readData1}, ReadData2={self.readData2}, ExtImm={self.extImm}\n"
        )


In [None]:

if __name__ == "__main__":
    test_cases = [
        # Test Case 1: Forwarding from readData1, ALUSrc = 0 (use forwardBRes)
        {
            "ALUSrc": 0, "forwardA": 0, "forwardB": 0, "ALUOp": 0, "shamt_EX": 0,
            "readData1": 10, "readData2": 20, "aluRes_MEM": 0, "aluRes_WB": 0, "extImm": 5,
            "expected_result": 30  # ADD: 10 + 20
        },
        # Test Case 2: Forwarding from aluRes_WB, ALUSrc = 1 (use extImm)
        {
            "ALUSrc": 1, "forwardA": 1, "forwardB": 0, "ALUOp": 1, "shamt_EX": 0,
            "readData1": 10, "readData2": 20, "aluRes_MEM": 0, "aluRes_WB": 15, "extImm": 7,
            "expected_result": 8  # SUB: 15 - 7
        },
        # Test Case 3: Forwarding from aluRes_MEM, ALUSrc = 0 (use forwardBRes)
        {
            "ALUSrc": 0, "forwardA": 2, "forwardB": 2, "ALUOp": 2, "shamt_EX": 0,
            "readData1": 10, "readData2": 20, "aluRes_MEM": 12, "aluRes_WB": 0, "extImm": 5,
            "expected_result": 12  # AND: 12 & 12
        },
        # Test Case 4: ALU operation with immediate value and SLL operation
        {
            "ALUSrc": 1, "forwardA": 0, "forwardB": 0, "ALUOp": 7, "shamt_EX": 2,
            "readData1": 3, "readData2": 0, "aluRes_MEM": 0, "aluRes_WB": 0, "extImm": 3,
            "expected_result": 12  # SLL: 3 << 2
        },
        # Test Case 5: No forwarding and SRL operation
        {
            "ALUSrc": 1, "forwardA": 0, "forwardB": 0, "ALUOp": 8, "shamt_EX": 1,
            "readData1": 4, "readData2": 0, "aluRes_MEM": 0, "aluRes_WB": 0, "extImm": 4,
            "expected_result": 2  # SRL: 4 >> 1
        },
    ]

    for i, test in enumerate(test_cases, 1):
        execute_unit = Execute(
            ALUSrc=test["ALUSrc"],
            forwardA=test["forwardA"],
            forwardB=test["forwardB"],
            ALUOp=test["ALUOp"],
            shamt_EX=test["shamt_EX"],
            readData1=test["readData1"],
            readData2=test["readData2"],
            aluRes_MEM=test["aluRes_MEM"],
            aluRes_WB=test["aluRes_WB"],
            extImm=test["extImm"]
        )

        execute_unit.execute()
        

        assert execute_unit.aluRes_EX == test["expected_result"], (
            f"Test case {i} failed: expected {test['expected_result']}, got {execute_unit.aluRes_EX}"
        )

        print(f"Test case {i} passed!")


Test case 1 passed!
Test case 2 passed!
Test case 3 passed!
Test case 4 passed!
Test case 5 passed!


In [None]:
class MEMORY:
    def __init__(self):
        # Initialize values
        self.aluRes_MEM = 0
        self.WriteDmem = 0
        self.MemReadEn_MEM = 0
        self.MemWriteEn_MEM = 0
        self.memoryReadData = 0
        self.memRows = [0]*256  # 256 memory locations

    def initializeFromFile(self, file_path):
        """Initialize memory from a file."""
        with open(file_path, "r") as file:
            for index, line in enumerate(file):
                if index < len(self.memRows):
                    binary_value = line.strip()
                    self.memRows[index] = int(binary_value, 2)
                else:
                    break

    def update(self):
        """
        Update the values for the current cycle.
        Also handles memory read or write based on control signals.
        """

        # Memory Read Operation
        if self.MemReadEn_MEM:
            self.memoryReadData = self.memRows[self.aluRes_MEM]

        # Memory Write Operation
        if self.MemWriteEn_MEM:
            self.memRows[self.aluRes_MEM] = self.WriteDmem

    def __repr__(self):
        """Returns a string representation of the MEMORY stage."""
        return (
            f"MEMORY Stage:\n"
            f"Memory Read Data: {self.memoryReadData}\n"
            f"Memory State (Partial): {self.memRows[:10]}...\n"  # Displaying only first 10 rows for brevity
            f"ALU Result: {self.aluRes_MEM}, Write Data: {self.WriteDmem}\n"
            f"Memory Read Enable: {self.MemReadEn_MEM}, Memory Write Enable: {self.MemWriteEn_MEM}\n"
        )


In [None]:
mem = MEMORY(aluRes_MEM=1, WriteDmem=0, MemReadEn_MEM=0, MemWriteEn_MEM=0)
mem.initializeFromFile("dmem.txt")
for i in range(10):
    print(bin(mem.memRows[i]))  #FYI python cannot read stuff in binary, 
                                # but it does make them integer, 
                                # hence, don't think too much about if 
                                # something is 32 bits or not

0b1
0b10
0b11
0b100
0b101
0b110
0b111
0b0
0b0
0b0


In [None]:
class PipelineRegister:
    def __init__(self, **kwargs):
        self.data = kwargs

    def update(self, hold, rst,  **kwargs):
        if not hold:
            if not rst:
                for key, value in kwargs.items():   # to add values every end of cycle(I hope) 
                    self.data[key] = value  
            else :
                for key, value in kwargs.items():   # to add values every end of cycle(I hope) 
                    self.data[key] = 0
                
                                            
                

    def read(self, key):# to output the signals in every cycle for debugging the design(I hope)
        return self.data.get(key, None)

    def clear(self): # for flushing when there is a hazrd, hold will not be necessary since we just don't update when there is hold
        self.data = {}

    

In [None]:
class HazardDetectionUnit:
    def __init__(self, rs_ID, rt_ID, dest_MEM, dest_EXE, dest_WB,
                 mem_read_EX, branch, branch_valid, write_back_MEM,
                 write_back_EX, write_back_WB, mem_to_reg_MEM,
                 reg_dest_ID, jump, jr, jal):
        # Inputs
        self.rs_ID = rs_ID
        self.rt_ID = rt_ID
        self.dest_MEM = dest_MEM
        self.dest_EXE = dest_EXE
        self.dest_WB = dest_WB
        self.mem_read_EX = mem_read_EX
        self.branch = branch
        self.branch_valid = branch_valid
        self.write_back_MEM = write_back_MEM
        self.write_back_EX = write_back_EX
        self.write_back_WB = write_back_WB
        self.mem_to_reg_MEM = mem_to_reg_MEM
        self.reg_dest_ID = reg_dest_ID
        self.jump = jump
        self.jr = jr
        self.jal = jal

        # Outputs
        self.ld_hazard = 0
        self.branch_hazard = 0
        self.branch_hold = 0
        self.hold = 0
        self.forwardA_Branch = 0
        self.forwardB_Branch = 0

    def detect(self):
        # Load Hazard Detection
        load_hazard = self.mem_read_EX and (
            (self.rs_ID == self.dest_EXE) or
            (self.reg_dest_ID and self.rt_ID == self.dest_EXE)
        )
        self.ld_hazard = bool(load_hazard)

        # Branch Hazard Detection
        branch_hazard = (
            (self.branch and self.branch_valid) or self.jump or self.jal or self.jr
        ) and not self.hold
        self.branch_hazard = bool(branch_hazard)

        # Branch Stall Detection
        branch_stall = (
            self.branch and (
                (self.write_back_EX and self.dest_EXE != 0 and (
                    self.dest_EXE == self.rs_ID or self.dest_EXE == self.rt_ID
                )) or
                (self.mem_to_reg_MEM and (
                    self.dest_MEM == self.rs_ID or self.dest_MEM == self.rt_ID
                ))
            )
        )
        self.branch_hold = bool(branch_stall)

        # Overall Hold Signal
        self.hold = self.ld_hazard or self.branch_hold
        # Branch Forwarding
        # ForwardA Branch
        if(self.branch and
            (
                self.write_back_MEM and 
                    (
                        self.dest_MEM == self.rs_ID and self.dest_MEM != 0
                )
            )
        ):
            self.forwardA_Branch = 1
        elif (self.branch and 
              (
                  self.write_back_WB  and 
                    (
                        self.dest_WB == self.rs_ID and self.dest_WB != 0
                )
            )
        ):
            self.forwardA_Branch = 2
        else: 
            self.forwardA_Branch = 0
        #ForwardB Branch
        if(self.branch and
            (
                self.write_back_MEM and 
                    (
                        self.dest_MEM == self.rt_ID and self.dest_MEM != 0
                )
            )
        ):
            self.forwardB_Branch = 1
        elif (self.branch and 
              (
                  self.write_back_WB  and 
                    (
                        self.dest_WB == self.rt_ID and self.dest_WB != 0
                )
            )
        ):
            self.forwardB_Branch = 2
        else:
            self.forwardB_Branch = 0
        


In [None]:
def run_tests():
    test_cases = [
        {
            "name": "Test Load Hazard (ld_hazard)",
            "input": {
                "rs_ID": 1, "rt_ID": 2, "dest_MEM": 3, "dest_EXE": 1, "dest_WB": 4,
                "mem_read_EX": True, "branch": False, "branch_valid": False, "write_back_MEM": False,
                "write_back_EX": False, "write_back_WB": False, "mem_to_reg_MEM": False, 
                "reg_dest_ID": False, "jump": False, "jr": False, "jal": False
            },
            "expected": {"ld_hazard": True, "branch_hazard": False, "branch_hold": False, "hold": True, "forwardA_Branch": 0, "forwardB_Branch": 0}
        },
        {
            "name": "Test Branch Hazard (branch_hazard)",
            "input": {
                "rs_ID": 1, "rt_ID": 2, "dest_MEM": 3, "dest_EXE": 4, "dest_WB": 5,
                "mem_read_EX": False, "branch": True, "branch_valid": True, "write_back_MEM": False,
                "write_back_EX": False, "write_back_WB": False, "mem_to_reg_MEM": False, 
                "reg_dest_ID": False, "jump": False, "jr": False, "jal": False
            },
            "expected": {"ld_hazard": False, "branch_hazard": True, "branch_hold": False, "hold": False, "forwardA_Branch": 0, "forwardB_Branch": 0}
        },
        {
            "name": "Test Branch Stall (branch_hold)",
            "input": {
                "rs_ID": 1, "rt_ID": 2, "dest_MEM": 3, "dest_EXE": 1, "dest_WB": 4,
                "mem_read_EX": False, "branch": True, "branch_valid": True, "write_back_MEM": False,
                "write_back_EX": True, "write_back_WB": False, "mem_to_reg_MEM": False, 
                "reg_dest_ID": False, "jump": False, "jr": False, "jal": False
            },
            "expected": {"ld_hazard": False, "branch_hazard": True, "branch_hold": True, "hold":True, "forwardA_Branch": 0, "forwardB_Branch": 0}
        },
        {
            "name": "Test Branch Forwarding (forwardA_Branch)",
            "input": {
                "rs_ID": 1, "rt_ID": 2, "dest_MEM": 1, "dest_EXE": 4, "dest_WB": 5,
                "mem_read_EX": False, "branch": True, "branch_valid": True, "write_back_MEM": True,
                "write_back_EX": False, "write_back_WB": False, "mem_to_reg_MEM": False, 
                "reg_dest_ID": False, "jump": False, "jr": False, "jal": False
            },
            "expected": {"ld_hazard": False, "branch_hazard": True, "branch_hold": False, "hold": False, "forwardA_Branch": 1, "forwardB_Branch": 0}
        },
        {
            "name": "Test Branch Forwarding (forwardB_Branch)",
            "input": {
                "rs_ID": 1, "rt_ID": 2, "dest_MEM": 3, "dest_EXE": 1, "dest_WB": 2,
                "mem_read_EX": False, "branch": True, "branch_valid": True, "write_back_MEM": False,
                "write_back_EX": False, "write_back_WB": True, "mem_to_reg_MEM": False, 
                "reg_dest_ID": False, "jump": False, "jr": False, "jal": False
            },
            "expected": {"ld_hazard": False, "branch_hazard": True, "branch_hold": False, "hold": False, "forwardA_Branch": 0, "forwardB_Branch": 2}
        },
        {
            "name": "Test Branch Forwarding (forwardB_Branch 1)",
            "input": {
                "rs_ID": 1, "rt_ID": 2, "dest_MEM": 2, "dest_EXE": 1, "dest_WB": 5,
                "mem_read_EX": False, "branch": True, "branch_valid": True, "write_back_MEM": True,
                "write_back_EX": False, "write_back_WB": False, "mem_to_reg_MEM": False, 
                "reg_dest_ID": False, "jump": False, "jr": False, "jal": False
            },
            "expected": {"ld_hazard": False, "branch_hazard": True, "branch_hold": False, "hold": False, "forwardA_Branch": 0, "forwardB_Branch": 1}
        },
        {
            "name": "Test No Hazard Case",
            "input": {
                "rs_ID": 1, "rt_ID": 2, "dest_MEM": 3, "dest_EXE": 4, "dest_WB": 5,
                "mem_read_EX": False, "branch": False, "branch_valid": False, "write_back_MEM": False,
                "write_back_EX": False, "write_back_WB": False, "mem_to_reg_MEM": False, 
                "reg_dest_ID": False, "jump": False, "jr": False, "jal": False
            },
            "expected": {"ld_hazard": False, "branch_hazard": False, "branch_hold": False, "hold": False, "forwardA_Branch": 0, "forwardB_Branch": 0}
        }
    ]
    
    for case in test_cases:
        print(f"Running Test: {case['name']}")
        hdu = HazardDetectionUnit(**case['input'])
        hdu.detect()

        result = {
            "ld_hazard": hdu.ld_hazard,
            "branch_hazard": hdu.branch_hazard,
            "branch_hold": hdu.branch_hold,
            "hold": hdu.hold,
            "forwardA_Branch": hdu.forwardA_Branch,
            "forwardB_Branch": hdu.forwardB_Branch
        }

        print(f"Expected: {case['expected']}")
        print(f"Result: {result}")
        if result == case['expected']:
            print(f"Test passed for {case['name']}\n")
        else:
            print(f"Test failed for {case['name']}\n")

run_tests()


Running Test: Test Load Hazard (ld_hazard)
Expected: {'ld_hazard': True, 'branch_hazard': False, 'branch_hold': False, 'hold': True, 'forwardA_Branch': 0, 'forwardB_Branch': 0}
Result: {'ld_hazard': True, 'branch_hazard': False, 'branch_hold': False, 'hold': True, 'forwardA_Branch': 0, 'forwardB_Branch': 0}
Test passed for Test Load Hazard (ld_hazard)

Running Test: Test Branch Hazard (branch_hazard)
Expected: {'ld_hazard': False, 'branch_hazard': True, 'branch_hold': False, 'hold': False, 'forwardA_Branch': 0, 'forwardB_Branch': 0}
Result: {'ld_hazard': False, 'branch_hazard': True, 'branch_hold': False, 'hold': False, 'forwardA_Branch': 0, 'forwardB_Branch': 0}
Test passed for Test Branch Hazard (branch_hazard)

Running Test: Test Branch Stall (branch_hold)
Expected: {'ld_hazard': False, 'branch_hazard': True, 'branch_hold': True, 'hold': True, 'forwardA_Branch': 0, 'forwardB_Branch': 0}
Result: {'ld_hazard': False, 'branch_hazard': True, 'branch_hold': True, 'hold': True, 'forwardA_

In [None]:
class ForwardingUnit:
    def __init__(self, rs_ex, rt_ex, dest_mem,
                 dest_wb, rt_mem, rst, regwrite_mem, reg_write_wb, MemWriteEn_MEM ):
        self.rs_ex = rs_ex
        self.rt_ex = rt_ex
        self.dest_mem = dest_mem
        self.dest_wb = dest_wb
        self.rt_mem = rt_mem
        self.rst = rst
        self.regwrite_mem = regwrite_mem
        self.reg_write_wb = reg_write_wb
        self.MemWriteEn_MEM = MemWriteEn_MEM
        self.forwardA = 0
        self.forwardB = 0
        self.memFw = 0
        
    def forward(self):
        if (self.dest_wb == self.rt_mem and self.MemWriteEn_MEM):
            self.memFw = 1
        else: 
            self.memFw = 0
        if self.rst:
            self.forwardA = 0
            self.forwardB = 0
        else:
            #forwardA logic
            if(
                (
                    self.regwrite_mem and
                        (self.dest_mem != 0) and (self.dest_mem == self.rs_ex)
                )
            ):
                self.forwardA = 2
            elif(
                self.reg_write_wb and 
                    (self.dest_wb != 0) and not
                        ((self.regwrite_mem and(self.dest_mem != 0) 
                          and(self.dest_mem == self.rs_ex))) 
                        and (self.dest_wb== self.rs_ex)
                ):
                self.forwardA = 1
            else: 
                self.forwardA = 0
            #forwardB logic
            if(
                (
                    self.regwrite_mem and
                        (self.dest_mem != 0) and (self.dest_mem == self.rt_ex)
                )
            ):
                self.forwardB = 2
            elif(
                self.reg_write_wb and 
                    (self.dest_wb != 0) and not
                        ((self.regwrite_mem and(self.dest_mem != 0) 
                          and(self.dest_mem == self.rt_ex))) 
                        and (self.dest_wb== self.rt_ex)
                ):
                self.forwardB = 1
            else: 
                self.forwardB = 0
            
            

In [None]:
def run_forwarding_unit_tests():
    test_cases = [
        {
            "name": "Test ForwardA from WB",
            "input": {
                "rs_ex": 1, "rt_ex": 2, "dest_mem": 3, "dest_wb": 1, "rt_mem": 4,
                "rst": False, "regwrite_mem": False, "reg_write_wb": True, "MemWriteEn_MEM": False
            },
            "expected": {"forwardA": 1, "forwardB": 0, "memFw": 0}
        },
        {
            "name": "Test ForwardB from MEM",
            "input": {
                "rs_ex": 1, "rt_ex": 2, "dest_mem": 2, "dest_wb": 5, "rt_mem": 5,
                "rst": False, "regwrite_mem": True, "reg_write_wb": False, "MemWriteEn_MEM": True
            },
            "expected": {"forwardA": 0, "forwardB": 2, "memFw": 1}
        },
        {
            "name": "Test No Forwarding",
            "input": {
                "rs_ex": 1, "rt_ex": 2, "dest_mem": 3, "dest_wb": 4, "rt_mem": 5,
                "rst": False, "regwrite_mem": False, "reg_write_wb": False, "MemWriteEn_MEM": False
            },
            "expected": {"forwardA": 0, "forwardB": 0, "memFw": 0}
        },
        {
            "name": "Test Reset Case (rst=True)",
            "input": {
                "rs_ex": 1, "rt_ex": 2, "dest_mem": 3, "dest_wb": 4, "rt_mem": 5,
                "rst": True, "regwrite_mem": True, "reg_write_wb": True, "MemWriteEn_MEM": True
            },
            "expected": {"forwardA": 0, "forwardB": 0, "memFw": 0}
        },
        {
            "name": "Test ForwardA from MEM",
            "input": {
                "rs_ex": 1, "rt_ex": 2, "dest_mem": 1, "dest_wb": 4, "rt_mem": 3,
                "rst": False, "regwrite_mem": True, "reg_write_wb": False, "MemWriteEn_MEM": False
            },
            "expected": {"forwardA": 2, "forwardB": 0, "memFw": 0}
        },
        {
            "name": "Test ForwardB from WB",
            "input": {
                "rs_ex": 1, "rt_ex": 2, "dest_mem": 3, "dest_wb": 2, "rt_mem": 4,
                "rst": False, "regwrite_mem": False, "reg_write_wb": True, "MemWriteEn_MEM": False
            },
            "expected": {"forwardA": 0, "forwardB": 1, "memFw": 0}
        }
    ]
    
    for case in test_cases:
        print(f"Running Test: {case['name']}")
        fu = ForwardingUnit(**case['input'])
        fu.forward()

        result = {
            "forwardA": fu.forwardA,
            "forwardB": fu.forwardB,
            "memFw": fu.memFw
        }

        print(f"Expected: {case['expected']}")
        print(f"Result: {result}")
        if result == case['expected']:
            print(f"Test passed for {case['name']}\n")
        else:
            print(f"Test failed for {case['name']}\n")

run_forwarding_unit_tests()


Running Test: Test ForwardA from WB
Expected: {'forwardA': 1, 'forwardB': 0, 'memFw': 0}
Result: {'forwardA': 1, 'forwardB': 0, 'memFw': 0}
Test passed for Test ForwardA from WB

Running Test: Test ForwardB from MEM
Expected: {'forwardA': 0, 'forwardB': 2, 'memFw': 1}
Result: {'forwardA': 0, 'forwardB': 2, 'memFw': 1}
Test passed for Test ForwardB from MEM

Running Test: Test No Forwarding
Expected: {'forwardA': 0, 'forwardB': 0, 'memFw': 0}
Result: {'forwardA': 0, 'forwardB': 0, 'memFw': 0}
Test passed for Test No Forwarding

Running Test: Test Reset Case (rst=True)
Expected: {'forwardA': 0, 'forwardB': 0, 'memFw': 0}
Result: {'forwardA': 0, 'forwardB': 0, 'memFw': 0}
Test passed for Test Reset Case (rst=True)

Running Test: Test ForwardA from MEM
Expected: {'forwardA': 2, 'forwardB': 0, 'memFw': 0}
Result: {'forwardA': 2, 'forwardB': 0, 'memFw': 0}
Test passed for Test ForwardA from MEM

Running Test: Test ForwardB from WB
Expected: {'forwardA': 0, 'forwardB': 1, 'memFw': 0}
Result: 

In [None]:
class WB:
    def __init__(self):
        self.aluRes_WB = 0
        self.memoryReadData_WB = 0
        self.MemtoReg_WB =0
        self.PCPlus1_WB = 0
        self.jal_WB = 0
        self.writeData_WB = 0

In [None]:
im = InstructionMemory()
rf = RF()
se = SignExtender()
cmp = Comparator()
pipeline_registers = {
    'IF_ID': {'instruction': None, 'PC': None, 'PCPlus1':None},
    'ID_EX': {'opcode': None, 'rs': None, 'rt': None, 'destReg': None, 'imm': None,
              'shamt':None, 'MemReadEn':None, 'MemtoReg':None, 'RegWrite':None, 'ALUOp' : None, 
              'ALUsrc' : None, 'jal':None, 'readData1':None, 'readData2':None, 'extImm':None, 'PCPlus1':None},  # Control and operands to Execute
    'EX_MEM': {'alu_result': None, 'forwardBres': None, 
               'mem_read': None, 'memtoreg':None,
               'mem_write': None ,'RegWrite':None, 'jal':None, 'destReg':None, 'PCPlus1': None, 'rt':None},  # Pass ALU result and memory signals
    'MEM_WB': {'alu_result': None, 'destReg': None, 'MemReadData' : None, 
               'reg_write': None, 'PCPlus1' : None, 'jal': None,
               'Memtoreg':None, 'RegWrite':None }  # Pass result and write-back signals
}










cycles = 25
cycle =0 
pc = 0

while(cycle<25):
    instruction = im.readInstruction(pc)
    if not hdu.hold:
        pipeline_registers['IF_ID']['instruction'] = instruction
        pipeline_registers['IF_ID']['PC'] = pc
        pipeline_registers['IF_ID']['PCPlus1'] = pc+1
        
#=========================Fetch Ends Here======================
    instruction1 = pipeline_registers['IF_ID']['instruction']
    opCode = instruction1 >> 26 & 0b111111
    rs = instruction1>> 21 & 0b11111
    rt = instruction1 >> 16 & 0b11111
    rd = instruction1 >> 11 & 0b11111
    imm = instruction1 & 0xFFFF
    funct = instruction1 & 0b111111
    shamt = instruction1 >> 6 & 0b11111
    jaddress = instruction1 & 0b1111111111
    regdst = cu.RegDst
    DestReg = rt if not regdst else rd
    regAddress = pipeline_registers['MEM_WB']['destReg'] if not pipeline_registers['MEM_WB']['jal'] else 31
    cu = ControlUnit(opCode=opCode, funct=funct) 
    WBMuxOutput = pipeline_registers['MEM_WB']['alu_result'] if not pipeline_registers['MEM_WB']['Memtoreg'] else pipeline_registers['MEM_WB']['MemReadData']
    writeData_WB = WBMuxOutput if not pipeline_registers['MEM_WB']['jal'] else pipeline_registers['MEM_WB']['PCPlus1']
#========hdu starts=============
    hdu = HazardDetectionUnit(pipeline_registers['ID_EX']['rs'],
                          pipeline_registers['ID_EX']['rt'],
                          pipeline_registers['EX_MEM']['destReg'],
                          pipeline_registers['ID_EX']['destReg'],
                          pipeline_registers['MEM_WB']['destReg'],
                          pipeline_registers['ID_EX']['MemReadEn'],
                          cu.Branch, cmp.branchValid, pipeline_registers['EX_MEM']['RegWrite'],
                          pipeline_registers['ID_EX']['RegWrite'],
                          pipeline_registers['MEM_WB']['RegWrite'],
                          pipeline_registers['EX_MEM']['memtoreg'],
                          regdst, cu.jump, cu.jr, cu.jal
                          )
# Flush logic based on hazard detection or branch/jump

if hdu.branch_hazard:
    pipeline_registers['IF_ID'] = {'instruction': None, 'PC': None, 'PCPlus1': None}
if hdu.ld_hazard:
    pipeline_registers['ID_EX'] = {'opcode': None, 'rs': None, 'rt': None, 'destReg': None,
                                   'imm': None, 'shamt': None, 'MemReadEn': None, 'MemtoReg': None,
                                   'RegWrite': None, 'ALUOp': None, 'ALUsrc': None, 'jal': None,
                                   'readData1': None, 'readData2': None, 'extImm': None, 'PCPlus1': None}
    
#=========hdu ends==============
    rf.readRegisters(readRegister1=rs, readRegister2=rt)
    rf.writeRegister(0, regAddress, writeData_WB, pipeline_registers['MEM_WB']['reg_write'])
    extImm = se.signExtend(imm)
    if(hdu.forwardA_Branch == 0):
        fwdA = rf.readData1
    elif(hdu.forwardA_Branch == 1):
        fwdA = pipeline_registers['EX_MEM']['alu_result']
    elif(hdu.forwardA_Branch == 2):
        fwdA = writeData_WB
    else: fwdA = 0
    if(hdu.forwardB_Branch == 0):
        fwdB = rf.readData2
    elif(hdu.forwardB_Branch == 1):
        fwdB = pipeline_registers['EX_MEM']['alu_result']
    elif(hdu.forwardB_Branch == 2):
        fwdB = writeData_WB
    else: fwdB = 0
    cmp.compare(fwdA, fwdB, cu.bne, 0, cu.Branch, hdu.hold)
    adderResult = pipeline_registers['IF_ID']['PCPlus1'] + imm
    
    pipeline_registers['ID_EX']['opcode'] = opCode
    pipeline_registers['ID_EX']['rs'] = rs
    pipeline_registers['ID_EX']['rt'] = rt
    pipeline_registers['ID_EX']['destReg'] = DestReg
    pipeline_registers['ID_EX']['imm'] = imm
    pipeline_registers['ID_EX']['shamt'] = shamt
    pipeline_registers['ID_EX']['MemReadEn'] = cu.MemReadEn
    pipeline_registers['ID_EX']['MemtoReg'] = cu.MemtoReg
    pipeline_registers['ID_EX']['RegWrite'] = cu.RegWriteEn
    pipeline_registers['ID_EX']['ALUsrc'] = cu.ALUSrc
    pipeline_registers['ID_EX']['jal'] = cu.jal
    pipeline_registers['ID_EX']['ALUOp'] = cu.ALUOp
    pipeline_registers['ID_EX']['readData1'] = rf.readData1
    pipeline_registers['ID_EX']['readData2'] = rf.readData2
    pipeline_registers['ID_EX']['extImm'] = extImm
    pipeline_registers['ID_EX']['PCPlus1'] = pipeline_registers['IF_ID']['PCPlus1']
    
    
    
    
    
    
    
#================Decode ends here===================================================
    # ALU input selection
frwd = ForwardingUnit(pipeline_registers['ID_EX']['rs'],
                      pipeline_registers['ID_EX']['rt'],
                      pipeline_registers['EX_MEM']['destReg'],
                      pipeline_registers['MEM_WB']['destReg'],
                      pipeline_registers['EX_MEM']['rt'],
                      0, pipeline_registers['EX_MEM']['RegWrite'],
                      pipeline_registers['MEM_WB']['RegWrite'],
                      pipeline_registers['EX_MEM']['mem_write'])
frwd.forward()

# Select ALU inputs based on forwarding decisions
if frwd.forwardA == 0:
    alu_input1 = pipeline_registers['ID_EX']['readData1']
elif frwd.forwardA == 1:
    alu_input1 = writeData_WB  # Forwarding from WB stage
elif frwd.forwardA == 2:
    alu_input1 = pipeline_registers['EX_MEM']['alu_result']  # Forwarding from MEM stage

if frwd.forwardB == 0:
    forwardBres = pipeline_registers['ID_EX']['extImm'] if pipeline_registers['ID_EX']['ALUsrc'] else pipeline_registers['ID_EX']['readData2']
elif frwd.forwardB == 1:
    forwardBres = writeData_WB  # Forwarding from WB stage
elif frwd.forwardB == 2:
    forwardBres = pipeline_registers['EX_MEM']['alu_result']  # Forwarding from MEM stage

alu_input2 = pipeline_registers['ID_EX']['extImm'] if pipeline_registers['ID_EX']['ALUsrc'] else forwardBres

# ALU operation (Add/Sub/And/Or based on ALUOp control signal)
alu = ALU(alu_input1, alu_input2, pipeline_registers['ID_EX']['ALUOp'], pipeline_registers['ID_EX']['shamt'])
alu_result = alu.result  # Assuming ALU returns a result property

# Passing data to the next stage (EX_MEM)
pipeline_registers['EX_MEM']['alu_result'] = alu_result
pipeline_registers['EX_MEM']['forwardBres'] = forwardBres
pipeline_registers['EX_MEM']['mem_read'] = pipeline_registers['ID_EX']['MemReadEn']
pipeline_registers['EX_MEM']['mem_write'] = pipeline_registers['ID_EX']['MemtoReg']
pipeline_registers['EX_MEM']['RegWrite'] = pipeline_registers['ID_EX']['RegWrite']
pipeline_registers['EX_MEM']['destReg'] = pipeline_registers['ID_EX']['destReg']
pipeline_registers['EX_MEM']['jal'] = pipeline_registers['ID_EX']['jal']
pipeline_registers['EX_MEM']['PCPlus1'] = pipeline_registers['ID_EX']['PCPlus1']

    
    
    
    
    
    
    
    

    