In [1004]:
# !pip install plotly ipywidgets nbformat kaleido

In [1005]:
# Teacher's implementation
from collections import defaultdict

class JulieVirtualMemoryManager(BaseVirtualMemoryManager):
    def handle_page_fault(self, process, virtual_page_number):
        if not self.free_pages:
            evicted_page = self.evict_page(process, virtual_page_number)
            self.free_pages.add(evicted_page)
        
        physical_page = self.free_pages.pop()
        self.page_queue.append((process.pid, virtual_page_number, physical_page))
        
        page_table_entry = process.page_table.get_entry(virtual_page_number)
        page_table_entry.physical_page = physical_page
        page_table_entry.present = True
        page_table_entry.dirty = False
        
        # Load page from secondary storage
        page_data = self.secondary_storage.read_page((process.pid, virtual_page_number))
        self.physical_memory.write_page(physical_page, page_data)
        
        return physical_page

    def evict_page(self, process, virtual_page_number):
        evicted_pid, evicted_virtual_page, evicted_physical_page = self.page_queue.pop(0)
        evicted_process = process.os.processes[evicted_pid]
        evicted_entry = evicted_process.page_table.get_entry(evicted_virtual_page)
        
        if evicted_entry.dirty:
            # Write dirty page to secondary storage
            page_data = self.physical_memory.read_page(evicted_physical_page)
            self.secondary_storage.write_page((evicted_pid, evicted_virtual_page), page_data)
        
        evicted_entry.present = False
        evicted_entry.dirty = False
        return evicted_physical_page

class JulieOS(BaseOS):
    def __init__(self):
        super().__init__()
        self.virtual_memory_manager = JulieVirtualMemoryManager(self.physical_memory, self.secondary_storage)
        self.free_list = [(0, PHYSICAL_MEMORY_SIZE)]
        self.next_virtual_address = defaultdict(int)

    def malloc(self, pid, size):
        process = self.processes[pid]
        
        # Round up size to the nearest page size
        pages_needed = (size + PAGE_SIZE - 1) // PAGE_SIZE
        size_to_allocate = pages_needed * PAGE_SIZE

        # Virtual memory allocation
        if self.virtual_memory_enabled:
            start_address = self.next_virtual_address[pid]
            self.next_virtual_address[pid] += size_to_allocate

            process.memory_regions[start_address] = size_to_allocate
            return start_address, size_to_allocate

        # Physical memory allocation
        for i, (start, free_size) in enumerate(self.free_list):
            if free_size >= size:
                del self.free_list[i]
                if free_size > size:
                    self.free_list.append((start + size, free_size - size))
                process.memory_regions[start] = size
                return start, size

        # If no suitable free region, raise an exception
        raise Exception("Out of memory")

    def free(self, pid, address):
        process = self.processes[pid]
        if address in process.memory_regions:
            size = process.memory_regions[address]
            del process.memory_regions[address]

            if not self.virtual_memory_enabled:
                self.free_list.append((address, size))
                self.free_list.sort(key=lambda x: x[0])
                # Merge adjacent free regions
                i = 0
                while i < len(self.free_list) - 1:
                    current_addr, current_size = self.free_list[i]
                    next_addr, next_size = self.free_list[i + 1]
                    if current_addr + current_size == next_addr:
                        self.free_list[i] = (current_addr, current_size + next_size)
                        del self.free_list[i + 1]
                    else:
                        i += 1

            if self.virtual_memory_enabled:
                start_page = address // PAGE_SIZE
                end_page = (address + size - 1) // PAGE_SIZE
                for virtual_page in range(start_page, end_page + 1):
                    entry = process.page_table.get_entry(virtual_page)
                    if entry.present:
                        self.virtual_memory_manager.free_pages.add(entry.physical_page)
                        if entry.physical_page in self.virtual_memory_manager.page_queue:
                            self.virtual_memory_manager.page_queue.remove(entry.physical_page)
                        entry.present = False

    def read_memory(self, pid, address):
        if not self._address_belongs_to_process(pid, address):
            raise Exception(f"Memory access violation: Process {pid} cannot access address {address}")
        physical_address = self.translate_address(pid, address)
        return self.physical_memory.read_byte(physical_address)
    
    def write_memory(self, pid, address, value):
        if not self._address_belongs_to_process(pid, address):
            raise Exception(f"Memory access violation: Process {pid} cannot access address {address}")
        physical_address = self.translate_address(pid, address)
        self.physical_memory.write_byte(physical_address, value)
        if self.virtual_memory_enabled:
            process = self.processes[pid]
            virtual_page = address // PAGE_SIZE
            page_table_entry = process.page_table.get_entry(virtual_page)
            page_table_entry.dirty = True
    
    def _address_belongs_to_process(self, pid, address):
        process = self.processes[pid]
        for start_address, size in process.memory_regions.items():
            if start_address <= address < start_address + size:
                return True
        return False

    def handle_page_fault(self, pid, virtual_address):
        process = self.processes[pid]
        virtual_page = virtual_address // PAGE_SIZE
        return self.virtual_memory_manager.handle_page_fault(process, virtual_page)

    def translate_address(self, pid, virtual_address):
        if not self.virtual_memory_enabled:
            return virtual_address
        
        process = self.processes[pid]
        virtual_page = virtual_address // PAGE_SIZE
        offset = virtual_address % PAGE_SIZE
        
        page_table_entry = process.page_table.get_entry(virtual_page)
        if not page_table_entry.present:
            physical_page = self.handle_page_fault(pid, virtual_address)
        else:
            physical_page = page_table_entry.physical_page
        
        return (physical_page * PAGE_SIZE) + offset

    def set_virtual_memory_enabled(self, enabled):
        self.virtual_memory_enabled = enabled
        if enabled:
            self.next_virtual_address = defaultdict(int)  # Reset virtual address space when enabling VM
        print(f"Virtual Memory {'Enabled' if enabled else 'Disabled'}")


