Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

* Add lanes.lua.

  • Loading branch information...
commit 7a202d30dd96ab455831e87bd3a2070ccbb7aab4 1 parent 4c36838
@jjensen jjensen authored
Showing with 611 additions and 0 deletions.
  1. +611 −0 bin/lua/lua/lanes.lua
View
611 bin/lua/lua/lanes.lua
@@ -0,0 +1,611 @@
+--
+-- LANES.LUA
+--
+-- Multithreading and -core support for Lua
+--
+-- Author: Asko Kauppi <akauppi@gmail.com>
+--
+-- History:
+-- Jun-08 AKa: major revise
+-- 15-May-07 AKa: pthread_join():less version, some speedup & ability to
+-- handle more threads (~ 8000-9000, up from ~ 5000)
+-- 26-Feb-07 AKa: serialization working (C side)
+-- 17-Sep-06 AKa: started the module (serialization)
+--
+--[[
+===============================================================================
+
+Copyright (C) 2007-08 Asko Kauppi <akauppi@gmail.com>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+
+===============================================================================
+]]--
+
+module( "lanes", package.seeall )
+
+require "lua51-lanes"
+assert( type(lanes)=="table" )
+
+local mm= lanes
+
+local linda_id= assert( mm.linda_id )
+
+local thread_new= assert(mm.thread_new)
+local thread_status= assert(mm.thread_status)
+local thread_join= assert(mm.thread_join)
+local thread_cancel= assert(mm.thread_cancel)
+
+local _single= assert(mm._single)
+local _version= assert(mm._version)
+
+local _deep_userdata= assert(mm._deep_userdata)
+
+local now_secs= assert( mm.now_secs )
+local wakeup_conv= assert( mm.wakeup_conv )
+local timer_gateway= assert( mm.timer_gateway )
+
+local max_prio= assert( mm.max_prio )
+
+-- This check is for sublanes requiring Lanes
+--
+-- TBD: We could also have the C level expose 'string.gmatch' for us. But this is simpler.
+--
+if not string then
+ error( "To use 'lanes', you will also need to have 'string' available.", 2 )
+end
+
+--
+-- Cache globals for code that might run under sandboxing
+--
+local assert= assert
+local string_gmatch= assert( string.gmatch )
+local select= assert( select )
+local type= assert( type )
+local pairs= assert( pairs )
+local tostring= assert( tostring )
+local error= assert( error )
+local setmetatable= assert( setmetatable )
+local rawget= assert( rawget )
+
+ABOUT=
+{
+ author= "Asko Kauppi <akauppi@gmail.com>",
+ description= "Running multiple Lua states in parallel",
+ license= "MIT/X11",
+ copyright= "Copyright (c) 2007-08, Asko Kauppi",
+ version= _version,
+}
+
+
+-- Making copies of necessary system libs will pass them on as upvalues;
+-- only the first state doing "require 'lanes'" will need to have 'string'
+-- and 'table' visible.
+--
+local function WR(str)
+ io.stderr:write( str.."\n" )
+end
+
+local function DUMP( tbl )
+ if not tbl then return end
+ local str=""
+ for k,v in pairs(tbl) do
+ str= str..k.."="..tostring(v).."\n"
+ end
+ WR(str)
+end
+
+
+---=== Laning ===---
+
+-- lane_h[1..n]: lane results, same as via 'lane_h:join()'
+-- lane_h[0]: can be read to make sure a thread has finished (always gives 'true')
+-- lane_h[-1]: error message, without propagating the error
+--
+-- Reading a Lane result (or [0]) propagates a possible error in the lane
+-- (and execution does not return). Cancelled lanes give 'nil' values.
+--
+-- lane_h.state: "pending"/"running"/"waiting"/"done"/"error"/"cancelled"
+--
+local lane_mt= {
+ __index= function( me, k )
+ if type(k) == "number" then
+ -- 'me[0]=true' marks we've already taken in the results
+ --
+ if not rawget( me, 0 ) then
+ -- Wait indefinately; either propagates an error or
+ -- returns the return values
+ --
+ me[0]= true -- marker, even on errors
+
+ local t= { thread_join(me._ud) } -- wait indefinate
+ --
+ -- { ... } "done": regular return, 0..N results
+ -- { } "cancelled"
+ -- { nil, err_str, stack_tbl } "error"
+
+ local st= thread_status(me._ud)
+ if st=="done" then
+ -- Use 'pairs' and not 'ipairs' so that nil holes in
+ -- the returned values are tolerated.
+ --
+ for i,v in pairs(t) do
+ me[i]= v
+ end
+ elseif st=="error" then
+ assert( t[1]==nil and t[2] and type(t[3])=="table" )
+ me[-1]= t[2]
+ -- me[-2] could carry the stack table, but even
+ -- me[-1] is rather unnecessary (and undocumented);
+ -- use ':join()' instead. --AKa 22-Jan-2009
+ elseif st=="cancelled" then
+ -- do nothing
+ else
+ error( "Unexpected status: "..st )
+ end
+ end
+
+ -- Check errors even if we'd first peeked them via [-1]
+ -- and then came for the actual results.
+ --
+ local err= rawget(me, -1)
+ if err~=nil and k~=-1 then
+ -- Note: Lua 5.1 interpreter is not prepared to show
+ -- non-string errors, so we use 'tostring()' here
+ -- to get meaningful output. --AKa 22-Jan-2009
+ --
+ -- Also, the stack dump we get is no good; it only
+ -- lists our internal Lanes functions. There seems
+ -- to be no way to switch it off, though.
+
+ -- Level 3 should show the line where 'h[x]' was read
+ -- but this only seems to work for string messages
+ -- (Lua 5.1.4). No idea, why. --AKa 22-Jan-2009
+ --
+ error( tostring(err), 3 ) -- level 3 should show the line where 'h[x]' was read
+ end
+ return rawget( me, k )
+ --
+ elseif k=="status" then -- me.status
+ return thread_status(me._ud)
+ --
+ else
+ error( "Unknown key: "..k )
+ end
+ end
+ }
+
+-----
+-- h= lanes.gen( [libs_str|opt_tbl [, ...],] lane_func ) ( [...] )
+--
+-- 'libs': nil: no libraries available (default)
+-- "": only base library ('assert', 'print', 'unpack' etc.)
+-- "math,os": math + os + base libraries (named ones + base)
+-- "*": all standard libraries available
+--
+-- 'opt': .priority: int (-2..+2) smaller is lower priority (0 = default)
+--
+-- .cancelstep: bool | uint
+-- false: cancellation check only at pending Linda operations
+-- (send/receive) so no runtime performance penalty (default)
+-- true: adequate cancellation check (same as 100)
+-- >0: cancellation check every x Lua lines (small number= faster
+-- reaction but more performance overhead)
+--
+-- .globals: table of globals to set for a new thread (passed by value)
+--
+-- ... (more options may be introduced later) ...
+--
+-- Calling with a function parameter ('lane_func') ends the string/table
+-- modifiers, and prepares a lane generator. One can either finish here,
+-- and call the generator later (maybe multiple times, with different parameters)
+-- or add on actual thread arguments to also ignite the thread on the same call.
+--
+local lane_proxy
+
+local valid_libs= {
+ ["package"]= true,
+ ["table"]= true,
+ ["io"]= true,
+ ["os"]= true,
+ ["string"]= true,
+ ["math"]= true,
+ ["debug"]= true,
+ --
+ ["base"]= true,
+ ["coroutine"]= true,
+ ["*"]= true
+}
+
+function gen( ... )
+ local opt= {}
+ local libs= nil
+ local lev= 2 -- level for errors
+
+ local n= select('#',...)
+
+ if n==0 then
+ error( "No parameters!" )
+ end
+
+ for i=1,n-1 do
+ local v= select(i,...)
+ if type(v)=="string" then
+ libs= libs and libs..","..v or v
+ elseif type(v)=="table" then
+ for k,vv in pairs(v) do
+ opt[k]= vv
+ end
+ elseif v==nil then
+ -- skip
+ else
+ error( "Bad parameter: "..tostring(v) )
+ end
+ end
+
+ local func= select(n,...)
+ if type(func)~="function" then
+ error( "Last parameter not function: "..tostring(func) )
+ end
+
+ -- Check 'libs' already here, so the error goes in the right place
+ -- (otherwise will be noticed only once the generator is called)
+ --
+ if libs then
+ for s in string_gmatch(libs, "[%a*]+") do
+ if not valid_libs[s] then
+ error( "Bad library name: "..s )
+ end
+ end
+ end
+
+ local prio, cs, g_tbl
+
+ for k,v in pairs(opt) do
+ if k=="priority" then prio= v
+ elseif k=="cancelstep" then cs= (v==true) and 100 or
+ (v==false) and 0 or
+ type(v)=="number" and v or
+ error( "Bad cancelstep: "..tostring(v), lev )
+ elseif k=="globals" then g_tbl= v
+ --..
+ elseif k==1 then error( "unkeyed option: ".. tostring(v), lev )
+ else error( "Bad option: ".. tostring(k), lev )
+ end
+ end
+
+ -- Lane generator
+ --
+ return function(...)
+ return lane_proxy( thread_new( func, libs, cs, prio, g_tbl,
+ ... ) ) -- args
+ end
+end
+
+lane_proxy= function( ud )
+ local proxy= {
+ _ud= ud,
+
+ -- void= me:cancel()
+ --
+ cancel= function(me) thread_cancel(me._ud) end,
+
+ -- [...] | [nil,err,stack_tbl]= me:join( [wait_secs=-1] )
+ --
+ join= function( me, wait )
+ return thread_join( me._ud, wait )
+ end,
+ }
+ assert( proxy._ud )
+ setmetatable( proxy, lane_mt )
+
+ return proxy
+end
+
+
+---=== Lindas ===---
+
+-- We let the C code attach methods to userdata directly
+
+-----
+-- linda_ud= lanes.linda()
+--
+function linda()
+ local proxy= _deep_userdata( linda_id )
+ assert( (type(proxy) == "userdata") and getmetatable(proxy) )
+ return proxy
+end
+
+
+---=== Timers ===---
+
+--
+-- On first 'require "lanes"', a timer lane is spawned that will maintain
+-- timer tables and sleep in between the timer events. All interaction with
+-- the timer lane happens via a 'timer_gateway' Linda, which is common to
+-- all that 'require "lanes"'.
+--
+-- Linda protocol to timer lane:
+--
+-- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs]
+--
+local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging
+local first_time_key= "first time"
+
+local first_time= timer_gateway:get(first_time_key) == nil
+timer_gateway:set(first_time_key,true)
+
+--
+-- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally
+-- has 'table' always declared)
+--
+if first_time then
+ local table_remove= assert( table.remove )
+ local table_insert= assert( table.insert )
+
+ --
+ -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h,
+ -- [key]= { wakeup_secs [,period_secs] } [, ...] },
+ -- }
+ --
+ -- Collection of all running timers, indexed with linda's & key.
+ --
+ -- Note that we need to use the deep lightuserdata identifiers, instead
+ -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple
+ -- entries for the same timer.
+ --
+ -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but
+ -- also important to keep the Linda alive, even if all outside world threw
+ -- away pointers to it (which would ruin uniqueness of the deep pointer).
+ -- Now we're safe.
+ --
+ local collection= {}
+
+ --
+ -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] )
+ --
+ local function set_timer( linda, key, wakeup_at, period )
+
+ assert( wakeup_at==nil or wakeup_at>0.0 )
+ assert( period==nil or period>0.0 )
+
+ local linda_deep= linda:deep()
+ assert( linda_deep )
+
+ -- Find or make a lookup for this timer
+ --
+ local t1= collection[linda_deep]
+ if not t1 then
+ t1= { [linda_deep]= linda } -- proxy to use the Linda
+ collection[linda_deep]= t1
+ end
+
+ if wakeup_at==nil then
+ -- Clear the timer
+ --
+ t1[key]= nil
+
+ -- Remove empty tables from collection; speeds timer checks and
+ -- lets our 'safety reference' proxy be gc:ed as well.
+ --
+ local empty= true
+ for k,_ in pairs(t1) do
+ if k~= linda_deep then
+ empty= false; break
+ end
+ end
+ if empty then
+ collection[linda_deep]= nil
+ end
+
+ -- Note: any unread timer value is left at 'linda[key]' intensionally;
+ -- clearing a timer just stops it.
+ else
+ -- New timer or changing the timings
+ --
+ local t2= t1[key]
+ if not t2 then
+ t2= {}; t1[key]= t2
+ end
+
+ t2[1]= wakeup_at
+ t2[2]= period -- can be 'nil'
+ end
+ end
+
+ -----
+ -- [next_wakeup_at]= check_timers()
+ --
+ -- Check timers, and wake up the ones expired (if any)
+ --
+ -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none).
+ --
+ local function check_timers()
+
+ local now= now_secs()
+ local next_wakeup
+
+ for linda_deep,t1 in pairs(collection) do
+ for key,t2 in pairs(t1) do
+ --
+ if key==linda_deep then
+ -- no 'continue' in Lua :/
+ else
+ -- 't2': { wakeup_at_secs [,period_secs] }
+ --
+ local wakeup_at= t2[1]
+ local period= t2[2] -- may be 'nil'
+
+ if wakeup_at <= now then
+ local linda= t1[linda_deep]
+ assert(linda)
+
+ linda:set( key, now )
+
+ -- 'pairs()' allows the values to be modified (and even
+ -- removed) as far as keys are not touched
+
+ if not period then
+ -- one-time timer; gone
+ --
+ t1[key]= nil
+ wakeup_at= nil -- no 'continue' in Lua :/
+ else
+ -- repeating timer; find next wakeup (may jump multiple repeats)
+ --
+ repeat
+ wakeup_at= wakeup_at+period
+ until wakeup_at > now
+
+ t2[1]= wakeup_at
+ end
+ end
+
+ if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then
+ next_wakeup= wakeup_at
+ end
+ end
+ end -- t2 loop
+ end -- t1 loop
+
+ return next_wakeup -- may be 'nil'
+ end
+
+ -----
+ -- Snore loop (run as a lane on the background)
+ --
+ -- High priority, to get trustworthy timings.
+ --
+ -- We let the timer lane be a "free running" thread; no handle to it
+ -- remains.
+ --
+ gen( "io", { priority=max_prio }, function()
+
+ while true do
+ local next_wakeup= check_timers()
+
+ -- Sleep until next timer to wake up, or a set/clear command
+ --
+ local secs= next_wakeup and (next_wakeup - now_secs()) or nil
+ local linda= timer_gateway:receive( secs, TGW_KEY )
+
+ if linda then
+ local key= timer_gateway:receive( 0.0, TGW_KEY )
+ local wakeup_at= timer_gateway:receive( 0.0, TGW_KEY )
+ local period= timer_gateway:receive( 0.0, TGW_KEY )
+ assert( key and wakeup_at and period )
+
+ set_timer( linda, key, wakeup_at, period>0 and period or nil )
+ end
+ end
+ end )()
+end
+
+-----
+-- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] )
+--
+function timer( linda, key, a, period )
+
+ if a==0.0 then
+ -- Caller expects to get current time stamp in Linda, on return
+ -- (like the timer had expired instantly); it would be good to set this
+ -- as late as possible (to give most current time) but also we want it
+ -- to precede any possible timers that might start striking.
+ --
+ linda:set( key, now_secs() )
+
+ if not period or period==0.0 then
+ timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer
+ return -- nothing more to do
+ end
+ a= period
+ end
+
+ local wakeup_at= type(a)=="table" and wakeup_conv(a) -- given point of time
+ or now_secs()+a
+ -- queue to timer
+ --
+ timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period )
+end
+
+
+---=== Lock & atomic generators ===---
+
+-- These functions are just surface sugar, but make solutions easier to read.
+-- Not many applications should even need explicit locks or atomic counters.
+
+--
+-- lock_f= lanes.genlock( linda_h, key [,N_uint=1] )
+--
+-- = lock_f( +M ) -- acquire M
+-- ...locked...
+-- = lock_f( -M ) -- release M
+--
+-- Returns an access function that allows 'N' simultaneous entries between
+-- acquire (+M) and release (-M). For binary locks, use M==1.
+--
+function genlock( linda, key, N )
+ linda:limit(key,N)
+ linda:set(key,nil) -- clears existing data
+
+ --
+ -- [true [, ...]= trues(uint)
+ --
+ local function trues(n)
+ if n>0 then return true,trues(n-1) end
+ end
+
+ return
+ function(M)
+ if M>0 then
+ -- 'nil' timeout allows 'key' to be numeric
+ linda:send( nil, key, trues(M) ) -- suspends until been able to push them
+ else
+ for i=1,-M do
+ linda:receive( key )
+ end
+ end
+ end
+end
+
+
+--
+-- atomic_f= lanes.genatomic( linda_h, key [,initial_num=0.0] )
+--
+-- int= atomic_f( [diff_num=1.0] )
+--
+-- Returns an access function that allows atomic increment/decrement of the
+-- number in 'key'.
+--
+function genatomic( linda, key, initial_val )
+ linda:limit(key,2) -- value [,true]
+ linda:set(key,initial_val or 0.0) -- clears existing data (also queue)
+
+ return
+ function(diff)
+ -- 'nil' allows 'key' to be numeric
+ linda:send( nil, key, true ) -- suspends until our 'true' is in
+ local val= linda:get(key) + (diff or 1.0)
+ linda:set( key, val ) -- releases the lock, by emptying queue
+ return val
+ end
+end
+
+
+--the end
Please sign in to comment.
Something went wrong with that request. Please try again.