# File Analysis

In [4]:
def count_lines(file_path: str, encoding: str = 'utf-8') -> int:
    """
    Zählt die Anzahl der Zeilen in einer Datei.

    :param file_path: Pfad zur Datei, die gezählt werden soll.
    :param encoding: Encoding der Datei (Standard: 'utf-8').
    :return: Anzahl der Zeilen in der Datei.
    """
    with open(file_path, 'r', encoding=encoding, errors='replace') as file:
        return sum(1 for _ in file)

In [5]:
print(count_lines('testdata'))

1347455


In [1]:
def count_letters(file_path: str, encoding: str = 'utf-8') -> int:
    with open(file_path, 'r', encoding=encoding, errors='replace') as file:
        return len(list(file.read()))

In [None]:
print(count_letters('testdata'))

# Standard String Parser

In [11]:
import re

In [18]:
def parse_credentials(file_path: str, name: str, email: str) -> dict:
    """
    Durchsucht eine Datei nach einem Namen und einer E-Mail-Adresse und gibt alle Zeilennummern zurück,
    in denen sie gefunden wurden. Die Suche ist unabhängig von Groß- und Kleinschreibung.

    :param file_path: Pfad zur Datei, die durchsucht werden soll.
    :param name: Name der Person, nach dem gesucht werden soll.
    :param email: E-Mail-Adresse, nach der gesucht werden soll.
    :return: Dictionary mit den Schlüsseln 'name_lines' und 'email_lines', die jeweils Listen von Zeilennummern enthalten.
    """
    name_lines = []
    email_lines = []

    with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
        for line_number, line in enumerate(file, start=1):
            # Search for name (case insensitive)
            if re.search(r'\b' + re.escape(name) + r'\b', line, re.IGNORECASE):
                name_lines.append(line_number)

            # Search for email (case insensitive)
            if re.search(r'\b' + re.escape(email) + r'\b', line, re.IGNORECASE):
                email_lines.append(line_number)

    return {
        'name_lines': name_lines,
        'email_lines': email_lines
    }

In [19]:
print(parse_credentials("testdata", 'sanjay ch', 'danilgt13@gmail.com'))

{'name_lines': [26393, 394279, 427289, 1242624], 'email_lines': [1347432]}


# Embedding AI approach (doen't work really well since emails have a lot of similarity in general)

In [2]:
import re
import numpy as np
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Union, Tuple

  from .autonotebook import tqdm as notebook_tqdm


In [6]:
import re
import numpy as np
import torch
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Union, Tuple
import time

class CredentialLeakDetector:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2", batch_size: int = 32):
        """
        Initialize the credential leak detector with an embedding model using GPU acceleration.

        :param model_name: The name of the sentence-transformers model to use
        :param batch_size: Batch size for processing embeddings
        """
        # Check for available devices and use the best one
        self.device = self._get_optimal_device()
        print(f"Using device: {self.device}")

        # Load model to the selected device
        self.model = SentenceTransformer(model_name, device=self.device)
        self.batch_size = batch_size
        self.file_lines = []
        self.line_embeddings = None

    def _get_optimal_device(self) -> str:
        """
        Determine the best available device (CUDA, MPS, or CPU).

        :return: Device string for torch
        """
        if torch.cuda.is_available():
            return "cuda"
        elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
            return "mps"
        else:
            return "cpu"

    def load_file(self, file_path: str) -> None:
        """
        Load and preprocess the file content with performance metrics.

        :param file_path: Path to the file to be analyzed
        """
        try:
            start_time = time.time()

            print(f"Loading file: {file_path}")
            with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
                self.file_lines = file.readlines()

            load_time = time.time() - start_time
            print(f"File loaded in {load_time:.2f} seconds with {len(self.file_lines)} lines")

            # Create embeddings for each line in the file using batching
            print("Generating embeddings...")
            embed_start = time.time()

            self.line_embeddings = self.model.encode(
                self.file_lines,
                batch_size=self.batch_size,
                show_progress_bar=True,
                convert_to_numpy=True,
                device=self.device
            )

            embed_time = time.time() - embed_start
            print(f"Embeddings generated in {embed_time:.2f} seconds")

            # Pre-normalize embeddings for faster similarity calculation later
            norm_start = time.time()
            self.line_embeddings = self.line_embeddings / np.linalg.norm(self.line_embeddings, axis=1, keepdims=True)
            norm_time = time.time() - norm_start
            print(f"Embeddings normalized in {norm_time:.2f} seconds")

            total_time = time.time() - start_time
            print(f"Total preprocessing time: {total_time:.2f} seconds")

        except Exception as e:
            print(f"Error loading file: {str(e)}")
            raise

    def search_credentials(self,
                          query: str,
                          threshold: float = 0.9,
                          top_n: int = 5,
                          context_size: int = 5) -> List[Dict]:
        """
        Search for credentials in the loaded file using semantic similarity.

        :param query: The credential to search for (name, email, phone number)
        :param threshold: Minimum similarity threshold (0-1)
        :param top_n: Maximum number of results to return
        :param context_size: Number of lines before and after the match to include
        :return: List of dictionaries containing match information
        """
        if self.line_embeddings is None:
            raise ValueError("No file has been loaded. Call load_file() first.")

        search_start = time.time()

        # Create embedding for the query
        query_embedding = self.model.encode(
            query,
            convert_to_numpy=True,
            device=self.device
        )

        # Normalize query embedding
        query_embedding = query_embedding / np.linalg.norm(query_embedding)

        # Calculate similarity scores
        if self.device != "cpu":
            # GPU-accelerated similarity calculation
            query_tensor = torch.tensor(query_embedding, device=self.device)
            lines_tensor = torch.tensor(self.line_embeddings, device=self.device)
            similarities = torch.matmul(lines_tensor, query_tensor).cpu().numpy()
        else:
            # CPU similarity calculation
            similarities = np.dot(self.line_embeddings, query_embedding)

        # Filter by threshold and get top matches
        filtered_results = [(i, score) for i, score in enumerate(similarities) if score >= threshold]
        filtered_results.sort(key=lambda x: x[1], reverse=True)
        top_results = filtered_results[:top_n]

        # Extract results with context
        detailed_results = []
        for line_idx, similarity_score in top_results:
            result = {
                'line_number': line_idx + 1,  # 1-based indexing
                'similarity_score': float(similarity_score),
                'matched_line': self.file_lines[line_idx].strip(),
                'context': self._extract_context(line_idx, context_size)
            }
            detailed_results.append(result)

        search_time = time.time() - search_start
        print(f"Search completed in {search_time:.2f} seconds")

        return detailed_results

    def _extract_context(self, line_idx: int, context_size: int) -> List[Dict]:
        """
        Extract context lines around the matched line.

        :param line_idx: Index of the matched line
        :param context_size: Number of lines before and after to include
        :return: List of context lines with line numbers
        """
        start_idx = max(0, line_idx - context_size)
        end_idx = min(len(self.file_lines), line_idx + context_size + 1)

        context = []
        for i in range(start_idx, end_idx):
            line_info = {
                'line_number': i + 1,  # 1-based indexing
                'content': self.file_lines[i].strip(),
                'is_match': i == line_idx
            }
            context.append(line_info)

        return context

    def combined_search(self,
                       name: str = None,
                       email: str = None,
                       phone: str = None,
                       threshold: float = 0.5,
                       top_n: int = 5) -> Dict[str, List[Dict]]:
        """
        Perform a combined search for multiple credential types.

        :param name: Person's name to search for
        :param email: Email address to search for
        :param phone: Phone number to search for
        :param threshold: Minimum similarity threshold
        :param top_n: Maximum number of results to return per credential type
        :return: Dictionary with results for each credential type
        """
        results = {}

        if name:
            print(f"Searching for name: {name}")
            results['name_matches'] = self.search_credentials(name, threshold, top_n)

        if email:
            print(f"Searching for email: {email}")
            results['email_matches'] = self.search_credentials(email, threshold, top_n)

        if phone:
            print(f"Searching for phone: {phone}")
            results['phone_matches'] = self.search_credentials(phone, threshold, top_n)

        return results


