Skip to content

Commit

Permalink
Adding support for sessions closing #14. Cleanup of client API. Impro…
Browse files Browse the repository at this point in the history
…vement of session methods.
  • Loading branch information
dareid committed Sep 3, 2015
1 parent c07407c commit 8b5f769
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 60 deletions.
33 changes: 0 additions & 33 deletions auth.go

This file was deleted.

40 changes: 24 additions & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ import (
// RClient is the main Roger interface allowing interaction with R.
type RClient interface {

// Eval evaluates an R command synchronously returning the resulting object and any possible error
// Eval evaluates an R command synchronously returning the resulting object and any possible error. Creates a new session per command.
Eval(command string) (interface{}, error)

// Evaluate evaluates an R command asynchronously. The returned channel will resolve to a Packet once the command has completed.
// Evaluate evaluates an R command asynchronously. The returned channel will resolve to a Packet once the command has completed. Creates a new session per command.
Evaluate(command string) <-chan Packet

// EvaluateSync evaluates an R command synchronously, resulting in a Packet.
// EvaluateSync evaluates an R command synchronously, resulting in a Packet. Creates a new session per command.
EvaluateSync(command string) Packet

// GetReadWriteCloser obtains a connection to obtain data from the client
GetReadWriteCloser() (io.ReadWriteCloser, error)
// GetSession gets a session object which can be used to perform multiple commands in the same Rserve session.
GetSession() (Session, error)
}

type roger struct {
Expand Down Expand Up @@ -52,13 +52,29 @@ func NewRClientWithAuth(host string, port int64, user, password string) (RClient
return rClient, nil
}

func (r *roger) GetSession() (Session, error) {
rwc, err := r.getReadWriteCloser()
if err != nil {
return nil, err
}
return newSession(rwc, r.user, r.password)
}

func (r *roger) getReadWriteCloser() (io.ReadWriteCloser, error) {
connection, err := net.DialTCP("tcp", nil, r.address)
if err != nil {
return nil, err
}
return connection, nil
}

func (r *roger) EvaluateSync(command string) Packet {
sess, err := newSession(r, r.user, r.password)
sess, err := r.GetSession()
if err != nil {
return newErrorPacket(err)
}
defer sess.close()
packet := sess.sendCommand(cmdEval, command+"\n")
defer sess.Close()
packet := sess.SendCommand(command + "\n")
return packet
}

Expand All @@ -74,11 +90,3 @@ func (r *roger) Evaluate(command string) <-chan Packet {
func (r *roger) Eval(command string) (interface{}, error) {
return r.EvaluateSync(command).GetResultObject()
}

func (r *roger) GetReadWriteCloser() (io.ReadWriteCloser, error) {
connection, err := net.DialTCP("tcp", nil, r.address)
if err != nil {
return nil, err
}
return connection, nil
}
68 changes: 57 additions & 11 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@ import (
"strings"
)

// Session is an interface to send commands to an RServe session. Sessions must be closed after use.
type Session interface {

// Sends a command to RServe which is evaluated synchronously resulting in a Packet.
SendCommand(command string) Packet

// Close closes a RServe session. Sessions must be closed after use.
Close()
}

type authType int

const (
atPlain authType = 1
atCrypt authType = 2
)

type session struct {
readWriteClose io.ReadWriteCloser
readWriter *bufio.ReadWriter
Expand All @@ -25,11 +42,7 @@ type session struct {
password string
}

func newSession(client RClient, user, password string) (*session, error) {
readWriteCloser, err := client.GetReadWriteCloser()
if err != nil {
return nil, err
}
func newSession(readWriteCloser io.ReadWriteCloser, user, password string) (*session, error) {
buffRead := bufio.NewReader(readWriteCloser)
buffWrite := bufio.NewWriter(readWriteCloser)
sess := &session{
Expand All @@ -38,11 +51,11 @@ func newSession(client RClient, user, password string) (*session, error) {
user: user,
password: password,
}
err = sess.handshake()
err := sess.handshake()
return sess, err
}

func (s *session) close() {
func (s *session) Close() {
s.connected = false
s.readWriter = nil
if s.readWriteClose != nil {
Expand Down Expand Up @@ -91,13 +104,36 @@ func (s *session) parseInitialMessage() error {
return nil
}

func (s *session) login() error {
if s.authReq == false {
return nil
}
if s.authReq == true && (s.user == "" || s.password == "") {
return errors.New("Authentication is required and no credentials have been specified")
}
if s.key == "" {
s.key = "rs"
}
cmd := s.user + "\n" + s.password
if s.authType == atCrypt {
cmd = s.user + "\n" + crypt(s.password, s.key)
}

packet := s.sendCommand(cmdLogin, cmd)
if packet.IsError() {
_, err := packet.GetResultObject()
return errors.New("Authentication failed: " + err.Error())
}
return nil
}

func (s *session) handshake() error {
err := s.parseInitialMessage()
if err != nil {
return err
}

err = login(s)
err = s.login()
if err != nil {
return err
}
Expand Down Expand Up @@ -152,9 +188,7 @@ func (s *session) exeCommand(cmdType command, cmd string) {
s.readWriter.Flush()
}

func (s *session) sendCommand(cmdType command, cmd string) Packet {
s.exeCommand(cmdType, cmd)

func (s *session) readResponse() Packet {
rep := binary.LittleEndian.Uint32(s.readNBytes(4))
r1 := binary.LittleEndian.Uint32(s.readNBytes(4))
s.readNBytes(8)
Expand All @@ -166,3 +200,15 @@ func (s *session) sendCommand(cmdType command, cmd string) Packet {
results := s.readNBytes(int(r1))
return newPacket(int(rep), results)
}

func (s *session) sendCommand(cmdType command, cmd string) Packet {
if s.connected == false && cmdType != cmdLogin {
return newErrorPacket(errors.New("Session was previously closed"))
}
s.exeCommand(cmdType, cmd)
return s.readResponse()
}

func (s *session) SendCommand(cmd string) Packet {
return s.sendCommand(cmdEval, cmd)
}
35 changes: 35 additions & 0 deletions session_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package roger

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestSessionCommand(t *testing.T) {
client, _ := NewRClient("localhost", 6311)
sess, err := client.GetSession()
defer sess.Close()
obj, err := sess.SendCommand("2.2").GetResultObject()
assert.Equal(t, obj, float64(2.2))
assert.Equal(t, err, nil)
}

func TestMultipleSessionCommands(t *testing.T) {
client, _ := NewRClient("localhost", 6311)
sess, err := client.GetSession()
defer sess.Close()
assert.Equal(t, err, nil)
sess.SendCommand("x <- 2.2")
obj, err := sess.SendCommand("x").GetResultObject()
assert.Equal(t, obj, float64(2.2))
assert.Equal(t, err, nil)
}

func TestSessionClose(t *testing.T) {
client, _ := NewRClient("localhost", 6311)
sess, err := client.GetSession()
sess.Close()
_, err = sess.SendCommand("2").GetResultObject()
assert.NotEqual(t, err, nil)
}

0 comments on commit 8b5f769

Please sign in to comment.