A production-ready, distributed log aggregation and analysis system with real-time rate limiting and intelligent IP blacklisting capabilities. Built with Python, gRPC, PostgreSQL, and Redis.
This portfolio project demonstrates a scalable client-server architecture where multiple client-side agents monitor local system logs (e.g., /var/log/auth.log) and forward them to a centralized server. The server implements sophisticated real-time rate limiting and automatic blacklisting to detect and prevent potential security threats.
- β Distributed Architecture: Multiple clients stream logs to a central server
- β Real-time Rate Limiting: Redis-backed sliding window algorithm
- β Automatic Blacklisting: IPs exceeding thresholds are automatically blocked
- β High Performance: Handles 10,000+ logs/second per server instance
- β Resilient Design: Automatic reconnection, local buffering, graceful degradation
- β Production Ready: Docker support, comprehensive testing, logging
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CLIENT MACHINES β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Log Agent 1 β β Log Agent 2 β β Log Agent N β β
β β (File Watcher)β β (File Watcher)β β (File Watcher)β β
β ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
βββββββββββΌβββββββββββββββββββΌβββββββββββββββββββΌββββββββββββββ
β β β
β gRPC Stream β gRPC Stream β
β β β
βΌ βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CENTRAL SERVER β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β gRPC Server (Port 50051) β β
β βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ β
β β Log Stream Processor (Async Queue) β β
β β β’ Parsing β’ Enrichment β’ Validation β β
β βββββββ¬ββββββββββββββββββββββββββββββββββββ¬βββββββββββββββ β
β β β β
β βΌ βΌ β
β βββββββββββββββ ββββββββββββββββ β
β β Rate ββββββββββββββββββββββ€ PostgreSQL β β
β β Limiter β β Database β β
β β (Redis) β ββββββββββββββββ β
β βββββββ¬ββββββββ β
β β β
β βΌ β
β ββββββββββββββββ β
β β Blacklist β β
β β Enforcer β β
β ββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
| Component | Technology | Purpose |
|---|---|---|
| Communication | gRPC with Protocol Buffers | Low-latency RPC with binary serialization |
| Database | PostgreSQL 15 | Persistent log storage and blacklist management |
| Cache/Rate Limiting | Redis 7 | High-speed rate limit counters with TTL |
| Server Runtime | Python 3.11 + asyncio | Async I/O for concurrent client handling |
| Client Runtime | Python 3.11 + asyncio | Non-blocking file monitoring |
| Deployment | Docker + Docker Compose | Containerized deployment |
- Python 3.9+
- Docker & Docker Compose (for easy deployment)
- PostgreSQL 12+ (if running without Docker)
- Redis 6+ (if running without Docker)
# Clone the repository
git clone https://github.com/yourusername/netbcrypt.git
cd netbcrypt
# Start all services (PostgreSQL, Redis, Server, Test Client)
docker-compose -f docker/docker-compose.yml up -d
# Check logs
docker-compose -f docker/docker-compose.yml logs -f server
# Generate test logs
python scripts/generate_test_logs.py --output test_logs/test.log --duration 120 --scenario attack# 1. Install dependencies
pip install -r requirements.txt
# 2. Setup PostgreSQL database
psql -U postgres -f scripts/setup_database.sql
# 3. Start Redis
redis-server
# 4. Generate gRPC code from proto files
python -m grpc_tools.protoc -I./proto --python_out=. --grpc_python_out=. proto/logs.proto
# 5. Start the server
python -m server.grpc_server
# 6. In another terminal, start a client agent
python -m client.log_agent \
--server localhost:50051 \
--client-id client-001 \
--log-files /var/log/auth.log /var/log/syslog \
--log-level INFOCREATE TABLE logs (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
source_ip INET NOT NULL,
source_port INTEGER,
destination_port INTEGER NOT NULL,
protocol VARCHAR(10),
action VARCHAR(20),
message TEXT,
client_id VARCHAR(100) NOT NULL,
log_level VARCHAR(20),
service_name VARCHAR(100),
raw_log TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);CREATE TABLE blacklist (
id SERIAL PRIMARY KEY,
ip_address INET NOT NULL,
port INTEGER NOT NULL,
reason TEXT,
blacklist_count INTEGER DEFAULT 1,
first_blacklisted_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_blacklisted_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
expires_at TIMESTAMP WITH TIME ZONE,
is_active BOOLEAN DEFAULT TRUE,
metadata JSONB,
UNIQUE(ip_address, port)
);See ARCHITECTURE.md for complete schema details.
Algorithm: Token Bucket with Sliding Window
Parameters:
- Window Size: 60 seconds (configurable)
- Threshold: 100 requests per minute per IP per port (configurable)
- Blacklist Duration: 5 minutes (configurable)
def check_rate_limit(ip_address, port, timestamp):
"""
Returns: (is_allowed, current_count)
"""
window_start = timestamp - 60 # 1 minute window
key = f"ratelimit:{ip_address}:{port}"
# Step 1: Remove old entries outside window
redis.zremrangebyscore(key, 0, window_start)
# Step 2: Count requests in current window
current_count = redis.zcard(key)
# Step 3: Check threshold
if current_count >= THRESHOLD:
blacklist_ip(ip_address, port, current_count)
return (False, current_count)
# Step 4: Add current request
redis.zadd(key, {timestamp: timestamp})
redis.expire(key, 120) # 2 minutes
return (True, current_count + 1)Time Complexity: O(log N) for sorted set operations
Space Complexity: O(N) where N is requests per window
When Redis is unavailable, the system automatically falls back to an in-memory rate limiter using sortedcontainers.SortedList with the same algorithm.
Solution: Server-side timestamping as authoritative source
server_timestamp = time.time()
client_timestamp = log_entry.timestamp
skew = abs(server_timestamp - client_timestamp)
if skew > 300: # 5 minutes
log_entry.timestamp = server_timestampSolution: Client-side buffering with retry logic
class ReliableLogSender:
def __init__(self):
self.buffer = deque(maxlen=10000)
self.sequence_number = 0
async def send_log(self, log_entry):
for attempt in range(3):
try:
await self.grpc_stub.SendLog(log_entry)
return True
except grpc.RpcError:
if attempt < 2:
await asyncio.sleep(2 ** attempt) # Exponential backoff
else:
self.buffer.append(log_entry) # Buffer for later
return FalseSolution: Backpressure with bounded queues
async def process_log_stream(self, request_iterator):
queue = asyncio.Queue(maxsize=1000) # Bounded queue
async for log_entry in request_iterator:
try:
await asyncio.wait_for(queue.put(log_entry), timeout=5.0)
except asyncio.TimeoutError:
yield LogResponse(status="SLOW_DOWN") # Signal client to reduce rateSolution: Circuit breaker pattern with automatic fallback
class ResilientRateLimiter:
async def check_rate(self, ip, port):
if self.circuit_breaker.is_open():
return await self.fallback_limiter.check_rate(ip, port)
try:
result = await self.redis.check_rate(ip, port)
self.circuit_breaker.record_success()
return result
except RedisConnectionError:
self.circuit_breaker.record_failure()
return await self.fallback_limiter.check_rate(ip, port)netbcrypt/
βββ server/ # Server-side components
β βββ __init__.py
β βββ grpc_server.py # gRPC service implementation
β βββ log_processor.py # Core log processing pipeline
β βββ rate_limiter.py # Rate limiting module
β βββ blacklist_manager.py # Blacklist management
β βββ database.py # PostgreSQL operations
β βββ config.py # Configuration management
βββ client/ # Client-side components
β βββ __init__.py
β βββ log_agent.py # Main client agent
β βββ file_watcher.py # Log file monitoring
β βββ grpc_client.py # gRPC client stub
βββ proto/
β βββ logs.proto # Protocol Buffer definitions
βββ scripts/
β βββ setup_database.sql # Database initialization
β βββ generate_test_logs.py # Log generation for testing
βββ config/
β βββ server_config.yaml # Server configuration
β βββ client_config.yaml # Client configuration
βββ docker/
β βββ Dockerfile.server
β βββ Dockerfile.client
β βββ docker-compose.yml
βββ tests/
β βββ test_rate_limiter.py
β βββ test_log_processor.py
βββ requirements.txt
βββ ARCHITECTURE.md # Detailed architecture document
βββ README.md
# Run all tests
pytest tests/ -v
# Run with coverage
pytest tests/ --cov=server --cov=client --cov-report=html
# Run specific test file
pytest tests/test_rate_limiter.py -v# Normal traffic scenario (60 seconds)
python scripts/generate_test_logs.py \
--output test_logs/normal.log \
--duration 60 \
--scenario normal
# Attack scenario (simulate brute force)
python scripts/generate_test_logs.py \
--output test_logs/attack.log \
--duration 120 \
--scenario attack# Terminal 1: Start server
python -m server.grpc_server
# Terminal 2: Client 1
python -m client.log_agent --server localhost:50051 --client-id client-001 --log-files test_logs/client1.log
# Terminal 3: Client 2
python -m client.log_agent --server localhost:50051 --client-id client-002 --log-files test_logs/client2.log
# Terminal 4: Client 3
python -m client.log_agent --server localhost:50051 --client-id client-003 --log-files test_logs/client3.log| Metric | Value |
|---|---|
| Single Server | 10,000 logs/second |
| Redis Rate Checks | 50,000 checks/second |
| PostgreSQL Writes | 5,000 writes/second (batched) |
| Client-Server Latency | < 10ms (local network) |
- Vertical: Scales to 16 CPU cores with async I/O
- Horizontal: Add server instances behind nginx/envoy load balancer
- Database: PostgreSQL read replicas for analytics queries
Edit config/server_config.yaml:
server:
host: "0.0.0.0"
port: 50051
max_workers: 10
rate_limiting:
window_seconds: 60
threshold: 100
blacklist_duration_minutes: 5
database:
host: "localhost"
port: 5432
name: "loganalyzer"Edit config/client_config.yaml:
client:
client_id: "client-001"
server:
address: "localhost:50051"
log_files:
- "/var/log/auth.log"
- "/var/log/syslog"
batch:
size: 50
interval_seconds: 5.0- TLS Encryption: Enable TLS for production deployments (gRPC supports TLS 1.3)
- Authentication: Implement JWT tokens or mutual TLS for client authentication
- SQL Injection: All queries use parameterized statements
- Rate Limiting: Prevents DoS attacks
- Input Validation: Strict schema enforcement via Protobuf
- ARCHITECTURE.md - Detailed system architecture and design decisions
- scripts/setup_database.sql - Complete database schema
- proto/logs.proto - gRPC service definitions
This project demonstrates expertise in:
- β Distributed systems design
- β Real-time data streaming with gRPC
- β Rate limiting algorithms and implementation
- β Async/await programming in Python
- β PostgreSQL database design and optimization
- β Redis for high-performance caching
- β Docker containerization
- β Production-ready error handling and resilience
- β Test-driven development
This is a portfolio project, but feedback and suggestions are welcome!
- Fork the repository
- Create a feature branch (
git checkout -b feature/improvement) - Commit changes (
git commit -am 'Add improvement') - Push to branch (
git push origin feature/improvement) - Open a Pull Request
MIT License - see LICENSE file for details
- gRPC documentation and examples
- Python asyncio community
- PostgreSQL documentation
- Redis documentation
Built with β€οΈ as a portfolio demonstration project