In [43]:


import os
import shutil
import time
from subprocess import Popen, PIPE, STDOUT
import sys
import glob

# --- Cleanup Step: Terminate previous processes ---
print("--- Cleanup: Shutting down old processes... ---")
!fuser -k -9 8501/tcp || true
!pkill -f ngrok || true
print("Cleanup complete. Starting new session.")

# --- Step 0: Setup ---
print("\n--- Setup: Installing dependencies. This will take some time. ---")
!{sys.executable} -m pip install -q streamlit==1.36.0 opencv-python-headless==4.8.0.74 numpy==1.26.4 tensorflow==2.19.0 Pillow==9.5.0 scikit-learn==1.5.0 h5py==3.11.0 av requests tqdm pyngrok

# --- Verify pyngrok installation ---
print("Verifying pyngrok installation...")
try:
    from pyngrok import ngrok
    print("pyngrok imported successfully.")
except ImportError:
    print("Error: pyngrok could not be imported. Please ensure it installed correctly.")
    exit()

# --- Step 1: Define Paths and Prepare Files ---
print("\n--- Step 1: Preparing files for app launch ---")

# Define base paths for artifacts and app
ARTIFACTS_INPUT_PATH = '/kaggle/input/lma/pytorch/default/1/'
APP_ROOT_KAGGLE = '/kaggle/working/face_unlock_app/'
MODELS_DIR_KAGGLE = os.path.join(APP_ROOT_KAGGLE, 'models')
DNN_MODELS_DIR = os.path.join(MODELS_DIR_KAGGLE, 'dnn_face_detector_models')
DB_PATH = os.path.join(APP_ROOT_KAGGLE, 'face_db.db')

# Add a cleanup step for the database file
print("Cleaning up old database file...")
try:
    if os.path.exists(DB_PATH):
        os.remove(DB_PATH)
        print(f"Old database file '{DB_PATH}' removed successfully.")
    else:
        print("No old database file found. Starting fresh.")
except OSError as e:
    print(f"Warning: Could not remove database file. The app will attempt to fix the schema instead.")

# Create destination directories
os.makedirs(APP_ROOT_KAGGLE, exist_ok=True)
os.makedirs(MODELS_DIR_KAGGLE, exist_ok=True)
os.makedirs(DNN_MODELS_DIR, exist_ok=True)

# Define full, absolute paths for all files
LIVENESS_MODEL_SRC = os.path.join(ARTIFACTS_INPUT_PATH, 'liveness_model.h5')
LIVENESS_MODEL_DST = os.path.join(MODELS_DIR_KAGGLE, 'liveness_model.h5')

FACE_EMBEDDING_MODEL_SRC = os.path.join(ARTIFACTS_INPUT_PATH, 'face_embedding_model.h5')
FACE_EMBEDDING_MODEL_DST = os.path.join(MODELS_DIR_KAGGLE, 'face_embedding_model.h5')

FACE_DB_SRC = os.path.join(ARTIFACTS_INPUT_PATH, 'face_db.db')
FACE_DB_DST = os.path.join(APP_ROOT_KAGGLE, 'face_db.db')

CAFFE_MODEL_SRC = os.path.join(ARTIFACTS_INPUT_PATH, 'res10_300x300_ssd_iter_140000_fp16.caffemodel')
CAFFE_MODEL_DST = os.path.join(DNN_MODELS_DIR, 'res10_300x300_ssd_iter_140000_fp16.caffemodel')

PROTOTXT_SRC = os.path.join(ARTIFACTS_INPUT_PATH, 'deploy.prototxt')
PROTOTXT_DST = os.path.join(DNN_MODELS_DIR, 'deploy.prototxt')

