-
Notifications
You must be signed in to change notification settings - Fork 4
/
chanio.go
128 lines (122 loc) · 2.86 KB
/
chanio.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Package chanio provides Reader and Writer bindings with
// go channels for io.Reader and io.Writer interfaces.
//
// Here's an example implementation of channels communication
// over the network:
//
// Server:
//
// addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:8080")
// l, _ := net.ListenTCP("tcp", addr)
// for {
// conn, err := l.Accept()
// if err != nil {
// continue
// }
// r := chanio.NewReader(conn)
// for x := range r {
// // do something with x
// }
// }
//
// Client:
//
// conn, _ := net.Dial("tcp", host)
// w := chanio.NewWriter(conn)
// w <- "Hello World!"
//
// The chanio package is using encoding/gob to encode and decode
// data exchanged over the underlying buffers. If you want to send
// a struct, map or value of a custom type, then you need to register
// it in gob first. Example:
//
// package main
//
// import (
// "encoding/gob"
// "chanio"
// "net"
// )
//
// type Cat struct {
// Name string
// IsCute bool
// }
//
// func main() {
// gob.Register(&Cat{})
// conn, _ := net.Dial("tcp", "host.com:8080")
// w := chanio.NewWriter(conn)
// w <- &Cat{Name: "Tom", IsCute: false}
// conn.Close()
// }
//
package chanio
import (
"encoding/gob"
"io"
)
// packet is a wrapper for data passed over the io interfaces.
type packet struct {
X interface{}
}
// NewReader returns a new read-only channel which provides data
// read from specified io.Reader.
//
// Example:
//
// conn := net.Dial("tcp", "host.com:8080")
// r := chanio.NewReader(conn)
// for x := range r {
// // do something with x
// }
//
func NewReader(r io.Reader) <-chan interface{} {
ch := make(chan interface{})
go read(r, ch)
return ch
}
// read handles all the data read from the underlaying io.Reader
// and passes it to the specified channel.
func read(r io.Reader, ch chan interface{}) {
defer close(ch)
dec := gob.NewDecoder(r)
for {
var e packet
if err := dec.Decode(&e); err == io.EOF {
break
} else if err != nil {
continue
}
ch <- e.X
}
}
// NewWriter returns a new write-only channel which passes data
// to specified io.Writer.
//
// Example:
//
// conn := net.Dial("tcp", "host.com:8080")
// w := chanio.NewWriter(conn)
// w <- "foo"
//
func NewWriter(w io.Writer) chan<- interface{} {
ch := make(chan interface{})
go write(w, ch)
return ch
}
// write handles all the data received from specified channel
// and writes it to the io.Writer.
func write(w io.Writer, ch chan interface{}) {
enc := gob.NewEncoder(w)
for x := range ch {
if err := enc.Encode(&packet{x}); err == io.EOF {
break
} else if err != nil {
continue
}
}
if wc, ok := w.(io.WriteCloser); ok {
wc.Close()
}
}