Skip to content

Commit

Permalink
app: return mapserver output to client if /render is called with matc…
Browse files Browse the repository at this point in the history
…hing ID
  • Loading branch information
olt committed Jul 26, 2017
1 parent 0283029 commit 65efb14
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 42 deletions.
171 changes: 134 additions & 37 deletions cmd/magnaserv/magnaserv.go
Expand Up @@ -17,6 +17,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/omniscale/magnacarto/mml"
Expand All @@ -43,6 +44,18 @@ type magnaserv struct {
defaultMaker builder.MapMaker
mapnikRenderer *render.Mapnik
mapserverRenderer *render.MapServer

// feedbackChan maps random websocket IDs (wsID) to channels
feedbackChans map[string]chan feedback
feedbackChansMu sync.Mutex
}

// feedback is a struct to pass information from the render handler to the websocket handler
type feedback struct {
err error
warnings []string
mml string
mss []string
}

func (s *magnaserv) styleParams(r *http.Request) (mml string, mss []string) {
Expand Down Expand Up @@ -89,9 +102,11 @@ func (s *magnaserv) render(w http.ResponseWriter, r *http.Request) {
}
}

wsID := mapReq.Query.Get("WSID")
styleFile := mapReq.Query.Get("FILE")
mml, mss := s.styleParams(r)

