<a href="https://colab.research.google.com/github/u1y2k3t4/Security_check/blob/main/Security_pro.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Colab-ready unified Dataset Security Scoring Pipeline
# Handles: labelled datasets (with a target column), unlabelled datasets, and metadata
# Outputs: overall score (0-100), per-check scores, detailed report, and CSVs with flagged rows.
# Run this cell in Google Colab, then call upload_and_run()

# --- Imports
import io
import os
import re
import json
import hashlib
from collections import Counter
from datetime import datetime

import pandas as pd
import numpy as np
from scipy.stats import entropy as scipy_entropy
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import NearestNeighbors
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import StandardScaler

# --- Optional spaCy NER (used if available). If not present, pipeline will use regex only for PII detection.
USE_SPACY = False
try:
    import spacy
    try:
        nlp = spacy.load("en_core_web_sm")
        USE_SPACY = True
    except Exception as e:
        # attempt to download model (works in Colab)
        try:
            print("Downloading spacy model...")
            os.system("python -m spacy download en_core_web_sm -q")
            nlp = spacy.load("en_core_web_sm")
            USE_SPACY = True
        except Exception as e2:
            print("spaCy model unavailable, continuing without NER. Reason:", e2)

except Exception as e:
    print("spaCy not installed; install with `pip install spacy` if you want NER support.")

# --- Helpers

def sha256_of_file(path):
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()

def string_entropy(s: str):
    if not s:
        return 0.0
    counts = Counter(s)
    probs = np.array(list(counts.values())) / len(s)
    return float(scipy_entropy(probs, base=2))

def is_hash_like(s: str):
    # simple checks for common hash lengths and base-16 hex characters (md5, sha1, sha256)
    if not isinstance(s, str):
        return False
    s = s.strip()
    hex_re = re.compile(r'^[0-9a-fA-F]+$')
    if len(s) in (32, 40, 64) and hex_re.match(s):
        return True
    # bcrypt-ish pattern ($2b$...)
    if s.startswith("$2a$") or s.startswith("$2b$") or s.startswith("$2y$"):
        return True
    return False

# --- PII patterns (extendable)
PII_REGEX_PATTERNS = {
    "email": re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"),
    "phone_10": re.compile(r"\b\d{10}\b"),
    "phone_international": re.compile(r"\+?\d[\d \-\(\)]{7,}\d"),
    "aadhaar": re.compile(r"\b\d{4}\s?\d{4}\s?\d{4}\b"),
    "ssn_like": re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
    "credit_card_like": re.compile(r"\b(?:\d[ -]*?){13,16}\b"),
}

# --- Main checks

def pii_check(df, metadata=None, sample_frac=1.0, max_samples=20000):
    """
    Detect PII in values and in metadata.
    Returns: score (0-25), details dict
    """
    # sample rows if dataset is huge
    n = len(df)
    if sample_frac < 1.0:
        df_sample = df.sample(frac=sample_frac, random_state=42)
    else:
        df_sample = df if n <= max_samples else df.sample(n=max_samples, random_state=42)
    findings = []
    total_cells = 0
    pii_cells = 0

    # Check column names and metadata
    meta_warnings = []
    if metadata:
        # metadata might be dict with keys like 'columns', 'description', 'source_hash'
        for k,v in metadata.items():
            text = str(v)
            for name,pat in PII_REGEX_PATTERNS.items():
                if pat.search(text):
                    meta_warnings.append(f"PII-like pattern in metadata field '{k}': {name}")
    for col in df.columns:
        if re.search(r"name|email|phone|ssn|aadhaar|id_card|passport", col.lower()):
            meta_warnings.append(f"Suspicious column name: '{col}'")
    if meta_warnings:
        findings.extend(meta_warnings)

    # Check cell values (stringified)
    text_cols = df_sample.select_dtypes(include=['object', 'string']).columns.tolist()
    for col in text_cols:
        for val in df_sample[col].astype(str).fillna(""):
            total_cells += 1
            # regex checks
            for name, pat in PII_REGEX_PATTERNS.items():
                if pat.search(val):
                    pii_cells += 1
                    findings.append({"col": col, "type": name, "value_sample": val if len(val)<=200 else val[:200]})
                    break
            else:
                # NER check if spaCy available
                if USE_SPACY and val.strip():
                    try:
                        doc = nlp(val)
                        for ent in doc.ents:
                            if ent.label_ in ("PERSON", "GPE", "LOC", "ORG", "DATE"):
                                pii_cells += 1
                                findings.append({"col": col, "type": f"ner_{ent.label_}", "value_sample": val[:200]})
                                break
                    except Exception:
                        pass

    # scoring logic
    if pii_cells == 0 and not findings:
        score = 25
    elif pii_cells <= max(1, 0.001 * total_cells):  # very rare PII in sampled cells
        score = 15
    else:
        score = 0

    details = {
        "total_text_cells_sampled": int(total_cells),
        "pii_cells_detected": int(pii_cells),
        "findings": findings[:200]  # cap reported findings
    }
    return score, details

