# DATA PREPARATION

In [13]:
!pip install pyzbar opencv-python qrcode[pil] numpy matplotlib scikit-image pytesseract python-whois tldextract



In [12]:
!apt-get update && apt-get install -y zbar-tools

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building depe

In [14]:
# QR CONTENT AND URL EXTRACTION

import cv2
import numpy as np
from pyzbar.pyzbar import decode
import pytesseract
from PIL import Image
import re
import matplotlib.pyplot as plt


def extract_qr_content(image_path):
    """
    Robust QR code content extraction with multiple fallbacks
    Returns:
    - content (str): Extracted text/URL if successful
    - None: If extraction fails
    """
    # Method 1: Try pyzbar with various pre-processing techniques
    img = cv2.imread(image_path)

    if img is None:
        print(f"Error: Could not load image from {image_path}")
        return None

    # Try different pre-processing combinations
    processing_combinations = [
        {'gray': True, 'blur': False, 'thresh': False},
        {'gray': True, 'blur': True, 'thresh': False},
        {'gray': True, 'blur': True, 'thresh': True},
        {'gray': False, 'blur': False, 'thresh': False}
    ]

    for params in processing_combinations:
        processed = img.copy()

        # Convert to grayscale
        if params['gray'] and len(processed.shape) == 3:
            processed = cv2.cvtColor(processed, cv2.COLOR_BGR2GRAY)

        # Apply blur
        if params['blur']:
            processed = cv2.GaussianBlur(processed, (3, 3), 0)

        # Apply threshold
        if params['thresh']:
            _, processed = cv2.threshold(processed, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

        # Try decoding
        decoded = decode(processed)
        if decoded:
            try:
                content = decoded[0].data.decode('utf-8')
                if content.strip():
                    return content
            except UnicodeDecodeError:
                try:
                    content = decoded[0].data.decode('latin-1')
                    if content.strip():
                        return content
                except:
                    continue

    # Method 2: If pyzbar fails, try OpenCV's QRCodeDetector
    qr_detector = cv2.QRCodeDetector()
    retval, decoded_info, points, straight_qrcode = qr_detector.detectAndDecodeMulti(img)

    if retval and decoded_info:
        for content in decoded_info:
            if content.strip():
                return content

    # Method 3: As last resort, try OCR (pytesseract)
    try:
        pytesseract.pytesseract.tesseract_cmd = r'/usr/bin/tesseract'
        content = pytesseract.image_to_string(Image.open(image_path))
        if content.strip():
            # Try to find URL patterns in OCR output
            url_match = re.search(r'(https?://\S+|www\.\S+)', content)
            if url_match:
                return url_match.group(0)
            return content
    except:
        pass

    return None


def extract_clean_url(qr_data):

    if pd.isna(qr_data) or not isinstance(qr_data, str):
        return None

    # Remove the leading number and whitespace
    url_match = re.search(r'(https?://\S+)', qr_data)

    if url_match:
        url = url_match.group(1)

        url = url.split(' ')[0]
        url = url.split('\n')[0]
        url = url.strip()

        if '...' in url:
            url = url.replace('...', '')

        return url

    return None

In [15]:
# URL DATA EXTRACTION

import re
import numpy as np
from urllib.parse import urlparse, parse_qs
import ssl
import socket
from datetime import datetime
import whois
from cryptography import x509
from cryptography.hazmat.backends import default_backend
import tldextract
import idna

from datetime import datetime, timezone

def check_ssl_cert(url):
    try:
        parsed_url = urlparse(url)
        host = parsed_url.hostname
        port = 443
        context = ssl.create_default_context()

        with socket.create_connection((host, port), timeout=5) as sock:
            with context.wrap_socket(sock, server_hostname=host) as ssock:
                cert_der = ssock.getpeercert(binary_form=True)
                cert = x509.load_der_x509_certificate(cert_der, default_backend())

                ssl_valid = 1
                ssl_self_signed = 1 if cert.issuer == cert.subject else 0

                # Fixed datetime comparison
                now = datetime.now(timezone.utc)
                validity = cert.not_valid_after_utc - datetime.now(timezone.utc)
                ssl_days_left = validity.days if validity.days > 0 else 0

                return ssl_valid, ssl_self_signed, ssl_days_left
    except Exception as e:
        return 0, 0, 0

def get_whois_features(domain):
    features = {
        'domain_age_days': -1,
        'domain_expiry_days': -1,
        'domain_registered': 0,
        'domain_country': '',
        'has_whois_info': 0,
        'registrar': '',
        'name_servers_count': 0
    }

    try:
        info = whois.whois(domain)
        features['has_whois_info'] = 1

        # Handle dates
        creation_date = info.creation_date
        expiration_date = info.expiration_date

        if isinstance(creation_date, list):
            creation_date = creation_date[0]
        if isinstance(expiration_date, list):
            expiration_date = expiration_date[0]

        if creation_date and isinstance(creation_date, datetime):
            features['domain_age_days'] = (datetime.utcnow() - creation_date).days
        if expiration_date and isinstance(expiration_date, datetime):
            features['domain_expiry_days'] = (expiration_date - datetime.utcnow()).days

        # Additional WHOIS features
        features['domain_registered'] = 1 if creation_date else 0
        features['registrar'] = info.registrar if info.registrar else ''
        features['name_servers_count'] = len(info.name_servers) if info.name_servers else 0
        features['domain_country'] = info.country if info.country else ''

    except Exception:
        pass

    return features

def get_tld_features(domain):
    extracted = tldextract.extract(domain)
    return {
        'subdomain_count': len(extracted.subdomain.split('.')),
        'tld': extracted.suffix,
        'domain_part_count': len(extracted.subdomain.split('.')) + 2  # subdomain + domain + suffix
    }

def is_shortener(url):
    shorteners = {'bit.ly', 'goo.gl', 'tinyurl.com', 'ow.ly', 't.co', 'is.gd'}
    return 1 if any(s in url for s in shorteners) else 0

def entropy(s):
    p, lns = np.unique(list(s), return_counts=True)
    return -np.sum((lns/lns.sum()) * np.log2(lns/lns.sum()))

def extract_url_features(URL):
    features = {}

    # Basic URL features
    features['URL_length'] = len(URL)
    features['num_dots'] = URL.count('.')
    features['num_hyphens'] = URL.count('-')
    features['num_slashes'] = URL.count('/')
    features['num_question_marks'] = URL.count('?')
    features['num_equals'] = URL.count('=')
    features['num_at'] = URL.count('@')
    features['has_ip'] = 1 if re.match(r"^(https?:\/\/)?(\d{1,3}\.){3}\d{1,3}", URL) else 0

    # Parsed components
    parsed = urlparse(URL)
    domain = parsed.netloc
    path = parsed.path
    query = parsed.query

    # Domain analysis
    features['domain_length'] = len(domain)
    tld_features = get_tld_features(domain)
    features.update(tld_features)
    features['is_idn'] = 1 if 'xn--' in domain else 0  # Internationalized domain name

    # Path analysis
    features['path_length'] = len(path)
    features['path_depth'] = path.count('/')
    features['file_extension'] = 1 if '.' in path.split('/')[-1] else 0

    # Query parameters analysis
    params = parse_qs(query)
    features['num_parameters'] = len(params)
    sensitive_params = {'password', 'login', 'user', 'creditcard'}
    features['sensitive_params'] = sum(1 for p in params if p.lower() in sensitive_params)

    # Security features
    features['uses_https'] = 1 if parsed.scheme == 'https' else 0
    ssl_valid, ssl_self_signed, ssl_days_left = check_ssl_cert(URL)
    features.update({
        'ssl_cert_valid': ssl_valid,
        'ssl_self_signed': ssl_self_signed,
        'ssl_days_left': ssl_days_left
    })

    # Content features
    features['entropy'] = entropy(URL)
    features['is_shortened'] = is_shortener(URL)
    suspicious_keywords = ['login', 'verify', 'secure', 'account', 'update',
                         'bank', 'paypal', 'signin', 'confirm', 'password']
    features['suspicious_keywords'] = sum(1 for word in suspicious_keywords if word in URL.lower())

    # WHOIS features
    whois_data = get_whois_features(domain)
    features.update(whois_data)

    # Additional features
    features['non_standard_port'] = 1 if parsed.port not in [None, 80, 443] else 0
    features['hex_chars'] = len(re.findall(r'%[0-9a-fA-F]{2}', URL))
    features['redirects'] = len(re.findall('//', URL)) - 1

    # IP and network features
    try:
        ip = socket.gethostbyname(domain)
        features['ip_private'] = 1 if ip.startswith(('10.', '172.', '192.168.')) else 0
    except:
        features['ip_private'] = 0

    return features

In [None]:
# URL FEATURES CSV

import os
import pandas as pd
from tqdm import tqdm
from google.colab import drive

drive.mount('/content/drive')

def process_dataset(base_path):
    all_features = []

    for version in range(1, 8):
        version_path = os.path.join(base_path, f'version_{version}')

        for label in ['benign', 'malicious']:
            folder_path = os.path.join(version_path, label)

            for img_file in tqdm(os.listdir(folder_path), desc=f'Version {version} {label}'):
                if img_file.endswith(('.png', '.jpg', '.jpeg')):
                    img_path = os.path.join(folder_path, img_file)

                    # Extract features
                    features = {'version': version, 'label': 0 if label == 'benign' else 1}


                    # If QR contains URL, extract URL features
                    qr_content = extract_qr_content(img_path)
                    cleaned_url = extract_clean_url(qr_content)
                    if cleaned_url:
                        features.update(extract_url_features(cleaned_url))

                    all_features.append(features)

    return pd.DataFrame(all_features)

# Define paths
base_path = '/content/drive/MyDrive/CyberGuard/qrCodes'
df = process_dataset(base_path)

# Save features
df.to_csv('qr_code_url_features.csv', index=False)

Mounted at /content/drive


Version 1 benign:  96%|█████████▌| 48/50 [01:04<00:02,  1.02s/it]2025-06-02 05:26:56,949 - whois.whois - ERROR - Error trying to connect to socket: closing socket - timed out
ERROR:whois.whois:Error trying to connect to socket: closing socket - timed out
Version 1 benign: 100%|██████████| 50/50 [01:16<00:00,  1.53s/it]
Version 1 malicious:  36%|███▌      | 18/50 [00:57<00:59,  1.85s/it]2025-06-02 05:27:57,048 - whois.whois - ERROR - Error trying to connect to socket: closing socket - [Errno 111] Connection refused
ERROR:whois.whois:Error trying to connect to socket: closing socket - [Errno 111] Connection refused
Version 1 malicious:  42%|████▏     | 21/50 [01:02<00:48,  1.68s/it]2025-06-02 05:28:11,467 - whois.whois - ERROR - Error trying to connect to socket: closing socket - timed out
ERROR:whois.whois:Error trying to connect to socket: closing socket - timed out
Version 1 malicious:  88%|████████▊ | 44/50 [02:03<00:08,  1.41s/it]2025-06-02 05:29:12,200 - whois.whois - ERROR - Error

In [16]:
print(len(df.columns))
print(df.columns)

38
Index(['version', 'label', 'URL_length', 'num_dots', 'num_hyphens',
       'num_slashes', 'num_question_marks', 'num_equals', 'num_at', 'has_ip',
       'domain_length', 'subdomain_count', 'tld', 'domain_part_count',
       'is_idn', 'path_length', 'path_depth', 'file_extension',
       'num_parameters', 'sensitive_params', 'uses_https', 'ssl_cert_valid',
       'ssl_self_signed', 'ssl_days_left', 'entropy', 'is_shortened',
       'suspicious_keywords', 'domain_age_days', 'domain_expiry_days',
       'domain_registered', 'domain_country', 'has_whois_info', 'registrar',
       'name_servers_count', 'non_standard_port', 'hex_chars', 'redirects',
       'ip_private'],
      dtype='object')


In [17]:
# LOADING IMAGES FOR CNN

import os
import cv2
import numpy as np

def load_qr_images(base_path, img_size=(128, 128)):
    images = []
    labels = []

    for version in range(1, 8):  # version1 to version7
        for label_name, label_value in [('benign', 0), ('malicious', 1)]:
            folder_path = os.path.join(base_path, f"version_{version}", label_name)
            for filename in os.listdir(folder_path):
                if filename.endswith(".png"):
                    img_path = os.path.join(folder_path, filename)
                    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
                    if img is None:
                        continue  # skip unreadable images
                    img = cv2.resize(img, img_size)
                    img = img.astype("float32") / 255.0
                    images.append(np.expand_dims(img, axis=-1))  # shape (H, W, 1)
                    labels.append(label_value)

    X = np.array(images)
    y = np.array(labels)
    return X, y


In [18]:
from google.colab import drive

drive.mount('/content/drive')
base_path = '/content/drive/MyDrive/CyberGuard/qrCodes'

X_img, y_img = load_qr_images(base_path)

print("Images shape:", X_img.shape)
print("Labels shape:", y_img.shape)
print("Class distribution:", np.bincount(y_img))


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Images shape: (700, 128, 128, 1)
Labels shape: (700,)
Class distribution: [350 350]


# DATA PRE-PROCESSING

In [19]:
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, FunctionTransformer
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif

def cast_to_int(X):
    """Casts input to integer type."""
    return X.astype(int)

# 1. Define feature groups
numeric_features = [
    'URL_length', 'num_dots', 'num_hyphens', 'num_slashes',
    'num_question_marks', 'num_equals', 'num_at', 'domain_length',
    'subdomain_count', 'domain_part_count', 'path_length', 'path_depth',
    'num_parameters', 'ssl_days_left', 'entropy',
    'domain_age_days', 'domain_expiry_days', 'name_servers_count', 'redirects'
]

boolean_features = [
    'has_ip', 'is_idn', 'uses_https', 'ssl_cert_valid', 'ssl_self_signed',
    'is_shortened', 'domain_registered', 'has_whois_info', 'non_standard_port',
    'hex_chars', 'ip_private'
]

categorical_features = [
    'tld', 'file_extension', 'domain_country'
]

# 2. Build individual pipelines
numeric_pipeline = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

boolean_pipeline = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value=0)),
    ('cast', FunctionTransformer(func=cast_to_int))
])

