In [5]:
import random
import string
import math

In [None]:
TUPLES_PER_BLOCK = 8
NUM_TUPLES = 5000
B_MIN = 10000
B_MAX = 50000
MEMORY_BLOCKS = 15

### 1. Generating S(B,C) and allocating it space in the virtual disk S.

In [21]:
def random_string(length=5):
    return ''.join(random.choices(string.ascii_uppercase + string.digits, k=length))

# generating 5000 unique B-values here
B_values = random.sample(range(B_MIN, B_MAX + 1), NUM_TUPLES)

# creating relation S 
relation_S = [(b, random_string()) for b in B_values]

# structuring virtual disk
virtual_disk_S = [
    relation_S[i:i + TUPLES_PER_BLOCK]
    for i in range(0, NUM_TUPLES, TUPLES_PER_BLOCK)
]

print(f"Total blocks in virtual disk S: {len(virtual_disk_S)}")
print(f"First block: {virtual_disk_S[0]}")


Total blocks in virtual disk S: 625
First block: [(28388, 'BR10C'), (33862, 'VGXX2'), (29626, 'GDA1Q'), (38703, '0JFHP'), (31873, 'Y4ZO5'), (48723, 'VN8RF'), (40541, '9HELH'), (45175, '8Q0AZ')]


### 2. Virtual Disk I/O

In [None]:
main_memory = [None] * MEMORY_BLOCKS
# just placeholder for now ^

In [None]:
# disk -> memory 
def read_block_from_disk(disk, block_index, memory, memory_index):
    memory[memory_index] = disk[block_index]

# memory -> disk
def write_block_to_disk(memory, memory_index, disk, block_index):
    disk[block_index] = memory[memory_index]

# this is how we can use it (writing it for my sanity):
# read_block_from_disk(virtual_disk_S, 0, main_memory, 0)
# write_block_to_disk(main_memory, 0, virtual_disk_S, 10)

### 3. Defining Hash Function

In [22]:
NUM_BUCKETS = MEMORY_BLOCKS - 1  # 14
# we need one memory block for reading input blocks, and the rest for output buffers

def hash_function(b_value):
    return b_value % NUM_BUCKETS

### 4. Hash Partition and Join Algorithm

In [None]:
# function to partition relation into buckets on disk
def hash_partition_relation(disk_relation, is_R):
    # Create empty bucket list: 14 buckets on disk
    buckets = [[] for _ in range(NUM_BUCKETS)]  # Each will be a list of blocks

    # !!! Each bucket is a list of blocks, and each block is a list of tuples !!!

    for block_index in range(len(disk_relation)):
        read_block_from_disk(disk_relation, block_index, main_memory, 0)
        block = main_memory[0]
        for tuple_ in block:
            b_value = tuple_[1] if is_R else tuple_[0]  # (A,B) or (B,C)
            bucket_index = hash_function(b_value)
            if not buckets[bucket_index] or len(buckets[bucket_index][-1]) == TUPLES_PER_BLOCK:
                buckets[bucket_index].append([])
            buckets[bucket_index][-1].append(tuple_)
    return buckets

since we don't have enough memory to load entire S_bucket
            # best case scenario there are 1000/14 = 71 blocks in each bucket, and still the main memory is not enough

In [None]:
def two_pass_hash_join(relation_R_disk, relation_S_disk):
    io_count = 0
    result = []

    # Pass 1: Partition both R and S
    R_buckets = hash_partition_relation(relation_R_disk, is_R=True)
    S_buckets = hash_partition_relation(relation_S_disk, is_R=False)

    for i in range(NUM_BUCKETS):
        R_bucket = R_buckets[i]  # list of blocks
        S_bucket = S_buckets[i]  # list of blocks

        if not R_bucket or not S_bucket:
            continue  # Skip empty buckets

        # Load S bucket into memory (build phase)
        hash_table = {}
        
        if len(S_bucket) <= MEMORY_BLOCKS - 1:
            # One-pass join: load entire S_bucket into memory
            
            for j, block in enumerate(S_bucket):
                main_memory[j] = block
                io_count += 1

                for b, c in block:
                    hash_table[b] = hash_table.get(b, []) + [(b, c)]

            # Probe with R bucket
            for block in R_bucket:
                main_memory[-1] = block  # last block for scanning R
                io_count += 1
                for a, b in block:
                    if b in hash_table:
                        for match in hash_table[b]:
                            result.append((a, b, match[1]))  # (A, B, C)
        else:
            # Fallback to block-nested loop join 
            # since we don't have enough memory to load entire S_bucket
            # best case scenario there are 1000/14 = 71 blocks in each bucket, and still the main memory is not enough
            for R_block in R_bucket:
                io_count += 1
                for S_block in S_bucket:
                    io_count += 1
                    for r in R_block:
                        r_B = r[1]
                        for s in S_block:
                            if r_B == s[0]:
                                result.append((r[0], r[1], s[1]))  # (A, B, C)

    return result, io_count