# Copy artifacts to their destinations
try:
    print("Copying liveness model...")
    shutil.copy(LIVENESS_MODEL_SRC, LIVENESS_MODEL_DST)
    print("Copying face embedding model...")
    shutil.copy(FACE_EMBEDDING_MODEL_SRC, FACE_EMBEDDING_MODEL_DST)
    print("Copying face detector files...")
    shutil.copy(CAFFE_MODEL_SRC, CAFFE_MODEL_DST)
    shutil.copy(PROTOTXT_SRC, PROTOTXT_DST)
    
    if os.path.exists(FACE_DB_SRC):
        print("Copying existing database...")
        shutil.copy(FACE_DB_SRC, FACE_DB_DST)
    
    print("All files copied successfully.")

except FileNotFoundError as e:
    print(f"Error: A required file was not found. Please check your source paths.")
    print(f"Details: {e}")
    exit()

# --- Verification Step ---
print("Verifying model files exist...")
required_files = [LIVENESS_MODEL_DST, CAFFE_MODEL_DST, PROTOTXT_DST]
for f in required_files:
    if not os.path.exists(f):
        print(f"ERROR: Required file not found: {f}")
        exit()
print("All model files verified.")

# --- Step 2: Generating Python files ---
print("\n--- Step 2: Generating Python files ---")

# We'll generate utils.py and app.py using full absolute paths
models_abs_path = os.path.abspath(MODELS_DIR_KAGGLE)
app_root_abs_path = os.path.abspath(APP_ROOT_KAGGLE)