categorical_pipeline = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])

# 3. Combine into a ColumnTransformer
preprocessor = ColumnTransformer(transformers=[
    ('num', numeric_pipeline, numeric_features),
    ('bool', boolean_pipeline, boolean_features),
    ('cat', categorical_pipeline, categorical_features)
])

# 4. Fit and transform
df = pd.read_csv('qr_code_url_features.csv')
X = df.drop(columns=['label'])
y = df['label']
X_preprocessed = preprocessor.fit_transform(X)

# 5. PCA for dimensionality reduction
# Keep components that explain 95% variance
pca = PCA(n_components=0.95, random_state=42)
X_pca = pca.fit_transform(X_preprocessed)

# 6. SelectKBest for supervised feature selection
# Using ANOVA F-test
kbest = SelectKBest(score_func=f_classif, k=10)
X_kbest = kbest.fit_transform(X_preprocessed, y)

# 7. Retrieving feature names after ColumnTransformer + OneHotEncoder
ohe = preprocessor.named_transformers_['cat']['onehot']
cat_names = ohe.get_feature_names_out(categorical_features)
all_feature_names = numeric_features + boolean_features + list(cat_names)
selected_indices = kbest.get_support(indices=True)
selected_feature_names = [all_feature_names[i] for i in selected_indices]
print('Selected features:', selected_feature_names)