class TestJulieOS(TestOS):
    def create_os(self):
        return JulieOS()

# Run Tests
suite = unittest.TestLoader().loadTestsFromTestCase(TestJulieOS)
unittest.TextTestRunner(verbosity=2).run(suite)




test_free_and_reallocate (__main__.TestJulieOS.test_free_and_reallocate) ... ok
test_malloc_and_free (__main__.TestJulieOS.test_malloc_and_free) ... ok
test_multiple_processes (__main__.TestJulieOS.test_multiple_processes) ... ok
test_out_of_memory (__main__.TestJulieOS.test_out_of_memory) ... ok
test_process_isolation (__main__.TestJulieOS.test_process_isolation) ... FAIL
test_read_write_memory (__main__.TestJulieOS.test_read_write_memory) ... ok
test_virtual_memory (__main__.TestJulieOS.test_virtual_memory) ... ok
test_virtual_memory_paging (__main__.TestJulieOS.test_virtual_memory_paging) ... ok
test_virtual_memory_paging_with_secondary_storage (__main__.TestJulieOS.test_virtual_memory_paging_with_secondary_storage) ... ok

FAIL: test_process_isolation (__main__.TestJulieOS.test_process_isolation)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "C:\Users\jrbac\AppData\Local\Temp\ipykernel_56088\2197975574.py", line 49,

Virtual Memory Enabled
Virtual Memory Enabled
Virtual Memory Enabled
Virtual Memory Enabled
Virtual Memory Enabled


<unittest.runner.TextTestResult run=9 errors=0 failures=1>

In [1006]:
import unittest

class TestOS(unittest.TestCase):
    def setUp(self):
        self.os = self.create_os()  # This will be overridden in subclasses

    def create_os(self):
        raise NotImplementedError("Subclasses must implement this method")

    def test_malloc_and_free(self):
        process = self.os.create_process()
        addr1 = process.malloc(32)
        self.assertEqual(addr1, 0)
        addr2 = process.malloc(64)
        self.assertEqual(addr2, 32)
        process.free(addr1)
        addr3 = process.malloc(16)
        self.assertEqual(addr3, 0)
        process.free(addr2)
        process.free(addr3)

    def test_read_write_memory(self):
        process = self.os.create_process()
        addr = process.malloc(16)
        test_value = 42
        process.write_memory(addr, test_value)
        read_value = process.read_memory(addr)
        self.assertEqual(read_value, test_value)

    def test_virtual_memory(self):
        self.os.set_virtual_memory_enabled(True)
        process = self.os.create_process()
        addr = process.malloc(PHYSICAL_MEMORY_SIZE * 2)  # Allocate more than physical memory
        for i in range(PHYSICAL_MEMORY_SIZE * 2):
            process.write_memory(addr + i, i % 256)
        for i in range(PHYSICAL_MEMORY_SIZE * 2):
            self.assertEqual(process.read_memory(addr + i), i % 256)

    def test_process_isolation(self):
        self.os.set_virtual_memory_enabled(True)
        process1 = self.os.create_process()
        process2 = self.os.create_process()
        addr1 = process1.malloc(16)
        addr2 = process2.malloc(16)
        process1.write_memory(addr1, 42)
        process2.write_memory(addr2, 84)
        self.assertEqual(process1.read_memory(addr1), 42)
        self.assertEqual(process2.read_memory(addr2), 84)
        with self.assertRaises(Exception):
            process2.read_memory(addr1)
        with self.assertRaises(Exception):
            process1.read_memory(addr2)

    def test_out_of_memory(self):
        process = self.os.create_process()
        with self.assertRaises(Exception):
            process.malloc(PHYSICAL_MEMORY_SIZE + 1)  # Try to allocate more than available memory

    def test_free_and_reallocate(self):
        process = self.os.create_process()
        addr1 = process.malloc(64)
        addr2 = process.malloc(64)
        process.free(addr1)
        addr3 = process.malloc(32)
        self.assertEqual(addr3, addr1)  # Should reuse the freed space

    def test_virtual_memory_paging(self):
        self.os.set_virtual_memory_enabled(True)
        process = self.os.create_process()
        addr = process.malloc(PAGE_SIZE * 2)  # Allocate 2 pages
        
        # Write to the first page
        for i in range(PAGE_SIZE):
            process.write_memory(addr + i, i % 256)
        
        # Write to the second page
        for i in range(PAGE_SIZE):
            process.write_memory(addr + PAGE_SIZE + i, (i + 128) % 256)
        
        # Read and verify the first page
        for i in range(PAGE_SIZE):
            self.assertEqual(process.read_memory(addr + i), i % 256)
        
        # Read and verify the second page
        for i in range(PAGE_SIZE):
            self.assertEqual(process.read_memory(addr + PAGE_SIZE + i), (i + 128) % 256)

    def test_multiple_processes(self):
        self.os.set_virtual_memory_enabled(True)
        processes = [self.os.create_process() for _ in range(5)]
        
        # Allocate memory for each process
        addresses = [process.malloc(PAGE_SIZE) for process in processes]
        
        # Write unique data for each process
        for i, (process, addr) in enumerate(zip(processes, addresses)):
            for j in range(PAGE_SIZE):
                process.write_memory(addr + j, (i * PAGE_SIZE + j) % 256)
        
        # Verify data for each process
        for i, (process, addr) in enumerate(zip(processes, addresses)):
            for j in range(PAGE_SIZE):
                self.assertEqual(process.read_memory(addr + j), (i * PAGE_SIZE + j) % 256)

    def test_virtual_memory_paging_with_secondary_storage(self):
        self.os.set_virtual_memory_enabled(True)
        process = self.os.create_process()
        
        # Allocate memory larger than physical memory
        total_pages = PHYSICAL_MEMORY_SIZE // PAGE_SIZE
        extra_pages = 5
        total_size = (total_pages + extra_pages) * PAGE_SIZE
        addr = process.malloc(total_size)
        
        # Write unique values to each page
        for i in range(total_pages + extra_pages):
            page_addr = addr + i * PAGE_SIZE
            process.write_memory(page_addr, i)
        
        # Read back values and verify
        for i in range(total_pages + extra_pages):
            page_addr = addr + i * PAGE_SIZE
            value = process.read_memory(page_addr)
            self.assertEqual(value, i, f"Page {i} should contain value {i}")
        
        # Verify that some pages were written to secondary storage
        self.assertGreater(len(self.os.secondary_storage.storage), 0)

