diff --git a/clearest/core.py b/clearest/core.py index 614f163..de77f0f 100644 --- a/clearest/core.py +++ b/clearest/core.py @@ -14,7 +14,8 @@ from clearest.exceptions import MissingArgumentError, AlreadyRegisteredError, NotUniqueError, HttpError, \ HttpNotFound, NotRootError, HttpUnsupportedMediaType -from clearest.http import HTTP_GET, HTTP_POST, CONTENT_TYPE, MIME_TEXT_PLAIN, HTTP_OK, MIME_WWW_FORM_URLENCODED +from clearest.http import HTTP_GET, HTTP_POST, CONTENT_TYPE, MIME_TEXT_PLAIN, HTTP_OK, MIME_WWW_FORM_URLENCODED, \ + MIME_FORM_DATA, CONTENT_DISPOSITION from clearest.wsgi import REQUEST_METHOD, PATH_INFO, QUERY_STRING, WSGI_INPUT, WSGI_CONTENT_TYPE, WSGI_CONTENT_LENGTH KEY_PATTERN = re.compile("\{(.*)\}") @@ -99,19 +100,40 @@ def one_or_many(dict_, key): def application(environ, start_response): - def parse_www_form(input_file, n): + def parse_www_form(input_file, n, rest): return parse_qs(input_file.read(n)) - content_types = {MIME_WWW_FORM_URLENCODED: parse_www_form} + def parse_form_data(input_file, n, rest): + kwargs = {} + boundary = re.match(" boundary=(.*)", rest).group(1) + state = 0 # TODO: enum + name = None + for line in input_file.read(n).splitlines(): + if state == 0 and line == boundary: + state = 1 + elif state == 1 and line.startswith(CONTENT_DISPOSITION): + name = re.match(CONTENT_DISPOSITION + ": form\-data; name=\"(.*)\"", line).group(1) + state = 2 + elif state == 2 and not len(line): + state = 3 + elif state == 3: + kwargs[name] = line + state = 0 + return kwargs + + content_types = {MIME_WWW_FORM_URLENCODED: parse_www_form, + MIME_FORM_DATA: parse_form_data} try: if environ[REQUEST_METHOD] in all_registered(): path = tuple(environ[PATH_INFO][1:].split("/")) query = parse_qs(environ[QUERY_STRING]) if QUERY_STRING in environ else {} if WSGI_CONTENT_TYPE in environ: - if environ[WSGI_CONTENT_TYPE] not in content_types: + temp = environ[WSGI_CONTENT_TYPE].split(";") + assert len(temp) <= 2 + content_type, rest = temp if len(temp) == 2 else (temp[0], None) + if content_type not in content_types: raise HttpUnsupportedMediaType() - query.update(content_types[environ[WSGI_CONTENT_TYPE]](environ[WSGI_INPUT], - int(environ[WSGI_CONTENT_LENGTH]))) + query.update(content_types[content_type](environ[WSGI_INPUT], int(environ[WSGI_CONTENT_LENGTH]), rest)) for signature, (fn, args) in six.iteritems(BaseDecorator.registered[environ[REQUEST_METHOD]]): if is_matching(signature, args, path, query): result = fn(**parse_args(args, path, query)) @@ -123,8 +145,6 @@ def parse_www_form(input_file, n): status = STATUS_FMT.format(code=error.code, msg=error.msg) start_response(status, [(CONTENT_TYPE, MIME_TEXT_PLAIN)]) return [status] - else: - pass @six.add_metaclass(ABCMeta) diff --git a/clearest/http.py b/clearest/http.py index 171d649..355e205 100644 --- a/clearest/http.py +++ b/clearest/http.py @@ -17,5 +17,7 @@ MIME_TEXT_PLAIN = "text/plain" MIME_WWW_FORM_URLENCODED = "application/x-www-form-urlencoded" +MIME_FORM_DATA = "multipart/form-data" CONTENT_TYPE = "Content-type" +CONTENT_DISPOSITION = "Content-Disposition" diff --git a/sample/demo_simple_post_var.py b/sample/demo_simple_post_var.py index 55b3608..c8d6095 100644 --- a/sample/demo_simple_post_var.py +++ b/sample/demo_simple_post_var.py @@ -8,5 +8,5 @@ def hello(what): return "hello {what}!".format(what=what) -httpd = make_server("", 8001, application) +httpd = make_server("", 8000, application) httpd.serve_forever() diff --git a/tests/test_application.py b/tests/test_application.py index 349eabb..d655d95 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -1,7 +1,7 @@ from six import StringIO from clearest import POST, HTTP_NOT_FOUND, GET, unregister_all, HTTP_OK, HTTP_UNSUPPORTED_MEDIA_TYPE, \ - MIME_WWW_FORM_URLENCODED + MIME_WWW_FORM_URLENCODED, MIME_FORM_DATA from tests.util import called_with from tests.wsgi import WSGITestCase @@ -107,3 +107,22 @@ def asd(a): self.post("/asd", input_=StringIO("a=hello"), content_type=MIME_WWW_FORM_URLENCODED, content_len=7) self.assertEqual(HTTP_OK, self.status) self.assertEqual((("hello",), {}), asd.called_with) + + def test_application_simple_post_multipart_var(self): + @POST("/asd") + @called_with + def asd(a): + return {} + + boundary = "-----------------------------12345" + body = """{boundary} +Content-Disposition: form-data; name="a" + +asd +{boundary}""".format(boundary=boundary) + self.post("/asd", + input_=StringIO(body), + content_type="{name}; boundary={boundary}".format(name=MIME_FORM_DATA, boundary=boundary), + content_len=len(body)) + self.assertEqual(HTTP_OK, self.status) + self.assertEqual((("asd",), {}), asd.called_with)