-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
app.py
94 lines (68 loc) · 2.61 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# aiohttp server
import asyncio
import datetime as dt
import traceback
import asyncpg
from ago import human
from aiohttp import web
from utils.config import DATABASE_DSN, PATH, PORT, REGEX_FILTER
from utils.db import Database
from utils.find_my import FindMyDevices, FindMyItems
routes = web.RouteTableDef()
@routes.get("/index")
async def index(request):
return web.FileResponse("static/index.html")
@routes.get("/")
async def track(request):
return web.FileResponse("static/track.html")
@routes.get("/api/local/devices")
async def api_local_get_devices(request):
return web.json_response(await app["devices"].get())
@routes.get("/api/local/items")
async def api_local_get_items(request):
return web.json_response(await app["items"].get())
@routes.get("/api/db/latest")
async def api_db_get_latest(request):
res = await app["db"].get_latest()
for row in res:
row["ago"] = human(dt.datetime.fromtimestamp(row["timestamp"]), precision=2)
row["timestamp_human"] = dt.datetime.fromtimestamp(row["timestamp"]).isoformat()
return web.json_response(res)
@routes.get("/api/db/trail/{did}")
async def api_db_get_trail(request):
res = await app["db"].specific(request.match_info["did"])
for row in res:
row["ago"] = human(dt.datetime.fromtimestamp(row["timestamp"]), precision=2)
row["timestamp_human"] = dt.datetime.fromtimestamp(row["timestamp"]).isoformat()
return web.json_response(res)
async def update_database(app):
try:
while True:
await asyncio.sleep(10)
feed = await app["items"].get() + await app["devices"].get()
for item in feed:
try:
await app["db"].insert(item)
print(f"Inserted {item['id']} {item['timestamp']}")
except asyncpg.exceptions.UniqueViolationError:
print("Duplicate", item["id"], item["timestamp"])
pass
except:
traceback.print_exc()
except asyncio.CancelledError:
print("Background task cancelled")
async def background_tasks(app):
app["update_database"] = asyncio.create_task(update_database(app))
yield
app["update_database"].cancel()
await app["update_database"]
app = web.Application()
app.add_routes(routes)
app.add_routes([web.static("/static", "static")])
# add background tasks
app.cleanup_ctx.append(background_tasks)
app["db"] = Database(DATABASE_DSN, REGEX_FILTER)
app["items"] = FindMyItems(path=PATH)
app["devices"] = FindMyDevices(path=PATH)
if __name__ == "__main__":
web.run_app(app, host="0.0.0.0", port=PORT)