<a href="https://colab.research.google.com/github/pallavikumari22/The-insider-der-breach-detector/blob/main/the_insider_breach_project_of_internship_digital_forensics_using_python_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install python-docx PyPDF2



In [None]:
import os
import hashlib
import datetime
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
import json
import random
import docx
import PyPDF2

# Simulate a file system with sensitive files (NTFS-like)
def create_sample_files(directory="sensitive_files"):
    """Create a directory with sample files for analysis, simulating NTFS metadata."""
    os.makedirs(directory, exist_ok=True)
    sample_files = [
        ("client_report.txt", "text/plain", "Sample client report content "),
        ("financial_summary.pdf", "application/pdf", b"Sample PDF content"),
        ("confidential.docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "Sample Word content")
    ]
    for file_name, mime_type, content in sample_files:
        file_path = os.path.join(directory, file_name)
        created_time = datetime.datetime.now() - datetime.timedelta(days=random.randint(30, 365))

        if isinstance(content, bytes):
            with open(file_path, "wb") as f:
                f.write(content)
        else:
            with open(file_path, "w") as f:
                f.write(content)
        mod_time = created_time + datetime.timedelta(hours=random.randint(1, 24))
        access_time = mod_time + datetime.timedelta(hours=random.randint(1, 24))

        if mod_time > datetime.datetime.now():
            mod_time = datetime.datetime.now() - datetime.timedelta(minutes=random.randint(1, 60))
        if access_time > datetime.datetime.now():
            access_time = datetime.datetime.now() - datetime.timedelta(minutes=random.randint(1, 60))


        os.utime(file_path, (access_time.timestamp(), mod_time.timestamp()))

# Simulate NTFS file system parsing and permission audit
def analyze_file_timestamps(directory="sensitive_files"):
    """Collect NTFS-like file metadata (MAC times, permissions, size)."""
    file_data = []
    for file_name in os.listdir(directory):
        file_path = os.path.join(directory, file_name)
        stats = os.stat(file_path) #collecting metadata into varialble file_size
        file_size = stats.st_size

        actual_mod_time = datetime.datetime.fromtimestamp(stats.st_mtime)
        actual_access_time = datetime.datetime.fromtimestamp(stats.st_atime)

        simulated_created_time = actual_mod_time - datetime.timedelta(days=random.randint(30, 365))


        # Simulate NTFS permissions (simplified)
        permissions = "Admin" if stats.st_mode & 0o600 else "Restricted"
        file_data.append({
            "file": file_name,
            "created": simulated_created_time, # Using our simulated older created time
            "modified": actual_mod_time,
            "accessed": actual_access_time,
            "size_bytes": file_size,
            "permissions": permissions
        })
    return pd.DataFrame(file_data)


# Call function and print result
# df = analyze_file_timestamps()  # Or pass another folder path
# print(df)

# Generate file hashes to detect tampering
def calculate_file_hashes(file_path):
    """Calculate MD5 and SHA256 hashes for a file."""
    md5_hash = hashlib.md5()
    sha256_hash = hashlib.sha256()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            md5_hash.update(chunk)
            sha256_hash.update(chunk)
    return md5_hash.hexdigest(), sha256_hash.hexdigest()

# Simulate prefetch analysis
def simulate_prefetch_analysis():
    """Simulate prefetch file data for program execution (Windows-specific)."""
    programs = ["notepad.exe", "wordpad.exe", "cmd.exe"]
    prefetch_data = []
    for _ in range(10):
        program = random.choice(programs)
        exec_time = datetime.datetime.now() - datetime.timedelta(hours=random.randint(0, 48))
        prefetch_data.append({
            "program": program,
            "exec_time": exec_time,
            "user": random.choice(["ria", "sita", "yashi"]),
            "suspicious": exec_time.hour < 6 or exec_time.hour > 16
        })
    return pd.DataFrame(prefetch_data)
simulate_prefetch_analysis()

# Simulate BitLocker logs
def simulate_bitlocker_logs():
    """Simulate BitLocker encryption status logs."""
    bitlocker_data = []
    drives = ["C:", "D:"]
    for _ in range(5):
        drive = random.choice(drives)
        log_time = datetime.datetime.now() - datetime.timedelta(hours=random.randint(0, 48))
        bitlocker_data.append({
            "drive": drive,
            "log_time": log_time,
            "status": random.choice(["Encrypted", "Unlocked", "Decrypted"]),
            "user": random.choice(["ria", "sita", "yashi"])
        })
    return pd.DataFrame(bitlocker_data)

# Simulate EXIF metadata extraction
def extract_exif_metadata(directory="sensitive_files"):
    """Simulate EXIF metadata extraction from Word/PDF files."""
    metadata = []
    for file_name in os.listdir(directory):
        file_path = os.path.join(directory, file_name)
        if file_name.endswith(".docx"):
            # Simulate created time for EXIF that is generally older than now
            exif_created_time = datetime.datetime.now() - datetime.timedelta(days=random.randint(50,100))
            metadata.append({
                "file": file_name,
                "author": random.choice(["ria", "sita", "yashi"]),
                "created": exif_created_time,
                "location": "Unknown"
            })
        elif file_name.endswith(".pdf"):
            # Simulate created time for EXIF that is generally older than now
            exif_created_time = datetime.datetime.now() - datetime.timedelta(days=random.randint(50,100))
            metadata.append({
                "file": file_name,
                "author": random.choice(["ria", "sita", "yashi"]),
                "created": exif_created_time,
                "location": "Unknown"
            })
    return pd.DataFrame(metadata)

# Simulate firewall logs
def generate_firewall_logs():
    """Generate sample firewall logs for incoming/outgoing traffic."""
    users = ["ria", "sita", "yashi"]
    firewall_logs = []
    for _ in range(15):
        user = random.choice(users)
        log_time = datetime.datetime.now() - datetime.timedelta(hours=random.randint(0, 48))
        is_suspicious = log_time.hour < 6 or log_time.hour > 22 or "192.168.1.255" in f"192.168.1.{random.randint(1, 260)}"
        firewall_logs.append({
            "user": user,
            "log_time": log_time,
            "ip_address": f"192.168.1.{random.randint(1, 260)}",
            "traffic_type": random.choice(["Inbound", "Outbound"]),
            "suspicious": is_suspicious
        })
    return pd.DataFrame(firewall_logs)

# Simulate login data for users
def simulate_login_data():
    """Simulate user login data."""
    users = ["ria", "sita", "yashi", "nisha", "aarti"]
    login_data = []
    for _ in range(20):
        user = random.choice(users)
        log_time = datetime.datetime.now() - datetime.timedelta(hours=random.randint(0, 72))
        # Define suspicious login times (e.g., outside business hours)
        suspicious = (log_time.hour < 7 or log_time.hour > 22) and random.random() < 0.3 # Higher chance of suspicious activity outside hours
        login_data.append({
            "user": user,
            "log_time": log_time,
            "ip_address": f"192.168.1.{random.randint(1, 255)}",
            "status": random.choice(["success", "failure"]),
            "suspicious": suspicious
        })
    return pd.DataFrame(login_data)


# Anomaly detection (login times, file sizes, formats)
def detect_anomalies(login_df, file_df, prefetch_df, firewall_df):
    """Score users based on suspicious logins, file sizes, and activities."""
    # Suspicious login times
    login_df["hour"] = login_df["log_time"].apply(lambda x: x.hour)
    suspicious_logins = login_df[login_df["suspicious"]]
    login_anomaly_scores = suspicious_logins.groupby("user").size() / len(login_df)

    # File size anomalies (e.g., size > 1MB or < 10 bytes)
    file_df["size_anomaly"] = file_df["size_bytes"].apply(lambda x: x > 1_000_000 or x < 10)
    file_anomaly_scores = file_df[file_df["size_anomaly"]].groupby("file").size() / len(file_df)

    # Prefetch anomalies
    suspicious_prefetch = prefetch_df[prefetch_df["suspicious"]]
    prefetch_anomaly_scores = suspicious_prefetch.groupby("user").size() / len(prefetch_df)

    # Firewall anomalies
    suspicious_firewall = firewall_df[firewall_df["suspicious"]]
    firewall_anomaly_scores = suspicious_firewall.groupby("user").size() / len(firewall_df)

    # Combine scores
    combined_scores = pd.concat([login_anomaly_scores, prefetch_anomaly_scores,firewall_anomaly_scores], axis=1, keys=["login", "prefetch","firewall"]).fillna(0)
    combined_scores["total"] = combined_scores.sum(axis=1)
    return combined_scores

# Function to categorize users as suspicious or non-suspicious
def categorize_users(anomaly_scores, threshold=0.1):
    """
    Categorizes users as suspicious or non-suspicious based on their total anomaly score.
    A user is marked suspicious if their total anomaly score exceeds the threshold.
    """
    users_data = []
    all_users = anomaly_scores.index.unique().tolist()

    for user in all_users:
        score = anomaly_scores.loc[user, 'total'] if user in anomaly_scores.index else 0
        is_suspicious = score > threshold
        users_data.append({
            "user": user,
            "anomaly_score": score,
            "suspicious": is_suspicious
        })

    users_df = pd.DataFrame(users_data)

    suspicious_users_df = users_df[users_df['suspicious']].sort_values(by='anomaly_score', ascending=False)
    non_suspicious_users_df = users_df[~users_df['suspicious']].sort_values(by='anomaly_score', ascending=True)

    return suspicious_users_df, non_suspicious_users_df


# Time-sequence correlation
def correlate_timestamps(file_df, login_df, prefetch_df, bitlocker_df, firewall_df):
    """Combine timestamps from all sources for correlation."""
    events = []
    # File events
    for _, row in file_df.iterrows():
        events.append({"time": row["modified"], "type": "File Modified", "details": row["file"]})
        events.append({"time": row["created"], "type": "File Created", "details": row["file"]})
        events.append({"time": row["accessed"], "type": "File Accessed", "details": row["file"]})
    # Login events
    for _, row in login_df.iterrows():
        events.append({"time": row["log_time"], "type": f"Login: {row['user']}", "details": row["ip_address"]})
    # Prefetch events
    for _, row in prefetch_df.iterrows():
        events.append({"time": row["exec_time"], "type": f"Program: {row['program']}", "details": row["user"]})
    # BitLocker events
    for _, row in bitlocker_df.iterrows():
        events.append({"time": row["log_time"], "type": f"BitLocker: {row['status']}", "details": row["drive"]})
    # Firewall events
    for _, row in firewall_df.iterrows():
        events.append({"time": row["log_time"], "type": f"Firewall: {row['traffic_type']}", "details": row["ip_address"]})
    return pd.DataFrame(events)


# Visualize timeline of all activities
def plot_timeline(correlation_df):
    """Create a timeline visualization of all correlated events."""
    plt.figure(figsize=(12, 8))
    for event_type in correlation_df["type"].unique():
        events = correlation_df[correlation_df["type"] == event_type]
        plt.scatter(events["time"], [event_type] * len(events), label=event_type, alpha=0.6)
    plt.xlabel("Time")
    plt.ylabel("Event Type")
    plt.title("Timeline of Suspicious Activities")
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig("timeline.png")
    plt.close()

# Generate forensic report
def generate_report(file_df, login_df, prefetch_df, bitlocker_df, firewall_df, exif_df, anomaly_scores, suspicious_users_df, non_suspicious_users_df):
    """Generate a forensic report in JSON format."""
    sample_file = "sensitive_files/client_report.txt"
    md5, sha256 = calculate_file_hashes(sample_file)
    report = {
        "file_analysis": file_df.to_dict(orient="records"),
        "login_analysis": login_df[login_df["suspicious"]].to_dict(orient="records"),
        "prefetch_analysis": prefetch_df[prefetch_df["suspicious"]].to_dict(orient="records"),
        "bitlocker_logs": bitlocker_df.to_dict(orient="records"),
        "firewall_logs": firewall_df[firewall_df["suspicious"]].to_dict(orient="records"),
        "exif_metadata": exif_df.to_dict(orient="records"),
        "anomaly_scores": anomaly_scores.to_dict(),
        "suspicious_users": suspicious_users_df.to_dict(orient="records"),
        "non_suspicious_users": non_suspicious_users_df.to_dict(orient="records"),
        "sample_file_hashes": {"file": sample_file, "md5": md5, "sha256": sha256},
        "timestamp": datetime.datetime.now().isoformat()
    }
    with open("forensic_report.json", "w") as f:
        json.dump(report, f, indent=4, default=str)

# Main function to run the forensic analysis
def main():
    print("Starting forensic investigation...")
    # Step 1: Create sample files
    print("Creating sample files...")
    create_sample_files()
    # Step 2: Analyze file timestamps
    print("Analyzing file timestamps...")
    file_df = analyze_file_timestamps()
    # Step 3: Extract EXIF metadata
    print("Extracting EXIF metadata...")
    exif_df = extract_exif_metadata()
    # Step 4: Simulate prefetch analysis
    print("Simulating prefetch analysis...")
    prefetch_df = simulate_prefetch_analysis()
    # Step 5: Simulate BitLocker logs
    print("Simulating BitLocker logs...")
    bitlocker_df = simulate_bitlocker_logs()
    # Step 6: Generate firewall logs
    print("Generating firewall logs...")
    firewall_df = generate_firewall_logs()
    # Step 7: Simulate login data
    print("Simulating login data...")
    login_df = simulate_login_data()
    # Step 8: Detect anomalies
    print("Detecting anomalies...")
    anomaly_scores = detect_anomalies(login_df, file_df, prefetch_df, firewall_df)
    print("Anomaly Scores:\n", anomaly_scores)

    # Step 9: Categorize users
    print("\nCategorizing users as suspicious or non-suspicious...")
    suspicious_users_df, non_suspicious_users_df = categorize_users(anomaly_scores)
    print("\nSuspicious Users:\n", suspicious_users_df)
    print("\nNon-Suspicious Users:\n", non_suspicious_users_df)

    # Step 10: Correlate timestamps
    print("\nCorrelating timestamps...")
    correlation_df = correlate_timestamps(file_df, login_df, prefetch_df, bitlocker_df, firewall_df)
    # Step 11: Visualize timeline
    print("Creating timeline visualization...")
    plot_timeline(correlation_df)
    # Step 12: Generate report
    print("Generating forensic report...")
    generate_report(file_df, login_df, prefetch_df, bitlocker_df, firewall_df, exif_df, anomaly_scores, suspicious_users_df, non_suspicious_users_df)
    print("Investigation complete. Check 'forensic_report.json' and 'timeline.png'.")

if __name__ == "__main__":
    main()

Starting forensic investigation...
Creating sample files...
Analyzing file timestamps...
Extracting EXIF metadata...
Simulating prefetch analysis...
Simulating BitLocker logs...
Generating firewall logs...
Simulating login data...
Detecting anomalies...
Anomaly Scores:
        login  prefetch  firewall     total
user                                      
aarti   0.05       0.0  0.000000  0.050000
sita    0.05       0.1  0.133333  0.283333
yashi   0.05       0.2  0.066667  0.316667
ria     0.00       0.2  0.133333  0.333333

Categorizing users as suspicious or non-suspicious...

Suspicious Users:
     user  anomaly_score  suspicious
3    ria       0.333333        True
2  yashi       0.316667        True
1   sita       0.283333        True

Non-Suspicious Users:
     user  anomaly_score  suspicious
0  aarti           0.05       False

Correlating timestamps...
Creating timeline visualization...
Generating forensic report...
Investigation complete. Check 'forensic_report.json' and 'timeli

In [None]:
from google.colab import files

files.download('forensic_report.json')
files.download('timeline.png')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
#NTFS - file data
df = analyze_file_timestamps()
print(df)

                    file                    created  \
0      confidential.docx 2024-05-10 10:21:07.570438   
1      client_report.txt 2024-02-20 09:21:07.570009   
2  financial_summary.pdf 2024-04-26 19:21:07.570284   

                    modified                   accessed  size_bytes  \
0 2024-09-25 10:21:07.570438 2025-07-16 10:21:08.672426          19   
1 2024-09-02 09:21:07.570009 2025-07-16 10:21:09.058459          29   
2 2025-03-13 19:21:07.570284 2025-07-16 10:21:08.672426          18   

  permissions  
0       Admin  
1       Admin  
2       Admin  


In [None]:
# Create sample files (optional if already created)
create_sample_files()

# Directory with your files
directory = "sensitive_files"

print("\nFile Hashes (MD5 and SHA256):\n")
for file_name in os.listdir(directory):
    file_path = os.path.join(directory, file_name)
    md5, sha256 = calculate_file_hashes(file_path)
    print(f"ðŸ“„ {file_name}")
    print(f"   MD5    : {md5}")
    print(f"   SHA256 : {sha256}\n")



File Hashes (MD5 and SHA256):

ðŸ“„ confidential.docx
   MD5    : ec7ff32bbed9c1cf9c504470d0b5551d
   SHA256 : 164b6c0f25f031a5290b941cf5555dbbf9fe30cbef0d0d0c5a4c9b4d3a6ebe83

ðŸ“„ client_report.txt
   MD5    : 0c5baeaf16c8cb8fc4010c493c54cac4
   SHA256 : 92d711833765d5b481f5a9234721962dad917b8dced1deb2261ad2aa8b825a38

ðŸ“„ financial_summary.pdf
   MD5    : 59c2953b14145ae42b8fcd5400913c1e
   SHA256 : b3b4e8714358cc79990c5c83391172e01c3e79a1b456d7e0c570cbf59da30e23



In [None]:
simulate_prefetch_analysis()

Unnamed: 0,program,exec_time,user,suspicious
0,cmd.exe,2025-07-14 16:21:09.135775,sita,False
1,notepad.exe,2025-07-15 14:21:09.135789,yashi,False
2,notepad.exe,2025-07-15 11:21:09.135793,sita,False
3,cmd.exe,2025-07-14 14:21:09.135797,ria,False
4,cmd.exe,2025-07-14 20:21:09.135800,ria,True
5,cmd.exe,2025-07-15 13:21:09.135803,yashi,False
6,cmd.exe,2025-07-15 12:21:09.135807,yashi,False
7,cmd.exe,2025-07-15 00:21:09.135810,sita,True
8,notepad.exe,2025-07-14 19:21:09.135813,yashi,True
9,wordpad.exe,2025-07-14 12:21:09.135816,sita,False


In [None]:
print(simulate_bitlocker_logs().to_string(index=False))

drive                   log_time    status user
   D: 2025-07-15 09:21:09.159439 Encrypted sita
   D: 2025-07-14 19:21:09.159464 Encrypted  ria
   C: 2025-07-15 17:21:09.159473  Unlocked sita
   C: 2025-07-15 21:21:09.159481  Unlocked sita
   C: 2025-07-15 03:21:09.159487  Unlocked  ria


In [None]:
#print exif metadata

# Call the function and print the result
df = extract_exif_metadata()
print(df)

                    file author                    created location
0      confidential.docx   sita 2025-04-30 10:21:09.168340  Unknown
1  financial_summary.pdf  yashi 2025-05-27 10:21:09.168367  Unknown


In [None]:
# Call the function and print output
df = generate_firewall_logs()
print(df)

     user                   log_time     ip_address traffic_type  suspicious
0   yashi 2025-07-15 01:21:09.181267  192.168.1.236     Outbound        True
1   yashi 2025-07-16 04:21:09.181285  192.168.1.122     Outbound        True
2   yashi 2025-07-15 13:21:09.181291   192.168.1.90      Inbound       False
3   yashi 2025-07-15 01:21:09.181297   192.168.1.84     Outbound        True
4   yashi 2025-07-15 03:21:09.181301  192.168.1.230      Inbound        True
5     ria 2025-07-16 09:21:09.181306  192.168.1.192     Outbound       False
6     ria 2025-07-14 15:21:09.181311  192.168.1.186      Inbound       False
7     ria 2025-07-15 11:21:09.181315  192.168.1.242      Inbound       False
8     ria 2025-07-16 01:21:09.181321  192.168.1.114      Inbound        True
9     ria 2025-07-15 16:21:09.181325   192.168.1.34     Outbound       False
10   sita 2025-07-15 00:21:09.181329   192.168.1.23      Inbound        True
11   sita 2025-07-15 12:21:09.181334  192.168.1.135     Outbound       False

In [None]:
df = simulate_login_data()
print(df)

     user                   log_time     ip_address   status  suspicious
0     ria 2025-07-15 18:21:09.192210  192.168.1.229  failure       False
1   nisha 2025-07-14 23:21:09.192228   192.168.1.44  failure        True
2     ria 2025-07-15 05:21:09.192234   192.168.1.59  failure       False
3     ria 2025-07-14 01:21:09.192239  192.168.1.165  success       False
4   nisha 2025-07-16 02:21:09.192243  192.168.1.164  failure       False
5    sita 2025-07-13 11:21:09.192247  192.168.1.138  success       False
6     ria 2025-07-14 11:21:09.192251   192.168.1.33  success       False
7   nisha 2025-07-15 22:21:09.192255   192.168.1.50  failure       False
8   yashi 2025-07-14 09:21:09.192259  192.168.1.140  success       False
9     ria 2025-07-15 11:21:09.192263  192.168.1.195  success       False
10   sita 2025-07-14 14:21:09.192267  192.168.1.176  failure       False
11  aarti 2025-07-15 01:21:09.192270   192.168.1.22  success       False
12  nisha 2025-07-14 18:21:09.192275  192.168.1.162