<a href="https://colab.research.google.com/github/juliensu/colab/blob/main/TieredFS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [139]:
!pip install simpy



In [133]:
import simpy
import random
import time # Importing the time module
import collections # Import collection for deque

class StorageTier:
    def __init__(self, env, name, capacity, bandwidth, latency, lower_tier=None):
        self.env = env
        self.name = name
        self.capacity = capacity
        self.utilization = 0  # Current used space
        self.bandwidth = bandwidth  # Read/write speed (MB/s)
        self.latency = latency  # Read/write latency (ms)
        self.lower_tier = lower_tier  # Link to lower storage tier
        self.queue = simpy.Store(env)  # Queue to accept I/O requests
        self.process = env.process(self.run())  # Start processing requests
        self.data_blocks = {}  # Initialize data_blocks as a dictionary

    def can_store(self, data_size):
        """Check if there is enough space for data"""
        return self.utilization + data_size <= self.capacity

    def run(self):
        """Process I/O requests in the queue"""
        while True:
            operation, data_size, data_id = yield self.queue.get()

            if operation == "read":
                # Check if data_id exists before reading
                if data_id in self.data_blocks:
                    yield self.env.timeout(self.latency / 1000 + data_size / self.bandwidth)
                    print(f"[{self.env.now}] Read #{data_id} {data_size}MB from {self.name}")
                    self.update_last_access(data_id)
                else:
                    print(f"[{self.env.now}] Data #{data_id} not found in {self.name}, checking lower tiers...")
                    # If data not found and lower tier exists, try to read from there
                    if self.lower_tier:
                        yield self.env.process(self.lower_tier.read_from_lower_tier(data_size, data_id))  # Call read_from_lower_tier
                    else:
                        print(f"[{self.env.now}] Data #{data_id} not found in any tier.")


            elif operation == "write":
                if self.can_store(data_size):
                    yield self.env.timeout(self.latency / 1000 + data_size / self.bandwidth)
                    self.utilization += data_size
                    print(f"[{self.env.now}] Wrote #{data_id} {data_size}MB to {self.name}")
                    # Add new data block and set last access time
                    self.add_data_block(data_size, data_id)  # Use data_id here

                else:
                    print(f"[{self.env.now}] {self.name} full, triggering migration")
                    yield self.env.process(self.migrate_data(data_size))

    def read_from_lower_tier(self, data_size, data_id):  # New method
        """Read from lower tier, potentially triggering migration."""
        if data_id in self.data_blocks:
            yield self.env.timeout(self.latency / 1000 + data_size / self.bandwidth)
            print(f"[{self.env.now}] Read #{data_id} {data_size}MB from {self.name} (migrated from lower tier)")
            self.update_last_access(data_id)

            # Migrate to upper tier for faster access (Tier 1 for this example)
            upper_tier = self  # Start from the current tier
            while upper_tier.name != "Tier 1" and upper_tier.lower_tier is not None:
                upper_tier = upper_tier.lower_tier  # Move to the next higher tier


            if upper_tier != self:  # Avoid migrating to the same tier
                # Check if upper tier has enough space, otherwise trigger eviction
                if not upper_tier.can_store(data_size):
                    print(f"[{self.env.now}] {upper_tier.name} is full, triggering eviction to make space...")
                    yield self.env.process(upper_tier.migrate_data(data_size))  # Evict from upper tier

                # Move data from current tier to upper tier
                if self.evict(data_size): #Check if eviction was successful
                    upper_tier.request_io("write", data_size, data_id)
                    print(f"[{self.env.now}] Migrated #{data_id} {data_size}MB from {self.name} to {upper_tier.name}")

        else:
            if self.lower_tier:
                yield self.env.process(self.lower_tier.read_from_lower_tier(data_size, data_id))  # Recursively check lower tiers
            else:
                print(f"[{self.env.now}] Data #{data_id} not found in any tier.")

    def request_io(self, operation, data_size, data_id):
        """Add an I/O request to the queue, including data_id."""
        self.queue.put((operation, data_size, data_id))
        if operation == "write":
            self.add_data_block(data_size, data_id)  # Add data block when writing

    def add_data_block(self, data_size, data_id):
        """Add a new data block and set its last access time."""
        self.data_blocks[data_id] = (data_size, time.time()) # Store size and access time

    def update_last_access(self, data_id):
        """Update the last access time of a data block."""
        if data_id in self.data_blocks:
            self.data_blocks[data_id] = (self.data_blocks[data_id][0], time.time())  # Update access time

    def get_data_blocks(self):
        """Get a list of data blocks and their last access times."""
        return list(self.data_blocks.values())

    def evict(self, block_size):
        """Evict a data block of the given size (LRU)."""

        # Find LRU block with size greater or equal to block_size
        lru_block_id = None
        min_access_time = float('inf')

        for block_id, (size, access_time) in self.data_blocks.items():
            if size >= block_size and access_time < min_access_time:
                min_access_time = access_time
                lru_block_id = block_id

        if lru_block_id is not None:
            self.utilization -= self.data_blocks[lru_block_id][0]
            del self.data_blocks[lru_block_id]
            return True #Eviction successful
        else:
            return False #Failed to find block to evict



    def migrate_data(self, data_size, data_id=None):
        """Move data to the lower tier if full, cascading eviction if necessary."""
        if self.lower_tier:
            # Check if lower tier has enough space
            if self.lower_tier.can_store(data_size):
                # --- Existing eviction and migration logic ---
                data_blocks = self.get_data_blocks()
                data_blocks.sort(key=lambda item: item[1])  # Sort by last access time

                evicted_size = 0
                for block_size, _ in data_blocks:
                    if evicted_size + block_size <= data_size:
                        if self.evict(block_size):  # Check if eviction was successful
                            evicted_size += block_size
                            print(f"[{self.env.now}] Evicted #{data_id} {block_size}MB from {self.name}. Total evicted: {evicted_size}MB")  # Log evicted size
                        else:
                            print(f"[{self.env.now}] Failed to evict a block of size {block_size}MB from {self.name}")  # Log eviction failures
                    else:
                        break

                print(f"[{self.env.now}] Data blocks in {self.name}: {self.data_blocks}")  # Log data blocks before migration

                # Migrate data to the lower tier
                self.lower_tier.request_io("write", evicted_size, data_id)
                print(f"[{self.env.now}] Migrated #{data_id} {evicted_size}MB from {self.name} to {self.lower_tier.name}")

                # Remove migrated data blocks from the current tier
                data_blocks_to_evict = []
                evicted_size_total = 0
                for block_id, (size, _) in list(self.data_blocks.items()):
                    if evicted_size_total + size <= evicted_size:
                        data_blocks_to_evict.append(block_id)
                        evicted_size_total += size
                for block_id in data_blocks_to_evict:
                    del self.data_blocks[block_id]
                    self.utilization -= size
                # --- End of existing logic ---

            else:
                print(f"[{self.env.now}] Lower tier ({self.lower_tier.name}) full, cascading eviction...")
                # Trigger eviction on the lower tier to make space
                yield self.env.process(self.lower_tier.migrate_data(data_size, data_id))
                # Now that lower tier has space, retry migration
                yield self.env.process(self.migrate_data(data_size, data_id))

        else:
            print(f"[{self.env.now}] No lower tier available for migration")
            yield self.env.timeout(0)  # Act as a generator


