Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

With Streamin proxy, handle server early error #3194

Closed
julienfr112 opened this issue Oct 17, 2022 · 2 comments
Closed

With Streamin proxy, handle server early error #3194

julienfr112 opened this issue Oct 17, 2022 · 2 comments

Comments

@julienfr112
Copy link

Im using tornado to build an http proxy between a web client and a storage server. I'm streaming data. I t works well except when the storage server has an error and terminate early. Then the requests is stuck from the client point of view. Do you have an idea how to implement that ? maybe check the connection is up in body_producer ?

@tornado.web.stream_request_body
class StreamingProxyHandler(CorsHandler):
    def prepare(self):
        self.chunks = tornado.queues.Queue(maxsize=1)
        self.request.connection.set_max_body_size(1_000_000_000) 
        self.client = tornado.httpclient.AsyncHTTPClient()
        self.fetch = self.client.fetch(
            "http://storageserver",
            method=self.request.method,
            raise_error=False,
            headers={"Content-Length": self.request.headers["Content-Length"]},
            body_producer=self.body_producer if self.request.method == "POST" else None,
            streaming_callback=self.data_fromch,
        )
    async def body_producer(self, write):
        while True:
            chunk = await self.chunks.get()
            if chunk is None:
                return
            await write(chunk)

    async def data_received(self, chunk):
        await self.chunks.put(chunk)

    def data_fromserver(self, chunk):
        self.write(chunk)
    
    async def post(self, *_):
        await self.chunks.put(None)
        resp = await self.fetch
        self.set_status(resp.code)`
@piraz
Copy link
Contributor

piraz commented Oct 17, 2022

Are you running the client with curl?

tornado.httpclient.AsyncHTTPClient.configure('tornado.curl_httpclient.CurlAsyncHTTPClient')
self.client = tornado.httpclient.AsyncHTTPClient()

I had issues before when handling multi-part response, and crazy soap responses.

Here is a draft I was testing before:

https://github.com/candango/tcpbee/blob/develop/tcpbee/handlers.py

@bdarnell
Copy link
Member

bdarnell commented Nov 4, 2022

If the server returns an error, you'll see it in await self.fetch, but you're not doing that until post, which is called after you've transmitted the entire body. You need to structure things so that you start a task in prepare() to wait on that future and can use that error to shut down the body producer and return the error to the client.

Writing a robust proxy is tricky; I'm not aware of any complete examples of doing this with Tornado. On the other side you also need to handle on_connection_close to handle disconnects from the client.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants