In [None]:
import os

os.makedirs("data/incoming_files", exist_ok=True)
os.makedirs("data/processed", exist_ok=True)

print("‚úÖ Folders ready")



‚úÖ Folders ready


In [None]:
from dataclasses import dataclass
import time

@dataclass
class Message:
    id: int
    created_at: float
    text: str


In [None]:
m1 = Message(id=1, created_at=time.time(), text="hello")
print(m1)


Message(id=1, created_at=1766010888.3186524, text='hello')


In [None]:
# generator
import random
import time
def message_source(duration_sec=5):
    start_time = time.time()
    msg_id = 0

    while time.time() - start_time < duration_sec:
        time.sleep(random.uniform(0.5, 1.5))
        msg_id += 1

        msg = Message(
            id=msg_id,
            created_at=time.time(),
            text=f"hello {msg_id}"
        )

        yield msg

In [None]:
for m in message_source(duration_sec=5):
    print(m)


Message(id=1, created_at=1766010924.3952546, text='hello 1')
Message(id=2, created_at=1766010925.103388, text='hello 2')
Message(id=3, created_at=1766010926.0423326, text='hello 3')
Message(id=4, created_at=1766010927.2737362, text='hello 4')
Message(id=5, created_at=1766010928.7432399, text='hello 5')


In [None]:
# novi Queue aka working line
from queue import Queue

q = Queue()
print("‚úÖ Queue created")


‚úÖ Queue created


In [None]:
# sakupljanje i stavljanje paketa u red ƒçekanja
import time

def minibatch_collector(message_iter, window_sec=3, out_queue=None):
    out_queue = out_queue or Queue()

    batch = []
    window_start = None

    for msg in message_iter:
        if window_start is None:
            window_start = time.time()   # start timer kada prvi message stigne

        batch.append(msg)                # dodavanje novog message-a u tekuci batch

        # ako je vremenski prozor istekao, gurni batch u queue/red ƒçekanja
        if time.time() - window_start >= window_sec:
            out_queue.put(batch)
            print(f"üì¶ Batch created with {len(batch)} messages")

            batch = []
            window_start = None

    # kada poruke prestanu, po≈°aljite preostali batch (ako postoji)
    if batch:
        out_queue.put(batch)
        print(f"üì¶ Final batch created with {len(batch)} messages")

    return out_queue


In [None]:
test_q = minibatch_collector(message_source(duration_sec=8), window_sec=3)

print("Batches waiting in queue:", test_q.qsize())


üì¶ Batch created with 5 messages
üì¶ Final batch created with 4 messages
Batches waiting in queue: 2


In [None]:
first_batch = test_q.get()
print("First batch size:", len(first_batch))
print("First batch contents:", first_batch)


First batch size: 6
First batch contents: [Message(id=1, created_at=1765964401.997906, text='hello 1'), Message(id=2, created_at=1765964402.6318367, text='hello 2'), Message(id=3, created_at=1765964403.1577313, text='hello 3'), Message(id=4, created_at=1765964404.2082257, text='hello 4'), Message(id=5, created_at=1765964404.9139535, text='hello 5'), Message(id=6, created_at=1765964406.1800034, text='hello 6')]


In [None]:
import time

def process_batch(batch):
    # simulira realnu situaciju  (upis u bazu podataka)
    time.sleep(0.2) # simuliranje pona≈°anja u stvarnom svetu
    return len(batch)


In [None]:
import threading

def worker(worker_id, queue):
    while True:
        batch = queue.get()                    # uzmite sledecÃÅu seriju sa linije
        count = process_batch(batch)           # do the work
        print(f"‚úÖ Worker {worker_id} processed {count} messages")
        queue.task_done()                      # obavesiti queue da je ova serija zavr≈°ena  


In [None]:
work_queue = Queue()

for i in range(10):
    t = threading.Thread(target=worker, args=(i+1, work_queue), daemon=True)
    t.start()