X_url = X_kbest

Selected features: ['domain_length', 'ssl_days_left', 'domain_age_days', 'name_servers_count', 'uses_https', 'ssl_cert_valid', 'domain_registered', 'has_whois_info', 'domain_country_US', 'domain_country_missing']


  f = msb / msw


# MODEL

In [20]:
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

# shuffle
X_img, X_url, y = shuffle(X_img, X_url, y, random_state=42)

# train-test split
X_img_train, X_img_test, X_url_train, X_url_test, y_train, y_test = train_test_split(
    X_img, X_url, y,
    test_size=0.2,
    stratify=y,
    random_state=42
)

# scaling
scaler = StandardScaler()
X_url_train = scaler.fit_transform(X_url_train)
X_url_test  = scaler.transform(X_url_test)


# Define the CNN branch for image data
image_input = Input(shape=(128, 128, 1), name='qr_image')
x = Conv2D(32, (3, 3), activation='relu')(image_input)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Flatten()(x)
cnn_output = Dense(64, activation='relu')(x)

# Define the MLP branch for URL features
url_input = Input(shape=(X_url_train.shape[1],), name='url_features') # Use the actual number of features
y_mlp = Dense(32, activation='relu')(url_input)

# Combine the branches
combined = concatenate([cnn_output, y_mlp])

