@@ -8,16 +8,17 @@ type Msg struct {
Src string //från noden som kalla
Dst string //destinationsadress
Bytes []byte //transport funktionen, msg.Bytes
Adress string //EVENTUELLT PEKA PÅ TINYNODE?
Id string
Type string // type of message thats is being sent
liteNode *Finger

Type string // type of message thats is being sent
}

func message(t, origin, dst, src, key string, bytes []byte) *Msg {
msg := &Msg{}
msg.Type = t
msg.Adress = ""
msg.Id = ""
msg.liteNode = &Finger{}
// msg.liteNode.adress = ""
// msg.liteNode.id = ""
msg.Origin = origin
msg.Src = src
msg.Dst = dst
@@ -26,11 +27,10 @@ func message(t, origin, dst, src, key string, bytes []byte) *Msg {
return msg
}

func joinMessage(dst string) *Msg {
func joinMessage(dst, adress, id string) *Msg {
msg := &Msg{}
msg.Type = "addToRing"
msg.Adress = ""
msg.Id = ""
msg.Type = "join"
msg.liteNode = &Finger{id, adress}
msg.Origin = "" //origin?
msg.Src = ""
msg.Dst = dst
@@ -42,8 +42,8 @@ func joinMessage(dst string) *Msg {
func printMessage(origin, dst string) *Msg {
msg := &Msg{}
msg.Type = "printRing"
msg.Adress = ""
msg.Id = ""
//msg.liteNode.adress = ""
// msg.liteNode.id = ""
msg.Origin = origin
msg.Src = ""
msg.Dst = dst
@@ -55,8 +55,7 @@ func printMessage(origin, dst string) *Msg {
func notifyMessage(src, dst, adress, id string) *Msg {
msg := &Msg{}
msg.Type = "notify"
msg.Adress = ""
msg.Id = ""
msg.liteNode = &Finger{id, adress}
msg.Origin = ""
msg.Key = ""
msg.Src = src
@@ -68,8 +67,8 @@ func notifyMessage(src, dst, adress, id string) *Msg {
func getNodeMessage(src, dst string) *Msg {
msg := &Msg{}
msg.Type = "pred"
msg.Adress = ""
msg.Id = ""
// msg.liteNode.adress = ""
// msg.liteNode.id = ""
msg.Origin = ""
msg.Src = src
msg.Dst = dst
@@ -80,8 +79,7 @@ func getNodeMessage(src, dst string) *Msg {
func responseMessage(src, dst, adress, id string) *Msg {
msg := &Msg{}
msg.Type = "response"
msg.Adress = adress
msg.Id = id
msg.liteNode = &Finger{id, adress}
msg.Origin = ""
msg.Src = src
msg.Dst = dst
@@ -93,8 +91,8 @@ func lookUpMessage(origin, key, src, dst string) *Msg {
msg := &Msg{}
msg.Type = "lookup"
msg.Key = key
msg.Adress = ""
msg.Id = ""
// msg.liteNode.adress = ""
// msg.liteNode.id = ""
msg.Origin = origin
msg.Src = src
msg.Dst = dst
@@ -106,8 +104,8 @@ func fingerLookUpMessage(origin, key, src, dst string) *Msg {
msg := &Msg{}
msg.Type = "fingerLookup"
msg.Key = key
msg.Adress = ""
msg.Id = ""
// msg.liteNode.adress = ""
// msg.liteNode.id = ""
msg.Origin = origin
msg.Src = src
msg.Dst = dst
@@ -119,8 +117,8 @@ func fingerPrintMessage(origin, dst string) *Msg {
msg := &Msg{}
msg.Type = "fingerPrint"
msg.Key = ""
msg.Adress = ""
msg.Id = ""
// msg.liteNode.adress = ""
// msg.liteNode.id = ""
msg.Origin = origin
msg.Src = ""
msg.Dst = dst
@@ -132,32 +130,71 @@ func heartBeatMessage(origin, dst string) *Msg {
msg := &Msg{}
msg.Type = "heartBeat"
msg.Key = ""
msg.Adress = ""
msg.Id = ""
// msg.liteNode.adress = ""
// msg.liteNode.id = ""
msg.Origin = origin
msg.Src = ""
msg.Dst = dst
msg.Bytes = nil
return msg
}

func pingMsg(dst, adress string) *Msg{
Msg := &Msg{}
Msg.Dst = dst
Msg.Adress = adress
return Msg
}

func heartBeatAnswer(origin, dst string) *Msg {
msg := &Msg{}
msg.Type = "heartBeatAnswer"
msg.Type = "heartAnswer"
msg.Key = ""
msg.Adress = ""
msg.Id = ""
// msg.liteNode.adress = ""
// msg.liteNode.id = ""
msg.Origin = origin
msg.Src = ""
msg.Dst = dst
msg.Bytes = nil
return msg
}

func AliveMessage(origin, dst string) *Msg {
msg := &Msg{}
msg.Type = "isAlive"
// msg.liteNode.adress = ""
// msg.liteNode.id = ""
msg.Origin = origin
//msg.Src = ""
msg.Dst = dst
msg.Bytes = nil
return msg
}

func nodeFoundMessage(origin, dst, adress, key string) *Msg {
msg := &Msg{}
msg.Type = "nodeFound"
msg.liteNode = &Finger{key, adress}
//msg.Key = key
msg.Origin = origin
msg.Src = ""
msg.Dst = dst
msg.Bytes = nil
return msg
}

func ackMsg(src, dst string) *Msg {
msg := &Msg{}
msg.Type = "ack"
//msg.liteNode.adress = ""
//msg.liteNode.id = ""
msg.Origin = ""
msg.Src = src
msg.Dst = dst
msg.Bytes = nil
return msg
}

func fingerStartMessage(src, dst, adress, id string) *Msg {
msg := &Msg{}
msg.Type = "fingerStart"
msg.liteNode = &Finger{id, adress}
msg.Origin = ""
msg.Src = src
msg.Dst = dst
msg.Bytes = nil
return msg
}
@@ -23,9 +23,11 @@ type DHTNode struct {
responseQ chan *Msg
TaskQ chan *Task
heartBeatQ chan *Msg
alive bool
}
fingerQ chan *Finger
nodeQ chan *Msg

alive bool
}
type tinyNode struct {
nodeId string
adress string
@@ -40,7 +42,6 @@ func makeDHTNode(nodeId *string, ip string, port string) *DHTNode {
dhtNode := new(DHTNode)
dhtNode.contact.ip = ip
dhtNode.contact.port = port
dhtNode.alive = true

if nodeId == nil {
genNodeId := generateNodeId()
@@ -51,15 +52,18 @@ func makeDHTNode(nodeId *string, ip string, port string) *DHTNode {

dhtNode.successor = &tinyNode{dhtNode.nodeId, ip + ":" + port}
dhtNode.predecessor = &tinyNode{dhtNode.nodeId, ip + ":" + port}
dhtNode.fingers = new(FingerTable)
dhtNode.fingers = &FingerTable{}
//ska new användas eller raden under?
//dhtNode.fingers.nodefingerlist = [bits]*DHTNode{}
//eller denna kanske
//dhtNode.fingers = &FingerTable{}
dhtNode.createTransport()
dhtNode.alive = true
dhtNode.responseQ = make(chan *Msg)
dhtNode.TaskQ = make(chan *Task)
dhtNode.heartBeatQ = make(chan *Msg)
dhtNode.fingerQ = make(chan *Finger)
dhtNode.nodeQ = make(chan *Msg)
dhtNode.createTransport()
return dhtNode
}

@@ -71,13 +75,22 @@ func (dhtNode *DHTNode) createTransport() {

func (dhtNode *DHTNode) join(master *tinyNode) {
src := dhtNode.contact.ip + ":" + dhtNode.contact.port
fmt.Println("src:", src)
fmt.Println("master adress:", master.adress)
fmt.Println("nodeId:",dhtNode.nodeId)
fmt.Println("")
message := message("join", src, master.adress, src, dhtNode.nodeId, nil)
//message = message
dhtNode.transport.send(message)
for {
select {
case r := <-dhtNode.responseQ:
fmt.Println("heeej")
dhtNode.successor.adress = r.Src
dhtNode.successor.nodeId = r.Key
dhtNode.setNetworkFingers(&Msg{"", "", "", "", nil, &Finger{dhtNode.successor.nodeId, dhtNode.successor.adress}, ""})
fingerStart := fingerStartMessage(src, dhtNode.successor.adress, dhtNode.transport.bindAddress, dhtNode.nodeId)
go func() { dhtNode.transport.send(fingerStart) }()
return
//fmt.Println(dhtNode.nodeId, dhtNode.successor)
}
@@ -108,10 +121,10 @@ func (node *DHTNode) printNetworkRing(msg *Msg) {
}

func (dhtNode *DHTNode) start_server() {
go dhtNode.heartTimer()
go dhtNode.initTaskQ()
go dhtNode.stableTimmer()
go dhtNode.fingerTimer()
go dhtNode.heartTimer()
go dhtNode.transport.listen()
}

@@ -133,17 +146,18 @@ func (node *DHTNode) initTaskQ() {
//node.improvePrintRing(node.msg)
//transport.send(&Msg{"printRing", "", v.Src, []byte("tjuuu")})
case "join":
node.findSucc(t.message)

fmt.Println("TaskQ: join")
go node.findSucc(t.message)
case "stabilize":
// fmt.Println("stabilize case: ", node.nodeId)
node.stabilize()
go node.stabilize()
case "updateFingers":
node.updateNetworkFingers()

go node.updateNetworkFingers()
case "heartBeat":
node.heartBeat()

//fmt.Println("initTask hearbeat")
go node.heartBeat()
case "alive":
fmt.Println("fuck")
}
}
}
@@ -153,30 +167,31 @@ func (node *DHTNode) initTaskQ() {
func (node *DHTNode) stabilize() {
nodeAdress := node.contact.ip + ":" + node.contact.port
predOfSucc := getNodeMessage(nodeAdress, node.successor.adress) // id eller adress?
go func() { node.transport.send(predOfSucc) }()
go node.transport.send(predOfSucc)
time := time.NewTimer(time.Millisecond * 5000)
for {
select {
case r := <-node.responseQ:
//fmt.Println("case 1 stab: ")

between := ((between([]byte(node.nodeId), []byte(node.successor.nodeId), []byte(r.Key))) && r.Key != "" && node.nodeId != r.Key) //r.key = "" för att connecta sista nodens successor
between := (between([]byte(node.nodeId), []byte(node.successor.nodeId), []byte(r.Key))) && r.Key != "" //r.key = "" för att connecta sista nodens successor
if between {
node.successor.adress = r.Src //origin eller source
//node.successor.adress = msg.Origin
//node.successor.nodeId = msg.Key
node.successor.nodeId = r.Key
// fmt.Println("beetween")
//return
}
//ska notifymessage ha fler variabler?
N := notifyMessage(nodeAdress, node.successor.adress, nodeAdress, node.nodeId)

go func(){
node.transport.send(N) }()
go node.transport.send(N)
// fmt.Println("node id:", node.nodeId, "node successor id:", node.successor, "node predecessor id:", node.predecessor)
return
case timer := <-time.C: //timer
fmt.Println("TIMER ERROR:", timer)
node.updateSucc(1)
return
}
}
@@ -186,7 +201,9 @@ func (dhtnode *DHTNode) stableTimmer() {
for {
if dhtnode.alive {
time.Sleep(time.Millisecond * 5000)
dhtnode.createNewTask(nil, "stabilize")
go dhtnode.createNewTask(nil, "stabilize")
} else {
return
}
}
}
@@ -233,38 +250,6 @@ func (node *DHTNode) PrintRingProc() {
}()
}

func (dhtnode *DHTNode) networkLookup(msg *Msg) {
nodeAdress := dhtnode.contact.ip + ":" + dhtnode.contact.port

if between([]byte(dhtnode.nodeId), []byte(dhtnode.successor.nodeId), []byte(msg.Key)) {
if dhtnode.nodeId == msg.Key {
//fmt.Println(dhtnode.nodeId)
respMsg := responseMessage(nodeAdress, msg.Origin, nodeAdress, dhtnode.nodeId)
go func() { dhtnode.transport.send(respMsg) }()
//return
} else {
//fmt.Println(dhtnode.successor.nodeId)
respMsg := responseMessage(nodeAdress, msg.Origin, dhtnode.successor.adress, dhtnode.successor.nodeId)
go func() { dhtnode.transport.send(respMsg) }()
//return
}
} else {
//fmt.Println("lookup else ")
lookUpMsg := lookUpMessage(msg.Origin, msg.Key, nodeAdress, dhtnode.successor.adress)
go func() { dhtnode.transport.send(lookUpMsg) }()
}
//fmt.Println(dhtnode.successor.nodeId)
}

//skicka till taskQ!!!
func (node *DHTNode) initNetworkLookUp(key string, dhtnode *DHTNode) {
lookUpMsg := lookUpMessage(node.transport.bindAddress, key, node.transport.bindAddress, dhtnode.transport.bindAddress)
fmt.Println("hej")
go func() {
dhtnode.transport.send(lookUpMsg)
}()
}

func (dhtnode *DHTNode) killTheNode() {
fmt.Println("killing node ", dhtnode.nodeId)
dhtnode.alive = false
@@ -274,12 +259,37 @@ func (dhtnode *DHTNode) killTheNode() {
dhtnode.predecessor.nodeId = ""
}

/*func (dhtnode *DHTNode) alive() bool {
/*func (dhtnode *DHTNode) isTheNodeAlive() bool {
if dhtnode.alive {
return true
} else {
return false
}
}*/

//vi har ingen updatesuccessor funktion!?
func (dhtnode *DHTNode) updateSucc(key int) {
dhtAdress := dhtnode.contact.ip + ":" + dhtnode.contact.port
getPredOfFinger := getNodeMessage(dhtAdress, dhtnode.fingers.nodefingerlist[key].adress)
go dhtnode.transport.send(getPredOfFinger)

timerResp := time.NewTimer(time.Second * 1)
for {
select {
case r := <-dhtnode.responseQ:
if r.liteNode.id == "" {
dhtnode.successor.adress = dhtnode.fingers.nodefingerlist[key].adress
dhtnode.successor.nodeId = dhtnode.fingers.nodefingerlist[key].id
fmt.Println("update succ done")
}
notify := notifyMessage(dhtAdress, dhtnode.fingers.nodefingerlist[key].adress, dhtAdress, dhtnode.nodeId)
go dhtnode.transport.send(notify)
return

case <-timerResp.C:
if key < (bits - 1) {
dhtnode.updateSucc(key + 1)
}
return
}
}
}
@@ -3,7 +3,7 @@ package dht
//go test -test.run TestDHT1

import (
//"fmt"
"fmt"
"testing"
"time"
)
@@ -25,6 +25,15 @@ func TestDHT2(t *testing.T) {
// node6 := makeDHTNode(nil, "localhost", "1116")
node7 := makeDHTNode(&id7, "localhost", "1117")

/*node0 := makeDHTNode(nil, "localhost", "1110")
node1 := makeDHTNode(nil, "localhost", "1111")
node2 := makeDHTNode(nil, "localhost", "1112")
node3 := makeDHTNode(nil, "localhost", "1113")
node4 := makeDHTNode(nil, "localhost", "1114")
// node5 := makeDHTNode(nil, "localhost", "1115")
// node6 := makeDHTNode(nil, "localhost", "1116")
node7 := makeDHTNode(nil, "localhost", "1117")*/

// key1 := "2b230fe12d1c9c60a8e489d028417ac89de57635"
// key2 := "87adb987ebbd55db2c5309fd4b23203450ab0083"
// key3 := "74475501523a71c34f945ae4e87d571c2c57f6f3"
@@ -43,28 +52,47 @@ func TestDHT2(t *testing.T) {
// fmt.Println("TEST: " + node1.lookup(key3).nodeId + " is responsible for " + key3)

node1.start_server()
fmt.Println("start 1")
node2.start_server()
fmt.Println("start 2")
node3.start_server()
fmt.Println("start 3")
node7.start_server()
fmt.Println("start 7")
node0.start_server()
fmt.Println("start 0")

src := node1.contact.ip + ":" + node1.contact.port
//dst := node2.contact.ip + ":" + node2.contact.port
src := node1.contact.ip + ":" + node1.contact.port
master := &tinyNode{node1.nodeId, src}
//node1.PrintRingProc()

fmt.Println("src:", src)
fmt.Println("master:", master)
fmt.Println("node 2:", node2)
node2.join(master)
fmt.Println("")
fmt.Println("node 2 join, node 3 soon join")
node3.join(master)
node0.join(master)
node7.join(master)
//node1.isTheNodeAlive()

time.Sleep(time.Second * 5)
//node1.killTheNode()
//node1.PrintOutNetworkFingers()
//node1.isTheNodeAlive()

//node1.initLookUpNetworkFingers("08", node3)

//node1.initNetworkLookUp("01", node1)
//time.Sleep(time.Second * 10)
//node1.initPrintNetworkFingers(node2)

node4.transport.listen()

//Glöm inte lägga till en timer på "20000sek" så inte allt dör.

time.Sleep(2000 * time.Second)
}

}
@@ -35,15 +35,17 @@ func (transport *Transport) listen() {
}

func (transport *Transport) send(msg *Msg) {
udpAddr, err := net.ResolveUDPAddr("udp", msg.Dst)
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
fmt.Println("error SEND function is:", err)
if transport.node.alive {
// fmt.Println("transport send msg.dst:", msg.Dst)
udpAddr, err := net.ResolveUDPAddr("udp", msg.Dst)
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
fmt.Println("error SEND function is:", err)
}
encoded, err := json.Marshal(msg)
defer conn.Close()
_, err = conn.Write(encoded)
}
encoded, err := json.Marshal(msg)
defer conn.Close()
_, err = conn.Write(encoded)

}

func (transport *Transport) initmsgQ() {
@@ -59,34 +61,47 @@ func (transport *Transport) initmsgQ() {
case "reply": //test
fmt.Println("hej:", string(msg.Bytes))
case "printRing":
transport.node.TaskQ <- &Task{msg, "printRing"} //transport.node.printRing()
go func() { transport.node.TaskQ <- &Task{msg, "printRing"} }() //transport.node.printRing()
//transport.send(&Msg{"ring", "", v.Src, []byte(transport.node.printRing())})
case "addToRing":
transport.node.printNetworkRing(msg)
case "response":
transport.node.responseQ <- msg
go func() { transport.node.responseQ <- msg }()
case "join":
transport.node.TaskQ <- &Task{msg, "join"}
fmt.Println("transport join")
go func() { transport.node.TaskQ <- &Task{msg, "join"} }()
case "notify":
// fmt.Println("notify network")
transport.node.notifyNetwork(msg)
go transport.node.notifyNetwork(msg)
case "pred":
transport.node.getPred(msg)
go transport.node.getPred(msg)
case "lookup":
go transport.node.improvedNetworkLookUp(msg)
//fmt.Println("initmsgQ lookup: ")
go transport.node.networkLookup(msg)
//go transport.node.networkLookup(msg)
case "fingerLookup":
go transport.node.LookUpNetworkFinger(msg)
//go transport.node.LookUpNetworkFinger(msg)
//go transport.node.lookupFingers(msg)
case "heartBeat":
if transport.node.alive{
if transport.node.alive {
transport.node.transport.send(heartBeatAnswer(msg.Origin, msg.Dst))
}
case "heartBeatAnswer":
transport.node.heartBeatQ <- msg

case "heartAnswer":
go func() { transport.node.heartBeatQ <- msg }()
case "isAlive":
if transport.node.alive {
transport.node.transport.send(responseMessage(msg.Dst, msg.Origin, transport.bindAddress, transport.node.nodeId))
}
case "nodeFound":
transport.node.transport.send(ackMsg(msg.Dst, msg.Origin))
//eller är det msg.liteNode.id i &finger!?
go func() { transport.node.fingerQ <- msg.liteNode }()
case "ack":
go func() { transport.node.responseQ <- msg }()
case "fingerStart":
go transport.node.setNetworkFingers(msg)
}
}
}
}()
}
}
@@ -103,4 +103,4 @@ func generateNodeId() string {
hasher.Write([]byte(u.String()))

return fmt.Sprintf("%x", hasher.Sum(nil))
}
}