Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serial port reconnection logic (Closes #1213) #1222

Merged
merged 3 commits into from Jan 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -157,7 +157,6 @@ public void initialize() {
} else {
healTime = -1;
}
initializeHeal();

// We must set the state
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, ZWaveBindingConstants.OFFLINE_CTLR_OFFLINE);
Expand Down Expand Up @@ -190,6 +189,33 @@ protected void initializeNetwork() {
}
}

initializeHeal();

}

/**
* Common stopping point for all ZWave controllers.
* Called by bridges when their interfaces are unavailable.
*/
protected void stopNetwork() {
logger.debug("Stopping ZWave network");
if (healJob != null) {
healJob.cancel(true);
healJob = null;
}

ZWaveController controller = this.controller;
if (controller != null) {
this.controller = null;
synchronized (listeners) {
for (ZWaveEventListener listener : listeners) {
controller.removeEventListener(listener);
}
}
controller.removeEventListener(this);
controller.shutdown();
}
logger.debug("ZWave network stopped");
}

private void initializeHeal() {
Expand Down
216 changes: 154 additions & 62 deletions src/main/java/org/openhab/binding/zwave/handler/ZWaveSerialHandler.java
Expand Up @@ -15,9 +15,16 @@
import static org.openhab.binding.zwave.ZWaveBindingConstants.*;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Optional;
import java.util.TooManyListenersException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.eclipse.jdt.annotation.NonNull;
import org.apache.commons.io.IOUtils;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.core.library.types.DecimalType;
import org.eclipse.smarthome.core.thing.Bridge;
import org.eclipse.smarthome.core.thing.ChannelUID;
Expand All @@ -44,15 +51,22 @@
*
* @author Chris Jackson - Initial contribution
*/
@NonNullByDefault
public class ZWaveSerialHandler extends ZWaveControllerHandler {

private static final int SERIAL_RECEIVE_TIMEOUT = 250;
private static final long WATCHDOG_INIT_SECONDS = 5;
private static final long WATCHDOG_CHECK_SECONDS = 30;

private final Logger logger = LoggerFactory.getLogger(ZWaveSerialHandler.class);

private SerialPortManager serialPortManager;
private final SerialPortManager serialPortManager;

private String portId;
private String portId = "";
bodiroga marked this conversation as resolved.
Show resolved Hide resolved

private org.eclipse.smarthome.io.transport.serial.SerialPort serialPort;
private @Nullable SerialPort serialPort;
private @Nullable InputStream inputStream;
private @Nullable OutputStream outputStream;

private int SOFCount = 0;
private int CANCount = 0;
Expand All @@ -61,15 +75,10 @@ public class ZWaveSerialHandler extends ZWaveControllerHandler {
private int OOFCount = 0;
private int CSECount = 0;

private static final int SERIAL_RECEIVE_TIMEOUT = 250;

private ZWaveReceiveThread receiveThread;

public ZWaveSerialHandler(Bridge bridge) {
super(bridge);
}
private @Nullable ZWaveReceiveThread receiveThread;
private @NonNullByDefault({}) ScheduledFuture<?> watchdog;

public ZWaveSerialHandler(@NonNull Bridge thing, SerialPortManager serialPortManager) {
public ZWaveSerialHandler(Bridge thing, SerialPortManager serialPortManager) {
super(thing);
this.serialPortManager = serialPortManager;
}
Expand All @@ -80,49 +89,79 @@ public void initialize() {

portId = (String) getConfig().get(CONFIGURATION_PORT);

if (portId == null || portId.length() == 0) {
logger.debug("ZWave port is not set.");
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.CONFIGURATION_ERROR,
ZWaveBindingConstants.OFFLINE_PORT_UNSET);
return;
super.initialize();
watchdog = scheduler.scheduleWithFixedDelay(this::watchSerialPort, WATCHDOG_INIT_SECONDS,
WATCHDOG_CHECK_SECONDS, TimeUnit.SECONDS);

}

/**
* Gets a serial port identifier for a given name.
* Workaround for getIdentifier in SerialPortManager class, because it doesn't detected correctly that the device is
* unplugged.
*
* @param the name
* @return a serial port identifier or null
*/
private @Nullable SerialPortIdentifier getSerialPortIdentifier(final String name) {
bodiroga marked this conversation as resolved.
Show resolved Hide resolved
Optional<SerialPortIdentifier> opt = serialPortManager.getIdentifiers().filter(id -> id.getName().equals(name))
.findFirst();
if (opt.isPresent()) {
return opt.get();
} else {
return null;
}
}

super.initialize();
logger.info("Connecting to serial port '{}'", portId);
private void watchSerialPort() {
try {
SerialPortIdentifier portIdentifier = serialPortManager.getIdentifier(portId);
SerialPortIdentifier portIdentifier = getSerialPortIdentifier(portId);
if (portIdentifier == null) {
bodiroga marked this conversation as resolved.
Show resolved Hide resolved
if (serialPort != null) {
onSerialPortError(ZWaveBindingConstants.OFFLINE_SERIAL_NOTFOUND);
}
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
ZWaveBindingConstants.OFFLINE_SERIAL_NOTFOUND + " [\"" + portId + "\"]");
return;
}

if (serialPort != null) {
return;
}

logger.debug("Connecting to serial port '{}'", portId);
SerialPort commPort = portIdentifier.open("org.openhab.binding.zwave", 2000);
serialPort = commPort;
serialPort.setSerialPortParams(115200, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
SerialPort.PARITY_NONE);
serialPort.enableReceiveThreshold(1);
serialPort.enableReceiveTimeout(SERIAL_RECEIVE_TIMEOUT);
commPort.setSerialPortParams(115200, SerialPort.DATABITS_8, SerialPort.STOPBITS_1, SerialPort.PARITY_NONE);
commPort.enableReceiveThreshold(1);
commPort.enableReceiveTimeout(SERIAL_RECEIVE_TIMEOUT);
inputStream = commPort.getInputStream();
outputStream = commPort.getOutputStream();
logger.debug("Starting receive thread");
receiveThread = new ZWaveReceiveThread();
receiveThread.start();

ZWaveReceiveThread zWaveReceiveThread = new ZWaveReceiveThread();
receiveThread = zWaveReceiveThread;
zWaveReceiveThread.start();

// RXTX serial port library causes high CPU load
// Start event listener, which will just sleep and slow down event loop
serialPort.addEventListener(receiveThread);
serialPort.notifyOnDataAvailable(true);
commPort.addEventListener(zWaveReceiveThread);
commPort.notifyOnDataAvailable(false);

logger.info("Serial port is initialized");
logger.debug("Serial port is initialized");

initializeNetwork();
} catch (PortInUseException e) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
ZWaveBindingConstants.OFFLINE_SERIAL_INUSE + " [\"" + portId + "\"]");
onSerialPortError(ZWaveBindingConstants.OFFLINE_SERIAL_INUSE);
} catch (UnsupportedCommOperationException e) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
ZWaveBindingConstants.OFFLINE_SERIAL_UNSUPPORTED + " [\"" + portId + "\"]");
onSerialPortError(ZWaveBindingConstants.OFFLINE_SERIAL_UNSUPPORTED);
} catch (IOException e) {
onSerialPortError(ZWaveBindingConstants.OFFLINE_CTLR_OFFLINE);
} catch (TooManyListenersException e) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
ZWaveBindingConstants.OFFLINE_SERIAL_LISTENERS + " [\"" + portId + "\"]");
onSerialPortError(ZWaveBindingConstants.OFFLINE_SERIAL_LISTENERS);
} catch (RuntimeException e) {
logger.debug("Unexpected runtime exception during serial port initialized ", e);
onSerialPortError(ZWaveBindingConstants.OFFLINE_CTLR_OFFLINE);
}
}