utils_code = f"""
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GlobalAveragePooling2D
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input as mobilenet_v2_preprocess
import h5py
import sqlite3
import streamlit as st
import datetime

# Define model paths using the absolute paths passed from the main script
MODELS_PATH_GLOBAL = "{models_abs_path}"

# Database file path using absolute path
DB_PATH = "{os.path.abspath(os.path.join(app_root_abs_path, 'face_db.db'))}"

# Database Manager Class
class SQLiteDatabaseManager:
    def __init__(self, db_path=DB_PATH):
        self.db_path = db_path
        self._initialize_db()

    def _initialize_db(self):
        conn = self._get_connection()
        c = conn.cursor()

        c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
        table_exists = c.fetchone()

        if table_exists:
            c.execute("PRAGMA table_info(users)")
            columns = [col[1] for col in c.fetchall()]
            if 'registration_date' not in columns:
                print("Adding missing 'registration_date' column to 'users' table.")
                c.execute("ALTER TABLE users ADD COLUMN registration_date TEXT")
        else:
            c.execute('''
                CREATE TABLE users (
                    id INTEGER PRIMARY KEY,
                    username TEXT UNIQUE NOT NULL,
                    embedding BLOB NOT NULL,
                    registration_date TEXT
                )
            ''')

        c.execute('''
            CREATE TABLE IF NOT EXISTS history (
                id INTEGER PRIMARY KEY,
                username TEXT,
                event_type TEXT,
                timestamp TEXT,
                success INTEGER
            )
        ''')
        conn.commit()
        conn.close()

    def _get_connection(self):
        return sqlite3.connect(self.db_path)

    def register_user(self, username, embedding):
        try:
            conn = self._get_connection()
            c = conn.cursor()
            embedding_blob = sqlite3.Binary(embedding)
            registration_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            c.execute("INSERT INTO users (username, embedding, registration_date) VALUES (?, ?, ?)", 
                      (username, embedding_blob, registration_date))
            conn.commit()
            conn.close()
            return True, "User registered successfully."
        except sqlite3.IntegrityError:
            return False, "Username already exists."
        except Exception as e:
            return False, f"Database error: {{e}}"

    def get_user_embedding(self, username):
        conn = self._get_connection()
        c = conn.cursor()
        c.execute("SELECT embedding FROM users WHERE username = ?", (username,))
        result = c.fetchone()
        conn.close()
        if result:
            return np.frombuffer(result[0], dtype=np.float32)
        return None

    def get_all_users(self):
        conn = self._get_connection()
        c = conn.cursor()
        c.execute("SELECT username, registration_date FROM users")
        users = c.fetchall()
        conn.close()
        return users
    
    def log_event(self, username, event_type, success):
        conn = self._get_connection()
        c = conn.cursor()
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        c.execute("INSERT INTO history (username, event_type, timestamp, success) VALUES (?, ?, ?, ?)", 
                  (username, event_type, timestamp, int(success)))
        conn.commit()
        conn.close()

    def get_history(self, username=None):
        conn = self._get_connection()
        c = conn.cursor()
        if username:
            c.execute("SELECT username, event_type, timestamp, success FROM history WHERE username = ?", (username,))
        else:
            c.execute("SELECT username, event_type, timestamp, success FROM history")
        history = c.fetchall()
        conn.close()
        return history

# Model and Detector Loading Functions using absolute paths
@st.cache_resource
def load_face_embedding_model():
    base_model = MobileNetV2(
        input_shape=(224, 224, 3), 
        include_top=False, 
        weights='imagenet'
    )
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    embedding_model = Model(inputs=base_model.input, outputs=x)
    return embedding_model

@st.cache_resource
def load_liveness_model():
    return tf.keras.models.load_model(os.path.join(MODELS_PATH_GLOBAL, 'liveness_model.h5'))

@st.cache_resource
def load_face_detector(prototxt_path, model_path):
    return cv2.dnn.readNetFromCaffe(prototxt_path, model_path)

# Core ML Logic Functions
def detect_and_align_face(frame, net):
    (h, w) = frame.shape[:2]
    blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0))
    net.setInput(blob)
    detections = net.forward()
    for i in range(0, detections.shape[2]):
        confidence = detections[0, 0, i, 2]
        if confidence > 0.5:
            box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
            (startX, startY, endX, endY) = box.astype("int")
            
            face_width = endX - startX
            face_height = endY - startY
            aspect_ratio = face_width / face_height
            if not (0.7 < aspect_ratio < 1.3):
                return None, None

            face = frame[startY:endY, startX:endX]
            face = cv2.resize(face, (224, 224))
            return face, (startX, startY, endX, endY)
    return None, None

def get_face_embedding(face, embedding_model):
    face = face.astype('float32')
    face_rgb = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
    face_processed = mobilenet_v2_preprocess(face_rgb)
    face_expanded = np.expand_dims(face_processed, axis=0)
    embedding = embedding_model.predict(face_expanded, verbose=0)[0]
    embedding = embedding / np.linalg.norm(embedding)
    return embedding

def check_liveness(face_image):
    gray = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)
    
    _, thresh = cv2.threshold(blurred, 220, 255, cv2.THRESH_BINARY)
    
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    if contours:
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > 50 and area < 500:
                (x, y), radius = cv2.minEnclosingCircle(contour)
                if cv2.arcLength(contour, True) > 0:
                    circularity = (4 * np.pi * area) / (cv2.arcLength(contour, True) ** 2)
                    if circularity > 0.6:
                        return False, "Suspicious reflection detected. Please remove any shiny objects."
    return True, "Liveness check passed."
    
def verify_face(known_embedding, new_embedding, threshold=0.5):
    distance = np.linalg.norm(known_embedding - new_embedding)
    is_match = distance < threshold
    return is_match, distance
"""