# Final dense layers
z = Dense(32, activation='relu')(combined)
output = Dense(1, activation='sigmoid')(z) # Binary classification (benign/malicious)

# Create the model
model = Model(inputs=[image_input, url_input], outputs=output)

# Compile the model
model.compile(optimizer=Adam(learning_rate=0.001),
              loss='binary_crossentropy',
              metrics=['accuracy'])

model.summary()


# fit hybrid model
history = model.fit(
    {
      "qr_image":     X_img_train,
      "url_features": X_url_train
    },
    y_train,
    validation_split=0.1,   # 10% of train used for val
    epochs=30,
    batch_size=16,
    callbacks=[
      tf.keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=5,
        restore_best_weights=True
      )
    ]
)



Epoch 1/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 422ms/step - accuracy: 0.5702 - loss: 0.8528 - val_accuracy: 0.7857 - val_loss: 0.5551
Epoch 2/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 413ms/step - accuracy: 0.8870 - loss: 0.3884 - val_accuracy: 0.7679 - val_loss: 0.5463
Epoch 3/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 411ms/step - accuracy: 0.8884 - loss: 0.3133 - val_accuracy: 0.7857 - val_loss: 0.5882
Epoch 4/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 435ms/step - accuracy: 0.9131 - loss: 0.2340 - val_accuracy: 0.7857 - val_loss: 0.6415
Epoch 5/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 393ms/step - accuracy: 0.9341 - loss: 0.1688 - val_accuracy: 0.7857 - val_loss: 0.5892
Epoch 6/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 398ms/step - accuracy: 0.9422 - loss: 0.1479 - val_accuracy: 0.7857 - val_loss: 0.5614
Epoch 7/30
[1m32/32[

In [21]:
loss, acc = model.evaluate(
    {
      "qr_image":     X_img_test,
      "url_features": X_url_test
    },
    y_test
)
print(f"Test loss: {loss:.4f}, Test accuracy: {acc:.4f}")


[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 170ms/step - accuracy: 0.8749 - loss: 0.3983
Test loss: 0.3827, Test accuracy: 0.8786


# PREDICTING ON NEW IMAGE

In [22]:
import cv2
import numpy as np
import pandas as pd

df = pd.read_csv("qr_code_url_features.csv")

def preprocess_image(img_path, img_size=(128, 128)):
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    if img is None:
        print(f"Error: Could not load image from {img_path}")
        return None
    img = cv2.resize(img, img_size)
    img = img.astype("float32") / 255.0
    img = np.expand_dims(img, axis=-1)  # (128, 128, 1)
    img = np.expand_dims(img, axis=0)   # (1, 128, 128, 1)
    return img

test_img_path = "qr-benign.png"
X_new_img = preprocess_image(test_img_path)

if X_new_img is None:
    print("Image loading failed. Exiting.")
    exit()

X_new_url_features = {}

qr_content = extract_qr_content(test_img_path)

cleaned_url = extract_clean_url(qr_content)

if cleaned_url:
    X_new_url_features.update(extract_url_features(cleaned_url))

training_columns = df.drop(columns=['label']).columns # Get columns from the training DataFrame 'df'
X_new_url_df = pd.DataFrame([X_new_url_features], columns=training_columns)


# Preprocess the URL features using the *trained* preprocessor
X_new_url_preprocessed = preprocessor.transform(X_new_url_df)

# Apply the SelectKBest transformation.
X_new_url_final = kbest.transform(X_new_url_preprocessed)


pred = model.predict({
    "qr_image": X_new_img,
    "url_features": X_new_url_final
})

print(f"Malicious probability: {pred[0][0]:.4f}")
print("Prediction:", "Malicious" if pred[0][0] >= 0.5 else "Benign")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 225ms/step
Malicious probability: 0.0186
Prediction: Benign


In [23]:
import requests
import cv2
import numpy as np
import pandas as pd
from pyzbar import pyzbar
from io import BytesIO

def preprocess_image_array(img, img_size=(128, 128)):
    """Resize, normalize, and reshape a grayscale image array for model input."""
    img = cv2.resize(img, img_size)
    img = img.astype("float32") / 255.0
    img = np.expand_dims(img, axis=-1)   # (H, W, 1)
    img = np.expand_dims(img, axis=0)    # (1, H, W, 1)
    return img

def predict_qr_from_url(image_url,
                        model,
                        preprocessor,
                        kbest,
                        training_columns):
    """
    Download an image from URL, discard non-QR images, extract features,
    preprocess and predict using the hybrid model.
    """
    # 1) Download image
    resp = requests.get(image_url)
    if resp.status_code != 200:
        raise ValueError(f"Failed to download image: HTTP {resp.status_code}")
    img_array = np.asarray(bytearray(resp.content), dtype=np.uint8)
    img = cv2.imdecode(img_array, cv2.IMREAD_GRAYSCALE)
    if img is None:
        raise ValueError("Downloaded content is not a valid image")

    # 2) Decode QR codes
    decoded_objs = pyzbar.decode(img)
    if not decoded_objs:
        return {"error": "No QR code detected in image"}

    # 3) Extract QR content and URL features
    qr_content = decoded_objs[0].data.decode("utf-8")
    cleaned_url = extract_clean_url(qr_content)
    if not cleaned_url:
        return {"error": "Decoded QR has no valid URL"}
    url_features = extract_url_features(cleaned_url)

    # 4) Prepare model inputs
    X_img = preprocess_image_array(img)
    X_url_df = pd.DataFrame([url_features], columns=training_columns)
    X_url_pre = preprocessor.transform(X_url_df)
    X_url_fin = kbest.transform(X_url_pre)

    # 5) Predict
    pred_prob = model.predict({
        "qr_image": X_img,
        "url_features": X_url_fin
    })[0][0]

    return {
        "malicious_probability": float(pred_prob),
        "prediction": "Malicious" if pred_prob >= 0.5 else "Benign"
    }

result = predict_qr_from_url(
    "https://letsenhance.io/static/73136da51c245e80edc6ccfe44888a99/1015f/MainBefore.jpg",
    model,
    preprocessor,
    kbest,
    training_columns
)
print(result)


{'error': 'No QR code detected in image'}


In [24]:
result = predict_qr_from_url(
    "https://docs.lightburnsoftware.com/legacy/img/QRCode/ExampleCode.png",
    model,
    preprocessor,
    kbest,
    training_columns
)
print(result)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 50ms/step
{'malicious_probability': 0.11023060232400894, 'prediction': 'Benign'}


In [25]:
result = predict_qr_from_url(
    "https://www.joydeepdeb.com/images/qr-code.jpg",
    model,
    preprocessor,
    kbest,
    training_columns
)
print(result)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 56ms/step
{'malicious_probability': 0.26557496190071106, 'prediction': 'Benign'}


In [26]:
import pickle
import joblib
import numpy as np
from tensorflow.keras.models import model_from_json

# Extract JSON architecture and weights from your trained Keras model
model_json    = model.to_json()
model_weights = model.get_weights()

# Define the wrapper class
class QRHybridPipeline:
    def __init__(self, model_json, model_weights, preprocessor, kbest, training_columns):
        # Keras pieces (JSON + weights)
        self._model_json    = model_json
        self._model_weights = model_weights

        # Scikit-learn transformers
        self.preprocessor     = preprocessor
        self.kbest            = kbest

        # Metadata
        self.training_columns = training_columns

        # Placeholder for reconstructed model
        self.model = None

    def load_model(self):
        """Rebuilds and compiles the Keras model from JSON + weights."""
        if self.model is None:
            self.model = model_from_json(self._model_json)
            self.model.set_weights(self._model_weights)
            self.model.compile(
                optimizer="adam",
                loss="binary_crossentropy",
                metrics=["accuracy"]
            )
        return self.model

# Instantiate and pickle the pipeline wrapper
pipeline = QRHybridPipeline(
    model_json       = model_json,
    model_weights    = model_weights,
    preprocessor     = preprocessor,
    kbest            = kbest,
    training_columns = training_columns
)

with open("qr_hybrid_pipeline.pkl", "wb") as f:
    pickle.dump(pipeline, f)

print("Saved unified pipeline to 'qr_hybrid_pipeline.pkl'")


Saved unified pipeline to 'qr_hybrid_pipeline.pkl'


In [27]:
# Loading and using the pipeline later
with open("qr_hybrid_pipeline.pkl", "rb") as f:
    loaded_pipeline = pickle.load(f)

# Rebuild the Keras model
reconstructed_model = loaded_pipeline.load_model()

# Access the preprocessor, k-best selector, and feature names
reconstructed_preprocessor = loaded_pipeline.preprocessor
reconstructed_kbest        = loaded_pipeline.kbest
reconstructed_columns      = loaded_pipeline.training_columns

print("Pipeline loaded:")
print("-", reconstructed_model)
print("-", reconstructed_preprocessor)
print("-", reconstructed_kbest)
print("-", reconstructed_columns)

Pipeline loaded:
- <Functional name=functional, built=True>
- ColumnTransformer(transformers=[('num',
                                 Pipeline(steps=[('imputer',
                                                  SimpleImputer(strategy='median')),
                                                 ('scaler', StandardScaler())]),
                                 ['URL_length', 'num_dots', 'num_hyphens',
                                  'num_slashes', 'num_question_marks',
                                  'num_equals', 'num_at', 'domain_length',
                                  'subdomain_count', 'domain_part_count',
                                  'path_length', 'path_depth', 'num_parameters',
                                  'ssl_days_left', 'entropy',...
                                 ['has_ip', 'is_idn', 'uses_https',
                                  'ssl_cert_valid', 'ssl_self_signed',
                                  'is_shortened', 'domain_registered',
                    