Skip to content

Commit

Permalink
Fixed all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
josephg committed Oct 31, 2011
1 parent 5f9481e commit 52be4fd
Show file tree
Hide file tree
Showing 17 changed files with 557 additions and 335 deletions.
1 change: 1 addition & 0 deletions src/server/db/couchdb.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ parseError = (err, resp, body, callback) ->
callback()

module.exports = (options) ->
options ?= {}
db = options.uri or "http://localhost:5984/sharejs"

uriForDoc = (docName) -> "#{db}/doc:#{encodeURIComponent docName}"
Expand Down
20 changes: 12 additions & 8 deletions src/server/db/index.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,23 @@
# require('server/db').create {type:'redis'}

defaultType = 'redis'
Manager = require './manager'

module.exports = (options) ->
type = options?.type ? defaultType
options ?= {}
type = options.type ? defaultType

console.warn "Database type: 'memory' detected. This has been deprecated and will
be removed in a future version. Use 'none' instead, or just remove the db:{} block
from your options. (The behaviour has remained the same.)" if type is 'memory'

return null if type in ['none', 'memory']
db = if type in ['none', 'memory']
null
else
Db = switch type
when 'redis' then require './redis'
when 'couchdb' then require './couchdb'
else throw new Error "Invalid or unsupported database type: '#{type}'"
new Db options

Db = switch type
when 'redis' then require './redis'
when 'couchdb' then require './couchdb'
else throw new Error "Invalid or unsupported database type: '#{type}'"

new Db(options)
new Manager db
71 changes: 54 additions & 17 deletions src/server/doccache.coffee → src/server/db/manager.coffee
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
# This file implements a simple document snapshot cache. It also does a sort of
# grab-bag of functionality we need from the sharejs database layer.
# This file caches open documents and manages the interactions with the database.
#
# The cache wraps your database and stores document snapshots in memory
# for all documents which are open by at least one client. If you don't
# Document snapshots are stored in memory for all documents which are open by at
# least one client, or if they have been accessed recently. If you don't
# have a database specified, the cache will store all documents which have
# ever been opened in memory.
# ever been opened.
#
# Because the database doesn't store metadata (like cursor positions), the
# cache stores this information too.
Expand All @@ -22,12 +21,14 @@
# - Managing when to save ops & when to save snapshots.
# - getOps has version numbers added to the returned ops

types = require '../types'
types = require '../../types'

# The cache wraps a database, or null.
module.exports = DocCache = (db, options) ->
return new DocCache(db, options) if !(this instanceof DocCache)

options ?= {}

# Map from docName -> {
# ops:[{op, meta}]
# data:{snapshot, type, v, meta}
Expand Down Expand Up @@ -177,9 +178,17 @@ module.exports = DocCache = (db, options) ->
process.nextTick ->
# This is an awkward way to find out the number of clients on a document. If this
# causes performance issues, add a numClients field to the document.
if Object.keys(doc.clients).length is 0 and (db or options.forceReaping)
#
# The first check is because its possible that between refreshReapingTimeout being called and this
# event being fired, someone called delete() on the document and hence the doc is something else now.
if doc == docs[docName] and Object.keys(doc.clients).length is 0 and (db or options.forceReaping)
clearTimeout doc.reapTimer
doc.reapTimer = setTimeout (-> delete docs[docName]), options.reapTime
doc.reapTimer = reapTimer = setTimeout ->
tryWriteSnapshot docName, ->
# If the reaping timeout has been refreshed while we're writing the snapshot,
# don't reap.
delete docs[docName] if docs[docName].reapTimer is reapTimer
, options.reapTime

# ** Public methods

Expand Down Expand Up @@ -228,7 +237,7 @@ module.exports = DocCache = (db, options) ->
version = docs[docName].data.v

# Ops contains an array of ops. The last op in the list is the last op applied
end ?= version - 1
end ?= version
start = Math.min start, end

# Base is the version number of the oldest op we have cached
Expand All @@ -239,7 +248,7 @@ module.exports = DocCache = (db, options) ->
refreshReapingTimeout docName
options.stats?.cacheHit 'getOps'

return callback null, ops[(start - base)..(end - base)]
return callback null, ops[(start - base)...(end - base)]

options.stats?.cacheMiss 'getOps'

Expand Down Expand Up @@ -300,8 +309,15 @@ module.exports = DocCache = (db, options) ->

tryWriteSnapshot = (docName, callback) ->
return callback?() unless db

doc = docs[docName]

# The doc is closed
return callback?() unless doc