app_code = f"""
import streamlit as st
import cv2
import numpy as np
from utils import (
    load_face_embedding_model, 
    load_face_detector, 
    check_liveness,
    verify_face,
    SQLiteDatabaseManager,
    detect_and_align_face,
    get_face_embedding
)
import os

# Define model paths using the absolute paths passed from the main script
MODELS_DIR = "{models_abs_path}"
DNN_MODELS_DIR = "{os.path.join(models_abs_path, 'dnn_face_detector_models')}"

face_embedding_model = load_face_embedding_model()
face_detector = load_face_detector(os.path.join(DNN_MODELS_DIR, 'deploy.prototxt'), os.path.join(DNN_MODELS_DIR, 'res10_300x300_ssd_iter_140000_fp16.caffemodel'))
db_manager = SQLiteDatabaseManager()

if 'page' not in st.session_state:
    st.session_state.page = 'Home'
if 'live_frame' not in st.session_state:
    st.session_state.live_frame = None
if 'username' not in st.session_state:
    st.session_state.username = ''

st.sidebar.title("Navigation")
page = st.sidebar.radio("Go to", ["Home", "Register", "Authenticate", "History"])

if page == "Home":
    st.title("Face Unlock System")
    st.write("Welcome to the face unlock application. Use the sidebar to navigate.")
    st.info("This application supports webcam authentication and registration, as well as a history log.")

elif page == "Register":
    st.title("User Registration")
    st.info("Enter a username and use your webcam to take a photo. A clear, well-lit photo works best.")
    
    username = st.text_input("Enter a username to register:")
    st.session_state.username = username
    
    registration_mode = st.radio("Choose registration method:", ["Webcam", "Upload Photo"])
    
    if registration_mode == "Webcam":
        captured_photo = st.camera_input("Take a picture to register")
        if captured_photo is not None:
            if not username:
                st.error("Please enter a username.")
            else:
                file_bytes = np.asarray(bytearray(captured_photo.read()), dtype=np.uint8)
                img = cv2.imdecode(file_bytes, 1)

                face, box = detect_and_align_face(img, face_detector)
                if face is not None:
                    # Removed liveness check for registration as requested
                    embedding = get_face_embedding(face, face_embedding_model)
                    success, message = db_manager.register_user(username, embedding.tobytes())
                    if success:
                        st.success(f"Registration successful for user '{{username}}'!")
                    else:
                        st.error(message)
                else:
                    st.warning("No face detected in the captured image. Please try again.")
    
    elif registration_mode == "Upload Photo":
        uploaded_file = st.file_uploader("Upload a photo to register", type=["jpg", "jpeg", "png"])
        if uploaded_file is not None:
            if not username:
                st.error("Please enter a username.")
            else:
                file_bytes = np.asarray(bytearray(uploaded_file.read()), dtype=np.uint8)
                img = cv2.imdecode(file_bytes, 1)
                st.image(img, channels="BGR", caption="Uploaded Photo")

                face, box = detect_and_align_face(img, face_detector)
                if face is not None:
                    embedding = get_face_embedding(face, face_embedding_model)
                    success, message = db_manager.register_user(username, embedding.tobytes())
                    if success:
                        st.success(f"Registration successful for user '{{username}}'!")
                    else:
                        st.error(message)
                else:
                    st.warning("No face detected in the uploaded image. Please try again.")


elif page == "Authenticate":
    st.title("User Authentication")
    st.info("Authenticate using your webcam. A liveness check will be performed.")

    auth_username = st.text_input("Enter username for authentication:")
    captured_photo = st.camera_input("Take a picture to authenticate")

    if captured_photo is not None:
        if not auth_username:
            st.error("Please enter a username.")
        else:
            stored_embedding_bytes = db_manager.get_user_embedding(auth_username)
            if stored_embedding_bytes is None:
                st.error(f"User '{{auth_username}}' not found.")
                db_manager.log_event(auth_username, "Authentication", False)
            else:
                file_bytes = np.asarray(bytearray(captured_photo.read()), dtype=np.uint8)
                live_frame = cv2.imdecode(file_bytes, 1)

                face, box = detect_and_align_face(live_frame, face_detector)
                if face is not None:
                    is_live, liveness_message = check_liveness(live_frame)
                    if is_live:
                        live_embedding = get_face_embedding(face, face_embedding_model)
                        is_match, distance = verify_face(stored_embedding_bytes, live_embedding)
                        if is_match:
                            st.success(f"Authentication Successful! Distance: {{distance:.2f}} (Lower is better)")
                            db_manager.log_event(auth_username, "Authentication", True)
                        else:
                            st.error(f"Authentication Failed. Distance: {{distance:.2f}}")
                            db_manager.log_event(auth_username, "Authentication", False)
                    else:
                        st.warning(f"Liveness check failed: {{liveness_message}}")
                        db_manager.log_event(auth_username, "Authentication", False)
                else:
                    st.warning("No face detected in the image.")
                    db_manager.log_event(auth_username, "Authentication", False)

elif page == "History":
    st.title("Activity History")
    
    history = db_manager.get_history()
    
    if history:
        st.write("---")
        for username, event_type, timestamp, success in history:
            status = "✅ Success" if success else "❌ Failed"
            st.write(f"**User:** {{username}} | **Event:** {{event_type}} | **Timestamp:** {{timestamp}} | **Status:** {{status}}")
        st.write("---")
    else:
        st.info("No activity history found.")

    st.write("---")
    st.header("Registered Users")
    users = db_manager.get_all_users()
    if users:
        for username, reg_date in users:
            st.write(f"**User:** {{username}} | **Registration Date:** {{reg_date}}")
    else:
        st.info("No users registered yet.")
"""

