-
Notifications
You must be signed in to change notification settings - Fork 2
/
server.py
166 lines (140 loc) · 5.42 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""
This module is the main of the project.
It's responsible to run the application and render of requests.
"""
from typing import Optional
from fastapi import FastAPI, HTTPException, Request
from fastapi.logger import logger
from pydantic import BaseModel
from fastapi.openapi.docs import get_swagger_ui_html
import importlib
import sys
import os
import logging
import uvicorn
import sentry_sdk
SENTRY_DSN = os.environ.get("SENTRY_DSN", None)
SENTRY_RELEASE = os.environ.get("SENTRY_RELEASE", None)
if SENTRY_DSN:
params = {
"dsn": SENTRY_DSN,
}
if SENTRY_RELEASE:
params["release"] = SENTRY_RELEASE
sentry_sdk.init(**params)
class Body(BaseModel):
filepath: str
functionName: Optional[str] = ""
def import_src(path):
return importlib.machinery.SourceFileLoader("mod", path).load_module()
class FuncApp(FastAPI):
def __init__(self, loglevel=logging.DEBUG):
super(FuncApp, self).__init__()
self.userfunc = None
self.module = None
self.root = logging.getLogger()
self.ch = logging.StreamHandler(sys.stdout)
self.root.setLevel(loglevel)
self.ch.setLevel(loglevel)
self.ch.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
)
logger.addHandler(self.ch)
@self.post("/specialize", include_in_schema=False)
def load(request: Request):
logger.info("/specialize called")
self.module = import_src("/userfunc/user")
self.userfunc = self.module.main
self.include_router(self.module.router)
return ""
@self.post("/v2/specialize", include_in_schema=False)
def loadv2(body: Body):
filepath = body.filepath
handler = body.functionName
logger.info(
'/v2/specialize called with filepath = "{}" handler = "{}"'.format(
filepath, handler
)
)
# handler looks like `path.to.module.function`
parts = handler.rsplit(".", 1)
if len(handler) == 0:
# default to main.main if entrypoint wasn't provided
moduleName = "main"
funcName = "main"
elif len(parts) == 1:
moduleName = "main"
funcName = parts[0]
else:
moduleName = parts[0]
funcName = parts[1]
logger.debug(
'moduleName = "{}" funcName = "{}"'.format(moduleName, funcName)
)
# check whether the destination is a directory or a file
if os.path.isdir(filepath):
# add package directory path into module search path
sys.path.append(filepath)
logger.debug('__package__ = "{}"'.format(__package__))
if __package__:
self.module = importlib.import_module(moduleName, __package__)
else:
self.module = importlib.import_module(moduleName)
else:
# load source from destination python file
self.module = import_src(filepath)
# load user function from module
self.userfunc = getattr(self.module, funcName)
self.include_router(self.module.router)
return ""
@self.get("/healthz", include_in_schema=False)
def healthz():
return {}
@self.api_route(
"/",
methods=["GET", "POST", "PUT", "HEAD", "OPTIONS", "DELETE"],
include_in_schema=False,
)
async def f(
request: Request,
docs_func: Optional[str] = None,
openapi_json: Optional[str] = None,
):
if self.userfunc is None:
print("Generic container: no requests supported")
return HTTPException(status_code=500)
#
# Customizing the request context
#
# return openapi.json
if openapi_json:
return self.openapi()
# return doc page
if docs_func:
self.openapi_url = "?openapi_json=1"
root_path = request.scope.get("root_path", "").rstrip("/")
oauth2_redirect_url = self.swagger_ui_oauth2_redirect_url
if oauth2_redirect_url:
oauth2_redirect_url = root_path + oauth2_redirect_url
return get_swagger_ui_html(
openapi_url=self.openapi_url,
title="Fission Functions - Doc",
oauth2_redirect_url=oauth2_redirect_url,
init_oauth=self.swagger_ui_init_oauth,
)
return await self.userfunc(request)
@self.middleware("http")
async def sentry_exception(request: Request, call_next):
try:
response = await call_next(request)
return response
except Exception as e:
with sentry_sdk.push_scope() as scope:
scope.set_context("request", request)
user_id = "database_user_id" # when available
scope.user = {"ip_address": request.client.host, "id": user_id}
sentry_sdk.capture_exception(e)
raise e
app = FuncApp(logging.DEBUG)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8888)