# **FraudSleuth Gen AI**

## 1. Setup and Imports

In [1]:
# Environment
import os
from dotenv import load_dotenv
load_dotenv()

# LLM + LangChain
from google import genai
from google.genai import types
from google.api_core import retry
# from langchain.prompts import PromptTemplate
# from langchain.chains import LLMChain
from langchain_core.runnables import Runnable
from langchain_core.prompts import PromptTemplate
#from langchain.agents import initialize_agent, Tool, AgentType
#from langchain.memory import ConversationBufferMemory
from langchain_google_genai.chat_models import ChatGoogleGenerativeAI
from langgraph.graph import StateGraph, END
from langgraph.graph import MessageGraph



# ChromaDB
import chromadb
from chromadb import Documents, EmbeddingFunction, Embeddings

# Requests for Fraud API
import requests
import json

import re

## 2. Config

In [2]:
# Example .env usage (you can inline if needed)
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
FRAUD_API_KEY = os.getenv("FRAUD_API_KEY")

GENAI_MODEL="gemini-2.0-flash"
EMBEDDING_MODEL = "models/embedding-001"
CHROMA_COLLECTION_NAME = "fraud_docs"

FRAUD_API_URL = "https://ipqualityscore.com/api/json/ip/"


In [3]:
# Authenticate Gemini
client = genai.Client(api_key = GEMINI_API_KEY)

is_retriable = lambda e: (isinstance(e, genai.errors.APIError) and e.code in {429, 503})

genai.models.Models.generate_content = retry.Retry(
    predicate=is_retriable)(genai.models.Models.generate_content)

## 3. Embedding Function

In [4]:
class GeminiEmbeddingFunction(EmbeddingFunction):
    # Specify whether to generate embeddings for documents, or queries
    def __init__(self, document_mode=True):
        self.document_mode = document_mode

    def __call__(self, input: Documents) -> Embeddings:
        if self.document_mode:
            embedding_task = "retrieval_document"
        else:
            embedding_task = "retrieval_query"

        
        response = client.models.embed_content(
                                    model = EMBEDDING_MODEL,
                                    contents = input,
                                    config = types.EmbedContentConfig(
                                        task_type = embedding_task,
                                    )
                                )
        
        # Return list of embedding vectors
        return [e.values for e in response.embeddings]

## 4. Chroma Vector Search

In [None]:
class ChromaVectorSearch:

    def __init__(self, persist_directory="chroma_db", collection_name = CHROMA_COLLECTION_NAME):
        
        self.persist_directory = persist_directory
        self.collection_name = collection_name
        self.client = chromadb.PersistentClient(path = self.persist_directory)
        self.embed_fn = GeminiEmbeddingFunction(document_mode = True)
    
        # Create or get collection
        self.collection = self.client.get_or_create_collection(name = self.collection_name,
                                                               embedding_function = self.embed_fn)
        
        self.load_documents_from_file("../data/knowledge_base.txt")

    def load_documents_from_file(self, file_path: str):
        if os.path.exists(file_path):
            with open(file_path, "r") as f:
                docs = [line.strip() for line in f if line.strip()]
                self.add_documents(docs)
                print(f"✅ Loaded {len(docs)} documents from {file_path} into ChromaDB.")
        else:
            print(f"⚠️ File not found: {file_path}")

    def add_documents(self, documents: list[str], ids: list[str] = None):
        """
        Adds documents to ChromaDB with embeddings.
        """
        if not documents:
            print("⚠️ No documents to add.")
            return
        if ids is None:
            ids = [str(i) for i in range(len(documents))]
        self.collection.add(documents = documents, ids = ids)

    def query(self, text: str, n_results: int = 5):
        """
        Queries the collection for top matching documents.
        """

        results = self.collection.query(query_texts=[text], n_results = n_results)
        return results

    def reset_collection(self):
        self.client.delete_collection(self.collection_name)
        self.collection = self.client.get_or_create_collection(name = self.collection_name, 
                                                               embedding_function = self.embed_fn)

## 5. Fraud Detection API Tool

In [6]:
class FraudChecker:
    def __init__(self):
        self.api_key = FRAUD_API_KEY
        self.api_url = FRAUD_API_URL

    def check_ip(self, ip: str) -> str:
        try:
            url = f"{self.api_url}{self.api_key}/{ip}"
            response = requests.get(url)
            if response.status_code == 200:
                data = response.json()
                return f"Fraud Score: {data.get('fraud_score')}, VPN: {data.get('vpn')},"\
                       f" Proxy: {data.get('proxy')}, Tor: {data.get('tor')},"\
                       f" Crawler: {data.get('crawler')}, Recent Abuse: {data.get('recent_abuse')},"\
                       f" Bot: {data.get('is_bot')}"
            else:
                return f"Error from API: {response.status_code}"
        except Exception as e:
            return f"Exception during fraud check: {str(e)}"

## 6. LangChain Prompt Template and Model Setup

In [7]:
# Prompt template to synthesize context and external API result
prompt_template = PromptTemplate(
    input_variables=["query", "retrieved_docs", "fraud_api_result"],
    template="""
        You are a fraud detection assistant.

        User Query:
        {query}

        Retrieved Knowledge Context:
        {retrieved_docs}

        Fraud Check Result (if applicable):
        {fraud_api_result}

        Based on all of the above, provide a clear and actionable response to the user.
        """
        )

