-
Notifications
You must be signed in to change notification settings - Fork 1
/
server.py
75 lines (56 loc) · 2.19 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import asyncio
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, UploadFile, File
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
redpanda_server = "localhost:9092" # Replace with your Redpanda server address
request_topic = "image-request"
reply_topic = "image-reply"
# Startup event handler
@asynccontextmanager
async def startup(app):
app.producer = AIOKafkaProducer(bootstrap_servers=[redpanda_server])
await app.producer.start()
app.consumer = AIOKafkaConsumer(
reply_topic,
group_id="image-reply-group"
)
await app.consumer.start()
yield
app = FastAPI(lifespan=startup)
# Mount the static directory to serve static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Redirect root URL to the static index.html
@app.get("/")
async def read_index():
return FileResponse('static/index.html')
# Endpoint to upload an image
@app.post("/upload-image/")
async def upload_image(file: UploadFile = File(...)):
# Save the file
current_dir = os.path.dirname(os.path.realpath(__file__))
file_location = os.path.join(current_dir, f"static/images/{file.filename}")
print(f"Saving file to {file_location}")
with open(file_location, "wb") as file_object:
file_object.write(await file.read())
# Send filename to Redpanda
await app.producer.send(request_topic, file.filename.encode('utf-8'))
return {"filename": file.filename}
# WebSocket endpoint
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
async def send_message_to_websocket(msg):
await websocket.send_text(msg.value.decode('utf-8'))
async def consume_from_topic(topic, callback):
print(f"Consuming from {topic}")
async for msg in app.consumer:
print(f"Received message: {msg.value.decode('utf-8')}")
await callback(msg)
# Start consuming
asyncio.create_task(consume_from_topic(reply_topic, send_message_to_websocket))
# Keep the connection open
while True:
await asyncio.sleep(10)