### Experiments

need to define these below functions since they are used multiples times. Also am defining R_disk separate from virtual_disk_S for ease of coding for now

In [16]:
def create_relation_on_disk(tuples):
    disk = []
    block = []
    for tup in tuples:
        block.append(tup)
        if len(block) == TUPLES_PER_BLOCK:
            disk.append(block)
            block = []
    if block:
        disk.append(block)
    return disk

def flatten_relation(relation_disk):
    return [tup for block in relation_disk for tup in block]


In [29]:
def run_experiment_5_1(S_disk):
    S_flat = flatten_relation(S_disk)
    B_values_in_S = [b for (b, _) in S_flat]

    R_tuples = [(f"A{i}", random.choice(B_values_in_S)) for i in range(1000)]
    R_disk = create_relation_on_disk(R_tuples)

    result, io_count = two_pass_hash_join(R_disk, S_disk)
    print(f"Total tuples in join result: {len(result)}")
    print("5.1: Total I/Os =", io_count)

    picked_B_values = random.sample(B_values_in_S, 20)
    filtered = [tup for tup in result if tup[1] in picked_B_values]

    print(f"5.1: Output for picked B-values (total {len(filtered)}):")
    for tup in filtered:
        print(tup)


In [33]:
def run_experiment_5_2(S_disk):
    # Generate 1,200 tuples for R with B values between 20,000–30,000
    R_tuples = [(f"A{i}", random.randint(20000, 30000)) for i in range(1200)]
    R_disk = create_relation_on_disk(R_tuples)

    result, io_count = two_pass_hash_join(R_disk, S_disk)
    print(f"Total tuples in join result: {len(result)}")
    print("5.2: Total I/Os =", io_count)

    for tup in result:
        print(tup)


#### Lets Experiment!

In [30]:
run_experiment_5_1(virtual_disk_S)



Total tuples in join result: 1000
5.1: Total I/Os = 6054
5.1: Output for picked B-values (total 4):
('A935', 26922, 'RKT3W')
('A833', 36704, 'X9Z8G')
('A31', 47598, 'FCSW9')
('A798', 40795, '07XZ7')


In [34]:
run_experiment_5_2(virtual_disk_S)

Total tuples in join result: 166
5.2: Total I/Os = 7298
('A20', 29610, 'M6YH3')
('A91', 20622, 'I1Y6F')
('A586', 24178, 'AEYTI')
('A620', 22344, 'NHXE5')
('A714', 22568, 'EXE7T')
('A770', 22988, 'PPCZL')
('A933', 24178, 'AEYTI')
('A1129', 22876, 'ZG0V0')
('A1166', 23268, 'P3BHV')
('A43', 23773, '7VH5A')
('A191', 20147, 'AS711')
('A443', 20063, 'V57CX')
('A507', 22737, 'WOXGW')
('A490', 22107, 'LVG0G')
('A569', 27931, 'N88RL')
('A621', 26503, 'MBP8I')
('A686', 26867, 'GSIR8')
('A838', 22863, 'SY9F1')
('A991', 22793, 'V8JPR')
('A1089', 20413, 'D34B7')
('A55', 25048, 'H4AYW')
('A258', 25216, '2HN7R')
('A214', 20834, 'KSOXI')
('A167', 28786, 'Z5T9Q')
('A276', 25510, '3U9XF')
('A424', 27344, '97MKO')
('A470', 26546, '0UUX6')
('A517', 20778, '3D665')
('A611', 23172, 'A56HU')
('A809', 28352, 'L110W')
('A755', 27232, 'QORH2')
('A738', 28478, 'GMOMH')
('A1043', 28254, 'B0ED6')
('A1148', 26364, 'W2PO5')
('A1197', 26756, 'TY68E')
('A29', 20961, 'QPOTA')
('A59', 26463, 'R3JNX')
('A31', 23635, '4MY