SafeStream is a real-time, AI-powered fraud detection platform built using a microservice architecture. It processes transactions in real time using Kafka, applies rule-based + AI-based fraud analysis, and streams alerts live to a dashboard using WebSockets.
- Auth Service — User registration, login, JWT auth, roles (admin / analyst)
- Producer API — Accepts transactions, stores them, publishes to Kafka
- Fraud Engine — Consumes transactions, runs rules + AI, publishes alerts
- Dashboard Service — Live fraud alerts via WebSocket
- Frontend — React + Vite + Tailwind + Framer Motion
- FastAPI
- PostgreSQL
- Confluent Kafka
- WebSockets
- Hugging Face / Vertex AI
- React + Tailwind CSS
SafeStream/
.
├── backend
│ ├── alembic
│ │ ├── env.py
│ │ ├── __pycache__
│ │ ├── README
│ │ ├── script.py.mako
│ │ └── versions
│ ├── alembic.ini
│ ├── auth_service
│ │ ├── app.py
│ │ ├── crud.py
│ │ ├── deps.py
│ │ ├── __init__.py
│ │ ├── main.py
│ │ ├── models.py
│ │ ├── __pycache__
│ │ ├── requirements.txt
│ │ ├── routes.py
│ │ ├── schemas.py
│ │ └── security.py
│ ├── dashboard
│ │ ├── consumer.py
│ │ ├── main.py
│ │ ├── __pycache__
│ │ ├── requirements.txt
│ │ ├── static
│ │ ├── templates
│ │ └── websocket_manager.py
│ ├── db
│ │ ├── base.py
│ │ ├── __init__.py
│ │ ├── models
│ │ ├── __pycache__
│ │ ├── requirements.txt
│ │ ├── session.py
│ │ └── smoke_test.py
│ ├── fraud_engine
│ │ ├── ai.py
│ │ ├── consumer.py
│ │ ├── main.py
│ │ ├── models.py
│ │ ├── producer.py
│ │ ├── __pycache__
│ │ ├── requirements.txt
│ │ └── rules.py
│ ├── infra
│ │ └── confluent.env.example
│ ├── __init__.py
│ └── producer_api
│ ├── kafka_producer.py
│ ├── main.py
│ ├── models.py
│ ├── __pycache__
│ └── requirements.txt
├── frontend
│ ├── auth
│ │ ├── login.html
│ │ └── register.html
│ ├── eslint.config.js
│ ├── FRONTEND_ARCHITECTURE.md
│ ├── index.html
│ ├── js
│ │ └── auth.js
│ ├── package.json
│ ├── package-lock.json
│ ├── postcss.config.js
│ ├── public
│ │ └── vite.svg
│ ├── README.md
│ ├── src
│ │ ├── api
│ │ ├── App.css
│ │ ├── App.tsx
│ │ ├── assets
│ │ ├── components
│ │ ├── context
│ │ ├── data
│ │ ├── index.css
│ │ ├── main.tsx
│ │ ├── pages
│ │ ├── router
│ │ ├── store
│ │ └── utils
│ ├── tailwind.config.js
│ ├── tsconfig.app.json
│ ├── tsconfig.json
│ ├── tsconfig.node.json
│ ├── vite.config.ts
│ └── WEBSOCKET_IMPLEMENTATION.md
└── README.md
Make sure you have:
- Python 3.10+
- Node.js 18+
- PostgreSQL 14+
- Git
- Confluent Cloud account (Kafka)
Each backend service uses its own .env.
Example: infra/.env
KAFKA_BOOTSTRAP=pkc-xxxx.ap-south-1.gcp.confluent.cloud:9092
KAFKA_API_KEY=your_api_key
KAFKA_API_SECRET=your_api_secretExample: auth_service/.env
DATABASE_URL=postgresql://postgres:password@localhost:5432/safestream
SECRET_KEY=super-secret-key
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=60
REFRESH_TOKEN_EXPIRE_DAYS=7Example: fraud_engine/.env
KAFKA_BOOTSTRAP=...
KAFKA_API_KEY=...
KAFKA_API_SECRET=...
AI_PROVIDER=mock
# AI_PROVIDER=huggingface_api
# AI_PROVIDER=vertex
HF_MODEL=Qwen/Qwen2.5-1.5B-Instruct
HF_API_KEY=optionalCREATE DATABASE safestream_auth;
CREATE DATABASE safestream_transactions;cd backend
alembic upgrade head
python3 -m venv .venv
source .venv/bin/activatepip install -r requirements.txtuvicorn auth_service.main:app --reload --port 9000Endpoints:
POST /auth/registerPOST /auth/loginGET /auth/me
uvicorn producer_api.main:app --reload --port 8000Endpoint:
POST /transaction
python3 -m fraud_engine.mainConsumes:
transactions-stream
Produces:
fraud-alerts
uvicorn dashboard.main:app --reload --port 8001WebSocket: ws://127.0.0.1:8001/ws?token=<ACCESS_TOKEN>
cd frontend
npm install
npm run devRuns on:
http://localhost:5173
- User registers →
/auth/register - User logs in →
/auth/login - Receives:
access_tokenrefresh_tokenrole
- Token stored in frontend
- Used for:
- Protected API calls
- WebSocket authentication
- Role-based UI rendering
| Role | Access |
|---|---|
| analyst | Dashboard, alerts, insights |
| admin | User management, system control |
Supported modes:
mock→ local testinghuggingface_api→ HF Inference APIvertex→ Google Vertex AI (production)
Switch using:
export AI_PROVIDER=mockcurl -X POST http://127.0.0.1:8000/transaction \
-H "Content-Type: application/json" \
-d '{
"txn_id": "TXN_TEST_1",
"user_id": 1,
"amount": 80000,
"merchant": "Electronics",
"device_id": "new-device",
"location": "Delhi",
"timestamp": 1732171200
}'