Skip to content

Commit

Permalink
Implement package
Browse files Browse the repository at this point in the history
  • Loading branch information
ysugimoto committed Apr 4, 2016
1 parent ef79b03 commit 1ae6d8e
Show file tree
Hide file tree
Showing 16 changed files with 382 additions and 277 deletions.
18 changes: 18 additions & 0 deletions aun.go
@@ -0,0 +1,18 @@
// Simple WebSocket implements over TCP/TLS.
//
// References:
// https://tools.ietf.org/html/rfc6455
// http://www.hcn.zaq.ne.jp/___/WEB/RFC6455-ja.html
package aun

// Connection state constants
const (
INITIALIZE = iota
OPENING
CONNECTED
CLOSING
CLOSED
)

// Sec-WebSocket-Accept key calculate seed
const ACCEPTKEY = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
83 changes: 57 additions & 26 deletions src/aun/connection.go → connection.go
Expand Up @@ -8,29 +8,43 @@ import (
"time"
)

const (
INITIALIZE = iota
OPENING
CONNECTED
CLOSING
CLOSED
)

// Client connection socket wrapper struct.
type Connection struct {
// socket max buffer size
maxDataSize int
state int
conn net.Conn
Read chan MessageReadable
Write chan MessageReadable
Close chan struct{}
broadcast chan MessageReadable
manager chan *Connection
join chan *Connection
frameStack FrameStack

// connection status
state int

// TCP socket connection
conn net.Conn

// Read message channel
Read chan Readable

// Send socket channel
Write chan Readable

// Close channel
Close chan struct{}

// broadcasting channnel ( supply from Server )
broadcast chan Readable

// leave channnel ( supply from Server )
manager chan *Connection

// join channnel ( supply from Server )
join chan *Connection

// Frame queue stack ( treats FIN = 0 message queue )
frameStack FrameStack
}

// Create new connection.
func NewConnection(conn net.Conn, maxDataSize int) *Connection {
if maxDataSize == 0 {
// Default buffer size is 1024 bytes.
maxDataSize = 1024
}

Expand All @@ -39,13 +53,14 @@ func NewConnection(conn net.Conn, maxDataSize int) *Connection {
maxDataSize: maxDataSize,
conn: conn,
frameStack: FrameStack{},
Read: make(chan MessageReadable, 1),
Write: make(chan MessageReadable, 1),
Read: make(chan Readable, 1),
Write: make(chan Readable, 1),
Close: make(chan struct{}),
}
}

