Skip to content
This repository has been archived by the owner on Oct 29, 2023. It is now read-only.

Commit

Permalink
Decrypt SURB replies
Browse files Browse the repository at this point in the history
  • Loading branch information
david415 committed Aug 13, 2018
1 parent fce6f6c commit 92c1639
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 36 deletions.
16 changes: 3 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"
"time"

"github.com/beeker1121/goque"
"github.com/katzenpost/client/config"
cConstants "github.com/katzenpost/client/constants"
"github.com/katzenpost/client/poisson"
Expand Down Expand Up @@ -58,7 +57,7 @@ type Client struct {
opCh chan workerOp
onlineAt time.Time
hasPKIDoc bool
egressQueue *goque.Queue
egressQueue EgressQueue
surbKeys map[[sConstants.SURBIDLength]byte][]byte
surbEtas map[time.Duration][sConstants.SURBIDLength]byte

Expand Down Expand Up @@ -120,17 +119,7 @@ func New(cfg *config.Config) (*Client, error) {
c.surbIDMap = make(map[[sConstants.SURBIDLength]byte]*MessageRef)
c.messageIDMap = make(map[[cConstants.MessageIDLength]byte]*MessageRef)
c.replyNotifyMap = make(map[[cConstants.MessageIDLength]byte]*sync.Mutex)

const egressQueueName = "egress_queue"
egressQueueDir := filepath.Join(c.cfg.Proxy.DataDir, egressQueueName)
err := utils.MkDataDir(egressQueueDir)
if err != nil {
return nil, err
}
c.egressQueue, err = goque.OpenQueue(egressQueueDir)
if err != nil {
return nil, err
}
c.egressQueue = new(Queue)

// make some synchronised conditions
c.condGotPKIDoc = sync.NewCond(new(sync.Mutex))
Expand All @@ -148,6 +137,7 @@ func New(cfg *config.Config) (*Client, error) {
basePath := c.cfg.Proxy.DataDir
linkPriv := filepath.Join(basePath, "link.private.pem")
linkPub := filepath.Join(basePath, "link.public.pem")
var err error
if c.linkKey, err = ecdh.Load(linkPriv, linkPub, rand.Reader); err != nil {
c.log.Errorf("Failure to load link keys: %s", err)
return nil, err
Expand Down
68 changes: 68 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// queue.go - Client egress queue.
// Copyright (C) 2018 David Stainton.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package client

import (
"errors"
)

const MAX_QUEUE_SIZE = 40

var QueueFullError = errors.New("Error, queue is full.")
var QueueEmptyError = errors.New("Error, queue is empty.")

type EgressQueue interface {
Peek() (*MessageRef, error)
Pop() (*MessageRef, error)
Push(*MessageRef) error
}

type Queue struct {
content [MAX_QUEUE_SIZE]MessageRef
readHead int
writeHead int
len int
}

func (q *Queue) Push(e *MessageRef) error {
if q.len >= MAX_QUEUE_SIZE {
return QueueFullError
}
q.content[q.writeHead] = *e
q.writeHead = (q.writeHead + 1) % MAX_QUEUE_SIZE
q.len++
return nil
}

func (q *Queue) Pop() (*MessageRef, error) {
if q.len <= 0 {
return nil, QueueEmptyError
}
result := q.content[q.readHead]
q.content[q.readHead] = MessageRef{}
q.readHead = (q.readHead + 1) % MAX_QUEUE_SIZE
q.len--
return &result, nil
}

func (q *Queue) Peek() (*MessageRef, error) {
if q.len <= 0 {
return nil, QueueEmptyError
}
result := q.content[q.readHead]
return &result, nil
}
30 changes: 13 additions & 17 deletions send.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,28 @@ type MessageRef struct {
SURBID *[sConstants.SURBIDLength]byte
Key []byte
Reply []byte
ACK bool // XXX not yet used
SURBType int
}

// WaitForReply blocks until a reply is received.
func (c *Client) WaitForReply(msgRef *MessageRef) {
func (c *Client) WaitForReply(msgRef *MessageRef) []byte {
c.replyNotifyMap[*msgRef.ID].Lock()
return c.messageIDMap[*msgRef.ID].Reply
}

func (c *Client) sendNext() error {
item, err := c.egressQueue.Peek()
msgRef, err := c.egressQueue.Peek()
if err != nil {
return err
}
msgRef := new(MessageRef)
err = item.ToObject(msgRef)
if err != nil {
return err
if msgRef.Provider == "" {
panic("wtf")
}
err = c.send(msgRef)
if err != nil {
return err
}
_, err = c.egressQueue.Dequeue()
_, err = c.egressQueue.Pop()
return err
}

Expand All @@ -79,7 +78,6 @@ func (c *Client) send(msgRef *MessageRef) error {
msgRef.ReplyETA = eta
c.surbIDMap[surbID] = msgRef
c.messageIDMap[*msgRef.ID] = msgRef
c.log.Infof("SENT MESSAGE ID %x", *msgRef.ID)
} else {
err = c.minclient.SendUnreliableCiphertext(msgRef.Recipient, msgRef.Provider, msgRef.Payload)
}
Expand Down Expand Up @@ -126,21 +124,21 @@ func (c *Client) SendUnreliable(recipient, provider string, message []byte) (*Me
Payload: message,
WithSURB: false,
}
_, err := c.egressQueue.EnqueueObject(msgRef)
err := c.egressQueue.Push(&msgRef)
return &msgRef, err
}

func (c *Client) SendKaetzchenQuery(recipient, provider string, message []byte, wantResponse bool) (*MessageRef, error) {
c.log.Info("SEND KAETZCHEN QUERY")

if provider == "" {
panic("wtf")
}
// Ensure the request message is under the maximum for a single
// packet, and pad out the message so that it is the correct size.
if len(message) > constants.UserForwardPayloadLength {
return nil, fmt.Errorf("invalid message size: %v", len(message))
}
payload := make([]byte, constants.UserForwardPayloadLength)
copy(payload, message)

id := [cConstants.MessageIDLength]byte{}
io.ReadFull(rand.Reader, id[:])
var msgRef = MessageRef{
Expand All @@ -149,12 +147,10 @@ func (c *Client) SendKaetzchenQuery(recipient, provider string, message []byte,
Provider: provider,
Payload: payload,
WithSURB: wantResponse,
SURBType: surbTypeKaetzchen,
}
c.log.Info("-----------------------------------------------------------")
c.log.Infof("Storing reply notification mutex at message ID %x", id)
c.replyNotifyMap[*msgRef.ID] = new(sync.Mutex)
c.replyNotifyMap[*msgRef.ID].Lock()

_, err := c.egressQueue.EnqueueObject(msgRef)
err := c.egressQueue.Push(&msgRef)
return &msgRef, err
}
43 changes: 37 additions & 6 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,26 @@
package client

import (
"encoding/hex"
"errors"
"fmt"
mrand "math/rand"
"time"

"github.com/katzenpost/client/internal/pkiclient"
cconstants "github.com/katzenpost/core/constants"
"github.com/katzenpost/core/pki"
"github.com/katzenpost/core/sphinx"
"github.com/katzenpost/core/sphinx/constants"
"github.com/katzenpost/minclient"
)

const (
surbTypeACK = 0
surbTypeKaetzchen = 1
surbTypeInternal = 2
)

// NewSession establishes a session with provider using key.
// This method will block until session is connected to the Provider.
func (c *Client) NewSession() error {
Expand Down Expand Up @@ -82,6 +91,9 @@ func (c *Client) GetService(serviceName string) (*ServiceDescriptor, error) {
return nil, errors.New("pki doc is nil")
}
serviceDescriptors := FindServices(serviceName, doc)
if len(serviceDescriptors) == 0 {
return nil, errors.New("GetService failure, service not found in pki doc.")
}
return &serviceDescriptors[mrand.Intn(len(serviceDescriptors))], nil
}

Expand Down Expand Up @@ -113,14 +125,15 @@ func (c *Client) onMessage(ciphertextBlock []byte) error {

// OnACK is called by the minclient api whe
// we receive an ACK message
func (c *Client) onACK(surbid *[constants.SURBIDLength]byte, message []byte) error {
c.log.Infof("OnACK with SURBID %x", *surbid)
msgRef, ok := c.surbIDMap[*surbid]
func (c *Client) onACK(surbID *[constants.SURBIDLength]byte, ciphertext []byte) error {
idStr := fmt.Sprintf("[%v]", hex.EncodeToString(surbID[:]))
c.log.Infof("OnACK with SURBID %x", idStr)

msgRef, ok := c.surbIDMap[*surbID]
if !ok {
c.log.Debug("wtf, received reply with unexpected SURBID")
return nil
}
c.log.Infof("reply with message ID %x", *msgRef.ID)
_, ok = c.replyNotifyMap[*msgRef.ID]
if !ok {
c.log.Infof("wtf, received reply with no reply notification mutex, map len is %d", len(c.replyNotifyMap))
Expand All @@ -129,8 +142,26 @@ func (c *Client) onACK(surbid *[constants.SURBIDLength]byte, message []byte) err
}
return nil
}
msgRef.Reply = message
c.replyNotifyMap[*msgRef.ID].Unlock()

plaintext, err := sphinx.DecryptSURBPayload(ciphertext, msgRef.Key)
if err != nil {
c.log.Infof("SURB Reply decryption failure: %s", err)
return err
}
if len(plaintext) != cconstants.ForwardPayloadLength {
c.log.Warningf("Discarding SURB %v: Invalid payload size: %v", idStr, len(plaintext))
return nil
}

switch msgRef.SURBType {
case surbTypeACK:
// XXX TODO fix me
case surbTypeKaetzchen, surbTypeInternal:
msgRef.Reply = plaintext[2:]
c.replyNotifyMap[*msgRef.ID].Unlock()
default:
c.log.Warningf("Discarding SURB %v: Unknown type: 0x%02x", idStr, msgRef.SURBType)
}
return nil
}

Expand Down

0 comments on commit 92c1639

Please sign in to comment.