# Multitasking  and Instrumentation Data I/O

J. M. Hughes. Real World Instrumentation with Python, O'Reilly Media, December 2010, ISBN 978-0-596-80956-0

* CHAPTER 11 Instrumentation Data I/O

  * Data I/O: Acquiring and Writing Data



## 1 Blocking Versus Nonblocking Calls 

Now it’s time to introduce some concepts that you will need to use later to build robust and reliable software. We’ll start with a discussion of blocking and nonblocking function calls, and then take a look at some basic techniques for handling errors.

One way to describe the behavior of a function or method is `in terms of how quickly it will return after it has been invoked`. Some only return `after` a result of some type is obtained, while others may return `immediately` without waiting for something else downstream to produce a particular response.

In other words, functions is

* **blocking** : the calling code must **wait** for a response 

* **nonblocking** : the call returns **immediately**, usually with a response that indicates success or failure

Actually, all software functions (and methods, too) can be classified as either `blocking or nonblocking`, and the majority of functions within a typical software application are of the blocking variety—that is, they don’t return until the intended action is complete or an error is detected. 

### 1.1 Blocking 

You can see this in the **message sequence chart (MSC)(消息序列图)** shown in `Figure 11-9`.

Here we have `Function1()` calling `Function2()`, which in turn calls `Function3()` and finally `Function4()`. The time required for `Function1()` to receive a response from `Function2()` is dependent on how long it takes for functions 2, 3, and 4 to complete their processing and return. During this entire time, **Function1()** is blocked.(In an MSC diagram, events in a function or process occur in a top-to-bottom order,and transactions between functions or processes are the horizontal lines.)

![data-io-msc](./img/data-io-msc.jpg)

>**Message sequence charts**
The message sequence chart (MSC) is defined by the guidelines document Z.120, maintained by the `International Telecommunication Union (ITU)`. In its current form, an MSC is a powerful tool for modeling command-response transactions between multiple entities. The UML equivalent of the MSC is the **sequence diagram**

**Blocking** allows functions to maintain **synchronization(同步) and honor the intended flow of execution** through the code. The action or data that the call is requesting may or may **not be available** at the time the call is made, so a `blocking` call will `wait` for the other end to respond in some fashion before returning to the caller. As a side effect, it will also effectively `suspend your application` until it returns.



#### 1.1.1 Blocking IO

**psutil**  https://github.com/giampaolo/psutil

* Cross-platform lib for process and system monitoring in Python 

In [None]:
import time
import psutil
import datetime

def io_cpu_percent():
    time.sleep(2)
    return psutil.cpu_percent()

def io_mem_percent():
    time.sleep(2)
    return psutil.virtual_memory().percent

def io_sensors_battery_percent():
    time.sleep(2)
    return psutil.sensors_battery().percent

tagfuncs={"CPU_PERCEN": io_cpu_percent,
           "MEM_PERCENT": io_mem_percent,
           "BAT_PERCENT": io_sensors_battery_percent
          }

print("Begin of Submitting the monitorings")
for tag in tagfuncs.keys():
    str_curtime=datetime.datetime.now().strftime('%H:%M:%S')
    print('\n\tBegin of {} at {}'.format(tag,str_curtime))
    value=tagfuncs[tag]()
    print("\t",tag,value)
    str_curtime=datetime.datetime.now().strftime('%H:%M:%S')
    print('\tEnd of {} at {}'.format(tag,str_curtime))

print("\nEnd of Submitting he monitorings")

In the following diagram, the three `tasks` are represented as `boxes`. The time spent by the  <b style="color:orange">CPU processing and submitting the request</b> is in  <b style="color:orange">orange</b>  while the  <b style="color:blue">waiting</b> times are in <b style="color:blue">blue</b>. You can see how most of the time is spent waiting for the resources while our machine sits
idle without doing anything else:

![three_tasks](./img/three_tasks.jpg)

#### 1.1.2 The Simple Monitor using Matplotlib.Plot


**Deques(双向队列)**
https://docs.python.org/3.7/library/collections.html#collections.deque

Deques are a generalization of stacks and queues (the name is pronounced “deck” and is short for “**double-ended queue**”). Deques support thread-safe, memory efficient appends and pops from **either side of the deque** with approximately the same O(1) performance in either direction.

```python
from collections import deque

```
**matplotlib.animation.FuncAnimation**

https://matplotlib.org/api/_as_gen/matplotlib.animation.FuncAnimation.html#matplotlib.animation.FuncAnimation