In [138]:
class TieredFileSystem:
    def __init__(self, env):
        # Create storage tiers and link them in order
        self.env = env  # Store the env object as an attribute
        self.tier3 = StorageTier(env, "Tier 3", 5000000, 100, 50)
        self.tier2 = StorageTier(env, "Tier 2", 10000, 500, 20, self.tier3)
        self.tier1 = StorageTier(env, "Tier 1", 1000, 1000, 10, self.tier2)
        self.tier0 = StorageTier(env, "Tier 0", 300, 5000, 5, self.tier1)
        self.dram = StorageTier(env, "DRAM", 100, 10000, 1, self.tier0)

        self.tiers = {
            "DRAM": self.dram,
            "Tier 0": self.tier0,
            "Tier 1": self.tier1,
            "Tier 2": self.tier2,
            "Tier 3": self.tier3
        }

        self.access_history = collections.deque(maxlen=100)  # Store the last 100 accesses


        # Fix: Ensure lower_tier is correctly set for Tier 1
        # self.tier3.lower_tier = self.tier2

    def access_data(self, tier_name, data_size, is_write=True, data_id=None):
        """Send an I/O request, considering data temperature for writes."""
        tier = self.tiers[tier_name]

        if is_write:
            # Check DRAM and Tier 0 utilization
            dram_utilization = self.dram.utilization / self.dram.capacity
            tier0_utilization = self.tier0.utilization / self.tier0.capacity

            # Prioritize DRAM if utilization is below 80%
            if dram_utilization < 0.8:
                target_tier = self.dram
            else:
                # If DRAM is full, check Tier 0 utilization
                tier0_utilization = self.tier0.utilization / self.tier0.capacity
                if tier0_utilization < 0.8:
                    target_tier = self.tier0
                else:
                    # Otherwise, use data temperature for tier selection
                    data_temperature = self.get_data_temperature(data_id)
                    if data_temperature == "hot":
                        target_tier = self.dram
                    elif data_temperature == "warm":
                        target_tier = self.tier0
                    else:  # cold
                        target_tier = self.tier1

            target_tier.request_io("write", data_size, data_id)

        else:  # Read operations
            # Find the tier where data_id is stored
            data_tier = self.find_data_tier(data_id)

            if data_tier is None:
                print(f"[{self.env.now}] Data #{data_id} not found in any tier.")
                return  # Exit if data not found

            # Enforce read operation on the correct tier
            if data_tier.name != tier_name:
                print(f"[{self.env.now}] Read request for #{data_id} on {tier_name}, but data is in {data_tier.name}. Redirecting...")

            data_tier.request_io("read", data_size, data_id)  # Read from the correct tier
            self.access_history.append(data_id)  # Update access history on read

    def find_data_tier(self, data_id):
        """Find the tier where the data_id is stored."""
        for tier_name, tier in self.tiers.items():
            if data_id in tier.data_blocks:
                return tier
        return None  # Return None if data_id not found in any tier


    def get_data_temperature(self, data_id):
        """Categorize data temperature based on access frequency."""
        if data_id is None:
            return "cold"  # Default to cold for new data

        access_count = self.access_history.count(data_id)
        if access_count >= 10:  # Adjust threshold as needed
            return "hot"
        elif access_count >= 5:  # Adjust threshold as needed
            return "warm"
        else:
            return "cold"



    def migrate_to_tier_1(self, target_tier, data_size):
        """Migrate data from a lower tier (Tier 2 or 3) to Tier 1."""
        current_tier = target_tier
        while current_tier.name != "Tier 1" and current_tier.lower_tier is not None:
            upper_tier = current_tier.lower_tier  # Get the next higher tier

            # Check if upper tier has enough space, otherwise trigger eviction
            if not upper_tier.can_store(data_size):
                print(f"[{self.env.now}] {upper_tier.name} is full, triggering eviction to make space...")
                yield self.env.process(upper_tier.migrate_data(data_size)) # Evict from upper tier

            # Migrate data to the upper tier
            yield self.env.process(current_tier.migrate_data(data_size))

            current_tier = upper_tier

        print(f"[{self.env.now}] Data migrated to Tier 1 for reading.")
        self.tier1.request_io("read", data_size) #Initiate read on tier 1

