-
Notifications
You must be signed in to change notification settings - Fork 36
/
Clients.scala
107 lines (92 loc) · 3.99 KB
/
Clients.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/*
* Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common.fs2.io
import java.net.URI
import cats.effect.{ConcurrentEffect, Resource, Timer}
import fs2.Stream
import org.http4s.{Headers, Request, Uri}
import org.http4s.client.defaults
import org.http4s.client.{Client => Http4sClient}
import org.http4s.client.blaze.BlazeClientBuilder
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import Clients._
import org.http4s.blaze.pipeline.Command
import org.http4s.client.middleware.{Retry, RetryPolicy}
import org.http4s.syntax.string._
import org.http4s.util.CaseInsensitiveString
case class Clients[F[_]: ConcurrentEffect](clients: List[Client[F]]) {
/** Download a URI as a stream of bytes, using the appropriate client */
def download(uri: URI): Stream[F, Byte] =
clients.find(_.canDownload(uri)) match {
case Some(client) =>
client.download(uri)
case None =>
Stream.raiseError[F](new IllegalStateException(s"No client initialized to download $uri"))
}
}
object Clients {
def init[F[_]: ConcurrentEffect](httpClient: Http4sClient[F], others: List[Client[F]]): Clients[F] =
Clients(wrapHttpClient(httpClient) :: others)
def wrapHttpClient[F[_]: ConcurrentEffect](client: Http4sClient[F]): Client[F] =
new Client[F] {
def canDownload(uri: URI): Boolean = {
// Since Azure Blob Storage urls' scheme are https as well and we want to fetch them with
// their own client, we added second condition to not pick up those urls
(uri.getScheme == "http" || uri.getScheme == "https") && !uri.toString.contains("core.windows.net")
}
def download(uri: URI): Stream[F, Byte] = {
val request = Request[F](uri = Uri.unsafeFromString(uri.toString))
for {
response <- client.stream(request)
body <- if (response.status.isSuccess) response.body
else Stream.raiseError[F](HttpDownloadFailure(uri))
} yield body
}
}
def mkHttp[F[_]: ConcurrentEffect: Timer](
connectionTimeout: FiniteDuration = defaults.ConnectTimeout,
readTimeout: FiniteDuration = defaults.RequestTimeout,
maxConnections: Int = 10, // http4s uses 10 by default
ec: ExecutionContext
): Resource[F, Http4sClient[F]] =
BlazeClientBuilder[F](ec)
.withConnectTimeout(connectionTimeout)
.withRequestTimeout(readTimeout)
.withMaxTotalConnections(maxConnections)
.resource
.map(Retry[F](retryPolicy, redactHeadersWhen))
private def retryPolicy[F[_]] =
RetryPolicy[F](
backoff,
retriable = {
//EOF error has to be retried explicitly for blaze client, see https://github.com/snowplow/enrich/issues/692
case (_, Left(Command.EOF)) => true
case _ => false
}
)
//retry once after 100 mills
private def backoff(attemptNumber: Int): Option[FiniteDuration] =
if (attemptNumber > 1) None
else Some(100.millis)
private def redactHeadersWhen(header: CaseInsensitiveString) =
(Headers.SensitiveHeaders + "apikey".ci).contains(header)
trait RetryableFailure extends Throwable
case class HttpDownloadFailure(uri: URI) extends RetryableFailure {
override def getMessage: String = s"Cannot download $uri"
}
trait Client[F[_]] {
def canDownload(uri: URI): Boolean
def download(uri: URI): Stream[F, Byte]
}
}