In [2]:
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
import os
import threading
import time
import psutil
from datetime import datetime, timedelta
import random
import numpy as np
from sklearn.linear_model import LinearRegression

class DriveScannerGUI:
    def __init__(self, master):
        self.master = master
        self.master.title("🔍 Drive File Scanner")
        self.master.geometry("900x650")
        self.master.configure(bg="#1e1e2f")

        self.scanning = False
        self.paused = False
        self.stopped = False
        self.total_files_scanned = 0
        self.total_size_scanned = 0
        self.start_time = None

        self.create_widgets()
        self.display_drive_info()

    def create_widgets(self):
        # Buttons Frame
        btn_frame = tk.Frame(self.master, bg="#1e1e2f")
        btn_frame.pack(pady=10)

        self.start_btn = tk.Button(btn_frame, text="Start Scan", command=self.start_scan, bg="#28a745", fg="white", width=15)
        self.start_btn.pack(side=tk.LEFT, padx=5)

        self.pause_btn = tk.Button(btn_frame, text="Pause", command=self.toggle_pause, bg="#ffc107", fg="black", width=15)
        self.pause_btn.pack(side=tk.LEFT, padx=5)

        self.stop_btn = tk.Button(btn_frame, text="Stop", command=self.stop_scan, bg="#dc3545", fg="white", width=15)
        self.stop_btn.pack(side=tk.LEFT, padx=5)

        self.export_btn = tk.Button(btn_frame, text="Export Results", command=self.export_results, bg="#007bff", fg="white", width=15)
        self.export_btn.pack(side=tk.LEFT, padx=5)

        # File Type Filter Dropdown
        filter_frame = tk.Frame(self.master, bg="#1e1e2f")
        filter_frame.pack(pady=5)
        tk.Label(filter_frame, text="Filter file type:", bg="#1e1e2f", fg="#00ffcc").pack(side=tk.LEFT)
        self.filetype_var = tk.StringVar(value="All files (*)")
        self.filetype_dropdown = ttk.Combobox(filter_frame, textvariable=self.filetype_var, width=20, state="readonly")
        self.filetype_dropdown['values'] = ("All files (*)", "*.txt", "*.py", "*.jpg", "*.png", "*.exe")
        self.filetype_dropdown.pack(side=tk.LEFT, padx=5)

        # Drive Info Label
        self.drive_info = tk.Label(self.master, text="", bg='#1e1e2f', fg='#00ffcc', font=("Segoe UI", 10), justify=tk.LEFT)
        self.drive_info.pack(pady=5, fill=tk.X, padx=10)

        # Forecast Info Label
        self.forecast_info = tk.Label(self.master, text="", bg='#1e1e2f', fg='#ffaa00', font=("Segoe UI", 9, "italic"), justify=tk.LEFT)
        self.forecast_info.pack(pady=3, fill=tk.X, padx=10)

        # Results Text Box with Scrollbar
        text_frame = tk.Frame(self.master)
        text_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True)

        self.text_box = tk.Text(text_frame, wrap="none", height=20, bg="#262638", fg="#00ff00")
        self.text_box.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)

        scrollbar = tk.Scrollbar(text_frame, command=self.text_box.yview)
        scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
        self.text_box.config(yscrollcommand=scrollbar.set)

        # Progress Bar
        self.progress = ttk.Progressbar(self.master, orient=tk.HORIZONTAL, length=700, mode='determinate')
        self.progress.pack(pady=5)

        # Estimated Time & Status
        self.status_label = tk.Label(self.master, text="", bg='#1e1e2f', fg='#ffffff')
        self.status_label.pack(pady=5)

        self.eta_label = tk.Label(self.master, text="", bg='#1e1e2f', fg='#aaaaaa')
        self.eta_label.pack()

        self.summary_label = tk.Label(self.master, text="", bg='#1e1e2f', fg='#00ffcc', font=("Segoe UI", 11, 'bold'))
        self.summary_label.pack(pady=5)

    def display_drive_info(self):
        info = ""
        for part in psutil.disk_partitions(all=False):
            try:
                usage = psutil.disk_usage(part.mountpoint)
                info += f"Drive {part.device} - Total: {self.convert_size(usage.total)}, Used: {self.convert_size(usage.used)}, Free: {self.convert_size(usage.free)}\n"
            except Exception:
                continue
        self.drive_info.config(text=info)
        self.drive_usage_forecasting()

    def convert_size(self, size_bytes):
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if size_bytes < 1024:
                return f"{size_bytes:.2f} {unit}"
            size_bytes /= 1024
        return f"{size_bytes:.2f} PB"

    def start_scan(self):
        if not self.scanning:
            path = filedialog.askdirectory(title="Select Folder to Scan")
            if path:
                self.scanning = True
                self.paused = False
                self.stopped = False
                self.total_files_scanned = 0
                self.total_size_scanned = 0
                self.start_time = time.time()
                self.text_box.delete(1.0, tk.END)
                self.summary_label.config(text="")
                self.progress['value'] = 0
                threading.Thread(target=self.scan_directory, args=(path,), daemon=True).start()

    def toggle_pause(self):
        if self.scanning:
            self.paused = not self.paused
            self.pause_btn.config(text="Resume" if self.paused else "Pause")
            self.status_label.config(text="⏸️ Paused" if self.paused else "▶️ Resumed")

    def stop_scan(self):
        if self.scanning:
            self.stopped = True
            self.scanning = False
            self.paused = False
            self.pause_btn.config(text="Pause")
            self.status_label.config(text="🛑 Scan Stopped by User")

    def scan_directory(self, path):
        # Count total files first for progress bar
        total_files = 0
        for root, dirs, files in os.walk(path):
            total_files += len(files)
        if total_files == 0:
            self.status_label.config(text="No files found in selected folder.")
            self.scanning = False
            return

        self.progress['maximum'] = total_files

        filter_type = self.filetype_var.get()
        # Convert filter to usable format
        filter_pattern = None
        if filter_type != "All files (*)":
            filter_pattern = filter_type.lstrip("*")

        for root, dirs, files in os.walk(path):
            while self.paused:
                time.sleep(0.5)
                self.status_label.config(text="⏸️ Paused")

            if self.stopped:
                break

            for file in files:
                if self.stopped:
                    break

                # File type filter check
                if filter_pattern and not file.endswith(filter_pattern):
                    continue

                try:
                    full_path = os.path.join(root, file)

                    # Access checks
                    is_readable = os.access(full_path, os.R_OK)
                    is_writable = os.access(full_path, os.W_OK)
                    is_executable = os.access(full_path, os.X_OK)

                    access_info = []
                    if is_readable:
                        access_info.append("Read")
                    if is_writable:
                        access_info.append("Write")
                    if is_executable:
                        access_info.append("Execute")

                    access_str = ", ".join(access_info) if access_info else "No Access"

                    file_size = os.path.getsize(full_path)
                    self.total_size_scanned += file_size

                    # --- File Anomaly & Risk Detection ---
                    risk_flag = self.check_risk(full_path, file, file_size)

                    display_line = f"{full_path} [Size: {self.convert_size(file_size)} | Access: {access_str}]"
                    if risk_flag:
                        display_line += " [⚠️ RISKY]"

                    self.text_box.insert(tk.END, display_line + "\n")
                    self.total_files_scanned += 1
                    self.update_status(total_files)
                    self.text_box.see(tk.END)
                except Exception as e:
                    self.text_box.insert(tk.END, f"{full_path} [Error: {e}]\n")
                    continue

        self.scanning = False
        if not self.stopped:
            self.status_label.config(text="✅ Scan Complete")
            self.eta_label.config(text="")
            elapsed_time = time.time() - self.start_time
            summary_text = (f"Total Files Scanned: {self.total_files_scanned}\n"
                            f"Total Size Scanned: {self.convert_size(self.total_size_scanned)}\n"
                            f"Elapsed Time: {elapsed_time:.2f} seconds")
            self.summary_label.config(text=summary_text)
        else:
            self.status_label.config(text="🛑 Scan Stopped")
            self.eta_label.config(text="")
            self.summary_label.config(text="")

    def update_status(self, total_files):
        self.progress['value'] = self.total_files_scanned
        elapsed = time.time() - self.start_time
        if self.total_files_scanned > 0 and elapsed > 0:
            rate = self.total_files_scanned / elapsed
            remaining = total_files - self.total_files_scanned
            eta_seconds = remaining / rate
            eta_str = str(timedelta(seconds=int(eta_seconds)))
            self.eta_label.config(text=f"Estimated time remaining: {eta_str}")
        self.status_label.config(text=f"Scanning... {self.total_files_scanned}/{total_files} files")

    def check_risk(self, full_path, file_name, file_size):
        # Simple heuristic examples for anomaly detection:

        # 1. Large executable files (e.g., > 50 MB)
        if file_name.endswith('.exe') and file_size > 50 * 1024 * 1024:
            return True

        # 2. Files with suspicious extensions or double extensions (e.g., ".txt.exe")
        if ".txt.exe" in file_name.lower():
            return True

        # 3. Hidden files (starts with .) with executable permission
        if file_name.startswith('.') and os.access(full_path, os.X_OK):
            return True

        # 4. Recently modified files that are large (e.g., > 100MB in last 1 day)
        try:
            mtime = os.path.getmtime(full_path)
            if file_size > 100 * 1024 * 1024 and (time.time() - mtime) < 86400:
                return True
        except Exception:
            pass

        return False

    def export_results(self):
        if self.total_files_scanned == 0:
            messagebox.showinfo("Export Results", "No scan results to export.")
            return
        export_path = filedialog.asksaveasfilename(defaultextension=".txt",
                                                   filetypes=[("Text files", "*.txt"), ("All files", "*.*")],
                                                   title="Save Scan Results")
        if export_path:
            try:
                with open(export_path, 'w', encoding='utf-8') as f:
                    f.write(self.text_box.get(1.0, tk.END))
                messagebox.showinfo("Export Results", f"Results exported successfully to:\n{export_path}")
            except Exception as e:
                messagebox.showerror("Export Results", f"Failed to export results:\n{e}")

    def drive_usage_forecasting(self):
        # Forecast disk usage increase over next 7 days using simple linear regression on past week data

        try:
            # Simulate past 7 days usage data in bytes
            # In real scenario, you'd get actual historical usage (e.g. saved daily)
            # Here, we create random data to simulate growth trend

            total, used, free = 0, 0, 0
            for part in psutil.disk_partitions(all=False):
                try:
                    usage = psutil.disk_usage(part.mountpoint)
                    total += usage.total
                    used += usage.used
                    free += usage.free
                except Exception:
                    continue

            today_used = used
            # Create synthetic data: usage over past 7 days (in GB)
            # Let's say usage was slightly increasing daily, simulate with noise
            days = np.arange(7).reshape(-1, 1)
            base_usage = (today_used - 7*500*1024*1024) / (1024**3)  # baseline usage 7 days ago (subtract 500MB daily approx)
            usage_values = base_usage + 0.5 * days.flatten() + np.random.normal(0, 0.1, size=7)
            usage_values = np.maximum(usage_values, 0)

            # Fit linear regression
            model = LinearRegression()
            model.fit(days, usage_values)

            # Predict usage next 7 days
            future_days = np.arange(7, 14).reshape(-1, 1)
            forecast = model.predict(future_days)

            # Prepare forecast summary
            forecast_text = "📊 Disk Usage Forecast (Next 7 days):\n"
            for i, val in enumerate(forecast):
                day = (datetime.now() + timedelta(days=i+1)).strftime("%a %b %d")
                forecast_text += f"  {day}: {val:.2f} GB used\n"

            self.forecast_info.config(text=forecast_text)

        except Exception as e:
            self.forecast_info.config(text=f"Forecast unavailable: {e}")


if __name__ == "__main__":
    root = tk.Tk()
    app = DriveScannerGUI(root)
    root.mainloop()