# Set up LangChain Gemini Chat model
llm = ChatGoogleGenerativeAI(model = GENAI_MODEL, temperature = 0.2, google_api_key = GEMINI_API_KEY)
llm_chain: Runnable = prompt_template | llm

In [8]:
# Define state
from typing import TypedDict

class FraudDetectionState(TypedDict):
    query: str
    retrieved_docs: str
    fraud_api_result: str
    response: str

## 7. Tool Definition and Agent

In [9]:
# Initialize tools
fraud_checker = FraudChecker()
vector_search = ChromaVectorSearch()

⚠️ File not found: data/knowledge_base.txt


In [10]:
def extract_ip_or_email(query: str):
    """Basic regex utility to extract IPs or emails from a query."""
    ip_match = re.search(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', query)
    email_match = re.search(r'\b[\w\.-]+@[\w\.-]+\.\w{2,4}\b', query)
    return ip_match.group(0) if ip_match else email_match.group(0) if email_match else None

In [11]:
# Vector Search
def retrieve_docs(state: FraudDetectionState) -> FraudDetectionState:
    print("🔧 Tool: Vector Search called...")
    results = vector_search.query(state["query"])
    docs = "\n".join(results["documents"][0])
    return {**state, "retrieved_docs": docs}

# Fraud API Check
def check_fraud_api(state: FraudDetectionState) -> FraudDetectionState:
    print("🔧 Tool: Check Fraud API called...")
    suspicious_input = extract_ip_or_email(state["query"])
    if suspicious_input:
        result = fraud_checker.check_ip(suspicious_input)
    else:
        result = "No IP or email detected in query."
    return {**state, "fraud_api_result": result}

# Final LLM synthesis
def generate_response(state: FraudDetectionState) -> FraudDetectionState:
    print("🔧 Tool: Generate Response called...")
    response = llm_chain.invoke({
        "query": state["query"],
        "retrieved_docs": state["retrieved_docs"],
        "fraud_api_result": state["fraud_api_result"]
    })
    return {**state, "response": response}

In [12]:
graph = StateGraph(FraudDetectionState)

# Add nodes
graph.add_node("vector_search", retrieve_docs)
graph.add_node("fraud_check", check_fraud_api)
graph.add_node("generate_response", generate_response)

# Define flow
graph.set_entry_point("vector_search")
graph.add_edge("vector_search", "fraud_check")
graph.add_edge("fraud_check", "generate_response")
graph.add_edge("generate_response", END)

# Compile the app
fraud_graph = graph.compile()

In [None]:
# Install the mermaid Jupyter extension if not already installed
# !pip install jupyter_contrib_nbextensions && jupyter contrib nbextension install --user
# !jupyter nbextension enable --py mermaid

Collecting jupyter_contrib_nbextensions
  Downloading jupyter_contrib_nbextensions-0.7.0.tar.gz (23.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.5/23.5 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting ipython_genutils (from jupyter_contrib_nbextensions)
  Downloading ipython_genutils-0.2.0-py2.py3-none-any.whl.metadata (755 bytes)
Collecting jupyter_contrib_core>=0.3.3 (from jupyter_contrib_nbextensions)
  Downloading jupyter_contrib_core-0.4.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25ldone
Collecting jupyter_highlight_selected_word>=0.1.1 (from jupyter_contrib_nbextensions)
  Downloading jupyter_highlight_selected_word-0.2.0-py2.py3-none-any.whl.metadata (730 bytes)
Collecting jupyter_nbextensions_configurator>=0.4.0 (from jupyter_contrib_nbextensions)
  Downloading jupyter_nbextensions_configurator-0.6.4-py2.py3-none-any.whl.metadata (1.8 kB)
Collecting nbco

In [24]:
mermaid_code = fraud_graph.get_graph().draw_mermaid()

from IPython.display import display, Markdown

display(Markdown(f"```mermaid\n{mermaid_code}\n```"))

```mermaid
---
config:
  flowchart:
    curve: linear
---
graph TD;
	__start__([<p>__start__</p>]):::first
	vector_search(vector_search)
	fraud_check(fraud_check)
	generate_response(generate_response)
	__end__([<p>__end__</p>]):::last
	__start__ --> vector_search;
	fraud_check --> generate_response;
	generate_response --> __end__;
	vector_search --> fraud_check;
	classDef default fill:#f2f0ff,line-height:1.2
	classDef first fill-opacity:0
	classDef last fill:#bfb6fc

```

## 8. Test Query

In [13]:
initial_state = {"query": "What can you tell me about this IP: 198.51.100.23?"}
final_state = fraud_graph.invoke(initial_state)

print("✅ Final Response:", final_state["response"].content)

🔧 Tool: Vector Search called...
🔧 Tool: Check Fraud API called...
🔧 Tool: Generate Response called...
✅ Final Response: Based on the information I have, the IP address 198.51.100.23 has a fraud score of 0 and doesn't appear to be associated with any known malicious activity. It's not flagged as a VPN, proxy, Tor exit node, crawler, bot, or exhibiting recent abuse.

In short, this IP address currently appears to be safe.