class TestJulieOS(TestOS):
    def create_os(self):
        return JulieOS()

# Example usage:
if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestJulieOS)
    unittest.TextTestRunner(verbosity=2).run(suite)

test_free_and_reallocate (__main__.TestJulieOS.test_free_and_reallocate) ... ok
test_malloc_and_free (__main__.TestJulieOS.test_malloc_and_free) ... ok
test_multiple_processes (__main__.TestJulieOS.test_multiple_processes) ... ok
test_out_of_memory (__main__.TestJulieOS.test_out_of_memory) ... ok
test_process_isolation (__main__.TestJulieOS.test_process_isolation) ... FAIL
test_read_write_memory (__main__.TestJulieOS.test_read_write_memory) ... ok
test_virtual_memory (__main__.TestJulieOS.test_virtual_memory) ... ok
test_virtual_memory_paging (__main__.TestJulieOS.test_virtual_memory_paging) ... ok
test_virtual_memory_paging_with_secondary_storage (__main__.TestJulieOS.test_virtual_memory_paging_with_secondary_storage) ... ok

FAIL: test_process_isolation (__main__.TestJulieOS.test_process_isolation)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "C:\Users\jrbac\AppData\Local\Temp\ipykernel_56088\2197975574.py", line 49,

Virtual Memory Enabled
Virtual Memory Enabled
Virtual Memory Enabled
Virtual Memory Enabled
Virtual Memory Enabled


In [1007]:
# Constants
PHYSICAL_MEMORY_SIZE = 256  # bytes
PAGE_SIZE = 16  # bytes
TOTAL_PAGES = PHYSICAL_MEMORY_SIZE // PAGE_SIZE

In [1008]:
print("=============")
print("==Section 1, variables ==")
print("=============")

a = "hi"
b = 2
c = 3

print(10)
print(a)
print(b + c)

d = b * c
print(d)

print("=============")
print("==Section 2, lists ==")
print("=============")
llama = [3]
e = 4

llama.append(2)
print(llama)

llama.append(e)
print(llama)

llama.pop(1)
print(llama)

llama.insert(1, 10)
print(llama)

print(llama[2])

print("=============")
print("==Section 3, maps ==")
print("=============")
brand = {}
brand[3] = "ie"
print(brand)

brand[100] = "jen"
print(brand)

print(brand[3])

print("=============")
print("==Section 3.1, maps 2 ==")
print("=============")
phones = {
    "brandie": "505-123-4532",
    "jenn": "505-987-1452"
}
print(phones)
print(phones["brandie"])

phones["julie"] = "923-412-1522"
print(phones)

phones.pop("brandie")
print(phones)

print("=============")
print("==Section 4, conditionals ==")
print("=============")
a = 3
b = 2

if (a > b):
    print("great!!")
else:
    print("not great...")

l = [1, 2, 3]
if a not in l:
    print("no no no")
