In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
!pip install faker pandas
from faker import Faker
import pandas as pd
import random
from datetime import datetime, timedelta
fake = Faker()


Collecting faker
  Downloading faker-37.0.0-py3-none-any.whl.metadata (15 kB)
Downloading faker-37.0.0-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m21.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-37.0.0


In [8]:
import pandas as pd
from faker import Faker
import random
from datetime import datetime, timedelta

# Initialize Faker
fake = Faker()

# Set consistent seed for reproducibility
random.seed(42)
Faker.seed(42)

# --- Static Data ---
static_profile = {
    'employee_id': 'E-2023',
    'name': 'Allison Hill',
    'role': 'Doctor',
    'department': 'Cardiology',
    'access_permissions': {
        'allowed_datasets': ['Patient Records'],
        'allowed_operations': ['Read', 'Update'],
        'data_categories': ['Medical History', 'Treatment Plans']
    }
}

# --- Dynamic Data Generation (10 Days) ---
def generate_normal_behavior(base_date):
    base_ip = '10.1.1.15 (Internal)'
    base_location = 'Main Campus - Cardiology Wing'
    normal_access_time = datetime.strptime("08:00", "%H:%M").time()

    records = []
    for day in range(10):
        access_date = base_date + timedelta(days=day)
        access_time = datetime.combine(access_date, normal_access_time) + timedelta(minutes=random.randint(-15, 15))

        record = {
            'date': access_date.strftime("%Y-%m-%d"),
            'timestamp': access_time.strftime("%Y-%m-%d %H:%M:%S"),
            'requested_dataset': 'Patient Records',
            'requested_operation': random.choices(['Read', 'Update'], weights=[0.85, 0.15])[0],
            'access_outcome': 'Granted',
            'mfa_failures': random.choices([0, 1], weights=[0.95, 0.05])[0],
            'ip_address': base_ip,
            'location': base_location,
            'behavior_score': calculate_behavior_score(access_time, base_ip)
        }

        if random.random() < 0.1:
            record['timestamp'] = (access_time + timedelta(minutes=random.randint(-30, 30))).strftime("%Y-%m-%d %H:%M:%S")

        records.append(record)

    return records

def calculate_behavior_score(access_time, ip_address):
    score = 0.0
    hour = access_time.hour
    if 7 <= hour <= 9:
        score += 0.0
    else:
        score += min(abs(hour - 8) * 0.1, 0.5)

    if "Internal" in ip_address:
        score += 0.0
    else:
        score += 0.3

    score += random.uniform(-0.05, 0.05)
    return max(0.0, min(round(score, 2), 0.3))

# Generate dataset
base_date = datetime(2023, 10, 1)
dynamic_data = generate_normal_behavior(base_date)
historical_df = pd.DataFrame(dynamic_data)

# --- Display Results in Requested Format ---
print("=== Static User Profile ===")
print(f"Name: {static_profile['name']}")
print(f"Role: {static_profile['role']} ({static_profile['department']})")
print("Permissions:")
print(f"- Allowed Datasets: {', '.join(static_profile['access_permissions']['allowed_datasets'])}")
print(f"- Allowed Operations: {', '.join(static_profile['access_permissions']['allowed_operations'])}")
print(f"- Access Categories: {', '.join(static_profile['access_permissions']['data_categories'])}")

print("\n=== 10-Day Activity Log ===")
print(historical_df.to_string(index=False, justify='left', columns=[
    'date', 'timestamp', 'requested_dataset',
    'requested_operation', 'access_outcome',
    'mfa_failures', 'ip_address', 'location', 'behavior_score'
]))

=== Static User Profile ===
Name: Allison Hill
Role: Doctor (Cardiology)
Permissions:
- Allowed Datasets: Patient Records
- Allowed Operations: Read, Update
- Access Categories: Medical History, Treatment Plans

=== 10-Day Activity Log ===
date       timestamp           requested_dataset requested_operation access_outcome  mfa_failures ip_address           location                       behavior_score
2023-10-01 2023-10-01 08:05:00 Patient Records     Read              Granted        0             10.1.1.15 (Internal) Main Campus - Cardiology Wing 0.00           
2023-10-02 2023-10-02 07:48:00 Patient Records     Read              Granted        0             10.1.1.15 (Internal) Main Campus - Cardiology Wing 0.00           
2023-10-03 2023-10-03 07:45:00 Patient Records     Read              Granted        0             10.1.1.15 (Internal) Main Campus - Cardiology Wing 0.01           
2023-10-04 2023-10-04 08:07:00 Patient Records     Read              Granted        0             10

