This repository has been archived by the owner on Jan 5, 2023. It is now read-only.
/
activity.go
177 lines (153 loc) · 5.53 KB
/
activity.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/*
* Copyright (c) 2020, Psiphon Inc.
* All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package common
import (
"net"
"sync/atomic"
"time"
"github.com/ooni/psiphon/oopsi/github.com/Psiphon-Labs/goarista/monotime"
"github.com/ooni/psiphon/oopsi/github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
)
// ActivityMonitoredConn wraps a net.Conn, adding logic to deal with events
// triggered by I/O activity.
//
// ActivityMonitoredConn uses lock-free concurrency synronization, avoiding an
// additional mutex resource, making it suitable for wrapping many net.Conns
// (e.g, each Psiphon port forward).
//
// When an inactivity timeout is specified, the network I/O will timeout after
// the specified period of read inactivity. Optionally, for the purpose of
// inactivity only, ActivityMonitoredConn will also consider the connection
// active when data is written to it.
//
// When a LRUConnsEntry is specified, then the LRU entry is promoted on either
// a successful read or write.
//
// When an ActivityUpdater is set, then its UpdateActivity method is called on
// each read and write with the number of bytes transferred. The
// durationNanoseconds, which is the time since the last read, is reported
// only on reads.
type ActivityMonitoredConn struct {
// Note: 64-bit ints used with atomic operations are placed
// at the start of struct to ensure 64-bit alignment.
// (https://github.com/ooni/psiphon/oopsi/golang.org/pkg/sync/atomic/#pkg-note-BUG)
monotonicStartTime int64
lastReadActivityTime int64
realStartTime time.Time
net.Conn
inactivityTimeout time.Duration
activeOnWrite bool
activityUpdater ActivityUpdater
lruEntry *LRUConnsEntry
}
// ActivityUpdater defines an interface for receiving updates for
// ActivityMonitoredConn activity. Values passed to UpdateProgress are bytes
// transferred and conn duration since the previous UpdateProgress.
type ActivityUpdater interface {
UpdateProgress(bytesRead, bytesWritten int64, durationNanoseconds int64)
}
// NewActivityMonitoredConn creates a new ActivityMonitoredConn.
func NewActivityMonitoredConn(
conn net.Conn,
inactivityTimeout time.Duration,
activeOnWrite bool,
activityUpdater ActivityUpdater,
lruEntry *LRUConnsEntry) (*ActivityMonitoredConn, error) {
if inactivityTimeout > 0 {
err := conn.SetDeadline(time.Now().Add(inactivityTimeout))
if err != nil {
return nil, errors.Trace(err)
}
}
// The "monotime" package is still used here as its time value is an int64,
// which is compatible with atomic operations.
now := int64(monotime.Now())
return &ActivityMonitoredConn{
Conn: conn,
inactivityTimeout: inactivityTimeout,
activeOnWrite: activeOnWrite,
realStartTime: time.Now(),
monotonicStartTime: now,
lastReadActivityTime: now,
activityUpdater: activityUpdater,
lruEntry: lruEntry,
}, nil
}
// GetStartTime gets the time when the ActivityMonitoredConn was initialized.
// Reported time is UTC.
func (conn *ActivityMonitoredConn) GetStartTime() time.Time {
return conn.realStartTime.UTC()
}
// GetActiveDuration returns the time elapsed between the initialization of
// the ActivityMonitoredConn and the last Read. Only reads are used for this
// calculation since writes may succeed locally due to buffering.
func (conn *ActivityMonitoredConn) GetActiveDuration() time.Duration {
return time.Duration(atomic.LoadInt64(&conn.lastReadActivityTime) - conn.monotonicStartTime)
}
func (conn *ActivityMonitoredConn) Read(buffer []byte) (int, error) {
n, err := conn.Conn.Read(buffer)
if n > 0 {
if conn.inactivityTimeout > 0 {
err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
if err != nil {
return n, errors.Trace(err)
}
}
lastReadActivityTime := atomic.LoadInt64(&conn.lastReadActivityTime)
readActivityTime := int64(monotime.Now())
atomic.StoreInt64(&conn.lastReadActivityTime, readActivityTime)
if conn.activityUpdater != nil {
conn.activityUpdater.UpdateProgress(
int64(n), 0, readActivityTime-lastReadActivityTime)
}
if conn.lruEntry != nil {
conn.lruEntry.Touch()
}
}
// Note: no context error to preserve error type
return n, err
}
func (conn *ActivityMonitoredConn) Write(buffer []byte) (int, error) {
n, err := conn.Conn.Write(buffer)
if n > 0 && conn.activeOnWrite {
if conn.inactivityTimeout > 0 {
err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
if err != nil {
return n, errors.Trace(err)
}
}
if conn.activityUpdater != nil {
conn.activityUpdater.UpdateProgress(0, int64(n), 0)
}
if conn.lruEntry != nil {
conn.lruEntry.Touch()
}
}
// Note: no context error to preserve error type
return n, err
}
// IsClosed implements the Closer iterface. The return value indicates whether
// the underlying conn has been closed.
func (conn *ActivityMonitoredConn) IsClosed() bool {
closer, ok := conn.Conn.(Closer)
if !ok {
return false
}
return closer.IsClosed()
}