else:
    print("i see")


print("=============")
print("==Section 5, loops ==")
print("=============")

names = ["brandie", "jenn", "julie"]
for n in names:
    print(n)

for i, n in enumerate(names):
    print(str(i) + " -> " + n) # here str() just converts i from integer to string, don't worry too much about it

m = {
    "b": "randie",
    "j": "enn"
}

for k, v in m.items():
    print(k + " -> " + v)

print("=============")
print("==Section 6, classes ==")
print("=============")

class Dog:
    def __init__(self, name):
        self.my_name = name
    
    def speak(self):
        print("Hello. My name is " + self.my_name)


j = Dog("Jedi")
h = Dog("Harper")

j.speak()
h.speak()

print("=============")
print("==Section 7, challenge ==")
print("=============")

class Human:
    def __init__(self, name):
        self.name = name
        self.friends = []
        self.best_friend = None

    def add_friend(self, friend):
        self.friends.append(friend)
    
    def greet_friends(self):
        for f in self.friends:
            if f == self.best_friend:
                print(self.name + ": bestie!!!!")
            else:
                print(self.name + ": hello friend, " + f.name)
            
            f.respond(self)

    def respond(self, greeter):
        if greeter in self.friends:
            print(self.name + ": hiii")
        else:
            print(self.name + ": do i know you???")
    
    def set_best_friend(self, human):
        self.best_friend = human

brandie = Human("brandie")
jenn = Human("jenn")
julie = Human("julie")
m = Human("marques Mc Marco, the third (esquire)")

ppl = {
    brandie: 0,
    jenn: 0,
    julie: 0,
    m: 1
}
for p1, v1 in ppl.items():
    for p2, v2 in ppl.items():
        if p1 is not p2 and v2 < 1:
            p1.add_friend(p2)

brandie.set_best_friend(julie)

for p, _ in ppl.items():
    p.greet_friends()

==Section 1, variables ==
10
hi
5
6
==Section 2, lists ==
[3, 2]
[3, 2, 4]
[3, 4]
[3, 10, 4]
4
==Section 3, maps ==
{3: 'ie'}
{3: 'ie', 100: 'jen'}
ie
==Section 3.1, maps 2 ==
{'brandie': '505-123-4532', 'jenn': '505-987-1452'}
505-123-4532
{'brandie': '505-123-4532', 'jenn': '505-987-1452', 'julie': '923-412-1522'}
{'jenn': '505-987-1452', 'julie': '923-412-1522'}
==Section 4, conditionals ==
great!!
i see
==Section 5, loops ==
brandie
jenn
julie
0 -> brandie
1 -> jenn
2 -> julie
b -> randie
j -> enn
==Section 6, classes ==
Hello. My name is Jedi
Hello. My name is Harper
==Section 7, challenge ==
brandie: hello friend, jenn
jenn: hiii
brandie: bestie!!!!
julie: hiii
jenn: hello friend, brandie
brandie: hiii
jenn: hello friend, julie
julie: hiii
julie: hello friend, brandie
brandie: hiii
julie: hello friend, jenn
jenn: hiii
marques Mc Marco, the third (esquire): hello friend, brandie
brandie: do i know you???
marques Mc Marco, the third (esquire): hello friend, jenn
jenn: do i know you

In [1009]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Classes that don't need student implementation
class PhysicalMemory:
    def __init__(self, size):
        self.memory = bytearray(size)
    
    def read_byte(self, address):
        return self.memory[address]
    
    def write_byte(self, address, value):
        self.memory[address] = value
    
    def read_page(self, page_number):
        start = page_number * PAGE_SIZE
        return self.memory[start:start + PAGE_SIZE]
    
    def write_page(self, page_number, data):
        start = page_number * PAGE_SIZE
        self.memory[start:start + PAGE_SIZE] = data

class PageTableEntry:
    def __init__(self):
        self.physical_page = None
        self.present = False
        self.dirty = False

class PageTable:
    def __init__(self):
        self.entries = {}  # virtual_page_number -> PageTableEntry
    
    def get_entry(self, virtual_page_number):
        if virtual_page_number not in self.entries:
            self.entries[virtual_page_number] = PageTableEntry()
        return self.entries[virtual_page_number]

class SecondaryStorage:
    def __init__(self):
        self.storage = {}  # page_number -> data
    
    def read_page(self, page_number):
        return self.storage.get(page_number, bytearray(PAGE_SIZE))
    
    def write_page(self, page_number, data):
        self.storage[page_number] = data

