In [None]:
import threading
import time
import random
import logging
from colorlog import ColoredFormatter

In [None]:
formatter = ColoredFormatter(
    "%(log_color)s[%(levelname)s] %(reset)s (%(threadName)s) %(blue)s %(message)s",
    datefmt=None,
    reset=True,
    log_colors={
        'DEBUG':    'cyan',
        'INFO':     'green',
        'WARNING':  'yellow',
        'ERROR':    'red',
        'CRITICAL': 'red,bg_black',
    }
)

logger = logging.getLogger('example')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

In [None]:
def worker():
    print('Worker')
    return

In [None]:
threads = []
for i in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()

In [None]:
def worker_wiht_arg(num):
    print("Worker: {}".format(num))
    return

In [None]:
threads = []
for i in range(5):
    t = threading.Thread(target=worker_wiht_arg, args=(i,))
    threads.append(t)
    t.start()

In [None]:
def worker_0_thread_name():
    print('{} Starting\n'.format(threading.current_thread().getName()))
    time.sleep(.3)
    print('{} Exiting\n'.format(threading.current_thread().getName()))

In [None]:
def worker_1_thread_name():
    print('{} Starting\n'.format(threading.current_thread().getName()))
    time.sleep(.4)
    print('{} Exiting\n'.format(threading.current_thread().getName()))

In [None]:
t0 = threading.Thread(name='T0', target=worker_0_thread_name)
t1 = threading.Thread(target=worker_1_thread_name)
t2 = threading.Thread(target=worker_0_thread_name)
t3 = threading.Thread(name='T2', target=worker_0_thread_name)

t0.start()
t1.start()
t2.start()
t3.start()

In [None]:
def worker_0_thread_logger():
    logger.debug('Starting\n')
    time.sleep(.2)
    logger.debug('Exiting\n')

In [None]:
def worker_1_thread_logger():
    logger.debug('Starting\n')
    time.sleep(.3)
    logger.debug('Exiting\n')

In [None]:
t0 = threading.Thread(name='T0', target=worker_0_thread_logger)
t1 = threading.Thread(target=worker_0_thread_logger)
t2 = threading.Thread(target=worker_1_thread_logger)
t3 = threading.Thread(name='T2', target=worker_1_thread_logger)

t0.start()
t1.start()
t2.start()
t3.start()

In [None]:
def daemon():
    logger.debug("Starting")
    time.sleep(3)
    logger.debug("Exiting")

In [None]:
def non_daemon():
    logger.debug("Staarting")
    time.sleep(1)
    logger.debug("Exiiting")

In [None]:
d = threading.Thread(name="daemon", target=daemon, daemon=True)
t = threading.Thread(name="non-daemon", target=non_daemon)

d.start()
t.start()

In [None]:
d = threading.Thread(name="daemon", target=daemon, daemon=True)
t = threading.Thread(name="non-daemon", target=non_daemon)

d.start()
t.start()

d.join()
t.join()

In [None]:
d = threading.Thread(name="daemon", target=daemon, daemon=True)
t = threading.Thread(name="non-daemon", target=non_daemon)

d.start()
t.start()

d.join(.1)
print('d.isAlive()', d.isAlive())
t.join()

In [None]:
def worker():
    pause = random.randint(1, 5) / 10
    logger.debug("sleeping {}".format(pause))
    time.sleep(pause)
    logger.debug("ending")

In [None]:
for i in range(3):
    t = threading.Thread(target=worker, daemon=True)
    t.start()
    
main_thread = threading.main_thread()

for t in threading.enumerate():
    if t is main_thread:
        continue
    logging.debug("joining {}".format(t.getName()))
    t.join()

In [None]:
class MyThread(threading.Thread):
    def run(self):
        logger.debug("running")

In [None]:
for i in range(5):
    t = MyThread()
    t.start()

In [None]:
class MyThreadWithArgs(threading.Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        super().__init__(group=group, target=target, name=name, daemon=daemon)
        self.args = args
        self.kwargs = kwargs
        
    def run(self):
        logger.debug("\nRunning wiith {} and {}".format(self.args, self.kwargs))
    

In [None]:
for i in range(5):
    t = MyThreadWithArgs(args=(i,), kwargs={'a': i, 'b': i+1})
    t.start()

In [None]:
def delayed():
    logger.debug('worker ruuning')

In [None]:
t1 = threading.Timer(.3, delayed)
t1.setName('t1')
t2 = threading.Timer(.3, delayed)
t2.setName('t2')

logger.debug('Start timers')
t1.start()
t2.start()

logger.debug('Wait befor canceling {}'.format(t2.getName()))
time.sleep(.2)
logger.debug('Canceling {}'.format(t2.getName()))
t2.cancel()
logger.debug('done')