/
transport.go
143 lines (129 loc) · 5 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package transport
import (
"crypto/tls"
"fmt"
"net"
"net/http"
runtimeDebug "runtime/debug"
"sync"
"sync/atomic"
"github.com/webnice/transport/v3/methods"
"github.com/webnice/transport/v3/request"
)
// New Function creates the transport object and return interface
func New() Interface {
var trt = new(impl)
trt.methods = methods.New()
trt.requestPoolInterface = request.New()
trt.requestChan = make(chan request.Interface, requestChanBuffer)
trt.requestPoolLock = new(sync.Mutex)
trt.requestPoolStarted = new(atomic.Value)
trt.requestPoolStarted.Store(false)
trt.requestPoolDone = new(sync.WaitGroup)
setDefaults(trt)
return trt
}
// Error Return latest error
func (trt *impl) Error() error { return trt.err }
// ErrorFunc Registering the error function on the client side
func (trt *impl) ErrorFunc(fn ErrorFunc) Interface { trt.errFunc = fn; return trt }
// DebugFunc Set debug func and enable or disable debug mode
// If fn=not nil - debug mode is enabled. If fn=nil, debug mode is disbled
func (trt *impl) DebugFunc(fn DebugFunc) Interface { trt.debugFunc = fn; return trt }
// Method Return interface of request methods
func (trt *impl) Method() methods.Interface { return trt.methods }
// RequestGet Загрузка из sync.Pool объекта request и возврат интерфейса к нему
// Полученный объект необходимо возвращать в sync.Pool методом RequestPut во избежании утечки памяти
func (trt *impl) RequestGet() request.Interface {
return trt.requestPoolInterface.RequestGet().DebugFunc(request.DebugFunc(trt.debugFunc))
}
// RequestPut Возврат в sync.Pool объекта request
func (trt *impl) RequestPut(req request.Interface) { trt.requestPoolInterface.RequestPut(req) }
// Client Returns the current http.Client
// В пределах одного экземпляра transport.impl, http.Client создаётся только один раз
// при первом вызове данной функции. Эта функция так же вызывается при первом вызове функции Do()
func (trt *impl) Client() (ret *http.Client) {
if trt.client != nil {
return trt.client
}
if trt.tlsClientConfig == nil && trt.tlsInsecureSkipVerify {
trt.tlsClientConfig = &tls.Config{InsecureSkipVerify: trt.tlsInsecureSkipVerify}
} else if trt.tlsClientConfig != nil && trt.tlsInsecureSkipVerify {
trt.tlsClientConfig.InsecureSkipVerify = trt.tlsInsecureSkipVerify
}
// Создание объекта транспорта
if trt.transport == nil {
trt.transport = &http.Transport{
Proxy: trt.proxy,
ProxyConnectHeader: trt.proxyConnectHeader,
MaxIdleConns: int(trt.maximumIdleConnections),
MaxIdleConnsPerHost: int(trt.maximumIdleConnectionsPerHost),
IdleConnTimeout: trt.idleConnectionTimeout,
TLSHandshakeTimeout: trt.tlsHandshakeTimeout,
TLSClientConfig: trt.tlsClientConfig,
DialTLS: trt.tlsDialFunc,
DialContext: trt.dialContextCustomFunc,
}
if trt.dialContextCustomFunc == nil {
trt.transport.DialContext = (&net.Dialer{
Timeout: trt.dialContextTimeout,
KeepAlive: trt.dialContextKeepAlive,
DualStack: trt.dialContextDualStack,
}).DialContext
}
}
// Создание клиента http
trt.client = &http.Client{
Transport: trt.transport,
Timeout: trt.totalTimeout,
Jar: trt.cookieJar,
}
return trt.client
}
// Do Executing the query in asynchronous mode. Non blocking function
// When you first start
// - 1. Создаётся транспорт
// - 2. Создаётся пул воркеров обработки запросов
// - 3. Выполняется запрос
// For all subsequent calls, step 1 and step 2 are skipped
func (trt *impl) Do(req request.Interface) Interface {
// Создание транспорта, клиента http
if trt.client == nil {
_ = trt.Client()
}
// Создание и запуск пула воркеров для обслуживания запросов
if !trt.requestPoolStarted.Load().(bool) {
trt.makePool()
}
// Добавление запроса в пул задач
trt.requestChan <- req
return trt
}
// Done Stopping the worker pool, closing connections
func (trt *impl) Done() {
defer func() {
if e := recover(); e != nil {
trt.err = fmt.Errorf("Catch panic: %s\nGoroutine stack is:\n%s", e.(error), string(runtimeDebug.Stack()))
if trt.errFunc != nil {
trt.errFunc(trt.err)
}
return
}
}()
trt.requestPoolLock.Lock()
defer trt.requestPoolLock.Unlock()
// Выход, если пул воркеров остановлен
if !trt.requestPoolStarted.Load().(bool) {
return
}
// Завершаем все воркеры пула
for i := range trt.requestPoolCancelFunc {
if trt.requestPoolCancelFunc[i] != nil {
trt.requestPoolCancelFunc[i]()
}
}
// Ожидание завершения воркеров
trt.requestPoolDone.Wait()
trt.requestPoolStarted.Store(false)
close(trt.requestChan)
}