### Thead

In [1]:
# use function pointer to manage thread:  ==>  **_my_function
def my_function(n):
    """Target function for the thread."""
    for i in range(n):
        # print("Running...")
        time.sleep(1)

In [2]:
import threading
import time

import sys
sys.path.append('/home/mohsen/frr/polar/codes/classes')
from color_text import ColorText
from time_util import TimeUtil

            
# TODO: how to array function into the list in
class ThreadJob():
    """
    To use this thread, follow the next step:

    # Run thread immediately
    # 10 ==> input parameter of function
    x = ThreadJob(my_function, 'my_function', 10, "/path/to/file.txt")

    # Start show thread
    x.show_thread_in_file()

    # Stop show thread
    x.write_to_file_thread_status = False
    """
    
    def __init__(self, func, func_name, func_input_value, path_thread_log):

        self.func = func
        self.thread = threading.Thread(target=self._run_func, args=(func_input_value,))
        self.thread.start()

        self.thread.name = func_name
        self.name = self.thread.name
        self.id = self.thread.ident
        self.write_to_file_thread_status = False
        self.path_thread_log = path_thread_log    

    def _run_func(self, func_input_value):
        """Runs the func function with the provided func_input_value in a background thread"""
        self.func(func_input_value)    
        
    def _write_to_file(self):
        with open(self.path_thread_log, "w") as f:
            f.write(self.__str__())

    def _thread_write_to_file(self):
        while self.write_to_file_thread_status:
            self._write_to_file()
            time.sleep(1)        
    
    def show_thread_in_file(self):
        # Runs only once
        # To start the thread, you need to call this method once.
        
        self.write_to_file_thread_status = True

        thread_signal = threading.Thread(target=self._thread_write_to_file)
        thread_signal.start()
            
    def __str__(self):
        # Makes the thread info
        
        info =  'Name:      ' + str(self.name)              + "\n"
        info += 'ID:        ' + str(self.id)                + "\n"

        # Makes the text colored
        # [32] => it's grean color
        x = ColorText('is_alive:  ' + str(self.thread.is_alive()), [32], )
        info += x.__str__()

        x = TimeUtil()
        info += x.miladi_str
        
        return info

In [3]:
# Run thread immediately
x = ThreadJob(my_function, 'my_function', 10, "/home/mohsen/frr/polar/codes/thread/thread_write.txt")

In [4]:
# Start show thread
x.show_thread_in_file()

In [5]:
# Stop show thread
x.write_to_file_thread_status = False

### Thread improvement

Yes, here are some technical tips for controlling threads in Python:

    Use threading primitives: To synchronize access to shared resources or coordinate between multiple threads, use threading primitives such as locks, semaphores, events, and condition variables. These provide low-level mechanisms for ensuring mutual exclusion, signaling between threads, and waiting for specific conditions to occur.

    Avoid blocking calls: Blocking calls can prevent other threads from executing, leading to poor performance and unpredictable behavior. Whenever possible, avoid making blocking calls inside critical sections or loops that need to respond quickly to changes in state.

    Limit the number of threads: Creating too many threads can lead to resource contention, increased overhead, and decreased overall throughput. As a rule of thumb, aim to limit the number of concurrently running threads to match the number of CPU cores available on the system. You can use tools like os.cpu_count() to determine the optimal number of threads dynamically.

    Monitor thread health: Regularly check the health of your threads by calling their is_alive() method or monitoring relevant attributes like exit codes or stack traces. This can help you detect issues early and take corrective action before they become serious problems.

    Implement graceful shutdown: When terminating a program, make sure to implement a graceful shutdown mechanism that allows all threads to complete their work and release any acquired resources before exiting. Failing to do so can result in data corruption, inconsistent state, orphaned processes, and other unexpected behaviors.

    Test thoroughly: Test your multithreaded programs extensively under various loads and scenarios to identify potential race conditions, deadlocks, livelocks, and other concurrency bugs. Make sure to test both positive cases (expected usage patterns) and negative cases (unusual or erroneous inputs).

    Profile and optimize: Profile your multithreaded applications to identify bottlenecks, hotspots, and areas for optimization. Use profiling tools like cProfile, PySnooper, or line_profiler to gather detailed statistics about execution times, memory usage, and other metrics. Based on the results, apply appropriate optimization techniques such as refactoring, memoization, caching, or parallelism.

    Consider alternative approaches: Depending on the problem domain and requirements, consider alternative approaches to multithreading such as multiprocessing, asyncio, gevent, or eventlet. Each approach has its own strengths and weaknesses, and choosing the right one requires careful consideration of factors like scalability, responsiveness, complexity, and maintainability.