class Process:
    def __init__(self, pid, os):
        self.pid = pid
        self.os = os
        self.page_table = PageTable()
        self.memory_regions = {}  # start_address -> size. Note that this is set by the OS, we don't update it
        self.mallocs = {}  # start_address -> size for malloc'd regions within memory_regions


    def malloc(self, size):
        # First, try to find a free space in existing allocations
        for region_start, region_size in self.memory_regions.items():
            region_end = region_start + region_size
            allocated_spaces = sorted([(addr, addr + size) for addr, size in self.mallocs.items() 
                                       if region_start <= addr < region_end])
            
            # Check for space at the beginning of the region
            if not allocated_spaces or allocated_spaces[0][0] - region_start >= size:
                new_addr = region_start
                self.mallocs[new_addr] = size
                return new_addr
            
            # Check for spaces between allocations
            for i in range(len(allocated_spaces) - 1):
                if allocated_spaces[i+1][0] - allocated_spaces[i][1] >= size:
                    new_addr = allocated_spaces[i][1]
                    self.mallocs[new_addr] = size
                    return new_addr
            
            # Check for space at the end of the region
            if region_end - allocated_spaces[-1][1] >= size:
                new_addr = allocated_spaces[-1][1]
                self.mallocs[new_addr] = size
                return new_addr

        # If we don't have space, request a new memory region from the OS
        address, mmap_size = self.os.mmap(self.pid, size)
        self.mallocs[address] = size
        return address
    
    def free(self, address):
        if address in self.mallocs:
            del self.mallocs[address]
        else:
            self.os.free(self.pid, address)

    # def malloc(self, size):
        
    #     address, mmap_size = self.os.mmap(self.pid, size)
    #     return address
    
    # def free(self, address):
    #     self.os.free(self.pid, address)
    
    def read_memory(self, address):
        return self.os.read_memory(self.pid, address)
    
    def write_memory(self, address, value):
        self.os.write_memory(self.pid, address, value)

class BaseProgram:
    def __init__(self, process):
        self.process = process
        self.variables = {}

    def allocate_variable(self, name, size):
        address = self.process.malloc(size)
        self.variables[name] = {'address': address, 'size': size}
        print(f"Allocated variable '{name}' of size {size} at address {address}")

    def set_variable(self, name, value):
        if name not in self.variables:
            raise ValueError(f"Variable '{name}' not allocated")
        address = self.variables[name]['address']
        if isinstance(value, int):
            self.process.write_memory(address, value)
        elif isinstance(value, str):
            for i, char in enumerate(value):
                self.process.write_memory(address + i, ord(char))
        print(f"Set value of variable '{name}' to {value}")

    def get_variable(self, name):
        if name not in self.variables:
            raise ValueError(f"Variable '{name}' not allocated")
        address = self.variables[name]['address']
        size = self.variables[name]['size']
        values = [self.process.read_memory(address + i) for i in range(size)]
        print(f"Value of variable '{name}': {values}")
        return values

    def free_variable(self, name):
        if name not in self.variables:
            raise ValueError(f"Variable '{name}' not allocated")
        address = self.variables[name]['address']
        self.process.free(address)
        del self.variables[name]
        print(f"Freed variable '{name}'")

    def dump_memory(self):
        print(f"Memory Dump for Program (PID: {self.process.pid}):")
        for name, info in self.variables.items():
            address, size = info['address'], info['size']
            print(f"\nVariable: {name}")
            for i in range(0, size, 16):
                chunk = [self.process.read_memory(address + j) for j in range(i, min(i+16, size))]
                hex_values = ' '.join([f'{b:02x}' for b in chunk])
                ascii_values = ''.join([chr(b) if 32 <= b < 127 else '.' for b in chunk])
                print(f'{address+i:04x}: {hex_values:<48} {ascii_values}')

    def get_memory_usage(self):
        return [(name, info['size'], info['address']) for name, info in self.variables.items()]

    def execute_step(self, step_number):
        method_name = f'step_{step_number}'
        if hasattr(self, method_name):
            getattr(self, method_name)()
        else:
            print(f"Step {step_number} not found")

def visualize_memory(os):
    # Create subplots: one for OS, one for each program
    fig = make_subplots(rows=1 + len(os.programs), cols=1, 
                        subplot_titles=['OS Memory'] + [f'Program {pid} Memory' for pid in os.programs])

    # OS Memory
    os_memory = os.get_memory_usage()
    for start, size, label in os_memory:
        fig.add_trace(
            go.Bar(x=[size], y=[label], orientation='h', name=label, 
                   text=f"Start: {start}, Size: {size}", textposition='auto'),
            row=1, col=1
        )

    # Program Memory
    for i, (pid, program) in enumerate(os.programs.items(), start=2):
        prog_memory = program.get_memory_usage()
        for name, size, address in prog_memory:
            fig.add_trace(
                go.Bar(x=[size], y=[name], orientation='h', name=name, 
                       text=f"Address: {address}, Size: {size}", textposition='auto'),
                row=i, col=1
            )

    fig.update_layout(height=300 * (1 + len(os.programs)), width=800, showlegend=False)
    fig.show()

            
# Base classes for student implementation
class BaseVirtualMemoryManager:
    def __init__(self, physical_memory, secondary_storage):
        self.physical_memory = physical_memory
        self.secondary_storage = secondary_storage
        self.free_pages = set(range(TOTAL_PAGES))
        self.page_queue = []  # For FIFO page replacement

    def handle_page_fault(self, process, virtual_page_number):
        raise NotImplementedError("Subclasses must implement this method")

    def evict_page(self, process, virtual_page_number):
        raise NotImplementedError("Subclasses must implement this method")