def workload(env, fs):
    """Simulated workload: random reads/writes with increased activity."""
    written_data_sizes = {}  # Dictionary to store written data sizes
    operation_count = 0  # Counter for operations

    # Guarantee at least one initial write
    tier = "DRAM"
    data_id = random.randint(1, 1000)  # Random data ID
    size = random.randint(1, 50)  # Wider range of data sizes
    fs.access_data(tier, size, True, data_id) # Perform an initial write
    written_data_sizes[data_id] = size  # Store written data size


    for _ in range(500):  # Increased number of requests
        is_write = random.choice([True, False])
        data_id = None # Initialize data_id to None for each iteration
        operation_count += 1
        print(f"operation id: {operation_count}")

        if is_write:
            tier = "DRAM"
            data_id = random.randint(1, 1000)  # Random data ID
            size = random.randint(1, 20)  # Wider range of data sizes
            fs.access_data(tier, size, is_write, data_id)
            written_data_sizes[data_id] = size  # Store written data size

        else:
            if written_data_sizes:  # Check if there are written data IDs
                data_id = random.choice(list(written_data_sizes.keys()))
                # Find the tier where data_id is stored
                data_tier = fs.find_data_tier(data_id)

                if data_tier is not None:
                    tier = data_tier.name  # Get the tier name
                    size = random.randint(1, max(1, written_data_sizes[data_id] - 1))
                    fs.access_data(tier, size, is_write, data_id)  # Pass data_id for reads
                else:
                    print(f"[{env.now}] Data #{data_id} not found for read operation.")

        if data_id is not None:
            print(f"operation id: {operation_count} WriteRead#: {is_write} data_id: {data_id} tier: {tier}")

        if operation_count % 10 == 0:
            print("\n--- Tier Utilization ---")
            for tier_name, tier in fs.tiers.items():
                print(f"{tier_name}: {tier.utilization}/{tier.capacity} ({tier.utilization / tier.capacity * 100:.2f}%)")
            print("------------------------\n")

        yield env.timeout(random.uniform(0.1, 2))  # Shorter, more frequent timeouts


env = simpy.Environment()
fs = TieredFileSystem(env)

# Run the simulation
env.process(workload(env, fs))
env.run(until=10000)  # Run simulation for 50 time units


operation id: 1
operation id: 1 WriteRead#: True data_id: 64 tier: DRAM
[0.0052] Wrote #832 42MB to DRAM
[0.006999999999999999] Wrote #64 8MB to DRAM
operation id: 2
operation id: 2 WriteRead#: False data_id: 64 tier: DRAM
[0.34912823011557664] Read #64 6MB from DRAM
operation id: 3
operation id: 3 WriteRead#: True data_id: 672 tier: DRAM
[0.68379376699151] Wrote #672 3MB to DRAM
operation id: 4
operation id: 4 WriteRead#: False data_id: 832 tier: DRAM
[2.304940491669866] Read #832 31MB from DRAM
operation id: 5
operation id: 5 WriteRead#: False data_id: 64 tier: DRAM
[4.168993257656295] Read #64 5MB from DRAM
operation id: 6
operation id: 6 WriteRead#: False data_id: 672 tier: DRAM
[5.549350389716414] Read #672 2MB from DRAM
operation id: 7
operation id: 7 WriteRead#: False data_id: 832 tier: DRAM
[6.71165658699686] Read #832 14MB from DRAM
operation id: 8
operation id: 8 WriteRead#: True data_id: 740 tier: DRAM
[7.913700426003086] Wrote #740 8MB to DRAM
operation id: 9
operation id: 