# Elasticity Simulation (No Comments)
Run this in Google Colab.

In [None]:

import threading, queue, time, random
from datetime import datetime, timedelta
import statistics

INITIAL_WORKERS=1
MAX_WORKERS=8
MIN_WORKERS=1
SCALE_OUT_THRESHOLD=6
SCALE_IN_THRESHOLD=2
SCALE_OUT_STEP=1
SCALE_IN_STEP=1
JOB_BATCH_STEPS=5
BATCH_INTERVAL=3
JOBS_PER_BATCH_MIN=3
JOBS_PER_BATCH_MAX=10
WORKER_PROCESSING_TIME_MIN=0.2
WORKER_PROCESSING_TIME_MAX=0.8
MONITOR_INTERVAL=1.0

job_queue=queue.Queue()
stop_all=threading.Event()
workers_lock=threading.Lock()
worker_threads={}
worker_id_seq=0

stats_lock=threading.Lock()
jobs_produced=0
jobs_processed=0
job_processing_times=[]
max_worker_count_seen=0

def now(): return datetime.now().strftime("%H:%M:%S")

def worker_loop(worker_id, stop_event):
    global jobs_processed, job_processing_times
    print(f"[{now()}] Worker-{worker_id} started")
    while not stop_event.is_set():
        try: job=job_queue.get(timeout=0.8)
        except: continue
        start=time.time()
        time.sleep(random.uniform(WORKER_PROCESSING_TIME_MIN,WORKER_PROCESSING_TIME_MAX))
        elapsed=time.time()-start
        with stats_lock:
            jobs_processed+=1
            job_processing_times.append(elapsed)
        print(f"[{now()}] Worker-{worker_id} processed {job} ({elapsed:.2f}s) Q={job_queue.qsize()}")
        job_queue.task_done()
    print(f"[{now()}] Worker-{worker_id} stopping")

def add_worker():
    global worker_id_seq, worker_threads, max_worker_count_seen
    with workers_lock:
        worker_id_seq+=1
        wid=worker_id_seq
        stop_evt=threading.Event()
        t=threading.Thread(target=worker_loop,args=(wid,stop_evt),daemon=True)
        worker_threads[wid]={"thread":t,"stop_event":stop_evt,"started_at":datetime.now()}
        t.start()
        c=len(worker_threads)
        if c>max_worker_count_seen: max_worker_count_seen=c
        print(f"[{now()}] Added Worker-{wid} (count={c})")

def remove_one_worker():
    with workers_lock:
        if len(worker_threads)<=MIN_WORKERS: return False
        oldest=min(worker_threads.items(),key=lambda x:x[1]["started_at"])[0]
        worker_threads[oldest]["stop_event"].set()
        def cleanup(w):
            thr=worker_threads[w]["thread"]
            thr.join(timeout=2)
            with workers_lock:
                if w in worker_threads: del worker_threads[w]
            print(f"[{now()}] Removed Worker-{w} (count={len(worker_threads)})")
        threading.Thread(target=cleanup,args=(oldest,),daemon=True).start()
        return True

def producer_simulation(steps=JOB_BATCH_STEPS,interval=BATCH_INTERVAL):
    global jobs_produced
    print(f"[{now()}] Producer started")
    for s in range(1,steps+1):
        n=random.randint(JOBS_PER_BATCH_MIN+s,JOBS_PER_BATCH_MAX+s*2)
        for j in range(n): job_queue.put(f"S{s}-J{j+1}")
        with stats_lock: jobs_produced+=n
        print(f"[{now()}] Step{s} added {n} jobs Q={job_queue.qsize()}")
        time.sleep(interval)
    print(f"[{now()}] Producer finished")

def monitor():
    global max_worker_count_seen
    low=0
    print(f"[{now()}] Monitor started")
    while not stop_all.is_set():
        q=job_queue.qsize()
        w=len(worker_threads)
        if q>SCALE_OUT_THRESHOLD and w<MAX_WORKERS:
            for _ in range(min(SCALE_OUT_STEP,MAX_WORKERS-w)): add_worker()
            low=0
        elif q<=SCALE_IN_THRESHOLD and w>MIN_WORKERS:
            low+=1
            if low>=3:
                for _ in range(min(SCALE_IN_STEP,w-MIN_WORKERS)): remove_one_worker()
                low=0
        else: low=0
        with stats_lock: p=jobs_processed
        print(f"[{now()}] Monitor Q={q} W={w} P={p}")
        time.sleep(MONITOR_INTERVAL)
    print(f"[{now()}] Monitor stopping")

def run_elasticity_demo(run_duration=None):
    global jobs_produced, jobs_processed
    start=datetime.now()
    print(f"[{now()}] Elasticity demo starting")
    for _ in range(INITIAL_WORKERS): add_worker()
    m=threading.Thread(target=monitor,daemon=True); m.start()
    p=threading.Thread(target=producer_simulation,daemon=True); p.start()
    if run_duration: deadline=datetime.now()+timedelta(seconds=run_duration)
    else: deadline=None
    p.join()
    print(f"[{now()}] Producer done, waiting queue drain...")
    while True:
        if deadline and datetime.now()>=deadline: break
        if job_queue.empty():
            time.sleep(1)
            if job_queue.empty(): break
        time.sleep(0.5)
    stop_all.set()
    with workers_lock: wids=list(worker_threads.keys())
    for w in wids: worker_threads[w]["stop_event"].set()
    with workers_lock:
        for w,info in list(worker_threads.items()):
            info["thread"].join(timeout=2)
            if w in worker_threads: del worker_threads[w]
    d=(datetime.now()-start).total_seconds()
    with stats_lock:
        prod=jobs_produced; proc=jobs_processed; times=list(job_processing_times)
    print("\n=== Summary ===")
    print(f"Produced={prod}, Processed={proc}")
    if times:
        print(f"Avg={statistics.mean(times):.3f}s Std={statistics.pstdev(times):.3f}s")
    print(f"MaxWorkers={max_worker_count_seen}")
    print("================")

run_elasticity_demo()