class BaseOS:
    def __init__(self):
        self.physical_memory = PhysicalMemory(PHYSICAL_MEMORY_SIZE)
        self.secondary_storage = SecondaryStorage()
        self.virtual_memory_manager = None  # Will be set in subclass
        self.processes = {}
        self.next_pid = 1
        self.virtual_memory_enabled = False
        self.programs = {}
    
    def create_process(self):
        pid = self.next_pid
        self.next_pid += 1
        process = Process(pid, self)
        self.processes[pid] = process
        return process
    
    def terminate_process(self, pid):
        if pid in self.processes:
            process = self.processes[pid]
            for start_address, size in process.memory_regions.items():
                self.free(pid, start_address)
            del self.processes[pid]
    
    def mmap(self, pid, size):
        return self.malloc(pid, size)
    def malloc(self, pid, size):
        raise NotImplementedError("Subclasses must implement this method")
    
    def free(self, pid, address):
        raise NotImplementedError("Subclasses must implement this method")
    
    def read_memory(self, pid, address):
        raise NotImplementedError("Subclasses must implement this method")
    
    def write_memory(self, pid, address, value):
        raise NotImplementedError("Subclasses must implement this method")
    
    def handle_page_fault(self, pid, virtual_address):
        raise NotImplementedError("Subclasses must implement this method")
    
    def translate_address(self, pid, virtual_address):
        raise NotImplementedError("Subclasses must implement this method")
    
    def set_virtual_memory_enabled(self, enabled):
        self.virtual_memory_enabled = enabled
        print(f"Virtual Memory {'Enabled' if enabled else 'Disabled'}")
    
    def create_program(self, program_class):
        process = self.create_process()
        program = program_class(process)
        self.programs[process.pid] = program
        return process.pid

    def execute_program_step(self, pid, step_number):
        if pid not in self.programs:
            raise ValueError(f"No program found with PID {pid}")
        self.programs[pid].execute_step(step_number)

    def dump_memory(self):
        print("Physical Memory Dump:")
        for i in range(0, len(self.physical_memory.memory), 16):
            chunk = self.physical_memory.memory[i:i+16]
            hex_values = ' '.join([f'{b:02x}' for b in chunk])
            ascii_values = ''.join([chr(b) if 32 <= b < 127 else '.' for b in chunk])
            print(f'{i:04x}: {hex_values:<48} {ascii_values}')

    def get_memory_usage(self):
        memory_usage = []
        for pid, process in self.processes.items():
            for start_address, size in process.memory_regions.items():
                memory_usage.append((start_address, size, f"Process {pid}"))
        
        memory_usage.sort(key=lambda x: x[0])
        
        free_spaces = []
        last_end = 0
        for start, size, _ in memory_usage:
            if start > last_end:
                free_spaces.append((last_end, start - last_end, "Free"))
            last_end = start + size
        if last_end < PHYSICAL_MEMORY_SIZE:
            free_spaces.append((last_end, PHYSICAL_MEMORY_SIZE - last_end, "Free"))
        
        memory_usage.extend(free_spaces)
        memory_usage.sort(key=lambda x: x[0])
        
        return memory_usage
    
    def dump_hard_drive(self):
        print("Secondary Storage (Hard Drive) Dump:")
        if not self.secondary_storage.storage:
            print("  The secondary storage is empty.")
            return

        for page_id, data in self.secondary_storage.storage.items():
            pid, vpn = page_id
            print(f"  Page (PID: {pid}, VPN: {vpn}):")
            for i in range(0, len(data), 16):
                chunk = data[i:i+16]
                hex_values = ' '.join([f'{b:02x}' for b in chunk])
                ascii_values = ''.join([chr(b) if 32 <= b < 127 else '.' for b in chunk])
                print(f"    {i:04x}: {hex_values:<48} {ascii_values}")

