Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent http/get stream from being closed [question] #500

Closed
igrishaev opened this Issue Mar 28, 2019 · 8 comments

Comments

Projects
None yet
2 participants
@igrishaev
Copy link

igrishaev commented Mar 28, 2019

What I've got to do is to get a file stream from a remote HTTP server and then upload that stream into AWS S3.

I query a URL with http/get, parse Content-Length header to know the file size and pass that into AWS PutObjectRequest as follows:

(aws/put-object
        aws
        {:bucket-name bucket
         :key key
         :input-stream url-stream ;; the body of http/get
         :metadata {:content-length 1418264736} ;; this is parsed content-length
         :canned-acl :public-read})

The problem is, in the middle of the upload AWS fails with an error like:

com.amazonaws.SdkClientException: Data read has a different length than the expected:
dataLength=193585457; expectedLength=1418264736; includeSkipped=false;
in.getClass()=class com.amazonaws.internal.ReleasableInputStream;
markedSupported=false; marked=0; resetSinceLastMarked=false;
markCount=0; resetCount=0, 

There is a special com.amazonaws.util.LengthCheckInputStream class that counts a number of bytes read from a stream. In case the number differs from the provided content-length, it throws an error.

I may guess, the reason for that is the connection pool closes HTTP connection before the stream has been read completely. I wonder what would be the best strategy to keep the connection alive?

Right now, I passed a huge idle timeout:

(http/get url {:pool (http/connection-pool
                  {:connection-options
                   {:keep-alive? true
                    :idle-timeout (* 1000 60 60)}})})

and so far it goes well. Is there another right way to do that?

@igrishaev

This comment has been minimized.

Copy link
Author

igrishaev commented Mar 28, 2019

Small update: even with idle-time, I still cannot download a file in repl. This time, I almost finished reading the stream. The exception was saying dataLength=1418258896; expectedLength=1418264736 which is much better than before, yet still unsuccessful.

@igrishaev

This comment has been minimized.

Copy link
Author

igrishaev commented Mar 28, 2019

upd: so far, I figured out with clj-http:

(client/get url {:as :stream})

The :body will be a stream that doesn't get closed until S3 consumes it completely.

@kachayev

This comment has been minimized.

Copy link
Collaborator

kachayev commented Mar 29, 2019

@igrishaev So, does it work or you need any help on this?

@igrishaev

This comment has been minimized.

Copy link
Author

igrishaev commented Mar 29, 2019

@kachayev it works because I switched to clj-http, but I'd like to know how to deal with it using http/get. Could you give me some tips, please?

@kachayev

This comment has been minimized.

Copy link
Collaborator

kachayev commented Mar 30, 2019

@igrishaev Is there any reproducible code that I can run? It's really hard to tell anything w/o looking into specific details.

@igrishaev

This comment has been minimized.

Copy link
Author

igrishaev commented Mar 30, 2019

@kachayev here is a sample of code that has been failing on Friday. Today at home, it works fine so I cannot reproduce the exception I wrote about. I'm not against closing that issue. But before I do this, could you please ensure it works on your machine?

Here is an example where I restream a file into S3. Just in case you don't time to deal with AWS credentials, please use the second snippet.

(require '[amazonica.aws.s3 :as s3])
(require '[aleph.http :as http])
(require '[manifold.deferred :as d])

(def url
  "http://exoscale-packer-images.ppsos-ch-dk-2.exo.io/exoscale-debian-stretch-2019-03-27-6cf902.qcow2")

(def len 1418264736)

(def creds {:access-key "................"
           :secret-key "..................."
           :endpoint   "us-east-1"})

(defn example
  []

  (let [{:keys [body]} @(http/get url)
        request {:bucket-name "uploadedfiles"
                 :key "test"
                 :input-stream body
                 :metadata {:content-length len}
                 :canned-acl :public-read}]

    (s3/put-object creds request)))

(example)

This is the second example. Here, I just consume the stream in a cycle. The LengthCheckInputStream class wraps the stream and counts the number of read bytes. In case it doesn't equal to a passed value, the exception arises.

;; the same imports...

;; [com.amazonaws/aws-java-sdk-core "1.11.528"]
(import 'com.amazonaws.util.LengthCheckInputStream)

(defn example
  []

  @(->

    (http/get url)

    (d/chain
     (fn [result]
       (let [{:keys [body]} result
             stream (new LengthCheckInputStream body len false)
             buffer (byte-array (* 1024 1024 4))]

         (while (not= -1 (.read ^LengthCheckInputStream stream buffer))))))))

(example)
@kachayev

This comment has been minimized.

Copy link
Collaborator

kachayev commented Mar 31, 2019

@igrishaev

#'user/example
user=> (example)
nil

The last example works on my machine. I can't run the first one as I don't have any available S3 creds right now. The only question I have... are you sure that .read does what you want it to do? As far as I see, LengthCheckInputStream doesn't have 2-arity read override... so it should go up to FilterStream* something, which would not throw any exception in any cases.

@igrishaev

This comment has been minimized.

Copy link
Author

igrishaev commented Apr 3, 2019

I also couldn't reproduce it again. Closing, maybe will investigate more.

@igrishaev igrishaev closed this Apr 3, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.