print("‚úÖ 10 workers started")


‚úÖ 10 workers started


In [None]:
# osigurava da se sve serije obrade pre nego ≈°to se nastavi
minibatch_collector(message_source(duration_sec=8), window_sec=3, out_queue=work_queue)

work_queue.join()   # saƒçekajte dok se sve grupe ne obrade
print("‚úÖ All batches processed completely!")


üì¶ Batch created with 5 messages
‚úÖ Worker 1 processed 5 messages
üì¶ Final batch created with 3 messages
‚úÖ Worker 2 processed 3 messages
‚úÖ All batches processed completely!


In [None]:
import random
import time

def process_batch(batch):
    time.sleep(0.2) 

    # FAIL nasumiƒçno 20% vremena
    if random.random() < 0.2:
        raise RuntimeError("Random failure happened")

    return len(batch)


In [None]:
def retry_with_backoff(work_fn, max_retries=3, base_delay=0.5):
    """
    work_fn = funkcija koja obavlja posao (mo≈æe da ne uspe)
    max_retries = broj ponovnog poku≈°aja
    base_delay = pocetno vreme cekanja
    """
    attempt = 0

    while True:
        try:
            return work_fn()   # poku≈°aj da se posao obavi
        except Exception as e:
            if attempt >= max_retries:
                # nakon max broja pokusaja, odustani
                raise

            # exponencijalno kalk. delay:
            wait_time = base_delay * (2 ** attempt)
            print(f"   üîÅ Retry in {wait_time:.1f}s (attempt {attempt+1}) because: {e}")

            time.sleep(wait_time)
            attempt += 1


In [None]:
def worker(worker_id, queue):
    while True:
        batch = queue.get()

        try:
            count = retry_with_backoff(lambda: process_batch(batch))
            print(f"‚úÖ Worker {worker_id} processed {count} messages")
        except Exception as e:
            print(f"‚ùå Worker {worker_id} FAILED batch even after retries: {e}")

        queue.task_done()


In [None]:
work_queue = Queue()

# startuj 10 worker-a ponovo (fresh run)
for i in range(10):
    t = threading.Thread(target=worker, args=(i+1, work_queue), daemon=True)
    t.start()

# poslagaj batche-eve u queue/red ƒçekanja
minibatch_collector(message_source(duration_sec=8), window_sec=3, out_queue=work_queue)

work_queue.join()
print("‚úÖ Done with retries enabled!")


üì¶ Batch created with 6 messages
‚úÖ Worker 1 processed 6 messages
üì¶ Final batch created with 3 messages
‚úÖ Worker 2 processed 3 messages
‚úÖ Done with retries enabled!


In [None]:
metrics = {
    "batches_ok": 0,
    "batches_failed": 0,
    "messages_processed": 0
}

print(metrics)


{'batches_ok': 0, 'batches_failed': 0, 'messages_processed': 0}


In [None]:
def worker(worker_id, queue, metrics):
    while True:
        batch = queue.get()

        try:
            count = retry_with_backoff(lambda: process_batch(batch))
            metrics["batches_ok"] += 1
            metrics["messages_processed"] += count
            print(f"‚úÖ Worker {worker_id} processed {count} messages")
        except Exception as e:
            metrics["batches_failed"] += 1
            print(f"‚ùå Worker {worker_id} FAILED batch even after retries: {e}")

        queue.task_done()


In [None]:
def health_check(metrics):
    if metrics["batches_failed"] >= 3:
        return "UNHEALTHY (too many failures)"
    return "HEALTHY"

print("Health right now:", health_check(metrics))


Health right now: HEALTHY


In [None]:
from queue import Queue
import threading

work_queue = Queue()

# startovanje 10 worker-a
for i in range(10):
    t = threading.Thread(target=worker, args=(i+1, work_queue, metrics), daemon=True)
    t.start()

# kreiranje batch-eva i stavljanje u queue/red ƒçekanja
minibatch_collector(message_source(duration_sec=8), window_sec=3, out_queue=work_queue)

# sacekaj dok se ne zavrsi sav posao
work_queue.join()

print("\n‚úÖ FINAL METRICS:", metrics)
print("‚úÖ FINAL HEALTH:", health_check(metrics))


üì¶ Batch created with 4 messages
‚úÖ Worker 1 processed 4 messages
üì¶ Batch created with 5 messages
‚úÖ Worker 2 processed 5 messages

‚úÖ FINAL METRICS: {'batches_ok': 2, 'batches_failed': 0, 'messages_processed': 9}
‚úÖ FINAL HEALTH: HEALTHY


In [None]:
stop_event = threading.Event()
print("‚úÖ stop_event created")


‚úÖ stop_event created


In [None]:
def worker(worker_id, queue, metrics, stop_event):
    while not stop_event.is_set():   # zavrti sve dok stop_event tNIJE setovan
        try:
            batch = queue.get(timeout=0.5)  # sacekaj max 0.5s za posao
        except:
            continue  # nema novog paketa, go back i proveri stop_event ponovo

        try:
            count = retry_with_backoff(lambda: process_batch(batch))
            metrics["batches_ok"] += 1
            metrics["messages_processed"] += count
            print(f"‚úÖ Worker {worker_id} processed {count} messages")
        except Exception as e:
            metrics["batches_failed"] += 1
            print(f"‚ùå Worker {worker_id} FAILED batch even after retries: {e}")

        queue.task_done()

    print(f"üõë Worker {worker_id} stopping gracefully")


In [None]:
# resetovanje pokazatelja / metrika radi novog ranovanja
metrics = {"batches_ok": 0, "batches_failed": 0, "messages_processed": 0}

work_queue = Queue()
stop_event = threading.Event()

# startovanje 10 worker-a
workers = []
for i in range(10):
    t = threading.Thread(target=worker, args=(i+1, work_queue, metrics, stop_event), daemon=True)
    t.start()
    workers.append(t)

# proizvodnja "paketa"
minibatch_collector(message_source(duration_sec=8), window_sec=3, out_queue=work_queue)

# cekamo dok svi paketi u redu ne zavrse obradu 
work_queue.join()

# stopiranje worker-a 
stop_event.set()

# omogucavanje printanja finalnih poruka nakon zaustavljanja
time.sleep(1)

print("\n‚úÖ FINAL METRICS:", metrics)
print("‚úÖ FINAL HEALTH:", health_check(metrics))


üì¶ Batch created with 4 messages
   üîÅ Retry in 0.5s (attempt 1) because: Random failure happened
‚úÖ Worker 1 processed 4 messages
üì¶ Batch created with 5 messages
‚úÖ Worker 2 processed 5 messages
üõë Worker 1 stopping gracefully
üõë Worker 3 stopping gracefully
üõë Worker 8 stopping gracefully
üõë Worker 10 stopping gracefully
üõë Worker 9 stopping gracefully
üõë Worker 4 stopping gracefully
üõë Worker 5 stopping gracefully
üõë Worker 6 stopping gracefully
üõë Worker 7 stopping gracefully
üõë Worker 2 stopping gracefully

‚úÖ FINAL METRICS: {'batches_ok': 2, 'batches_failed': 0, 'messages_processed': 9}
‚úÖ FINAL HEALTH: HEALTHY


In [None]:
import os
import random

def create_fake_files(n=10, min_kb=50, max_kb=500):
    paths = []

    for i in range(n):
        size_kb = random.randint(min_kb, max_kb)   # random veicina u KB
        path = f"data/incoming_files/file_{i}.bin"

        with open(path, "wb") as f:                # open file radi upisa (binary)
            f.write(os.urandom(size_kb * 1024))    # upisivanje random bytes

        paths.append(path)

    return paths

files = create_fake_files(n=5)
files