In [1010]:
class BrandieOS(BaseOS):
    def __init__(self):
        super().__init__()
        self.virtual_memory_manager = BrandieVirtualMemoryManager(self.physical_memory, self.secondary_storage)
        self.free_list = [(0, PHYSICAL_MEMORY_SIZE)]
        self.next_virtual_address = 0

        self.next_addr = 0 # -> 12

    def malloc(self, pid, size):
        
        addr = self.next_addr
        self.next_addr = self.next_addr + size
        return addr

    def free(self, pid, address):
        """
        Free previously allocated memory.
        
        Steps:
        1. Check if the address belongs to the process
        2. Remove the memory region from the process
        3. If virtual memory is not enabled, add the freed memory to the free list
        4. If virtual memory is enabled, update page tables and free physical pages

        Note: Be careful to properly handle merging of adjacent free memory regions
        """
        process = self.processes[pid]
        
        # TODO: Implement memory freeing logic here
        
        raise NotImplementedError("free method not implemented")

    def read_memory(self, pid, address):
        """
        Read a byte from memory.
        
        Steps:
        1. Check if the address belongs to the process
        2. If virtual memory is enabled, translate the virtual address to physical
        3. Read the byte from physical memory
        4. Handle page faults if they occur during address translation

        Note: Ensure proper error handling for invalid memory accesses
        """
        if not self._address_belongs_to_process(pid, address):
            raise Exception(f"Memory access violation: Process {pid} cannot access address {address}")
        
        # TODO: Implement memory reading logic here
        
        raise NotImplementedError("read_memory method not implemented")

    def write_memory(self, pid, address, value):
        """
        Write a byte to memory.
        
        Steps:
        1. Check if the address belongs to the process
        2. If virtual memory is enabled, translate the virtual address to physical
        3. Write the byte to physical memory
        4. Handle page faults if they occur during address translation
        5. If virtual memory is enabled, mark the page as dirty

        Note: Ensure proper error handling for invalid memory accesses
        """
        if not self._address_belongs_to_process(pid, address):
            raise Exception(f"Memory access violation: Process {pid} cannot access address {address}")
        
        # TODO: Implement memory writing logic here
        
        self.physical_memory.write_byte(address, value)
        # raise NotImplementedError("write_memory method not implemented")

    def _address_belongs_to_process(self, pid, address):
        """
        Check if an address belongs to a process.
        
        Steps:
        1. Get the process object
        2. Iterate through the process's memory regions
        3. Check if the address falls within any of the regions

        Note: This method is crucial for ensuring memory protection between processes
        """
        process = self.processes[pid]
        
        # TODO: Implement address checking logic here
        
        return True
        # raise NotImplementedError("_address_belongs_to_process method not implemented")

    def handle_page_fault(self, pid, virtual_address):
        """
        Handle a page fault.
        
        Steps:
        1. Calculate the virtual page number from the virtual address
        2. Call the virtual memory manager's handle_page_fault method
        3. Update the process's page table with the new mapping
        4. Return the physical page number

        Note: This method is key to implementing demand paging
        """
        process = self.processes[pid]
        virtual_page = virtual_address // PAGE_SIZE
        
        # TODO: Implement page fault handling logic here
        
        raise NotImplementedError("handle_page_fault method not implemented")

    def translate_address(self, pid, virtual_address):
        """
        Translate a virtual address to a physical address.
        
        Steps:
        1. If virtual memory is not enabled, return the address as-is
        2. Calculate the virtual page number and offset
        3. Look up the page table entry for the virtual page
        4. If the page is not present, handle the page fault
        5. Combine the physical page number with the offset to get the physical address

        Note: This method is central to the virtual memory system
        """
        if not self.virtual_memory_enabled:
            return virtual_address
        
        process = self.processes[pid]
        virtual_page = virtual_address // PAGE_SIZE
        offset = virtual_address % PAGE_SIZE
        
        # TODO: Implement address translation logic here
        
        raise NotImplementedError("translate_address method not implemented")

    def set_virtual_memory_enabled(self, enabled):
        """
        Enable or disable virtual memory.
        
        Steps:
        1. Set the virtual_memory_enabled flag
        2. If enabling, initialize the virtual address space
        3. If disabling, ensure all pages are written back to secondary storage

        Note: This method allows switching between physical and virtual memory modes
        """
        self.virtual_memory_enabled = enabled
        if enabled:
            self.next_virtual_address = 0  # Reset virtual address space when enabling VM
        print(f"Virtual Memory {'Enabled' if enabled else 'Disabled'}")

class BrandieVirtualMemoryManager(BaseVirtualMemoryManager):
    def handle_page_fault(self, process, virtual_page_number):
        """
        Handle a page fault by loading the required page into memory.
        
        Steps:
        1. Check if there are free physical pages
        2. If not, evict a page to make room
        3. Allocate a physical page
        4. Load the page data from secondary storage
        5. Update the page table
        6. Return the physical page number

        Note: This method is crucial for implementing demand paging
        """
        # TODO: Implement page fault handling logic here
        
        raise NotImplementedError("handle_page_fault method not implemented")

    def evict_page(self, process, virtual_page_number):
        """
        Evict a page from physical memory.
        
        Steps:
        1. Choose a page to evict (e.g., using FIFO or LRU policy)
        2. If the page is dirty, write it back to secondary storage
        3. Update the page table to mark the page as not present
        4. Return the freed physical page number

        Note: The eviction policy can significantly affect system performance
        """
        # TODO: Implement page eviction logic here
        
        raise NotImplementedError("evict_page method not implemented")

"""
Suggested Order of Implementation:

1. Implement physical memory allocation (malloc) without virtual memory
2. Implement physical memory deallocation (free) without virtual memory
3. Implement read_memory and write_memory for physical memory
4. Implement _address_belongs_to_process method
5. Implement set_virtual_memory_enabled method
6. Implement virtual memory allocation in malloc
7. Implement page table and page table entry structures
8. Implement translate_address method for virtual memory
9. Implement handle_page_fault method
10. Update read_memory and write_memory to use virtual memory when enabled
11. Implement evict_page method in VirtualMemoryManager
12. Update free method to handle virtual memory deallocation
13. Implement remaining helper methods and optimizations
"""