Makes an animation by repeatedly calling a function **func**.

```python
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
```
**matplotlib.pyplot.table**

https://matplotlib.org/api/_as_gen/matplotlib.pyplot.table.html

Add a table to the current axes.


**Test**

Set `interval=1000`
```python
ani = FuncAnimation(fig, update,init_func=init, blit=True,interval=1000)
```

1. The times of Blocking IO > the `interval=1000`

```python
time.sleep(2)
```

2. The times of Blocking IO < the `interval=1000`

```python
time.sleep(0.1)
```    

In [4]:
%%file ./code/concurrency/demo_blocking_io_matplotlib.py
import time
from collections import deque
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import psutil

def virtual_interface_data(tag):
    # time.sleep(0.1)
    tagfuncs={"CPU_PERCENT": psutil.cpu_percent(),
               "MEM_PERCENT": psutil.virtual_memory().percent,
               "BAT_PERCENT": psutil.sensors_battery().percent} 
    try:              
        value= tagfuncs[tag]
        rc=1    
    except:
        rc,value=0,None  
    return (rc,value)        
            
tag="CPU_PERCENT"
y = deque()

columns = ()
col_labels = ['Tag', 'Unit', 'Value']
table_vals = [[tag,"%",""]]


fig, ax = plt.subplots()
ax.set_title("The Simple Monitor:"+tag)
ln, = plt.plot([], [], 'b-o')
str_cursecond=str(time.localtime(time.time()).tm_sec)   
time_text = ax.text(0.5, 80, "")

tbl = ax.table(cellText=table_vals,
               colLabels=col_labels,
               colWidths=[0.2] * 3,
               cellLoc='center',
               loc='best')

def init():
    ax.set_xlim(0, 9)
    ax.set_ylim(0, 100)
    return ln,

def update(frames):
    rc,value = virtual_interface_data(tag)
    if len(y) < 10:
        y.append(value)
    else:
        y.popleft()
        y.append(value)

    str_curtime=time.strftime("%F %H:%M:%S", time.localtime(time.time()))
    time_text.set_text("Time:"+str_curtime)
    
    table_vals = [[tag,"%",str(value)]]
    tbl = ax.table(cellText=table_vals,
               colLabels=col_labels,
               colWidths=[0.2] *3,
               cellLoc='center',
               loc='best')

    ln.set_xdata(np.arange(len(y)))
    ln.set_ydata(np.array(y))
    return ln,time_text, tbl

ani = FuncAnimation(fig, update,init_func=init, blit=True,interval=1000)
plt.show()


Writing ./code/concurrency/demo_blocking_io_matplotlib.py


#### 1.1.3 Blocking IO with Timeout

**The type of blocking** we’re most interested in is when an application process is forced to **wait for an interface**, which in turn waits for a `hardware device to respond`. 

This is shown in Figure 11-10. Notice that there is a **timer** symbol in this diagram. 

This means that if the **hardware does not respond**  within some preset period of time, the interface process will **terminate and return an error**

![data-io-io](./img/data-io-io.jpg)

In some cases it `may not matter` if a blocking call `waits for a bit` before returning to the caller, and allowing this is more convenient than writing the necessary code to support continual query and retry actions.

But, there is a **warning** in order here:

* when working with I/O devices, **a blocking call without a timeout of some sort** can potentially **hang forever**. 

This is usually a bad thing, and often the only way to get out of the situation is to shut down Python and restart the application. If your code is running on an unattended machine somewhere in the middle of nowhere, a fault that hangs a blocking call can be really, really bad.

---

One way to deal with this is to use **nonblocking** function calls. This entails some `additional` code, but it’s very useful when dealing with network communications and data acquisition. 

To implement concurrency, it is necessary to think and code differently; in the following sections, we'll demonstrate techniques and best practices to implement **robust concurrent applications**.


### 1.2 Non-blocking call IO 

We’ll look at some ways to realize **nonblocking** function using **Concurrency(并发) Programming**.

#### 1.2.1  Process and Thread

When you start a program, the operating system creates a new **process(进程)** in which the program is executed. A process consists of one or more **threads(线程)**. Each thread is a partial process that executes a sequence of instructions **independently** of other parts of the process.

When the process begins, its **main thread** is active. From then on, any running thread can launch other threads.

Every process has its own address space in memory, and has other exclusive resources, such as open files. All the threads of a process inherit its resources. Most significantly,several **threads** in one process `share` the **same address space.**