# The document is already saved.
return callback?() if doc.committedVersion is doc.data.v

return callback? 'Another snapshot write is in progress' if doc.snapshotWriteLock

doc.snapshotWriteLock = true
Expand All @@ -310,14 +326,20 @@ module.exports = DocCache = (db, options) ->

writeSnapshot = db?.writeSnapshot or (docName, docData, dbMeta, callback) -> callback()

# We have to cache the version in the closure because the version in the doc object could
# be updated between now and when the callback returns
version = doc.data.v
data =
v: doc.data.v
meta: doc.data.meta
snapshot: doc.data.snapshot
# The database doesn't know about object types.
type: doc.data.type.name

# Commit snapshot.
writeSnapshot docName, doc.data, doc.dbMeta, (error, dbMeta) ->
writeSnapshot docName, data, doc.dbMeta, (error, dbMeta) ->
doc.snapshotWriteLock = false
doc.committedVersion = version

# We have to use data.v here because the version in the doc could
# have been updated between the call to writeSnapshot() and now.
doc.committedVersion = data.v
doc.dbMeta = dbMeta

callback? error
Expand Down Expand Up @@ -349,12 +371,17 @@ module.exports = DocCache = (db, options) ->
writeOp = db?.writeOp or (docName, newOpData, callback) -> callback()

writeOp docName, newOpData, (error) ->

if error
# The user should probably know about this.
console.warn "Error writing ops to database: #{error}"
return callback? error

doc.data = newDocData
# Not copying in the type.
doc.data.v = newDocData.v
doc.data.snapshot = newDocData.snapshot
doc.data.meta = newDocData.meta

doc.ops.push newOpData
doc.ops.shift() if db and doc.ops.length > options.numCachedOps

Expand All @@ -378,6 +405,16 @@ module.exports = DocCache = (db, options) ->
else
callback null, doc.data

# Get the version of the document. This is a convenience method. Internally, it loads the
# whole document. (It doesn't need to do that much work, but its always used in situations
# where we're going to need a bunch of other information about the document anyway, so there's
# no benefit optimising it.)
#
# callback(error, version)
@getVersion = (docName, callback) ->
@getSnapshot docName, (error, data) ->
callback error, data?.v

# Flush saves all snapshot data to the database
@flush = (callback) ->
return callback?() unless db
Expand All @@ -397,7 +434,7 @@ module.exports = DocCache = (db, options) ->
callback?() if pendingWrites is 0

@close = (callback) ->
flush ->
@flush ->
db?.close()
callback?()

Expand Down
6 changes: 3 additions & 3 deletions src/server/db/redis.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ module.exports = RedisDb = (options) ->
return callback? err if err

if result
callback? null
callback?()
else
callback? 'Document already exists'

Expand Down Expand Up @@ -111,8 +111,8 @@ module.exports = RedisDb = (options) ->
throw err if err?

if response != null
doc_data = JSON.parse(response)
callback null, doc_data
docData = JSON.parse(response)
callback null, docData
else
callback 'Document does not exist'

Expand Down
5 changes: 3 additions & 2 deletions src/server/index.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ module.exports = create = (options, model = createModel(options)) ->
create.createModel = createModel = (options) ->
dbOptions = options?.db

db = new Db(dbOptions)
new Model(db, options)
db = new Db dbOptions

new Model db, options


# Attach the OT server frontends to the provided Node HTTP server. Use this if you
Expand Down
64 changes: 18 additions & 46 deletions src/server/model.coffee
Original file line number Diff line number Diff line change
@@ -1,79 +1,59 @@
# The model of all the ops. Responsible for applying & transforming remote deltas
# and managing the storage layer.
#
# Actual storage is handled by the database wrappers in db/*.

p = -> #require('util').debug
i = -> #require('util').inspect
# Actual storage is handled by the database wrappers in db/*, wrapped by DocCache

hat = require 'hat'

queue = require './syncqueue'
types = require '../types'
db = require './db'
Events = require './events'
DocCache = require './doccache'

module.exports = Model = (db, options) ->
return new Model(db) if !(this instanceof Model)
return new Model(db, options) if !(this instanceof Model)

options ?= {}

cache = new DocCache db, options

# Callback is called with (error, deltas)
# Deltas is a list of the deltas from versionFrom to versionTo, or
# to the most recent version if versionTo is null.
#
# At time of writing, error is always null. If the document doesn't exist or if
# start / end are too long, the ops are trimmed.
@getOps = (docName, start, end, callback) ->
db.getVersion docName, (error, v) ->
if error
callback error, null
else
db.getOps docName, start, end, callback
@getOps = (docName, start, end, callback) -> db.getOps docName, start, end, callback