def encryption_check(df, metadata=None, text_sample_size=200):
    """
    Detect whether sensitive columns appear encrypted/hashed
    Returns score (0-25) and details
    """
    cols = df.columns.tolist()
    encrypted_like_cols = []
    suspicious_plain_cols = []
    for col in cols:
        # sample some values as strings
        vals = df[col].dropna().astype(str).head(text_sample_size).tolist()
        if not vals:
            continue
        entropies = [string_entropy(v) for v in vals]
        avg_entropy = float(np.mean(entropies))
        avg_len = float(np.mean([len(v) for v in vals]))
        hash_likes = sum([1 for v in vals if is_hash_like(v)])
        # heuristics:
        # - Many hash-like values OR high avg_entropy and long length => likely encrypted/hashes
        if hash_likes >= max(1, int(0.4*len(vals))) or (avg_entropy > 3.5 and avg_len > 20):
            encrypted_like_cols.append({"col": col, "avg_entropy": avg_entropy, "avg_len": avg_len, "hash_like_count": int(hash_likes)})
        else:
            # if column name suggests sensitive and values are low entropy -> suspicious plain PII
            if re.search(r"pass|pwd|ssn|card|aadhar|aadhaar|email|secret|token", col.lower()):
                suspicious_plain_cols.append({"col": col, "avg_entropy": avg_entropy, "avg_len": avg_len})

    # scoring heuristic
    if not suspicious_plain_cols and encrypted_like_cols:
        score = 25
    elif suspicious_plain_cols and not encrypted_like_cols:
        score = 0
    elif encrypted_like_cols and suspicious_plain_cols:
        score = 15
    else:
        # neither suspicious nor clearly encrypted: neutral
        score = 15

    details = {
        "encrypted_like_columns": encrypted_like_cols,
        "suspicious_plain_columns": suspicious_plain_cols
    }
    return score, details

def poisoning_check(df, label_column=None, contamination=0.05, numeric_only=False):
    """
    Detect poisoning/anomalies. Handles labelled and unlabelled datasets.
    - If labelled: use NearestNeighbors label-consistency check to detect potential label flips.
    - For numeric/unstructured: use IsolationForest on numeric features and TF-IDF embeddings for text columns.
    Returns score (0-25) and details
    """
    n = len(df)
    # Prepare feature matrix
    # Use numeric features + TF-IDF for text columns (sample/truncate for speed)
    num_df = df.select_dtypes(include=[np.number]).copy()
    text_cols = df.select_dtypes(include=['object', 'string']).columns.tolist()
    # Drop obvious id columns
    text_cols = [c for c in text_cols if not re.search(r'id$|^id|_id$', c.lower())]

    # Build numeric matrix
    X_num = None
    if not num_df.empty:
        X_num = num_df.fillna(0).values
        scaler = StandardScaler()
        X_num = scaler.fit_transform(X_num)

    # Build text matrix via TF-IDF (concatenate text columns)
    X_text = None
    if text_cols:
        sample_text = df[text_cols].astype(str).fillna(" ").agg(" ".join, axis=1).head(5000)  # cap for speed
        tfidf = TfidfVectorizer(max_features=2000, stop_words='english')
        try:
            X_text = tfidf.fit_transform(sample_text).toarray()
        except Exception as e:
            X_text = None

    # Combine features
    if X_num is not None and X_text is not None:
        # pad smaller one if needed
        if X_num.shape[0] != X_text.shape[0]:
            min_rows = min(X_num.shape[0], X_text.shape[0])
            X_num = X_num[:min_rows]
            X_text = X_text[:min_rows]
        X = np.hstack([X_num[:X_text.shape[0]], X_text])
    elif X_num is not None:
        X = X_num
    elif X_text is not None:
        X = X_text
    else:
        # no usable features -> can't run poisoning detection; be optimistic
        return 25, {"reason": "No numeric or textual features to analyze for anomalies."}

    # If labelled, run label-consistency KNN check
    label_issues = []
    if label_column and label_column in df.columns:
        labels = df[label_column].astype(str).fillna("")
        # Need numeric/textual features aligned with labels; truncate to min size
        m = min(len(labels), X.shape[0])
        X_lab = X[:m]
        labels = labels.values[:m]
        # KNN neighbors
        try:
            knn = NearestNeighbors(n_neighbors=6, algorithm='auto').fit(X_lab)
            dists, idxs = knn.kneighbors(X_lab, return_distance=True)
            # For each sample, check fraction of neighbors with same label
            inconsistent = 0
            for i, neigh in enumerate(idxs):
                neigh_labels = labels[neigh[1:]]  # ignore self at index 0
                same = np.sum(neigh_labels == labels[i])
                if same < 3:  # less than half neighbors agree -> suspicious label
                    inconsistent += 1
                    label_issues.append({"index": int(i), "label": labels[i], "agreeing_neighbors": int(same)})
            fraction_inconsistent = inconsistent / m
            if fraction_inconsistent == 0:
                label_score = 25
            elif fraction_inconsistent < 0.02:
                label_score = 15
            else:
                label_score = 0
        except Exception as e:
            label_score = 15
            label_issues.append({"knn_error": str(e)})
    else:
        label_score = None

    # Isolation Forest on combined features
    try:
        iso = IsolationForest(contamination=contamination, random_state=42)
        preds = iso.fit_predict(X)
        anomalies = (preds == -1).sum()
        frac_anom = anomalies / X.shape[0]
        if frac_anom == 0:
            iso_score = 25
        elif frac_anom < contamination * 1.5:
            iso_score = 15
        else:
            iso_score = 0
    except Exception as e:
        iso_score = 15
        anomalies = None

    # Combine scores: if labelled, consider both label_score and iso_score; else use iso_score
    if label_score is not None:
        # average out of 25
        final_score = int(round((label_score + iso_score) / 2.0))
    else:
        final_score = iso_score

    details = {
        "labelled_mode": bool(label_column and label_column in df.columns),
        "label_check_score": label_score,
        "label_issues_sample": label_issues[:200],
        "isolation_forest_score": iso_score,
        "anomalies_count": int(anomalies) if anomalies is not None else None
    }
    return final_score, details