Expand All @@ -131,6 +170,56 @@ public void initialize() {
*/
@Override
public void dispose() {
disposeReceiveThread();
disposeSerialConnection();

if (watchdog != null && !watchdog.isCancelled()) {
watchdog.cancel(true);
}
logger.debug("Stopped ZWave serial handler");

super.dispose();
}

/**
* Stops resources when serial port is disconnected.
*/
private void onSerialPortError(String errorMessage) {
stopNetwork();
disposeReceiveThread();
disposeSerialConnection();

updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
errorMessage + " [\"" + portId + "\"]");
}

/**
* Closes serial port connection
*/
private void disposeSerialConnection() {
logger.debug("Disposing serial connection");
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
inputStream = null;
}
if (outputStream != null) {
IOUtils.closeQuietly(outputStream);
outputStream = null;
}

if (serialPort != null) {
serialPort.removeEventListener();
serialPort.close();
serialPort = null;
}
logger.debug("Serial connection disposed");
}

/**
* Closes receive thread
*/
private void disposeReceiveThread() {
logger.debug("Disposing receive thread");
if (receiveThread != null) {
receiveThread.interrupt();
try {
Expand All @@ -139,13 +228,7 @@ public void dispose() {
}
receiveThread = null;
}
if (serialPort != null) {
serialPort.close();
serialPort = null;
}
logger.info("Stopped ZWave serial handler");

super.dispose();
logger.debug("Receive thread dispose");
}

@Override
Expand All @@ -158,21 +241,21 @@ public void handleCommand(ChannelUID channelUID, Command command) {
*/
private class ZWaveReceiveThread extends Thread implements SerialPortEventListener {

private static final int SOF = 0x01;
private static final int ACK = 0x06;
private static final int NAK = 0x15;
private static final int CAN = 0x18;

private final Logger logger = LoggerFactory.getLogger(ZWaveReceiveThread.class);

private static final int SEARCH_SOF = 0;
private static final int SEARCH_LEN = 1;
private static final int SEARCH_DAT = 2;

private int rxState = SEARCH_SOF;
private int messageLength;
private int rxLength;
private byte[] rxBuffer;

private static final int SOF = 0x01;
private static final int ACK = 0x06;
private static final int NAK = 0x15;
private static final int CAN = 0x18;

private final Logger logger = LoggerFactory.getLogger(ZWaveReceiveThread.class);
private byte @Nullable [] rxBuffer;

ZWaveReceiveThread() {
super("ZWaveReceiveInputThread");
Expand All @@ -195,9 +278,12 @@ public void serialEvent(SerialPortEvent arg0) {
*/
private void sendResponse(int response) {
try {
synchronized (serialPort.getOutputStream()) {
serialPort.getOutputStream().write(response);
serialPort.getOutputStream().flush();
if (serialPort == null) {
return;
}
synchronized (outputStream) {
outputStream.write(response);
outputStream.flush();
logger.trace("Response SENT {}", response);
}
} catch (IOException e) {
Expand Down Expand Up @@ -227,7 +313,10 @@ public void run() {
int nextByte;

try {
nextByte = serialPort.getInputStream().read();
if (serialPort == null) {
break;
}
nextByte = inputStream.read();
// logger.debug("SERIAL:: STATE {}, nextByte {}, count {} ", rxState, nextByte, rxLength);

// If byte value is -1, this is a timeout
Expand Down Expand Up @@ -343,17 +432,19 @@ public void run() {
}

}
} catch (Exception e) {
} catch (RuntimeException e) {
logger.warn("Exception during ZWave thread. ", e);
} finally {
logger.debug("Stopped ZWave thread: Receive");
if (serialPort != null) {
serialPort.removeEventListener();
}
}
logger.debug("Stopped ZWave thread: Receive");

serialPort.removeEventListener();
}
}

@Override
public void sendPacket(SerialMessage serialMessage) {
public void sendPacket(@Nullable SerialMessage serialMessage) {
byte[] buffer = serialMessage.getMessageBuffer();
if (serialPort == null) {
logger.debug("NODE {}: Port closed sending REQUEST Message = {}", serialMessage.getMessageNode(),
Expand All @@ -365,13 +456,14 @@ public void sendPacket(SerialMessage serialMessage) {
SerialMessage.bb2hex(buffer));

try {
synchronized (serialPort.getOutputStream()) {
serialPort.getOutputStream().write(buffer);
serialPort.getOutputStream().flush();
synchronized (outputStream) {
outputStream.write(buffer);
outputStream.flush();
logger.debug("Message SENT");
}
} catch (IOException e) {
logger.warn("Got I/O exception {} during sending. exiting thread.", e.getLocalizedMessage());
onSerialPortError(ZWaveBindingConstants.OFFLINE_CTLR_OFFLINE);
}
}
}