# Tutorial 10: Production — Deploy, Monitor, and Report

Deploy a production-style environment, start monitoring, generate a health report, and simulate alerts. Works in Connected Mode (uses API endpoints) or Demo Mode (simulates realistic values).

In [None]:
# Lightweight install cell
import sys, subprocess, pkgutil
for p in ['numpy','matplotlib','seaborn','requests','psutil']:
    if pkgutil.find_loader(p) is None:
        subprocess.check_call([sys.executable,'-m','pip','install',p])
print('✅ Dependencies ready')

In [None]:
# Setup
import time, json, threading, requests, psutil, numpy as np
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import matplotlib.pyplot as plt, seaborn as sns
sns.set_theme(style='whitegrid')
API='http://127.0.0.1:7860'
def server_ok():
    try: return requests.get(f'{API}/health', timeout=2).status_code==200
    except: return False
SERVER=server_ok(); print('📡 Tensorus:', '✅ Connected' if SERVER else '⚠️ Demo Mode')

class Env(Enum): DEV='development'; STAGE='staging'; PROD='production'
class Sev(Enum): INFO='info'; WARN='warning'; ERR='error'; CRIT='critical'

@dataclass
class Metrics:
    ts: datetime; cpu: float; mem: float; disk: float; net: float; conns:int; rps: float; err: float; rt_ms: float; thpt: float

@dataclass
class Security:
    auth: bool=True; authz: bool=True; at_rest: bool=True; in_transit: bool=True; audit: bool=True; rate_limit: bool=True; ip_whitelist: list=field(default_factory=list); session_timeout:int=3600

class Monitor:
    def __init__(self, api=API):
        self.api=api; self.alive=server_ok(); self.hist=[]; self.alerts=[]; self.active=False
        self.thres={'cpu':80.0,'mem':85.0,'disk':90.0,'err':5.0,'rt':1000.0}
    def collect(self):
        cpu=psutil.cpu_percent(interval=0.2); mem=psutil.virtual_memory().percent; disk=psutil.disk_usage('/').percent; net=psutil.net_io_counters().bytes_sent+psutil.net_io_counters().bytes_recv
        if self.alive:
            try:
                r=requests.get(f'{self.api}/api/v1/metrics', timeout=2).json()
                conns=r.get('active_connections',25); rps=r.get('request_rate',120.0); err=r.get('error_rate',0.5); rt=r.get('avg_response_time',45.0); th=r.get('throughput',1250.0)
            except Exception:
                conns, rps, err, rt, th = 25, 120.0, 0.5, 45.0, 1250.0
        else:
            conns, rps, err, rt, th = 0, 0.0, 0.0, 0.0, 0.0
        m=Metrics(datetime.now(), cpu, mem, disk, net, conns, rps, err, rt, th); self.hist.append(m); self._alerts(m); return m
    def _alerts(self, m:Metrics):
        def add(sev,msg): self.alerts.append({'ts':datetime.now(),'sev':sev.value,'msg':msg})
        if m.cpu>self.thres['cpu']: add(Sev.WARN, f'High CPU {m.cpu:.1f}%')
        if m.mem>self.thres['mem']: add(Sev.WARN, f'High memory {m.mem:.1f}%')
        if m.disk>self.thres['disk']: add(Sev.ERR, f'High disk {m.disk:.1f}%')
        if m.err>self.thres['err']: add(Sev.ERR, f'High error rate {m.err:.2f}%')
        if m.rt_ms>self.thres['rt']: add(Sev.WARN, f'Slow response {m.rt_ms:.1f}ms')
    def start(self, interval=2.0, samples=5):
        self.active=True
        def loop():
            for _ in range(samples):
                if not self.active: break
                m=self.collect(); print(f'📈 CPU {m.cpu:.1f}% | MEM {m.mem:.1f}% | RPS {m.rps:.0f}'); time.sleep(interval)
            self.active=False
        t=threading.Thread(target=loop, daemon=True); t.start(); return t
    def stop(self): self.active=False
    def report(self):
        if not self.hist: return {'status':'no_data'}
        tail=self.hist[-min(10,len(self.hist)):]
        avg=lambda s: float(np.mean([getattr(x,s) for x in tail]))
        status='healthy'
        if avg('cpu')>90 or avg('mem')>95 or avg('err')>10: status='critical'
        elif avg('cpu')>80 or avg('mem')>85 or avg('err')>5: status='warning'
        return {'status':status,'metrics':{'cpu':avg('cpu'),'mem':avg('mem'),'rt_ms':avg('rt_ms'),'err':avg('err')}, 'alerts_last_hour': len(self.alerts)}

class Deployment:
    def __init__(self, env=Env.PROD):
        self.env=env; self.monitor=Monitor(); self.sec=Security()
        self.cfg={Env.DEV:{'replicas':1,'cpu':'1000m','mem':'2Gi'}, Env.STAGE:{'replicas':2,'cpu':'2000m','mem':'4Gi'}, Env.PROD:{'replicas':5,'cpu':'4000m','mem':'8Gi','autoscale':True}}[env]
    def deploy(self):
        steps=['Validate config','Setup security','Provision infra','Deploy containers','Configure LB','Setup monitoring','Health checks','Enable traffic']
        for i,s in enumerate(steps,1): print(f'{i}/{len(steps)} {s}...'); time.sleep(0.3)
        print('✅ Deployment completed'); return {'status':'deployed','env':self.env.value,'time':datetime.now().isoformat(),'cfg':self.cfg}
    def status(self): return {'env':self.env.value,'security':self.sec.__dict__,'cfg':self.cfg,'health': self.monitor.report()}


## Step 1 — Deploy

In [None]:
deploy=Deployment(Env.PROD)
summary=deploy.deploy(); summary

## Step 2 — Start Monitoring (short run)

In [None]:
t=deploy.monitor.start(interval=1.0, samples=6)
t.join()
report=deploy.monitor.report(); report

## Step 3 — Visualize CPU/Memory

In [None]:
cpus=[m.cpu for m in deploy.monitor.hist]; mems=[m.mem for m in deploy.monitor.hist]
plt.figure(figsize=(8,3)); plt.plot(cpus,'-o',label='CPU%'); plt.plot(mems,'-o',label='MEM%'); plt.legend(); plt.title('Production Metrics'); plt.ylabel('%'); plt.show()

## Step 4 — Simulate Alerts (lower thresholds temporarily)

In [None]:
old=deploy.monitor.thres.copy(); deploy.monitor.thres.update({'cpu':10.0,'mem':10.0})
_=deploy.monitor.collect()
deploy.monitor.thres=old
deploy.monitor.alerts[-3:]  # show most recent alerts

## Step 5 — Status Snapshot

In [None]:
deploy.status()