Dataset for access

In [6]:
import pandas as pd
import random
from faker import Faker
from datetime import datetime, timedelta

# Initialize Faker for generating synthetic data
fake = Faker()

# Define constants with role-department alignment
ROLE_DEPARTMENT_MAP = {
    "Doctor": ["Emergency", "ICU", "Radiology", "Surgery", "Pediatrics"],
    "Nurse": ["Emergency", "ICU", "Pediatrics", "Surgery"],
    "Receptionist": ["Emergency", "Records", "Admissions"],
    "Technician": ["Radiology", "Laboratory", "Pharmacy"],
    "Admin": ["Administration", "HR", "Records"]
}

SENSITIVE_ILLNESSES = ["HIV", "Mental Health Disorder", "Substance Abuse", "Cancer"]
REGULAR_ILLNESSES = ["Flu", "COVID-19", "Broken Bone", "Infection", "Hypertension", "Diabetes"]

# Function to generate employee dataset with access levels
def generate_employee_dataset(n):
    """
    Generate employee data with role-based access permissions
    """
    data = []
    for _ in range(n):
        role = random.choice(list(ROLE_DEPARTMENT_MAP.keys()))
        department = random.choice(ROLE_DEPARTMENT_MAP[role])

        # Define access permissions
        permissions = {
            "patient_data": {
                "access_level": "Full" if role == "Doctor" else "Limited",
                "allowed_operations": ["Read", "Update"] if role == "Doctor" else ["Read"]
            },
            "employee_data": {
                "access_level": "Admin" if role == "Admin" else "None",
                "allowed_operations": ["Read", "Write", "Update"] if role == "Admin" else []
            }
        }

        data.append([
            f"E{random.randint(1, 999):03d}",
            fake.name(),
            role,
            department,
            random.randint(50000, 200000),
            fake.address().replace("\n", ", "),
            fake.phone_number(),
            permissions
        ])

    return pd.DataFrame(data, columns=[
        "Employee_ID", "Employee_Name", "Role", "Department",
        "Salary", "Address", "Phone", "Access_Permissions"
    ])

# Function to generate patient dataset with data sensitivity
def generate_patient_dataset(n):
    """
    Generate patient data with medical sensitivity levels
    """
    data = []
    for _ in range(n):
        is_sensitive = random.random() < 0.2  # 20% sensitive cases
        illness = random.choice(SENSITIVE_ILLNESSES if is_sensitive else REGULAR_ILLNESSES)

        data.append([
            f"P{random.randint(1, 999):03d}",
            fake.name(),
            random.choice(["Male", "Female"]),
            random.randint(1, 100),
            fake.address().replace("\n", ", "),
            fake.phone_number(),
            illness,
            random.choice(["Private", "Medicare", "Medicaid", "Self-Pay"]),
            datetime.now() - timedelta(days=random.randint(0, 30)),
            random.randint(100, 500),
            "High" if is_sensitive else "Normal"
        ])

    return pd.DataFrame(data, columns=[
        "Patient_ID", "Patient_Name", "Gender", "Age",
        "Address", "Phone", "Illness", "Insurance_Type",
        "Admission_Date", "Room_Number", "Data_Sensitivity"
    ])

# Generate datasets
employee_df = generate_employee_dataset(20)
patient_df = generate_patient_dataset(20)

# Display data
print("\nEmployee Data (with Access Permissions):")
print(employee_df.head().to_string(index=False))

print("\nPatient Data (with Sensitivity Levels):")
print(patient_df.head().to_string(index=False))


Employee Data (with Access Permissions):
Employee_ID  Employee_Name         Role Department  Salary                                                   Address                Phone                                                                                                                                                           Access_Permissions
       E285    Noah Rhodes       Doctor    Surgery  168858             600 Jeffery Parkways, New Jamesside, MT 29394     394.802.6542x351                    {'patient_data': {'access_level': 'Full', 'allowed_operations': ['Read', 'Update']}, 'employee_data': {'access_level': 'None', 'allowed_operations': []}}
       E380     Ryan Munoz Receptionist  Emergency  143132   161 Calderon River Suite 931, Lake Jeremyport, CO 31013         664-375-2553                           {'patient_data': {'access_level': 'Limited', 'allowed_operations': ['Read']}, 'employee_data': {'access_level': 'None', 'allowed_operations': []}}
       E719 Carolyn Danie