def integrity_and_metadata_check(df, metadata=None):
    """
    Checks schema consistency, missing values, duplicates, value ranges hints.
    metadata can be dict containing expected schema, row_count, file_hash, column_types.
    """
    issues = []
    score = 25

    # Missing values
    missing_total = int(df.isnull().sum().sum())
    if missing_total > 0:
        issues.append(f"{missing_total} missing values")
        score -= min(10, int(10 * (missing_total / max(1, df.size))))

    # Duplicates
    dup = int(df.duplicated().sum())
    if dup > 0:
        issues.append(f"{dup} duplicate rows")
        score -= min(10, int(10 * (dup / max(1, len(df)))))

    # Simple range sanity for numeric columns (detect extreme outliers beyond 5-sigma)
    numeric = df.select_dtypes(include=[np.number])
    extreme_vals = {}
    if not numeric.empty:
        for col in numeric.columns:
            colvals = numeric[col].dropna()
            if colvals.empty:
                continue
            mu = colvals.mean()
            sigma = colvals.std()
            if sigma == 0 or np.isnan(sigma):
                continue
            outliers = colvals[(colvals < mu - 5*sigma) | (colvals > mu + 5*sigma)]
            if len(outliers) > 0:
                extreme_vals[col] = int(len(outliers))
    if extreme_vals:
        issues.append(f"Extreme-value columns: {extreme_vals}")
        score -= min(10, 5 * len(extreme_vals))

    # Metadata checks (if provided)
    meta_warnings = []
    if metadata:
        # verify expected row_count, columns, file_hash if present
        if "row_count" in metadata and int(metadata["row_count"]) != len(df):
            meta_warnings.append(f"Row count mismatch: metadata {metadata['row_count']} vs actual {len(df)}")
            score -= 5
        if "columns" in metadata:
            missing_cols = [c for c in metadata["columns"] if c not in df.columns]
            if missing_cols:
                meta_warnings.append(f"Missing columns from metadata: {missing_cols}")
                score -= 5
        if "file_hash" in metadata and isinstance(metadata["file_hash"], str):
            # if metadata provides a file path or hash string; user may need to upload file to verify, skip if not matchable
            try:
                if "file_path" in metadata:
                    if os.path.exists(metadata["file_path"]):
                        actual_hash = sha256_of_file(metadata["file_path"])
                        if actual_hash != metadata["file_hash"]:
                            meta_warnings.append("File hash mismatch vs metadata")
                            score -= 5
                else:
                    # cannot verify arbitrary hash without file
                    pass
            except Exception:
                pass
    if meta_warnings:
        issues.extend(meta_warnings)

    if score < 0:
        score = 0

    details = {
        "missing_total": missing_total,
        "duplicates": dup,
        "extreme_value_columns": extreme_vals,
        "metadata_warnings": meta_warnings,
        "issues": issues
    }
    return score, details

# --- Orchestration function ---