['data/incoming_files/file_0.bin',
 'data/incoming_files/file_1.bin',
 'data/incoming_files/file_2.bin',
 'data/incoming_files/file_3.bin',
 'data/incoming_files/file_4.bin']

In [None]:
def validate_file(path):
    if not os.path.exists(path):
        return False

    if os.path.getsize(path) == 0:
        return False

    return True

for p in files:
    print(p, "VALID" if validate_file(p) else "INVALID")


data/incoming_files/file_0.bin VALID
data/incoming_files/file_1.bin VALID
data/incoming_files/file_2.bin VALID
data/incoming_files/file_3.bin VALID
data/incoming_files/file_4.bin VALID


In [None]:
import hashlib

def sha256_file(path):
    h = hashlib.sha256()

    with open(path, "rb") as f:
        while True:
            chunk = f.read(8192)
            if not chunk:
                break
            h.update(chunk)

    return h.hexdigest()

for p in files:
    print(p, sha256_file(p)[:12])


data/incoming_files/file_0.bin 12dab7b10e03
data/incoming_files/file_1.bin 7e8a30602f5a
data/incoming_files/file_2.bin 453c43763843
data/incoming_files/file_3.bin 8218325d22ef
data/incoming_files/file_4.bin c560fea4285b


In [None]:
import shutil

def copy_and_verify(path):
    if not validate_file(path):
        raise ValueError("Invalid file")

    before = sha256_file(path)

    out_path = "data/processed/" + os.path.basename(path)
    shutil.copyfile(path, out_path)

    after = sha256_file(out_path)

    if before != after:
        raise RuntimeError("Checksum mismatch! File got corrupted.")

    return out_path

for p in files:
    out = copy_and_verify(p)
    print("‚úÖ Copied and verified:", out)


‚úÖ Copied and verified: data/processed/file_0.bin
‚úÖ Copied and verified: data/processed/file_1.bin
‚úÖ Copied and verified: data/processed/file_2.bin
‚úÖ Copied and verified: data/processed/file_3.bin
‚úÖ Copied and verified: data/processed/file_4.bin


In [None]:
def file_size_bytes(path):
    return os.path.getsize(path)

for p in files:
    print(p, file_size_bytes(p), "bytes")


data/incoming_files/file_0.bin 197632 bytes
data/incoming_files/file_1.bin 425984 bytes
data/incoming_files/file_2.bin 140288 bytes
data/incoming_files/file_3.bin 459776 bytes
data/incoming_files/file_4.bin 189440 bytes


In [None]:
TARGET_BUCKET_BYTES = 10 * 1024 * 1024  # 10MB
print("Target bucket size:", TARGET_BUCKET_BYTES, "bytes")


Target bucket size: 10485760 bytes


In [None]:
def first_fit_buckets(paths, target_bytes):
    buckets = []  # placeholder za bucket-e (svaki baket kao lista datoteka)

    for path in paths:
        placed = False

        for bucket in buckets:
            current_size = sum(file_size_bytes(p) for p in bucket)
            if current_size + file_size_bytes(path) <= target_bytes:
                bucket.append(path)
                placed = True
                break

        if not placed:
            buckets.append([path])

    return buckets

In [None]:
def ffd_buckets(paths, target_bytes):
    sorted_paths = sorted(paths, key=file_size_bytes, reverse=True)
    return first_fit_buckets(sorted_paths, target_bytes)

In [None]:
def bfd_buckets(paths, target_bytes):
    sorted_paths = sorted(paths, key=file_size_bytes, reverse=True)
    buckets = []

    for path in sorted_paths:
        best_index = None
        best_remaining = None
        size_p = file_size_bytes(path)

        for i, bucket in enumerate(buckets):
            current_size = sum(file_size_bytes(p) for p in bucket)
            if current_size + size_p <= target_bytes:
                remaining = target_bytes - (current_size + size_p)
                if best_remaining is None or remaining < best_remaining:
                    best_remaining = remaining
                    best_index = i

        if best_index is None:
            buckets.append([path])
        else:
            buckets[best_index].append(path)

    return buckets