if styleFile == "" {
mml, mss := s.styleParams(r)
if mml == "" {
log.Println("missing mml param in request")
http.Error(w, "missing mml param", http.StatusBadRequest)
Expand All @@ -100,19 +115,20 @@ func (s *magnaserv) render(w http.ResponseWriter, r *http.Request) {

styleFile, err = s.builderCache.StyleFile(maker, mml, mss)
if err != nil {
s.sendFeedback(wsID, err, nil, mml, mss)
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

var warnings []string
w.Header().Add("Content-Type", "image/png")
if renderer == "mapserver" {
mapReq.Format = mapReq.Query.Get("FORMAT") // use requested format, not internal mapnik format
if s.mapserverRenderer == nil {
err = errors.New("mapserver not initialized")
}
err = s.mapserverRenderer.Render(styleFile, w, renderReq(mapReq))
warnings, err = s.mapserverRenderer.Render(styleFile, w, renderReq(mapReq))
} else {
if s.mapnikRenderer == nil {
err = errors.New("mapnik not initialized")
Expand All @@ -121,12 +137,46 @@ func (s *magnaserv) render(w http.ResponseWriter, r *http.Request) {
}
}

s.sendFeedback(wsID, err, warnings, mml, mss)

if err != nil {
s.internalError(w, r, err)
return
}
}

func (s *magnaserv) sendFeedback(wsID string, err error, warnings []string, mml string, mss []string) {
s.feedbackChansMu.Lock()
defer s.feedbackChansMu.Unlock()
c, ok := s.feedbackChans[wsID]
if !ok {
return
}
c <- feedback{
err: err,
warnings: warnings,
mml: mml,
mss: mss,
}
}

func (s *magnaserv) makeFeedbackChan(wsID string) chan feedback {
s.feedbackChansMu.Lock()
defer s.feedbackChansMu.Unlock()
if s.feedbackChans == nil {
s.feedbackChans = make(map[string]chan feedback)
}
c := make(chan feedback)
s.feedbackChans[wsID] = c
return c
}

func (s *magnaserv) removeFeedbackChan(wsID string) {
s.feedbackChansMu.Lock()
defer s.feedbackChansMu.Unlock()
delete(s.feedbackChans, wsID)
}

func (s *magnaserv) projects(w http.ResponseWriter, r *http.Request) {
projects, err := findProjects(s.config.StylesDir)
if err != nil {
Expand Down Expand Up @@ -276,6 +326,15 @@ func renderReq(r *maps.Request) render.Request {
return result
}

func randomID() string {
letterBytes := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
b := make([]byte, 32)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
}

func (s *magnaserv) changes(ws *websocket.Conn) {
mml, mss := s.styleParams(ws.Request())
if mml == "" {
Expand All @@ -296,10 +355,10 @@ func (s *magnaserv) changes(ws *websocket.Conn) {
maker = s.defaultMaker
}

done := make(chan struct{})
updatec, errc := s.builderCache.Notify(maker, mml, mss, done)
closeNotify := make(chan struct{})
updatec, errc := s.builderCache.Notify(maker, mml, mss, closeNotify)

// read and discard anything from client, signal close on any error
// Read and discard anything from client. Signal close on any error.
closeWs := make(chan struct{})
go func() {
readbuf := make([]byte, 0, 16)
Expand All @@ -313,63 +372,101 @@ func (s *magnaserv) changes(ws *websocket.Conn) {
}
}
}()
defer func() {
closeNotify <- struct{}{}
ws.Close()
}()

var lastMsgSent time.Time
var lastMsg []byte
// sendJSON sends JSON struct as websocket message.
// Messages are debounced. Websocket will be closed if
// the message cannot be sent.
sendJSON := func(msg interface{}) {
buf, err := json.Marshal(msg)
if err != nil {
log.Println("error encoding json", err)
closeWs <- struct{}{}
return
}
// Debounce notifications.
if time.Since(lastMsgSent) > 2*time.Second || bytes.Compare(buf, lastMsg) != 0 {
if _, err := ws.Write(buf); err != nil {
log.Println("error sending message", err)
closeWs <- struct{}{}
return
}
lastMsg = buf
lastMsgSent = time.Now()
}
}

// Create new feedback channel and send the ID to client.
// The render handler can sendFeedback with this ID. Feedback message
// will be send to the websocket client.
wsid := randomID()
feedbackC := s.makeFeedbackChan(wsid)
defer s.removeFeedbackChan(wsid)

sendJSON(struct {
WsID string `json:"wsid"`
}{WsID: wsid})

for {
select {
case <-closeWs:
done <- struct{}{}
ws.Close()
return
break
case update := <-updatec:
var msg []byte
var err error
var msg interface{}
if update.Err != nil {
if parseErr, ok := update.Err.(*mssPkg.ParseError); ok {
msg, err = json.Marshal(struct {
msg = struct {
FullError string `json:"full_error"`
Error string `json:"error"`
Filename string `json:"filename"`
Line int `json:"line"`
Column int `json:"column"`
}{parseErr.Error(), parseErr.Err, parseErr.Filename, parseErr.Line, parseErr.Column})
}{parseErr.Error(), parseErr.Err, parseErr.Filename, parseErr.Line, parseErr.Column}
} else if missingFilesErr, ok := update.Err.(*builder.FilesMissingError); ok {
msg, err = json.Marshal(struct {
msg = struct {
Error string `json:"error"`
Files []string `json:"files"`
}{"missing files", missingFilesErr.Files})
}{"missing files", missingFilesErr.Files}
} else {
msg, err = json.Marshal(struct {
msg = struct {
Error string `json:"error"`
}{update.Err.Error()})
}{update.Err.Error()}
}
} else {
msg, err = json.Marshal(struct {
msg = struct {
UpdatedAt time.Time `json:"updated_at"`
UpdatedMML bool `json:"updated_mml"`
}{update.Time, update.UpdatedMML})

}{update.Time, update.UpdatedMML}
}
if err != nil {
log.Println("error encoding json", err)
ws.Close()
return
}
// debounce notifications
if !lastMsgSent.Add(2*time.Second).After(time.Now()) || bytes.Compare(msg, lastMsg) != 0 {
if _, err := ws.Write(msg); err != nil {
done <- struct{}{}
ws.Close()
return
}
lastMsg = msg
lastMsgSent = time.Now()
sendJSON(msg)

case feedback := <-feedbackC:
var errStr string
if feedback.err != nil {
errStr = feedback.err.Error()
}
sendJSON(struct {
Error string `json:"error"`
MSS []string `json:"mss"`
MML string `json:"mml"`
Warnings []string `json:"warnings"`
}{
Error: errStr,
MSS: feedback.mss,
MML: feedback.mml,
Warnings: feedback.warnings,
})
case err := <-errc:
log.Println("error:", err)
ws.Close()
return
log.Println("watcher notify:", err)
sendJSON(struct {
Error string `json:"error"`
}{err.Error()})
break
}
}
}
Expand Down
16 changes: 11 additions & 5 deletions render/mapserver.go
@@ -1,6 +1,7 @@
package render

import (
"bytes"
"fmt"
"io"
"net/http"
Expand All @@ -24,7 +25,7 @@ func NewMapServer() (*MapServer, error) {
return &MapServer{bin: bin}, nil
}

func (m *MapServer) Render(mapfile string, dst io.Writer, mapReq Request) error {
func (m *MapServer) Render(mapfile string, dst io.Writer, mapReq Request) ([]string, error) {
if !filepath.IsAbs(mapfile) {
if wd, err := os.Getwd(); err == nil {
mapfile = filepath.Join(wd, mapfile)
Expand Down Expand Up @@ -52,6 +53,9 @@ func (m *MapServer) Render(mapfile string, dst io.Writer, mapReq Request) error
}

wd := filepath.Dir(mapfile)
buf := bytes.Buffer{}
stderr := io.MultiWriter(os.Stderr, &buf)

handler := cgi.Handler{
Path: m.bin,
Dir: wd,
Expand All @@ -61,6 +65,7 @@ func (m *MapServer) Render(mapfile string, dst io.Writer, mapReq Request) error
"PROJ_LIB",
"CURL_CA_BUNDLE",
},
Stderr: stderr,
}

w := &responseRecorder{
Expand All @@ -69,18 +74,19 @@ func (m *MapServer) Render(mapfile string, dst io.Writer, mapReq Request) error

req, err := http.NewRequest("GET", "/?"+q.Encode(), nil)
if err != nil {
return err
return nil, err
}

handler.ServeHTTP(w, req)

warnings := strings.Split(buf.String(), "\n")
if w.Code != 200 {
return fmt.Errorf("error while calling mapserv CGI (status %d)", w.Code)
return warnings, fmt.Errorf("error while calling mapserv CGI (status %d)", w.Code)
}
if ct := w.Header().Get("Content-type"); ct != "" && !strings.HasPrefix(ct, "image") {
return fmt.Errorf(" mapserv CGI did not return image (%v)", w.Header())
return warnings, fmt.Errorf(" mapserv CGI did not return image (%v)", w.Header())
}
return nil
return warnings, nil
}

// responseRecorder from net/http/httptest
Expand Down

0 comments on commit 65efb14

Please sign in to comment.