key pattern of historical dataset of one user

In [23]:
from sentence_transformers import SentenceTransformer

# --- Hugging Face Embedding Model ---
def embed_log_entries_hf(df):
    """Convert log entries to vector embeddings using Hugging Face"""
    model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

    # Create text representations
    df['log_text'] = df.apply(
        lambda x: f"On {x['date']} at {x['timestamp'].time()}, " +
                  f"{static_profile['name']} performed {x['requested_operation']} " +
                  f"from {x['location']} (IP: {x['ip_address']}), " +
                  f"MFA failures: {x['mfa_failures']}, " +
                  f"Risk score: {x['behavior_score']}",
        axis=1
    )

    # Generate embeddings
    df['embedding'] = df['log_text'].apply(lambda x: model.encode(x))
    return df

# Generate embeddings
embedded_df = embed_log_entries_hf(historical_df.copy())

print("\n=== Embedded Log Entries ===")
print(embedded_df[['log_text', 'embedding']].head())


=== Embedded Log Entries ===
                                            log_text  \
0  On 2023-10-01 at 08:05:00, Allison Hill perfor...   
1  On 2023-10-02 at 07:48:00, Allison Hill perfor...   
2  On 2023-10-03 at 07:45:00, Allison Hill perfor...   
3  On 2023-10-04 at 08:07:00, Allison Hill perfor...   
4  On 2023-10-05 at 08:10:00, Allison Hill perfor...   

                                           embedding  