# Gets the snapshot data for the specified document.
# getSnapshot(docName, callback)
# Callback is called with (error, {v: <version>, type: <type>, snapshot: <snapshot>, meta: <meta>})
@getSnapshot = getSnapshot = (docName, callback) ->
db.getSnapshot docName, (error, data) ->
p "getSnapshot #{i data}"
if error
callback error
else
data.type = types[data.type] if typeof data.type is 'string'
callback null, data
@getSnapshot = getSnapshot = (docName, callback) -> db.getSnapshot docName, callback

# Gets the latest version # of the document. May be more efficient than getSnapshot.
# Gets the latest version # of the document.
# getVersion(docName, callback)
# callback is called with (error, version).
@getVersion = db.getVersion
@getVersion = (docName, callback) -> db.getVersion docName, callback

# Create a document.
@create = (docName, type, meta, callback) ->
type = types[type] if typeof type == 'string'
if typeof meta == 'function'
callback = meta
meta = {}
[meta, callback] = [{}, meta] if typeof meta == 'function'

if docName.match /\//
callback 'Invalid document name', false
return
type = types[type] if typeof type == 'string'
return callback? 'Type not found' unless type

meta ||= {}
return callback? 'Invalid document name' if docName.match /\//

newDocData =
docData =
snapshot:type.create()
type:type.name
type:type
meta:meta || {}
v:0

p "db.create #{docName}, #{i newDocData}"

db.create docName, newDocData, callback
db.create docName, docData, (error) ->
if error
callback error
else
callback null, docData

# applyOp is not re-entrant for the same docName. Hence its logic is wrapped in a queue structure.
queues = {} # docName -> syncQueue

# Apply an op to the specified document.
Expand All @@ -84,11 +64,8 @@ module.exports = Model = (db, options) ->
# model.applyOp 'doc', OPA, -> model.applyOp 'doc', OPB
# model.applyOp 'doc', OPC
@applyOp = (docName, opData, callback) ->
p "applyOp #{docName} op #{i opData}"

# Its important that all ops are applied in order.
queues[docName] ||= queue (opData, callback) ->
p "applyOpInternal v#{opData.v} #{i opData.op} to #{docName}."
getSnapshot docName, (error, docData) ->
return callback error if error

Expand All @@ -98,7 +75,6 @@ module.exports = Model = (db, options) ->
meta.ts = Date.now()

{v:version, snapshot, type} = docData
p "applyOp hasdata v#{opVersion} #{i op} to #{docName}."

submit = ->
try
Expand All @@ -111,10 +87,8 @@ module.exports = Model = (db, options) ->
newOpData = {op, v:opVersion, meta}
newDocData = {snapshot, type:type.name, v:opVersion + 1, meta:docData.meta}

p "submit #{i newOpData}"
db.append docName, newOpData, newDocData, ->
# Success!
p "appended v#{opVersion} to #{docName}. Calling callback..."
events.onApplyOp docName, newOpData
callback null, opVersion

Expand All @@ -129,10 +103,8 @@ module.exports = Model = (db, options) ->

try
for realOp in ops
p "XFORM Doc #{docName} op #{i op} by #{i realOp.op}"
op = docData.type.transform op, realOp.op, 'left'
opVersion++
p "-> #{i op}"

catch error
console.error error.stack
Expand Down
16 changes: 8 additions & 8 deletions src/server/rest.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ router = (app, model, options) ->
unless typeof type == 'string' and (meta == undefined or typeof meta == 'object')
send400 res, 'Type invalid'
else
model.clientCreate req._client, req.params.name, type, meta, (error, result) ->
if result
send200 res
else
model.clientCreate req._client, req.params.name, type, meta, (error) ->
if error
sendError res, error
else
send200 res

# POST submits an op to the document.
app.post '/doc/:name', auth, (req, res) ->
Expand All @@ -121,11 +121,11 @@ router = (app, model, options) ->
sendJSON res, {v:newVersion}

app.delete '/doc/:name', auth, (req, res) ->
model.clientDelete req._client, req.params.name, (error, result) ->
if result
send200 res
else
model.clientDelete req._client, req.params.name, (error) ->
if error
sendError res, error
else
send200 res

# Attach the frontend to the supplied http.Server.
#
Expand Down
Loading

0 comments on commit 52be4fd

Please sign in to comment.