In [None]:
def print_bucket_summary(buckets, name):
    print("\n---", name, "---")
    for i, bucket in enumerate(buckets, start=1):
        total = sum(file_size_bytes(p) for p in bucket)
        print(f"Bucket {i}: {len(bucket)} files, total={total} bytes")

ff = first_fit_buckets(files, TARGET_BUCKET_BYTES)
ffd = ffd_buckets(files, TARGET_BUCKET_BYTES)
bfd = bfd_buckets(files, TARGET_BUCKET_BYTES)

print_bucket_summary(ff, "First Fit")
print_bucket_summary(ffd, "FFD (First Fit Decreasing)")
print_bucket_summary(bfd, "BFD (Best Fit Decreasing)")



--- First Fit ---
Bucket 1: 5 files, total=1413120 bytes

--- FFD (First Fit Decreasing) ---
Bucket 1: 5 files, total=1413120 bytes

--- BFD (Best Fit Decreasing) ---
Bucket 1: 5 files, total=1413120 bytes


In [None]:
import shutil
import os

def process_one_bucket(bucket):
    """
    bucket = lista putanja fajlova za obradu
    Kopiramo svaki file u data/processed/ i radimo verifikaciju checksum-a
    """
    processed_count = 0

    for path in bucket:
        if not validate_file(path):
            print("‚ùå Invalid file skipped:", path)
            continue

        copy_and_verify(path)
        processed_count += 1

    return processed_count


In [None]:
from queue import Queue

bucket_queue = Queue()

# biramo strategiju: ff / ffd / bfd
buckets_to_process = bfd  # koristeci BFD bucket-e koje smo kreirali ranije 

for bucket in buckets_to_process:
    bucket_queue.put(bucket)

print("‚úÖ Buckets pushed into queue:", bucket_queue.qsize())


‚úÖ Buckets pushed into queue: 1


In [None]:
import threading

def bucket_worker(worker_id, queue, metrics, stop_event):
    while not stop_event.is_set():
        try:
            bucket = queue.get(timeout=0.5)
        except:
            continue  # ukoliko nema bucket-a, provera stop_event uslova

        try:
            count = process_one_bucket(bucket)
            metrics["buckets_ok"] += 1
            metrics["files_processed"] += count
            print(f"‚úÖ BucketWorker {worker_id} processed bucket with {count} files")
        except Exception as e:
            metrics["buckets_failed"] += 1
            print(f"‚ùå BucketWorker {worker_id} failed bucket: {e}")

        queue.task_done()

    print(f"üõë BucketWorker {worker_id} stopping gracefully")


In [None]:
# file metrics resetovanje
file_metrics = {
    "buckets_ok": 0,
    "buckets_failed": 0,
    "files_processed": 0
}

stop_event_files = threading.Event()

for i in range(10):
    t = threading.Thread(
        target=bucket_worker,
        args=(i+1, bucket_queue, file_metrics, stop_event_files),
        daemon=True
    )
    t.start()

bucket_queue.join()

stop_event_files.set()
time.sleep(1)

print("\n‚úÖ FILE PIPELINE METRICS:", file_metrics)


‚úÖ BucketWorker 1 processed bucket with 5 files
üõë BucketWorker 2 stopping gracefully
üõë BucketWorker 3 stopping gracefully
üõë BucketWorker 4 stopping gracefully
üõë BucketWorker 5 stopping gracefully
üõë BucketWorker 6 stopping gracefully
üõë BucketWorker 7 stopping gracefully
üõë BucketWorker 9 stopping gracefully
üõë BucketWorker 10 stopping gracefully
üõë BucketWorker 8 stopping gracefully
üõë BucketWorker 1 stopping gracefully

‚úÖ FILE PIPELINE METRICS: {'buckets_ok': 1, 'buckets_failed': 0, 'files_processed': 5}
