Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

More work on Modbus connection class

  • Loading branch information...
commit 22414399797f1b54d9c519aa1ca0b92926417948 1 parent 56210d3
@tallakt authored
Showing with 79 additions and 49 deletions.
  1. +79 −49 lib/machines/io/{ModbusSlave.rb → modbus_slave.rb}
View
128 lib/machines/io/ModbusSlave.rb → lib/machines/io/modbus_slave.rb
@@ -6,16 +6,17 @@
module Machines
module IO
- # All methods are assumed to perform in the Scvheduler current thread
+ # All methods are assumed to perform in the Scheduler current thread
# Performing requests are done in private threads and when the results
# are ready, they are added to the queue of scheduled tasks in the
# Scheduler
class ModbusSlave
- ModbusEntry = Struct.new :signal, :format, :scangroup
+ ModbusEntry = Struct.new :signal, :format, :scangroup, :length
notify :exception
def initialize(options = {})
+ #TODO timeout values?
@opts = {
:port => 502,
:host => 'localhost',
@@ -25,7 +26,6 @@ def initialize(options = {})
:threads => 1
}
@opts.merge! options
- @connection = RModbus::TCPClient.new @opts[:host], @opts[:port], @opts[:slave_address]
@bool_inputs = MultiRBTree.new
@word_inputs = MultiRBTree.new
@bool_outputs = MultiRBTree.new
@@ -42,14 +42,7 @@ def initialize(options = {})
# Start the background threads
1.upto(@opts[:threads]) do
Thread.start do
- @thread_tasks[:queue].synchronize do
- @thread_tasks[:cond].wait_while { @thread_tasks[:queue].empty? }
- begin
- @thread_tasks[:queue].shift.call
- rescue Exception => ex
- notify_exception ex
- end
- end
+ communication_thread_start
end
end
end
@@ -72,27 +65,28 @@ def output(signal, address, options={})
end
end
- def close
- @connection.close
- end
-
-
- def update(scangroup = :all)
+ def update(filter = :all)
#TODO Should have an option to autoreset written bits
#TODO Should frist write all output groups, then wait before reading
#TODO For bool values written support options :only_re, :only_fe, :always, :only_change
#TODO Must support buffering of writes for bool and int values :only_change/:always
- write_wrapper(@bool_outputs, scangroup) do |group|
- values = @bool_outputs.values_at(group).map {|v| v ? 1 : 0})
- @connection.write_multiple_coils(group.first, values)
+ #FIXME Bug since values must be read from the signals in main thread, FIXED, TEST!
+ values_generator = Proc.new do |group|
+ @bool_outputs.values_at(group).map {|v| v ? 1 : 0})
+ end
+ update_helper(@bool_outputs, filter, :consecutive_groups, values_generator) do |group, conn, values|
+ conn.write_multiple_coils(group.first, values)
end
- write_wrapper(@word_outputs, scangroup) do |group|
- @connection.write_multiple_registers(group.first, @word_outputs.values_at(group))
+ values_generator = Proc.new do |group|
+ @bool_outputs.values_at(group)
+ end
+ update_helper(@word_outputs, filter, :consecutive_groups, values_generator) do |group, conn, values|
+ conn.write_multiple_registers(group.first, values)
end
- read_wrapper(@bool_inputs, scangroup) do |group|
- values = @connection.read_multiple_coils(group.first, group.count)
+ update_helper(@bool_inputs, filter, :block_groups) do |group, connection|
+ values = connection.read_multiple_coils(group.first, group.count)
Scheduler.at_once do
@bool_inputs.bound(group.first, group.last) do |address, entry|
entry.signal.v = (values[address - group.first] == 0 ? false : true)
@@ -100,16 +94,11 @@ def update(scangroup = :all)
end
end
- read_wrapper(@word_inputs, scangroup) do |group|
- values = @connection.read_multiple_registers(group.first, group.count)
+ update_helper(@word_inputs, filter, :block_groups) do |group, connection|
+ values = connection.read_multiple_registers(group.first, group.count)
Scheduler.at_once do
@word_inputs.bound(group.first, group.last) do |address, entry|
- #TODO Only supporting :int as yet
- case entry.format
- when :int
- entry.signal.v = values[address - group.first]
- else
- throw RuntimeError.new 'Unsupported format :%s' % entry.format.to_s
+ handle_word_data(address - group.first, entry, values)
end
end
end
@@ -117,30 +106,45 @@ def update(scangroup = :all)
private
- def write_wrapper(entries, filter, &action) do
- consecutive_groups(filter_scangroup(entries, filter)).each do |group|
- perform_in_background do
- yield group
- end
+ def handle_word_data(address, entry, raw_data)
+ #TODO Only supporting :int as yet
+ case entry.format
+ when :int
+ entry.signal.v = values[address]
+ when :uint
+ entry.signal.v = values[address] & 0xffff
+ when :long
+ entry.signal.v = values.values_at(address..(address+1)).pack('u*').unpack('i')
+ when :float
+ entry.signal.v = values.values_at(address..(address+1)).pack('u*').unpack('g')
+ when :string
+ throw RuntimeError.new 'No :length supplied for string value' unless entry.length
+ entry.signal.v = values.values_at(address..(address + entry.length)).pack('u*')
+ else
+ throw RuntimeError.new 'Unsupported format :%s' % entry.format.to_s
end
end
- def read_wrapper(entries, filter, &action) do
- block_groups(filter_scangroup(entries, filter)).each do |group|
- perform_in_background do
- yield group
+ def update_helper(entries, filter, group_method, val_generator = nil) do
+ send(group_method, filter_scangroup(entries, filter)).each do |group|
+ values = val_generator && val_generator.call(group)
+ perform_in_background do |connection|
+ if values
+ yield group, connection, values
+ else
+ yield group, connection
+ end
end
end
end
-
def filter_scangroup(entries, filter)
- # TODO filters :inputs, :outputs, :standard (nil)
+ # TODO filters :inputs, :outputs, :standard (nil), :constants
case filter
when :all
entries
else
- MultiRBTree[entries.select {|en| en.scangroup == filter }]
+ MultiRBTree[entries.select {|en| en.filter == filter }]
end
end
@@ -191,19 +195,26 @@ def bool_input(address, options)
def word_input(address, options)
result = Machines::TimeDomain::AnalogSignal.new
@bool_inputs[number_from_address address] =
- ModbusEntry.new result, options[:format] || :int, options[:scangroup]
+ ModbusEntry.new result, options[:format] || :int, options[:scangroup], options[:length]
result
end
def bool_output(source, address, options)
- @bool_outputs[number_from_address address] =
- ModbusEntry.new source, :bool, options[:scangroup]
+ addr = number_from_address address]
+ if @bool_outputs.key? addr
+ throw RuntimeError.new 'Output on bool address %d is already in use' % addr
+ end
+ @bool_outputs[addr] = ModbusEntry.new source, :bool, options[:scangroup]
source
end
def word_output(source, address, options)
- @word_outputs[number_from_address address] =
- ModbusEntry.new source, options[:format] || :int, options[:scangroup]
+ addr = number_from_address address]
+ if @word.key? addr
+ throw RuntimeError.new 'Output on word address %d is already in use' % addr
+ end
+ @word_outputs[addr] =
+ ModbusEntry.new source, options[:format] || :int, options[:scangroup], options[:length]
source
end
@@ -221,6 +232,25 @@ def type_from_address(address)
throw RuntimeError.new 'Invalid format for address, use :mw1 or :m1'
end
end
+
+ def communication_thread_start
+ while true
+ begin
+ connection = RModbus::TCPClient.new @opts[:host], @opts[:port], @opts[:slave_address]
+ @thread_tasks[:queue].synchronize do
+ @thread_tasks[:cond].wait_while { @thread_tasks[:queue].empty? }
+ begin
+ @thread_tasks[:queue].shift.call(connection)
+ rescue Exception => ex
+ notify_exception ex
+ end
+ end
+ rescue Exception => ex
+ notify_exception ex
+ sleep(10.0 + 5.0 * rand)
+ end
+ end
+ end
end
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.