0  [-0.029415758, 0.009264707, -0.033427626, 0.02...  
1  [-0.031590264, 0.008542097, -0.032836016, 0.02...  
2  [-0.029741656, 0.007885972, -0.0342469, 0.0236...  
3  [-0.029362421, 0.004619626, -0.030891113, 0.02...  
4  [-0.0514411, -0.0012354427, -0.00929477, 0.001...  


In [24]:
import faiss
import numpy as np
from langchain.vectorstores import FAISS
from langchain.docstore.in_memory import InMemoryDocstore
from langchain.schema import Document

# --- Vector Database with Hugging Face Embeddings ---
def create_vector_db_hf(embedded_df):
    """Store Hugging Face embeddings in a searchable vector database"""
    texts = embedded_df['log_text'].tolist()
    embeddings = np.array(embedded_df['embedding'].tolist())

    # Create FAISS index
    dimension = embeddings.shape[1]  # Get embedding dimension
    index = faiss.IndexFlatL2(dimension)  # Create FAISS index
    index.add(embeddings.astype('float32'))  # Add embeddings

    # Create LangChain FAISS wrapper
    vector_db = FAISS(
        embedding_function=None,  # Not needed since we have precomputed embeddings
        index=index,
        docstore=InMemoryDocstore(),
        index_to_docstore_id={i: i for i in range(len(texts))}
    )

    # Add text metadata
    for i, text in enumerate(texts):
        vector_db.docstore.add({i: Document(page_content=text)})

    return vector_db

# Create and persist vector DB
vector_db = create_vector_db_hf(embedded_df)

print("\n=== Vector Database Info ===")
print(f"Index size: {vector_db.index.ntotal}")




=== Vector Database Info ===
Index size: 10


In [34]:
def analyze_access_request(request, vector_db):
    """Compare real-time request with historical data"""
    model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

    # Create query text
    query_text = (
        f"New access request: {request['operation']} operation "
        f"on {request['dataset']} at {request['time']} "
        f"from {request['location']} (IP: {request['ip']}), "
        f"MFA failures: {request['mfa_failures']}"
    )

    # Generate query embedding
    query_embedding = model.encode(query_text)

    # Search FAISS index
    distances, indices = vector_db.index.search(
        np.array([query_embedding]).astype('float32'),
        k=3
    )

    # Retrieve matching documents
    similar_logs = []
    for idx in indices[0]:
        doc = vector_db.docstore.get(str(idx))
        if isinstance(doc, Document):  # Ensure doc is a Document object
            similar_logs.append({
                'log': doc.page_content,
                'score': float(1 - distances[0][idx])  # Convert distance to similarity
            })
        else:
            print(f"Warning: Retrieved object is not a Document: {doc}")

    return similar_logs

In [35]:
# --- Simulate Real-Time Request ---
def simulate_real_time_request(user_profile, is_suspicious=False):
    if is_suspicious:
        # Suspicious behavior
        request = {
            'dataset': 'Patient Records',
            'operation': 'Update',
            'time': '02:15:00',  # Unusual time
            'location': 'External Clinic',  # Unusual location
            'ip': '203.0.113.5',  # External IP
            'mfa_failures': 2  # Multiple failures
        }
    else:
        # Normal behavior
        request = {
            'dataset': 'Patient Records',
            'operation': 'Read',
            'time': '08:10:00',  # Typical time
            'location': 'Main Campus - Cardiology Wing',  # Typical location
            'ip': '10.1.1.15',  # Internal IP
            'mfa_failures': 0  # No failures
        }

    return request

# Simulate a normal request
normal_request = simulate_real_time_request(static_profile, is_suspicious=False)

# Simulate a suspicious request
suspicious_request = simulate_real_time_request(static_profile, is_suspicious=True)

# Analyze normal request
normal_similar_entries = analyze_access_request(normal_request, vector_db)

# Analyze suspicious request
suspicious_similar_entries = analyze_access_request(suspicious_request, vector_db)

# --- Display Results ---
print("\n=== Similar Historical Entries (Normal Request) ===")
for idx, entry in enumerate(normal_similar_entries, 1):
    print(f"\nMatch #{idx} (Similarity: {entry['score']:.2f}):")
    print(entry['log'])

print("\n=== Similar Historical Entries (Suspicious Request) ===")
for idx, entry in enumerate(suspicious_similar_entries, 1):
    print(f"\nMatch #{idx} (Similarity: {entry['score']:.2f}):")
    print(entry['log'])


AttributeError: 'InMemoryDocstore' object has no attribute 'get'

Simulating User Requests

In [37]:
def simulate_real_time_request(user_profile, is_suspicious=False):
    """Simulate a real-time access request"""
    if is_suspicious:
        # Suspicious behavior
        request = {
            'dataset': 'Patient Records',
            'operation': 'Update',
            'time': '02:15:00',  # Unusual time
            'location': 'External Clinic',  # Unusual location
            'ip': '203.0.113.5',  # External IP
            'mfa_failures': 2  # Multiple failures
        }
    else:
        # Normal behavior
        request = {
            'dataset': 'Patient Records',
            'operation': 'Read',
            'time': '08:10:00',  # Typical time
            'location': 'Main Campus - Cardiology Wing',  # Typical location
            'ip': '10.1.1.15',  # Internal IP
            'mfa_failures': 0  # No failures
        }

    return request

# Simulate a normal request
normal_request = simulate_real_time_request(static_profile, is_suspicious=False)

# Simulate a suspicious request
suspicious_request = simulate_real_time_request(static_profile, is_suspicious=True)

print("=== Normal Request ===")
print(normal_request)

print("\n=== Suspicious Request ===")
print(suspicious_request)

=== Normal Request ===
{'dataset': 'Patient Records', 'operation': 'Read', 'time': '08:10:00', 'location': 'Main Campus - Cardiology Wing', 'ip': '10.1.1.15', 'mfa_failures': 0}

=== Suspicious Request ===
{'dataset': 'Patient Records', 'operation': 'Update', 'time': '02:15:00', 'location': 'External Clinic', 'ip': '203.0.113.5', 'mfa_failures': 2}


In [36]:
def analyze_access_request(request, vector_db):
    """Compare real-time request with historical data"""
    model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

    # Create query text
    query_text = (
        f"New access request: {request['operation']} operation "
        f"on {request['dataset']} at {request['time']} "
        f"from {request['location']} (IP: {request['ip']}), "
        f"MFA failures: {request['mfa_failures']}"
    )

    # Generate query embedding
    query_embedding = model.encode(query_text)

    # Search FAISS index
    distances, indices = vector_db.index.search(
        np.array([query_embedding]).astype('float32'),
        k=3
    )

    # Retrieve matching documents
    similar_logs = []
    for idx in indices[0]:
        # Access the document directly using dictionary-like syntax or search
        # doc = vector_db.docstore[idx]  # if IDs are integers and sequential
        doc = vector_db.docstore.search(str(idx)) # Use search for more general cases

        if isinstance(doc, Document):  # Ensure doc is a Document object
            similar_logs.append({
                'log': doc.page_content,
                'score': float(1 - distances[0][idx])  # Convert distance to similarity
            })
        else:
            print(f"Warning: Retrieved object is not a Document: {doc}")

    return similar_logs