Different tasks are performed simultaneously by the **concurrent execution** of parts of the program. Especially on modern multiprocessor systems — including multicore processors,of course — it is increasingly important for programs to take advantage of concurrency to use the system’s resources efficiently.

#### 1.2.2 Non-Blocking IO using Thread

The threading module provides APIs for managing several threads of execution, which allows a program to run multiple operations **concurrently** in the same process space.

* [threading — Thread-based parallelism](https://docs.python.org/3/library/threading.html)

Threads can be used to improve the responsiveness of applications that accept user input while other tasks run in the background. 

* A related use case is running I/O in parallel with computations in another thread.

The simplest way to use a **Thread** is to instantiate it with a **target** function and call **start()** to let it begin working


In [None]:
import threading
import time
import psutil
import datetime

def virtual_interface_data(tag):
    time.sleep(2)
    tagfuncs={"CPU_PERCENT": psutil.cpu_percent(),
                   "MEM_PERCENT": psutil.virtual_memory().percent,
                   "BAT_PERCENT": psutil.sensors_battery().percent} 
    try:              
        value= tagfuncs[tag]
        rc=1    
    except:
        rc,value=0,None  
    return (rc,value)        

def nonblocking_io_worker(tag):
    """thread worker function"""
    str_curtime=datetime.datetime.now().strftime('%H:%M:%S.%f')
    print('\n\tBegin of {} at {}'.format(tag,str_curtime),end="")
    value=virtual_interface_data(tag)
    print("\n\n\t",tag,value)
    str_curtime=datetime.datetime.now().strftime('%H:%M:%S.%f')
    print('\tEnd of {} at {}'.format(tag,str_curtime),end="")

tags=["CPU_PERCENT","MEM_PERCENT","BAT_PERCENT"]    
print("Begin of Submitting tags")
for tag in tags:
    t = threading.Thread(target=nonblocking_io_worker,args=(tag,))
    t.start()
print("\n\nEnd of Submitted  tags")    

In the following figure, you can see that as soon as we submit our request `virtual_interface_data(tag1)` in one worker thread, we can start preparing for next  worker thread of  `virtual_interface_data(tag2)` and so on. 

This allows us to reduce the CPU waiting time and to start processing the results as soon as they become available:

![three_tasks_concurrency](./img/three_tasks_concurrency.jpg)

>The time spent by the  <b style="color:orange">CPU processing and submitting the request</b> is in  <b style="color:orange">orange</b>  while the  <b style="color:blue">waiting</b> times are in <b style="color:blue">blue</b>.


#### 1.2.3 Blocking IO with timeout using concurrent.futures

concurrent.futures

* [concurrent.futures — Launching parallel tasks](https://docs.python.org/3/library/concurrent.futures.html)

The `concurrent.futures` module provides interfaces for running tasks using pools of thread or process workers. The APIs are the same for both options, so applications can switch between threads and processes with minimal changes.

The module provides two types of classes for interacting with the pools.

* **Executors** are used for managing pools of workers, 

* **Futures** are used for managing results computed by the workers.

To use a pool of workers, an application creates an instance of the appropriate **executor** class and then submits tasks for it to run. When each task is started, a **Future** instance is returned.

When the result of the task is needed, an application can use the **Future** to block until the result becomes available. 

Various APIs are provided that make it convenient to wait for tasks to complete, so the Future objects do not need to be managed directly.

In [1]:
%%file  ./code/concurrency/io_timeout.py 
from concurrent.futures import ThreadPoolExecutor
import time
import psutil

def virtual_interface_data(tag):
    time.sleep(1)
    monitoringios={"CPU_PERCENT": psutil.cpu_percent(),
                   "MEM_PERCENT": psutil.virtual_memory().percent,
                   "BAT_PERCENT": psutil.sensors_battery().percent} 
    try:              
        value= monitoringios[tag]
        rc=1    
    except:
        rc,value=0,None  
    return (rc,value)        

def get_data_with_timeout(timeout=1,tag=None):
    pool=ThreadPoolExecutor(max_workers=1)
    future = pool.submit(virtual_interface_data,tag)
    try: 
        rc,value = future.result(timeout=timeout)
    except TimeoutError:
        rc = 0
        value = None
    except Exception as error:
        rc = 0
        value = None
    return (rc, value)


Overwriting ./code/concurrency/io_timeout.py


In [None]:
import sys  
sys.path.append('./code/concurrency/')  
from io_timeout import get_data_with_timeout

if __name__ == '__main__':
    timeouts=[1,2,3]
    tags=["CPU_PERCENT","MEM_PERCENT","BAT_PERCENT"]
    str_curtime=datetime.datetime.now().strftime('%H:%M:%S')
    for i in range(len(tags)):
        rc,value = get_data_with_timeout(timeouts[i],tags[i])
        print(rc,value)

## 2 Data I/O Methods

Now that we’ve seen what blocking and nonblocking functions entail, let’s look at how these concepts are involved with various operational modes of interface I/O. We’ll start with the simplest form, **on-demand I/O**（请求式I/O）, then proceed to **polled I/O**(轮询I/O）, and finally take a quick a look at **multithreaded I/O.**（多线程I/O）

### 2.1 On-demand data I/O 

As I stated earlier, the two most obvious ways to move data into or out of your application are just a matter of reading from or writing to a port or device. 

When sending (writing) data using a serial (RS-232 or RS-485) or GPIB-type interface, there usually is `no need to worry about the use of a blocking call`. In the case of an RS-232 interface that does not use hardware handshaking, the data is sent out through the hardware port `immediately`. 

An RS-485 interface with `a single master and multiple listeners` should never block on a write by the master device, but the listeners may be unresponsive for a period of time. GPIB can also get into a situation where there are no listeners responding to the sender, but most GPIB interface APIs and the associated hardware can detect this and return an error code. 

Writing to a hardware interface API for a device such as a PCI interface card is usually not a problem in terms of blocking, but the call might still return an error code if something is amiss.

If your software uses `on-demand calls` to read data, they should be `blocking calls`, and your software should always check the return codes. If `timeout parameters` are available for a blocking function call they should definitely be used, but not every API provides blocking calls with timeouts (perhaps it was assumed that a timeout couldn’t possibly happen). 

For those situations you’ll need to use a **nonblocking** version of the API function and employ a different approach to **implement a `timeout` in your own software**.

In [None]:
import sys  
sys.path.append('./code/concurrency/')  
from io_timeout import get_data_with_timeout

timeout=3
tag="CPU_PERCENT"
rc,value=get_data_with_timeout(timeout,tag)
print(rc,value)

### 2.2 Polled data I/O 

**A `nonblocking` call will return `immediately`**, and its return code or return value will (hopefully) let the caller know whether or not it succeeded. A nonblocking call can be used to `avoid an I/O hang`, but it requires more code to support it. 

For example, let’s assume that the API we’re using has both blocking and nonblocking versions of I/O functions to read data from a device, or perhaps that the I/O functions have a parameter that can be set to control blocking. You can then put a nonblocking call into a loop that also checks for a timeout, like this:

In [None]:
import time
import sys  
sys.path.append('./code/concurrency/')  
from io_timeout import get_data_with_timeout

def GetData(timeout,tag):
    checking = True
    tstart = time.time()
    while checking:
        rc,value = get_data_with_timeout(timeout,tag)
        if rc == 1:
            break
        else:
            time.sleep(0.05) # wait 50 ms between checks
    return rc, value

timeout=3
tag="CPU_PERCENT"
rc,value=GetData(timeout,tag)
print(rc,value)

This is an example of **polling**: this function will attempt to get data from a specific data acquisition device by continually polling the port (using the **`get_data_with_limit()`** function call) until valid data appears. 

In between each read attempt it will `sleep` for 50 milliseconds. The delay is mainly for the benefit of the device being read, as many devices can’t tolerate being `hammered continuously for data`. 

In order to actually have a polling function that doesn’t cause the rest of an application to **suspend** while it’s active, you need to use **a thread**.

### 2.3  Acquiring data using a thread 

So far we’ve looked at `on-demand` and `polled` data I/O. Now let’s take a quick look at how we might check for incoming data `without bogging down the entire system` in a continuous polling loop. 



There is one API function `GetData()`. It is assumed that these exist as part of the API for the data acquisition hardware, and they do what their names imply. Also, the type of data being acquired isn’t specified, primarily because it doesn’t really matter for this example. It could be anything, just so long as the specified number of samples are acquired and no errors occur:

In [None]:
import time
import threading
import sys  
sys.path.append('./code/concurrency/')  
from io_timeout import get_data_with_timeout

class AcqData:
    
    def __init__(self, timeout,tag):
        self.timeout = timeout
        self.tag=tag
        self.dvals = [] # list for acquired data values
        self.dsamps = 0 # number of values actually read
        self.get_rc = 0 # 0 is OK, negative value is an error
        self.get_done = False # True if thread is finished
   
    def _get_data(self, numsamples):
        cnt = 0
        acqfail = False
        
        while not acqfail:
            self.get_rc, dataval = GetData(self.timeout,self.tag)
            if self.get_rc == 1:
                self.dsamps = cnt + 1
                self.dvals.append(dataval)
                cnt += 1
                if cnt >= numsamples:
                    break
            else:
                acqfail = True
        
        self.get_done = True

    def StartDataSamples(self, samplecnt):
        try:
            acq_thread = threading.Thread(target=self._get_data,args=(samplecnt,))
            acq_thread.start()
        except Exception as e:
            print("Acquire fault: {}".format(str(e)))

    def GetDataSamples(self):
        if self.get_done == True:
            return (self.get_rc, self.dsamps, self.dvals)
        else:
            return (None, 0, 0)

This bit of code uses `a thread`, in the form of the function `_get_data()`, to continuously read the external device to obtain some number of data samples. Notice that `the hypothetical API function GetData()` supports the use of a `timeout` parameter, and we can assume that it will return an error code if a timeout does occur.

The key thing in this simple example is 

* **how we can check to see if the data acquisition is complete**.

The accessor function `GetDataSamples()` checks the variable `self.get_done` to determine if the thread has finished. 

* If so, `GetDataSamples()` will return the data collected.

* If the thread is still running, it will return a 3-tuple with the first item set to `None`.It is up to the caller to determine if the sample count returned matches the sample count requested.

In [None]:
timeout=3
tag="MEM_PERCENT"
AcqDataClient= AcqData(timeout,tag)

samplecnt=3
AcqDataClient.StartDataSamples(samplecnt)

while True:
    get_rc,dsmaps,dvals=AcqDataClient.GetDataSamples()
    print("AcqData ON -- ")
    time.sleep(1)
    if get_rc is not None:
        print(get_rc,dsmaps,dvals)
        break

### 2.4 Acquiring data using Queue

It is often encountered when working with MultiThreads: safely **communicate or exchange data** between them.

One such interaction is the **producer/consumer** relationship. 

Perhaps the safest way to send data from one thread to another is to use a **Queue** from the **queue** library. To do this, you create a Queue instance that is shared by the threads.

Threads then use `put()` or `get(`) operations to add or remove items from the `queue`. 

>The principal challenge of multi-threaded applications is coordinating threads that **share data or other resources**. To that end, the threading module provides a number of synchronization primitives including locks, events, condition variables, and semaphores.
While those tools are powerful, minor design errors can result in problems that are difficult to reproduce.
>
>So, the preferred approach to task coordination is to concentrate all access to a resource in a single thread
and then use the queue module to feed that thread with requests from other threads. Applications using
Queue objects for inter-thread communication and coordination are easier to design, more readable, and
more reliable.

#### 2.4.1 Queue

**Queue** instances already have all of the required `locking`, so they can be safely shared by as many threads as you wish.

>[queue — A synchronized queue class](https://docs.python.org/3/library/queue.html)
>The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads

When using `queues`, it can be somewhat tricky to coordinate the shutdown of the producer and consumer. A common solution to this problem is to rely on a special sentinel value, which when placed in the queue, causes consumers to terminate.

If a thread needs to know **immediately** when a consumer thread has processed a particular item of data, 

you should pair the sent data with an **Event** object that allows the producer to monitor its progress. 

**Event**
```python
evt = Event()
evt.wait()
```

```python
rcvalue, evt = in_q.get()
evt.set()
```

**join() Method**

```python
join(timeout=None)
```
Wait until the thread terminates. This blocks the calling thread until the thread whose `join()` method is called terminates

* either normally or through an unhandled exception 

* until the optional timeout occurs.


In [None]:
from queue import Queue
from threading import Thread, Event
import time
import sys  
sys.path.append('./code/concurrency/')  
from io_timeout import get_data_with_timeout

def DataProducer(out_q,timeout,tag):
    checking = True
    while checking:
        (rc,value) = get_data_with_timeout(timeout,tag)
        if rc == 1:
            # Make an (data, event) pair and hand it to the consumer
            evt = Event()
            out_q.put(((rc,value), evt))
            # Wait for the consumer to process the item
            evt.wait()
            break
        else:
            time.sleep(0.05) # wait 50 ms between checks

# A thread that consumes data
def DataConsumer(in_q):
    while True:
        # Get some data
        rcvalue, evt = in_q.get()
        print(rcvalue[0],rcvalue[1])
        # Indicate completion
        evt.set()

# Create the shared queue and launch both threads
q = Queue()
timeout=3
tag="BAT_PERCENT"
t1 = Thread(target=DataConsumer, args=(q,))
t2 = Thread(target=DataProducer, args=(q,timeout,tag))
t1.start()
t2.start()
# Wait for all produced items to be consumed
q.join()

#### 2.4.2 Timer and Callback

**Timer Objects**

This class represents an action that should be run only after a certain amount of time has passed — a timer. `Timer` is a subclass of `Thread` and as such also functions as an example of creating custom threads.

Timers are started, as with threads, by calling their `start(`) method. The timer can be stopped (before its action has begun) by calling the `cancel()` method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.

**Callback**

If we want to write the function in a **non-blocking** way, we can use the `threading.Timer` class.

We can initialize a `threading.Timer` instance by passing the amount of time we want to `wait and a callback`. 

A `callback` is simply a function that will be called when the timer expires. Note that we have to also call the `Timer.start` method to activate the timer:

In [None]:
from queue import Queue
from threading import Thread,Timer
import time
import sys  
sys.path.append('./code/concurrency/')  
from io_timeout import get_data_with_timeout

def PeriodDataProducer(delay,tag,out_q):
    timeout=delay
    rc,value= get_data_with_timeout(timeout,tag)
    if rc == 1:
        out_q.put((rc,value))
    else:
        pass
    t=Timer(delay, PeriodDataProducer,(delay,tag,out_q))
    t.start()

def DataConsumer(in_q):
    while True:
        rcvalue = in_q.get()
        print(rcvalue[0],rcvalue[1])
        
q = Queue()
tag="BAT_PERCENT"
delay=2
p=PeriodDataProducer(delay,tag,q)
c= Thread(target= DataConsumer, args=(q,)) 
c.start()
q.join()

### 2.5 Reactive programming for Acquiring Data

**Reactive programming(响应式编程)** is a paradigm that aims at building better concurrent systems. Reactive applications are designed to comply with the requirements exemplified by the reactive manifesto:

* **Responsive**: The system responds **immediately** to the user.

* **Elastic**: The system is capable of handling different levels of load and is able to adapt to accommodate increasing demands.

* **Resilient**: The system deals with failure gracefully. This is achieved by modularity and avoiding having a single point of failure.

* **Message driven**: The system should not block and take advantage of `events` and `messages`. A message-driven application helps achieve all the previous requirements.

As you can see, the intent of reactive systems is quite noble, but how exactly does reactiveprogramming work? In this section, we will learn about the principles of reactive programming using the **RxPy** library.

```
python -m pip install rx
```

>The RxPy library is part of [ReactiveX](http://reactivex.io), which is a project that implements reactive programming tools for a large variety of languages.

```python
from nonblocking_io_timeout_thread import get_data_with_timeout
```

Set `Delay=1`
````python
 rc, value = get_data_with_timeout(delay,tag)
```
Test

1. time.sleep < timeout

```python
def virtual_interface_data(tag):
    time.sleep(0.1)
```
2. time.sleep > timeout

```python
def virtual_interface_data(tag):
    time.sleep(2)
```


In [2]:
%%file  ./code/concurrency/demo_io_timeout_reactive_matplotlib.py
from rx import Observable
import numpy as np
import matplotlib.pyplot as plt
import time
import sys  
sys.path.append('./code/concurrency/')  
from io_timeout import get_data_with_timeout

intervalTime=100
delay=1
tag="CPU_PERCENT"
cpu_data = (Observable
            .interval(intervalTime) 
            .map(lambda rc,value: get_data_with_timeout(delay,tag))
            .publish())

cpu_data.connect()

def monitor_cpu(npoints):
    plt.figure()
    plt.title("The Simple CPU Percent Monitor")
    lines, = plt.plot([], [],"b-o")
    time_text = plt.text(0.5, 80, "")
    plt.xlim(0, npoints-1)
    plt.ylim(0, 100)
    
    columns = ()
    col_labels = ['Tag', 'Unit', 'Value']
    table_vals = [[tag,"%",""]]
    
    tbl = plt.table(cellText=table_vals,
               colLabels=col_labels,
               colWidths=[0.2] * 3,
               cellLoc='center',
               loc='best')

    cpu_data_window = cpu_data.buffer_with_count(npoints, 1)
    
    def update_plot(cpu_readings):
        lines.set_xdata(np.arange(len(cpu_readings)))
        lines.set_ydata(np.array(cpu_readings)[:,1])
        str_curtime=time.strftime("%F %H:%M:%S", time.localtime(time.time()))  
        
        #if np.array(cpu_readings)[-1,1] is None: 
         #   str_cursecond=str_cursecond+" (Timeout)"
        
        time_text.set_text("Time:"+str_curtime)
        
        table_vals = [[tag,"%",str(np.array(cpu_readings)[:,1][-1])]]
        tbl = plt.table(cellText=table_vals,
               colLabels=col_labels,
               colWidths=[0.2] * 3,
               cellLoc='center',
               loc='best')
         
        plt.draw()
    
    cpu_data_window.subscribe(update_plot)
    plt.show()

if __name__ == '__main__':
    monitor_cpu(10)


Writing ./code/concurrency/demo_io_timeout_reactive_matplotlib.py


## 3 Handling Data I/O Errors

No matter how unlikely it may seem, errors can still happen, especially when dealing with interfaces to the real world. They might be the result of spurious noise on a serial interface, an out-of-range voltage level on an analog input, or a fault in an external instrument. How the software detects and handles errors is directly related to its robustness. Another way to put it would be to say that robust software tends to exhibit a high degree of fault tolerance.

For a system (be it software, hardware, or a combination of the two) to be called **faulttolerant** implies that it has the ability to detect a fault condition, take action to correct or bypass the fault, and continue to function (perhaps at a reduced level of functionality)
instead of just crashing or abruptly halting. The ability to `continue to function at reduced levels of capability` in the presence of an increasing level of errors is called **graceful degradation**(优雅降级). Of course, if the errors continue to mount, at some point the system will eventually come to a halt, but the idea is that it will do so after giving ample notice and it will `not do it in a catastrophic fashion`.

The reality is that there are almost always faults, and most things will eventually break
or wear out. How much planning you should do for the mostly likely faults and the
resulting errors is largely down to how much of a problem a failure will create. It might
be insignificant (just ignore it and move on), or it could be a really big deal (something
might explode, catch fire, or otherwise fail to stop an impending disaster). If you’ve
done your up-front planning, as discussed in Chapter 8, you should be able to identify
the nastiest scenarios and give some thought to how your system might deal with them
should they arise.

### 3.1 Classes of errors

Errors can be grouped into two broad categories: **nonfatal** and **fatal**. A nonfatal error might be something like an intermittent communications channel, perhaps due to noise or other perturbations in the medium, or someone’s foot occasionally kicking a connector
under a desk. Depending on the speed of the system and the duration of the failure, it may be possible to continue operation without adverse effects until communications can be reestablished. Another example might be an instrument that occasionally does not respond in a timely fashion, for whatever reason. If the command or query can be retried successfully with no ill effect, the error could be considered nonfatal.
(Note that nonfatal does not mean nonannoying!)。

A **fatal** error is one that requires significant intervention if the system is to continue functioning. Lacking that, it will need to perform a complete shutdown. An example of a fatal error would be the loss of control for the primary DC power supply used in an experiment. Unless there is a backup supply available that can automatically take over, the system will need to shut down until the problem can be resolved. Another example might be the failure of the control system for the liquid nitrogen supply used for the sorption pumps on a vacuum chamber, perhaps due to a failure in the control interface electronics, or a failure in the command communications channel. In either
case, the system will begin to lose vacuum and potentially damage things like ion gauges or sputter emitters. At the very least, the current activity should be stopped until the problem is resolved.

### 3.2 Error retry and system termination

Sometimes it may make sense to retry an operation if an error is detected, perhaps after altering a parameter to compensate for the error. While this might sound clever (and it can be), it’s not something that should be done without some serious consideration of the context, cause, and consequences of the error. Blithely attempting to retry a failed operation can sometimes cause serious damage.

The` more error-detection and self-recovery capabilities` one attempts to build into a system, the `more complicated` the system becomes. This is fairly obvious, to be sure, but what isn’t obvious is how that complexity will manifest, and the subsequent implications it might have, not only for a particular subsystem, but for the system as a whole. As complexity increases, so too does the chance of new defects being introduced. Increased complexity can also increase the number of possible execution paths in the software, some of which may be unintended.

**Figure 11-11** shows a scheme for handling a data I/O error in a **fault-tolerant** fashion. While this approach may not be suitable for every application, it does show why robust or fault-tolerant software tends to be **an order of magnitude (or more) `more expensive`  to implement than something that just does the I/O operation and returns either `pass` or `fail`**. This is particularly true when performing testing to verify the fault-tolerant behavior. In **Figure 11-11**, there are `three possible paths` that can be taken should an
error occur. In addition to the I/O operation itself, each of these paths must be tested by simulating the I/O and the error context. This rigorous testing involves a lot of work,but if you need that level of robustness there really is no other way to achieve it.

![fault-tolerant fashion](./img/data-io-fault-resistant.jpg)

An interesting point to note about `Figure 11-11` is the amount of code it implies. The data I/O operation and its return code (pass or fail, perhaps) are simple and straightforward, and might take no more than a line or two of code to implement. With the error handling included in the design, the code for performing a data I/O operation will grow by anywhere from 10 to 100 times in size. `This is typical of fault-tolerant software`.A large portion of it is concerned with error detection and handling, and only a fraction actually deals directly with the I/O. Also note that the last decision block, **“Backup active?,”** means that if the backup is already in use (i.e., the test is True), there are no more options left except to fail.

When detecting and attempting to deal with an error, the system has to make a decision as to whether to attempt to recover from the error (and what recovery strategy to use)or just try to shut down gracefully. The logic making that decision must have inputs in the form of data describing the context in which the error occurred and the current state of the system, and there may also be a need to define excluded operations that should not be used.

For example, it may not be a good idea for a system controlling a pressure vessel to just
relinquish control of the system without first performing some kind of check to determine
if the pressure needs to be released. If the pressure continues to build even after
the pumps and heaters are disabled (this can happen), there is a risk that the vessel may
explode, especially if the error involved an over-pressure-related situation to start with.
A graceful shutdown could possibly involve some type of venting action before control
is completely terminated


Similarly, if an error occurs in a system that is moving a mass of some type, does it make sense for the system to just stop? If the action of lifting or moving the mass entails control of power to a motor or servo, it might not be a good idea to just kill the power 

The system may need to engage some type of braking or locking mechanism, or it might make sense for the mass to be lowered to a safe position prior to shutdown (if possible).

These considerations also come into play when attempting to retry a failed operation.Retries may not be appropriate after some types of failures, such as the loss of direct positional feedback, or the failure of a temperature sensor. Other failures may be known to be transient, and the operations can be retried some number of times before the situation is declared hopeless.

Consider the situation where the position of a secondary mechanism is dependent on the position of a primary mechanism, both of which are moving at a slow and relatively continuous rate for extended periods of time. The link between the two is a communications channel that is known to occasionally drop out due to system load or other factors. In a situation like this, the secondary mechanism that is following the primary one might be able to predict where it should be over short periods of time. This allows it to continue to function without an update from the primary mechanism. If after some period of time the communications with the primary mechanism cannot be reestablished,the secondary mechanism will enter an error condition. If it does reestablish the communications channel with the primary mechanism before the timeout period, it can update its position, if necessary, and reset the timeout


Failure analysis, which we discussed briefly in the section **“Handling Errors and Faults” on page 272 in Chapter 8**, comes into play when making decisions like these.If done correctly, it can provide the guidance needed to make the decision to terminate abruptly, terminate gracefully, or attempt to recover. Lacking a failure analysis, the best choice is often to just terminate gracefully, and provide sufficient information (typically in a crash log or something similar) to allow someone to go back and ascertain the cause of the problem later.

## Reference

Allen B. Downey. [Think OS:A Brief Introduction to Operating Systems](http://greenteapress.com/wp/think-os/)

[The Python Standard Library:Concurrent Execution](https://docs.python.org/3/library/concurrency.html)

* [threading — Thread-based parallelism](https://docs.python.org/3/library/threading.html)

* [multiprocessing — Process-based parallelism](https://docs.python.org/3/library/multiprocessing.html)

* [concurrent.futures — Launching parallel tasks](https://docs.python.org/3/library/concurrent.futures.html)

* [queue — A synchronized queue class](https://docs.python.org/3/library/queue.html)


J. M. Hughes. Real World Instrumentation with Python, O'Reilly Media, December 2010

Doug Hellmann. The Python3 Standard Library by Example,Pearson Education, Inc. 2017

* http://doughellmann.com/blog/the-python-3-standard-library-by-example

* Python 3 Module of the Week https://pymotw.com/3/
   
Gabriele Lanaro. Python High Performance,Second Edition, Packt Publishing,2017