### Note

In [1]:
import threading
import time

def func1(number):
    for i in range(1, number):
        print("...!")
        time.sleep(1)

class ThreadJob():
    def __init__(self, func, func_input_value):
        self.func = func
        self.thread = threading.Timer(target=self._run_func, args=(func_input_value,))
        self.thread.start()  

    def _run_func(self, func_input_value):
        self.func(func_input_value)    
            
    def __str__(self):       
        info = 'Name:'
        return info


x = ThreadJob(func1, 5)

TypeError: Timer.__init__() got an unexpected keyword argument 'target'

In [1]:
import threading
import time
lock = threading.Lock()

def func1(number):
    with lock:
        for i in range(1, number):
            print("...!")
            time.sleep(1)
        
    tr_1 = threading.Timer(10, func1, args=(3,))
    tr_1.start()


func1(3)

...!
...!


In [1]:
# threading.event.wait()

In [1]:
import threading
import time

lock = threading.Lock()
timer = None

def func1(number):
    global timer
    
    with lock:
        for i in range(1, number):
            print("...!")
            time.sleep(1)

    # Create a new timer object and update the global variable
    timer = threading.Timer(10, func1, args=(3,))
    timer.start()

# Start the initial call to func1
func1(3)

...!
...!


In [1]:
import threading
import time

lock = threading.Lock()
timer = None

def func1(number):
    for i in range(1, number):
        print("...!")
        time.sleep(1)

class ThreadJob():
    def __init__(self, repeat_period, func, func_input_value):
        self.func = func
        self.timer = threading.Timer(repeat_period, self._run_func, args=(func_input_value,))
        self.timer.start()

    def _run_func(self, func_input_value):
        while True:
            self.func(func_input_value)
            time.sleep(10)

    def __str__(self):
        info = 'Name:'
        return info

x = ThreadJob(10, func1, 3)


time.sleep(1)  # Add a delay to ensure the timer has started
x.timer.cancel()

In [2]:
x.timer.is_alive()
time.sleep(1)  # Add a delay to ensure the timer has started
x.timer.cancel()

...!
...!


In [1]:
import threading
import time

class ThreadJob():
    def __init__(self, repeat_period, func, func_input_value):
        self.func = func
        self.repeat_period = repeat_period
        self.func_input_value = func_input_value
        self.timer = None
        self.stop_thread = False

    def start(self):
        self.stop_thread = False
        self.timer = threading.Timer(self.repeat_period, self._run_func)
        self.timer.start()

    def _run_func(self):
        while not self.stop_thread:
            self.func(self.func_input_value)
            time.sleep(self.repeat_period)

    def stop(self):
        self.stop_thread = True
        if self.timer is not None:
            self.timer.cancel()

def func1(number):
    for i in range(1, number):
        print("...!")
        time.sleep(1)

x = ThreadJob(10, func1, 3)
x.start()

# time.sleep(1)  # Add a delay to ensure the timer has started
# x.stop()

In [2]:
x.timer.is_alive()

True

...!
...!
...!
...!
...!
...!


In [3]:
time.sleep(1)  # Add a delay to ensure the timer has started
x.stop()

In [4]:
x.timer.is_alive()

False