class TestBrandieOS(TestOS):
    def create_os(self):
        return BrandieOS()
    
    def test_basic_malloc(self):
        process = self.os.create_process()
        
        # Allocate some memory
        addr1 = process.malloc(32)
        self.assertEqual(addr1, 0, "First allocation should start at address 0")
        
        # Allocate more memory
        addr2 = process.malloc(64)
        self.assertEqual(addr2, 32, "Second allocation should start after the first")
        
        # Check that allocations are recorded correctly
        self.assertIn(0, process.memory_regions, "First allocation should be recorded")
        self.assertEqual(process.memory_regions[0], 32, "First allocation size should be correct")
        self.assertIn(32, process.memory_regions, "Second allocation should be recorded")
        self.assertEqual(process.memory_regions[32], 64, "Second allocation size should be correct")


suite = unittest.TestLoader().loadTestsFromTestCase(TestBrandieOS)
unittest.TextTestRunner(verbosity=2).run(suite)

test_basic_malloc (__main__.TestBrandieOS.test_basic_malloc) ... ERROR
test_free_and_reallocate (__main__.TestBrandieOS.test_free_and_reallocate) ... ERROR
test_malloc_and_free (__main__.TestBrandieOS.test_malloc_and_free) ... ERROR
test_multiple_processes (__main__.TestBrandieOS.test_multiple_processes) ... ERROR
test_out_of_memory (__main__.TestBrandieOS.test_out_of_memory) ... ok
test_process_isolation (__main__.TestBrandieOS.test_process_isolation) ... ERROR
test_read_write_memory (__main__.TestBrandieOS.test_read_write_memory) ... ERROR
test_virtual_memory (__main__.TestBrandieOS.test_virtual_memory) ... ERROR
test_virtual_memory_paging (__main__.TestBrandieOS.test_virtual_memory_paging) ... ERROR
test_virtual_memory_paging_with_secondary_storage (__main__.TestBrandieOS.test_virtual_memory_paging_with_secondary_storage) ... ERROR

ERROR: test_basic_malloc (__main__.TestBrandieOS.test_basic_malloc)
----------------------------------------------------------------------
Traceback (mo

Virtual Memory Enabled
Virtual Memory Enabled
Virtual Memory Enabled
Virtual Memory Enabled
Virtual Memory Enabled


<unittest.runner.TextTestResult run=10 errors=9 failures=0>

In [1011]:
print("=================================")
print("============ SANDBOX ============")
print("=================================")

def convert_bytes_to_str(bytes):
    return ''.join(chr(b) for b in bytes if b != 0)

# Sample Programs
class Program1(BaseProgram):
    def step_1(self):
        print("Program 1, Step 1: Allocate and set variables")
        self.allocate_variable("var1", 4)
        self.allocate_variable("var2", 4)
        self.set_variable("var1", 42)
        self.set_variable("var2", "Hello")

    def step_2(self):
        print("Program 1, Step 2: Read and modify variables")
        self.get_variable("var1")
        self.get_variable("var2")
        self.set_variable("var1", 100)
        self.get_variable("var1")

    def step_3(self):
        print("Program 1, Step 3: Free variables")
        self.free_variable("var1")
        self.free_variable("var2")

class CoolProgram(BaseProgram):
    def step_1(self):
        self.allocate_variable("var1", 8)
        self.set_variable("var1", "ABC abc")

        self.allocate_variable("var2", 4)
        self.set_variable("var2", "mac ")

    def step_2(self):
        

        self.allocate_variable("var3", 256)
        self.set_variable("var3", "a"*256)


    def step_3(self):
        self.get_variable("var2")


os = JulieOS()
os.set_virtual_memory_enabled(True)

pid1 = os.create_program(CoolProgram)
pid2 = os.create_program(CoolProgram)

os.execute_program_step(pid1, 1)
os.dump_memory()
os.dump_hard_drive()
os.programs[pid1].dump_memory()

# os.execute_program_step(pid2, 1)
# os.dump_memory()
# os.dump_hard_drive()
# os.programs[pid2].dump_memory()

os.execute_program_step(pid1, 2)
os.dump_memory()
os.dump_hard_drive()
os.programs[pid1].dump_memory()

os.execute_program_step(pid1, 3)
os.dump_memory()
os.dump_hard_drive()
os.programs[pid1].dump_memory()



# os.programs[pid1].dump_memory()
# os.programs[pid2].dump_memory()


# pid1 = os.create_program(Program1)

# print("Initial state:")
# os.dump_memory()
# os.programs[pid1].dump_memory()
visualize_memory(os)

# print("\nExecute Program 1, Step 1:")
# os.execute_program_step(pid1, 1)
# os.dump_memory()
# os.programs[pid1].dump_memory()
# visualize_memory(os)

Virtual Memory Enabled
Allocated variable 'var1' of size 8 at address 0
Set value of variable 'var1' to ABC abc
Allocated variable 'var2' of size 4 at address 8
Set value of variable 'var2' to mac 
Physical Memory Dump:
0000: 41 42 43 20 61 62 63 00 6d 61 63 20 00 00 00 00  ABC abc.mac ....
0010: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
0020: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
0030: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
0040: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
0050: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
0060: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
0070: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
0080: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
0090: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
00a0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  .....