# Write the generated code to files
with open(os.path.join(APP_ROOT_KAGGLE, 'utils.py'), "w") as f:
    f.write(utils_code)
with open(os.path.join(APP_ROOT_KAGGLE, 'app.py'), "w") as f:
    f.write(app_code)
print("utils.py and app.py generated successfully.")

# --- Phase 3: Running Streamlit App with ngrok ---
print("\n--- Phase 3: Running Streamlit App ---")
print("Attempting to set ngrok auth token...")
try:
    ngrok.set_auth_token("30NVTR3IDN5fDUVZ6XegDSSj2ho_SCnczzHNLMG43LqgYpAK")
    print("ngrok auth token set successfully.")
except Exception as e:
    print(f"Error setting ngrok auth token: {e}")
    print("Please ensure your ngrok token is correct.")
    exit()

print("Starting Streamlit app in the background...")
streamlit_command = [
    "streamlit", "run", os.path.join(APP_ROOT_KAGGLE, 'app.py'),
    "--server.port", "8501",
    "--server.enableCORS", "false",
    "--browser.gatherUsageStats", "false"
]

with open("streamlit_logs.txt", "w") as log_file:
    streamlit_process = Popen(streamlit_command, stdout=log_file, stderr=STDOUT, text=True)
print("Streamlit app process started. Waiting 5 seconds for it to initialize...")

time.sleep(5)

print("Attempting to create ngrok tunnel...")
try:
    public_url = ngrok.connect(8501)
    print("ngrok tunnel created successfully.")
    print(f"Your ngrok URL is: {public_url}")
except Exception as e:
    print(f"Error starting ngrok: {e}")

print("\n--- Streamlit Logs (if app doesn't load) ---")
!cat streamlit_logs.txt

--- Cleanup: Shutting down old processes... ---
8501/tcp:             5960
Cleanup complete. Starting new session.

--- Setup: Installing dependencies. This will take some time. ---
Verifying pyngrok installation...
pyngrok imported successfully.

--- Step 1: Preparing files for app launch ---
Cleaning up old database file...
Old database file '/kaggle/working/face_unlock_app/face_db.db' removed successfully.
Copying liveness model...
Copying face embedding model...
Copying face detector files...
Copying existing database...
All files copied successfully.
Verifying model files exist...
All model files verified.

--- Step 2: Generating Python files ---
utils.py and app.py generated successfully.

--- Phase 3: Running Streamlit App ---
Attempting to set ngrok auth token...
ngrok auth token set successfully.
Starting Streamlit app in the background...
Streamlit app process started. Waiting 5 seconds for it to initialize...
Attempting to create ngrok tunnel...
ngrok tunnel created successf