forked from wcong/ants-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
155 lines (142 loc) · 3.85 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package rpc
import (
"github.com/wcong/ants-go/ants/action"
"github.com/wcong/ants-go/ants/crawler"
"github.com/wcong/ants-go/ants/http"
"github.com/wcong/ants-go/ants/node"
"log"
"net/rpc"
"net/rpc/jsonrpc"
"strconv"
"time"
)
type RpcClient struct {
node *node.Node
connMap map[string]*rpc.Client
}
func NewRpcClient(node *node.Node) *RpcClient {
connMap := make(map[string]*rpc.Client)
return &RpcClient{node, connMap}
}
// start a rpc client
// if ok , store if
func (this *RpcClient) Dial(ip string, port int) (*rpc.Client, error) {
client, err := jsonrpc.Dial(RPC_TYPE, ip+":"+strconv.Itoa(port))
if err != nil {
log.Println(err)
}
return client, err
}
/*
* this is rpc method of cluster
**/
func (this *RpcClient) LetMeIn(ip string, port int) error {
client, err := this.Dial(ip, port)
if err != nil {
return err
}
request := new(action.LeftMeInRequest)
request.NodeInfo = this.node.NodeInfo
response := new(action.LeftMeInResponse)
err = client.Call("RpcServer.LetMeIn", request, response)
if response.Result {
this.connMap[response.NodeInfo.Name] = client
this.node.AddNodeToCluster(response.NodeInfo)
this.node.Cluster.MakeMasterNode(response.NodeInfo.Name)
} else {
client.Close()
this.LetMeIn(response.NodeInfo.Ip, response.NodeInfo.Port)
}
return err
}
func (this *RpcClient) Start() {
go func() {
for {
this.Detect()
time.Sleep(2 * time.Second)
}
}()
}
// judge if node is down
func (this *RpcClient) Detect() {
request := new(action.RpcBase)
response := new(action.RpcBase)
for key, conn := range this.connMap {
err := conn.Call("RpcServer.IsAlive", request, response)
if err != nil {
log.Println(err)
log.Println("node:", key, "is dead ,so remove it")
delete(this.connMap, key)
if this.node.IsMasterNode() {
this.node.DeleteDeadNode(key)
}
}
}
}
func (this *RpcClient) SyncClusterInfo() {
}
// for now it is for master connect to slave
func (this *RpcClient) Connect(ip string, port int) error {
client, err := this.Dial(ip, port)
if err != nil {
return err
}
request := new(action.RpcBase)
response := new(action.RpcBase)
err = client.Call("RpcServer.Connect", request, response)
if err == nil {
this.connMap[response.NodeInfo.Name] = client
this.node.AddNodeToCluster(response.NodeInfo)
}
return err
}
// top node
func (this *RpcClient) StopNode(nodeName string) error {
stopRequest := &action.StopRequest{}
stopRequest.NodeInfo = this.node.NodeInfo
stopResponse := &action.StopResponse{}
err := this.connMap[nodeName].Call("RpcServer.StopNode", stopRequest, stopResponse)
if err != nil {
log.Println(err)
}
return err
}
/*
* this is rpc method of request
**/
func (this *RpcClient) Distribute(nodeName string, request *http.Request) error {
distributeRequest := &action.DistributeRequest{}
distributeRequest.NodeInfo = this.node.NodeInfo
distributeRequest.Request = request
distributeReqponse := &action.DistributeReqponse{}
err := this.connMap[nodeName].Call("RpcServer.AcceptRequest", distributeRequest, distributeReqponse)
if err != nil {
log.Println(err)
} else {
this.node.AddToCrawlingQuene(request)
}
return err
}
func (this *RpcClient) StartSpider(nodeName, spiderName string) error {
startRequest := &action.RpcBase{
NodeInfo: this.node.NodeInfo,
}
startResponse := &action.RpcBase{}
err := this.connMap[nodeName].Call("RpcServer.StartSpider", startRequest, startResponse)
if err != nil {
log.Println(err)
}
return err
}
// for slave send crawl result to master
func (this *RpcClient) ReportResult(nodeName string, result *crawler.ScrapeResult) error {
reportRequest := &action.ReportRequest{}
reportRequest.NodeInfo = this.node.NodeInfo
reportRequest.ScrapeResult = result
reportResponse := &action.ReportResponse{}
err := this.connMap[nodeName].Call("RpcServer.AcceptResult", reportRequest, reportResponse)
if err != nil {
log.Println(err)
}
return err
}