func (c *Connection) Wait(broadCast chan MessageReadable, join, manager chan *Connection) {
// Waiting incoming message, receive channel.
func (c *Connection) Wait(broadCast chan Readable, join, manager chan *Connection) {
c.broadcast = broadCast
c.manager = manager
c.join = join
Expand All @@ -54,30 +69,37 @@ func (c *Connection) Wait(broadCast chan MessageReadable, join, manager chan *Co
c.loop()
}

// Main channael message waiting
func (c *Connection) loop() {
defer c.conn.Close()
// Outer loop label
OUTER:
for {
select {
// Message incoming
case msg := <-c.Read:
switch c.state {
// When state is INITIALIZE, process handshake.
case INITIALIZE:
if err := c.handshake(msg); err == nil {
c.join <- c
}
// When state is CONNECTED, incoming message.
case CONNECTED:
m, err := NewMessageFrame(msg.getData())
frame := NewFrame()
err := frame.parse(msg.getData())
if err != nil {
fmt.Println(err)
break OUTER
}

if err := c.handleFrame(m.Frame); err != nil {
if err := c.handleFrame(frame); err != nil {
fmt.Println(err)
break OUTER
}
}
go c.readSocket()
// Message sending
case msg := <-c.Write:
data := msg.getData()
size := len(data)
Expand All @@ -94,14 +116,15 @@ OUTER:
break
}
}
// Connection closing
case <-c.Close:
break OUTER
}
c.conn.SetDeadline(time.Now().Add(1 * time.Minute))
}
fmt.Println("connection will closing")
}

// Read message from socket.
func (c *Connection) readSocket() {
dat := make([]byte, 0)
buf := make([]byte, c.maxDataSize)
Expand All @@ -119,29 +142,35 @@ func (c *Connection) readSocket() {
}
}

func (c *Connection) handshake(msg MessageReadable) error {
// Processing handshake.
func (c *Connection) handshake(msg Readable) error {
c.state = OPENING

request := NewRequest(string(msg.getData()))
if !isValidHandshake(request) {
fmt.Println("Error")

// Check valid handshke request
if !request.isValid() {
c.Close <- struct{}{}
return errors.New("Invalid handshake request")
}
response := NewResponse(request)
c.Write <- response
// state changed to CONNECTED
c.state = CONNECTED
return nil
}

// Processing incoming message frame
func (c *Connection) handleFrame(frame *Frame) error {
switch frame.Opcode {

// text / binary frame
case 1, 2:
c.frameStack = append(c.frameStack, frame)
if frame.Fin == 0 {
return nil
}
// synthesize queueing frames (if exists)
message := c.frameStack.synthesize()
c.frameStack = FrameStack{}
frames, err := BuildFrame(message, c.maxDataSize)
Expand All @@ -151,9 +180,11 @@ func (c *Connection) handleFrame(frame *Frame) error {
for _, frame := range frames {
c.broadcast <- NewMessage(frame.toFrameBytes())
}

// closing frame
case 8:
c.Close <- struct{}{}

// ping frame
case 9:
c.broadcast <- NewMessage(NewPongFrame().toFrameBytes())
Expand Down
22 changes: 21 additions & 1 deletion src/aun/frame.go → frame.go
Expand Up @@ -26,6 +26,8 @@ import (
)

type Frame struct {
// embeded interface
Readable
Fin int
RSV1 int
RSV2 int
Expand All @@ -37,10 +39,12 @@ type Frame struct {
PayloadData []byte
}

// Create new frame
func NewFrame() *Frame {
return &Frame{}
}

// Create "pong" frame
func NewPongFrame() *Frame {
return &Frame{
Fin: 1,
Expand All @@ -54,6 +58,7 @@ func NewPongFrame() *Frame {
}
}

// Create Message frame for S->C sending
func BuildFrame(message []byte, maxSize int) (FrameStack, error) {
stack := FrameStack{}

Expand All @@ -75,6 +80,9 @@ func BuildFrame(message []byte, maxSize int) (FrameStack, error) {
return stack, nil
}

// Create single message frame.
// If finBit is equal to zero,
// Message was split deriverling.
func BuildSingleFrame(message []byte, finBit int, opcode int) (*Frame, error) {
return &Frame{
Fin: finBit,
Expand All @@ -88,6 +96,7 @@ func BuildSingleFrame(message []byte, finBit int, opcode int) (*Frame, error) {
}, nil
}

// Parse them incoming message frame.
func (f *Frame) parse(buffer []byte) error {
bits := int(buffer[0])
f.Fin = (bits >> 7) & 1
Expand All @@ -102,16 +111,20 @@ func (f *Frame) parse(buffer []byte) error {

index := 2
switch {
// payload length = 126, using length of 2 bytes
case f.PayloadLength == 126:
n := binary.BigEndian.Uint16(buffer[index:(index + 2)])
f.PayloadLength = int(n)
index += 2
// payload length = 127, using length of 8 bytes
case f.PayloadLength == 127:
n := binary.BigEndian.Uint64(buffer[index:(index + 8)])
f.PayloadLength = int(n)
index += 8
}

// Masking check.
// C->S message has always need to be masking.
if f.Mask > 0 {
f.MaskingKey = buffer[index:(index + 4)]
index += 4
Expand All @@ -120,6 +133,8 @@ func (f *Frame) parse(buffer []byte) error {
for i := 0; i < size; i++ {
f.PayloadData = append(
f.PayloadData,
// Unmasking payload:
// payload-i ^ masking-key-j mod 4
byte((int(payload[i]) ^ int(f.MaskingKey[i%4]))),
)
}
Expand All @@ -130,6 +145,12 @@ func (f *Frame) parse(buffer []byte) error {
return nil
}

// Implement Readable interface
func (f *Frame) getData() []byte {
return f.toFrameBytes()
}

// Binarify the frame to send.
func (f *Frame) toFrameBytes() (data []byte) {
bin := 0
bin |= (f.Fin << 7)
Expand Down Expand Up @@ -172,5 +193,4 @@ func (f *Frame) toFrameBytes() (data []byte) {
}
data = append(data, f.PayloadData...)
return

}
2 changes: 2 additions & 0 deletions src/aun/frame_stack.go → frame_stack.go
@@ -1,7 +1,9 @@
package aun

// Define type the *Frame's slice.
type FrameStack []*Frame

// Synthesize the queueing frames.
func (f FrameStack) synthesize() []byte {
var payload []byte

Expand Down
15 changes: 0 additions & 15 deletions main.go

This file was deleted.

5 changes: 4 additions & 1 deletion src/aun/message.go → message.go
@@ -1,16 +1,19 @@
package aun

// Simple message struct.
type Message struct {
MessageReadable
Readable
Data string
}

// Create []byte wrapped pointer
func NewMessage(data []byte) *Message {
return &Message{
Data: string(data),
}
}

// Readable interface implement
func (m *Message) getData() []byte {
return []byte(m.Data)
}
9 changes: 9 additions & 0 deletions readable.go
@@ -0,0 +1,9 @@
package aun

// Readable Interface

// This package's channel messaging type must be implement this interface.
// Messages will get the bytes for messaage through the "getData" method.
type Readable interface {
getData() []byte
}

0 comments on commit 1ae6d8e

Please sign in to comment.