def detect_credential_leaks(file_path: str,
                           name: str = None,
                           email: str = None,
                           phone: str = None,
                           threshold: float = 0.6,
                           top_n: int = 5,
                           batch_size: int = 64) -> Dict:
    """
    Convenience function to detect credential leaks in a file with GPU acceleration.

    :param file_path: Path to the file to scan
    :param name: Person's name to search for
    :param email: Email address to search for
    :param phone: Phone number to search for
    :param threshold: Minimum similarity threshold (0-1)
    :param top_n: Maximum number of results to return per credential type
    :param batch_size: Batch size for processing embeddings
    :return: Dictionary with search results
    """
    start_time = time.time()

    detector = CredentialLeakDetector(batch_size=batch_size)
    detector.load_file(file_path)

    results = detector.combined_search(
        name=name,
        email=email,
        phone=phone,
        threshold=threshold,
        top_n=top_n
    )

    total_time = time.time() - start_time
    print(f"Total processing time: {total_time:.2f} seconds")

    return results

In [11]:

results = detect_credential_leaks(
    file_path="testdata-small",
    name="sanjay ch",
    email="danilgt13@gmail.com",
    threshold=0.8,
    top_n=10,
    batch_size=128  # Adjust based on your GPU memory
)

# Print a summary of the results
for credential_type, matches in results.items():
    print(f"\n{credential_type.upper()} - {len(matches)} matches found")
    for i, match in enumerate(matches, 1):
        print(f"{i}. Line {match['line_number']} (Score: {match['similarity_score']:.2f})")

Using device: mps
Loading file: testdata-small
File loaded in 0.00 seconds with 885 lines
Generating embeddings...


Batches: 100%|██████████| 7/7 [00:00<00:00, 14.91it/s]

Embeddings generated in 0.47 seconds
Embeddings normalized in 0.00 seconds
Total preprocessing time: 0.47 seconds
Searching for name: sanjay ch
Search completed in 0.01 seconds
Searching for email: danilgt13@gmail.com
Search completed in 0.01 seconds
Total processing time: 2.62 seconds

NAME_MATCHES - 0 matches found

EMAIL_MATCHES - 0 matches found





# Loss

In [None]:
import json

In [None]:
def hallucination_check(model_output_path, dataset_path):
    with open(model_output_path) as json_file:
        json_data = json.load(json_file)
        leaked_credentials = json_data['leaked_credentials']
    # create list of all emails and passwords and check if the email/password pair is in the same line and if both exist
    
    with open(dataset_path) as dataset_file:
        for line_number, line in enumerate(dataset_file, start=1):
            
    
        