def run_security_pipeline(df, metadata=None, label_column=None, contamination=0.05):
    """
    Run all checks and produce a consolidated report.
    """
    # 1. PII check
    pii_score, pii_details = pii_check(df, metadata=metadata)

    # 2. Encryption check
    enc_score, enc_details = encryption_check(df, metadata=metadata)

    # 3. Poisoning / anomaly check
    poison_score, poison_details = poisoning_check(df, label_column=label_column, contamination=contamination)

    # 4. Integrity & metadata check
    integrity_score, integrity_details = integrity_and_metadata_check(df, metadata=metadata)

    total_score = int(pii_score + enc_score + poison_score + integrity_score)

    report = {
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "total_score": total_score,
        "breakdown": {
            "pii_score": pii_score,
            "encryption_score": enc_score,
            "poisoning_score": poison_score,
            "integrity_score": integrity_score
        },
        "details": {
            "pii": pii_details,
            "encryption": enc_details,
            "poisoning": poison_details,
            "integrity": integrity_details
        }
    }
    return report

# --- Colab-friendly upload + run ---

def upload_and_run():
    try:
        from google.colab import files
        uploaded = files.upload()
        if not uploaded:
            print("No file uploaded.")
            return None, None
        fname = list(uploaded.keys())[0]
        print("Uploaded file:", fname)
        df = pd.read_csv(fname)
        # Optional: look for metadata file with same base name + .meta.json if user uploads
        metadata = {}
        meta_name = fname.rsplit(".",1)[0] + ".meta.json"
        if meta_name in uploaded:
            try:
                metadata = json.loads(uploaded[meta_name].decode('utf-8'))
            except Exception:
                pass
    except Exception:
        # fallback for non-Colab: try loading a local path
        fname = input("Enter local CSV path: ").strip()
        df = pd.read_csv(fname)
        metadata = None

    # auto-detect label column heuristically if not provided
    label_col = None
    for candidate in ["label", "target", "class", "y"]:
        if candidate in df.columns:
            label_col = candidate
            break

    # run pipeline
    report = run_security_pipeline(df, metadata=metadata, label_column=label_col, contamination=0.05)

    # save report + flagged items
    out_dir = "/mnt/data/dataset_security_report"
    os.makedirs(out_dir, exist_ok=True)
    timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
    report_path = os.path.join(out_dir, f"security_report_{timestamp}.json")
    with open(report_path, "w") as f:
        json.dump(report, f, indent=2)

    # Attempt to save flagged rows (PII and anomalies) for user review
    flagged_rows = pd.DataFrame()
    try:
        # PII flagged rows
        pii_findings = report["details"]["pii"]["findings"]
        if pii_findings:
            rows = []
            for fnd in pii_findings:
                col = fnd.get("col")
                sample_val = fnd.get("value_sample")
                if col and sample_val is not None:
                    # find rows where col contains the sample snippet
                    mask = df[col].astype(str).str.contains(re.escape(sample_val[:50]), na=False)
                    rows.append(df[mask])
            if rows:
                flagged_rows = pd.concat(rows).drop_duplicates().reset_index(drop=True)

    except Exception:
        pass


    flagged_path = None
    if not flagged_rows.empty:
        flagged_path = os.path.join(out_dir, f"flagged_rows_{timestamp}.csv")
        flagged_rows.to_csv(flagged_path, index=False)


    print("\n=== DATASET SECURITY REPORT SUMMARY ===")
    print(f"Overall score: {report['total_score']}/100")
    print("Breakdown:")
    if report['total_score'] >= 80:
      print("The dataset is safe")
    elif report['total_score'] > 50 and report['total_score']  < 80:
      print("The dataset has some security issues")
    else:
      print("The dataset is not safe")
    for k,v in report["breakdown"].items():
        print(f" - {k}: {v}/25")
    print("\nDetailed findings saved to:", report_path)
    if flagged_path:
        print("Flagged rows saved to:", flagged_path)
    else:
        print("No flagged rows extracted. Inspect the full report for details.")

    return report, {"report_path": report_path, "flagged_path": flagged_path}

if __name__ == "__main__":
    print("Run `upload_and_run()` to upload a CSV and execute the security checks.")
report, paths = upload_and_run()


Run `upload_and_run()` to upload a CSV and execute the security checks.


Saving unsafe_dataset.csv to unsafe_dataset.csv
Uploaded file: unsafe_dataset.csv

=== DATASET SECURITY REPORT SUMMARY ===
Overall score: 25/100
Breakdown:
The dataset is not safe
 - pii_score: 0/25
 - encryption_score: 0/25
 - poisoning_score: 0/25
 - integrity_score: 25/25

Detailed findings saved to: /mnt/data/dataset_security_report/security_report_20250905T161849Z.json
No flagged rows extracted. Inspect the full report for details.


  "timestamp": datetime.utcnow().isoformat() + "Z",
  timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
