In [None]:
class UserService(object):

    def __init__(self):
        self.users_by_id = {}  # key: user id, value: User

    def add_user(self, user_id, name, pass_hash):
        pass

    def remove_user(self, user_id):
        pass
    
    

In [73]:
class Communicator(object):
    var_global = 123
    def __init__(self):
        self.myid = "id105"
        self.var_local = 345
    def get_data(self,source_direction) -> "data message": 
        #Doing something to get data
        #in progress
        data = "get data message"
        return data
    def plus_glob_counter(self):
        Communicator.var_global+=1
    def plus_loc_counter(self):
        self.var_local+=1
    def get_glob_var(self):
        return Communicator.var_global
    def get_loc_var(self):
        return self.var_local



347

In [1]:
import time
import logging
import queue
import threading
import concurrent.futures
from concurrent.futures.thread import ThreadPoolExecutor


#initialization of logging
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")


#Pipeline class for Communicator 
class Pipeline(queue.Queue):
    def __init__(self,buffer_size,client_name,name):
        super().__init__(maxsize=buffer_size)
        self.name = name
        logging.debug("%s's pipeline %s: created",client_name,name)
        ##Status: .full() .empty()
    def __str__(self):
        return "pipeline_"+self.name
  
    
#Communicator class
class Communicator(object):
    
    def __init__ (self,con_type:str,con_dir:str,buffer_size_out:int,buffer_size_inp:int=0,simulation = False):
        
        self.con_type = con_type
        self.con_dir = con_dir
        self.buffer_size_inp = buffer_size_inp
        self.buffer_size_out = buffer_size_out
        if self.con_dir =="both":
            self.pipeline_inp = Pipeline(buffer_size_inp,con_type,"inp")
            self.pipeline_out = Pipeline(buffer_size_out,con_type,"out")
        elif self.con_dir =="inp":
            self.pipeline_inp = Pipeline(buffer_size_inp,con_type,"inp")
            self.pipeline_out = None
        elif self.con_dir =="out":    
            self.pipeline_inp = None
            self.pipeline_out = Pipeline(buffer_size_out,con_type,"out")
        else:
            raise ValueError("Communication direction must be 'inp'/'out'/'both'")
        #Real communication module, which depends of communication partner and choose proper library
        #TODO
        self.comm_module = object #TODO
        
        #Threading object
        self.executor = ThreadPoolExecutor(max_workers=3)
        
        #theading lock to secure set_data(), get_data() function access
        self.set_data_lock = threading.Lock()
        self.get_data_lock = threading.Lock()
        
        #Theading events to stop/start communication
        self.event_out_start = threading.Event()
        self.event_inp_start = threading.Event()
        
        #Simulation block
        self.simulation = simulation
        self.sim_recv_cnt = 0 #receive counter simulation
        
        logging.info("Communicator %s with %s direction: created",con_type,con_dir)
        
    def __send_data(self,data) -> int:
        logging.debug("Communicator %s: __send_data: preparing for send",self.con_type)
        code_result = 0
        
        if self.simulation:
            time.sleep(2) #Simulation
            code_result = True #Simulation
        else:
            logging.debug("Communicator %s: __send_data: Program stopped TODO",self.con_type)
            raise ValueError("TODO")
            #TODO
        return code_result
    
    def __recieve_data(self) -> (str,int):
        logging.debug("Communicator %s: __recieve_data: preparing for recieve",self.con_type)
        code_result = False
        
        if self.simulation:
            #Recieve data simulation
            data = "Simulated message from PLC #"+str(self.sim_recv_cnt) #Simulation
            self.sim_recv_cnt += 1 #Simulation
            code_result = True #Simulation
            time.sleep(1)
        else:
            logging.debug("Communicator %s: __recieve_data: Program stopped TODO",self.con_type)
            raise ValueError("TODO")
            #TODO
        
        
        return data,code_result
    
    def __get_data_from_buffer(self,pipeline) -> object:
        #logging.debug("Communicator %s: __get_data_from_buffer: trying to get data",self.con_type)
        value = pipeline.get(block=False)
        logging.debug("Communicator %s: __get_data_from_buffer: get data from %s ok",self.con_type,pipeline)
        return value

    def __set_data_to_buffer(self,pipeline,value):
        #logging.debug("Communicator %s: __set_data_to_buffer: trying to set data to %s",self.con_type,pipeline)
        pipeline.put(value,block=False)
        logging.debug("Communicator %s: __set_data_to_buffer: set data ok to %s",self.con_type,pipeline)
    
    def __thread_update_inp_buffer(self,pipeline,run_event):
        k = 0
        logging.debug("Communicator %s: __thread_update_inp_buffer: checking communication event",self.con_type)
        while run_event.is_set():
            logging.debug("Communicator %s: __thread_update_inp_buffer: trying to recieve data",self.con_type)
            data,code_result = self.__recieve_data() 
            #result put to buffer
            if code_result:
                logging.debug("Communicator %s: __thread_update_inp_buffer: recieved data %s",self.con_type,data)
                self.get_data_lock.acquire()
                if not pipeline.full():
                    self.__set_data_to_buffer(pipeline,data)
                else:
                    data_lost = self.__get_data_from_buffer(pipeline)
                    logging.warning("Communicator %s: __thread_update_inp_buffer: buffer %s is full, data lost: %s",self.con_type,pipeline,data_lost)
                    self.__set_data_to_buffer(pipeline,data)
                self.get_data_lock.release()
            else:
                logging.debug("Communicator %s: __thread_update_inp_buffer: no data recieved yet",self.con_type)
        
        logging.info("Communicator %s: __thread_update_inp_buffer: stop communication event",self.con_type)
            
    def __thread_update_out_buffer(self,pipeline,run_event):
        while run_event.is_set():
            _comm_ready = True #TMP
            if _comm_ready and not pipeline.empty():
                logging.debug("Communicator %s: __thread_update_out_buffer: trying to send data",self.con_type)
                code_result = self.__send_data(self.__get_data_from_buffer(pipeline))
                if code_result:
                    logging.debug("Communicator %s: __thread_update_out_buffer: data send ok",self.con_type)
                else:
                    logging.warning("Communicator %s: __thread_update_out_buffer: error sending data",self.con_type)
        logging.info("Communicator %s: __thread_update_out_buffer: stop communication event",self.con_type)
    
    def set_data(self,data) -> True:
        logging.debug("Communicator %s: set_data: trying to add data to %s",self.con_type,self.pipeline_out)
        code_result = False
        #secure operation from multithreading access
        self.set_data_lock.acquire()
        #add data to output buffer
        if not self.pipeline_out.full():
            self.__set_data_to_buffer(self.pipeline_out,data)
            code_result = True
            logging.info("Communicator %s: set_data: add data to %s ok",self.con_type,self.pipeline_out)
        else:
            logging.warning("Communicator %s: set_data: not added, %s is full",self.con_type,self.pipeline_out)
        #release operation for multithreading access
        self.set_data_lock.release()
        return code_result
    
    
    def get_data(self) -> str:
        logging.debug("Communicator %s: get_data: trying to get data from %s",self.con_type,self.pipeline_inp)
        data = None
        code_result = False
        #secure operation from multithreading access
        self.get_data_lock.acquire()
        #get data from inout buffer
        if not self.pipeline_inp.empty():
            data = self.__get_data_from_buffer(self.pipeline_inp)
            code_result = True
            logging.info("Communicator %s: get_data: get data from %s ok: %s",self.con_type,self.pipeline_inp, data)
        else:
            logging.info("Communicator %s: get_data: no data, %s is empty",self.con_type,self.pipeline_inp)
        #release operation for multithreading access
        self.get_data_lock.release()
        return data,code_result
    
    def get_out_buff_count(self) -> int:
        #secure operation from multithreading access
        self.set_data_lock.acquire()
        #get queue size
        result = self.pipeline_out.qsize()
        logging.info("Communicator %s: get_out_buff_count: qsize of %s is %s ",self.con_type,self.pipeline_out,result)
        #release operation for multithreading access
        self.set_data_lock.release()
        return result
    
    def get_inp_buff_count(self) -> int:
        #secure operation from multithreading access
        self.get_data_lock.acquire()
        #get queue size
        result = self.pipeline_inp.qsize()
        logging.info("Communicator %s: get_inp_buff_count: qsize of %s is %s ",self.con_type,self.pipeline_inp,result)
        #release operation for multithreading access
        self.get_data_lock.release()
        return result
    
    def start_communication(self) -> None:
        logging.debug("Communicator %s: start_communication: try to start communication treading",self.con_type)
        #with concurrent.futures.ThreadPoolExecutor(max_workers=3) as self.executor: #self.executor
            
        #set communication on events
        self.event_inp_start.set() #start inp communication
        self.event_out_start.set() #start out communication
        #this thread updated incoming communication buffer
        self.executor.submit(self.__thread_update_inp_buffer, self.pipeline_inp,self.event_inp_start)  
        #this thread updated outcoming communication buffer
        self.executor.submit(self.__thread_update_out_buffer, self.pipeline_out,self.event_out_start)
        logging.debug("Communicator %s: start_communication: communication threading started ok",self.con_type) 
        
        
    def stop_communication(self):
        logging.debug("Communicator %s: stop_communication: trying to stop threading",self.con_type)
        self.event_inp_start.clear() #stop inp communication
        self.event_out_start.clear() #stop out communication
        self.executor.shutdown(wait=True)
        logging.debug("Communicator %s: stop_communication: threading stopped",self.con_type)
    
if __name__ == "__main__":
    plc_comm =  Communicator("PLC","both",3,3,simulation = True)
    plc_comm2 =  Communicator("ABB","both",3,3,simulation = True)
    #plc_comm.set_data("Message1 to be sent")
    #plc_comm.set_data("Message2 to be sent")
    #plc_comm.get_out_buff_count()
    #plc_comm.get_data()
    #plc_comm.get_inp_buff_count()
    plc_comm.start_communication()
    plc_comm2.start_communication()
    time.sleep(5)
    #plc_comm.get_data()
    #plc_comm2.get_data()
    plc_comm.stop_communication()
    time.sleep(3)
    plc_comm2.stop_communication()
    time.sleep(3)
    plc_comm.get_data()
    plc_comm.get_data()
    plc_comm.get_data()
    plc_comm.get_data()
    plc_comm2.get_data()
    plc_comm2.get_data()
    plc_comm2.get_data()
    plc_comm2.get_data()
    time.sleep(3)

18:42:23: Communicator PLC with both direction: created
18:42:23: Communicator ABB with both direction: created
18:42:27: Communicator PLC: __thread_update_inp_buffer: buffer pipeline_inp is full, data lost: Simulated message from PLC #0
18:42:27: Communicator ABB: __thread_update_inp_buffer: buffer pipeline_inp is full, data lost: Simulated message from PLC #0
18:42:28: Communicator PLC: __thread_update_out_buffer: stop communication event
18:42:28: Communicator PLC: __thread_update_inp_buffer: buffer pipeline_inp is full, data lost: Simulated message from PLC #1
18:42:28: Communicator PLC: __thread_update_inp_buffer: stop communication event
18:42:28: Communicator ABB: __thread_update_inp_buffer: buffer pipeline_inp is full, data lost: Simulated message from PLC #1
18:42:29: Communicator ABB: __thread_update_inp_buffer: buffer pipeline_inp is full, data lost: Simulated message from PLC #2
18:42:30: Communicator ABB: __thread_update_inp_buffer: buffer pipeline_inp is full, data lost: 

In [14]:
plc_comm.stop_communication()

18:26:34: Communicator %s: stop_communication: trying to stop threading
18:26:35: Communicator %s: stop_communication: threading stopped
18:26:35: Communicator PLC: __thread_update_inp_buffer: recieved data Simulated message from PLC #70
18:26:35: Communicator PLC: __thread_update_inp_buffer: recieved data Simulated message from PLC #72
18:26:35: Communicator PLC: __get_data_from_buffer: get data from pipeline_inp ok
18:26:35: Communicator PLC: __get_data_from_buffer: get data from pipeline_inp ok
18:26:35: Communicator PLC: __thread_update_inp_buffer: buffer pipeline_inp is full, data lost: Simulated message from PLC #67
18:26:35: Communicator PLC: __set_data_to_buffer: set data ok to pipeline_inp
18:26:35: Communicator PLC: __thread_update_inp_buffer: trying to recieve data
18:26:35: Communicator PLC: __recieve_data: preparing for recieve
18:26:35: Communicator PLC: __thread_update_inp_buffer: buffer pipeline_inp is full, data lost: Simulated message from PLC #71
18:26:35: Communicat

In [11]:
plc_comm.get_data()

18:34:19: Communicator PLC: get_data: trying to get data from pipeline_inp
18:34:19: Communicator PLC: get_data: no data, pipeline_inp is empty


(None, False)

In [2]:
import select
import socket 
import sys 
import queue
import logging
import re
import time
import threading
import concurrent.futures
from concurrent.futures.thread import ThreadPoolExecutor

#initialization of logging
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.DEBUG,datefmt="%H:%M:%S")

#static paramenters
TCP_PARTNERS_ALL = "0"

#Class for communication module. Implement 
class CommModuleSocketTcp(object):
    
    def __init__ (self,name,HOST="0.0.0.0",PORT=65432,listen_backlog=5,buffer_size=1024,timeout=0.2,con_dir="both", \
                  main_buff_size_inp=10,main_buff_size_out=10,loc_buff_size_out=10,loc_buff_size_inp=10):
        
        #General parameters
        self.comm_type = "Communication module socket TCP/IP" #static
        self.name = name
        self.con_dir=con_dir #can be "both"/"inp"/"out"
        self.PARTNERS_ALL = TCP_PARTNERS_ALL #partner name to send telegram to all partners
        
        #Socket configuration
        self.HOST = HOST # The server's hostname or IP address
        self.PORT = PORT # The port used by the server
        self.buffer_size = buffer_size
        self.listen_backlog = listen_backlog
        self.timeout=timeout
        
        #Main parameters
        self.partners_filter = []  #Use form eg for :'10.17.35.137.64762' or '10.17.35' or '10.17.35.137.647'
        self.partners = []
        self.partners_dict = {}
        self.message_queues_inp = {}
        self.message_queues_out = {}
        self.loc_buff_size_inp=loc_buff_size_inp
        self.loc_buff_size_out=loc_buff_size_out
        self.all_message_queues_inp = queue.Queue(main_buff_size_inp) #commulative queue for input messages
        self.all_message_queues_out = queue.Queue(main_buff_size_out) #commulative queue for output messages
        
        #Threading parameters
        self.executor = ThreadPoolExecutor(max_workers=1)
        self.event_comm_start = threading.Event()
        
        logging.debug("Communication module %s: created",self.name)
    
    def __str__(self):
        return  self.comm_type+ ":" + self.name
    
    def __thread_run_server(self,run_event):
        logging.debug("Communication module %s: trying to start server",self.name)
        #initialistation of sockets
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
            server.setblocking(0)
            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
            server.bind((self.HOST, self.PORT))
            server.listen(self.listen_backlog)
            logging.debug("Communication module %s: starting server...",self.name)
            #data exchange parameters
            inputs = [server]
            outputs = []
                
            logging.info("Communication module %s: server started",self.name)
            if not self.partners_filter:
                logging.warning("Communication module %s: partners filter is empty. No connections allowed",self.name)
            
            #start server proccess loop
            while run_event.is_set():
                readable, writable, exceptional = select.select(inputs, outputs, inputs,self.timeout)
                #list of connections to be terminated
                connection_terminate_list = []
                #get new data/connection

                for s in readable:
                    #if server recived new connection
                    if s is server:
                        connection, client_address = s.accept()
                        connection.setblocking(0)
                        #represent partner IP.PORT as name
                        connection_name = '.'.join(map(str,connection.getpeername()))
                        logging.info("Communication module %s: incoming partner name %s",self.name,connection_name)
                        #check if connection_name fit the filter XX.XX.XX.XX.XXXXX requirements
                        if any(map(lambda y: re.search("^"+y, connection_name)!=None,self.partners_filter)):
                            logging.info("Communication module %s: partner name %s accepted",self.name,connection_name)
                            #add new event source to the list
                            inputs.append(connection)
                            if connection_name in self.partners_dict:
                                logging.warning("Communication module %s: partner %s reset",self.name,connection_name)                       
                            #add to dictionary partners_dict: connection_name<->connection 
                            self.partners_dict[connection_name]=connection 
                            #create inp/out queue for new connection
                            self.message_queues_inp[connection] = queue.Queue(self.loc_buff_size_inp)
                            self.message_queues_out[connection] = queue.Queue(self.loc_buff_size_out)                   
                        else:
                            logging.warning("Communication module %s: partner name %s rejected",self.name,connection_name)
                            if connection not in connection_terminate_list:
                                connection_terminate_list.append(connection)
                    
                    #if recieved new data from connection
                    else:
                        data = s.recv(self.buffer_size)
                        connection_name = '.'.join(map(str,s.getpeername()))
                        #put to separate queue
                        try:
                            self.message_queues_inp[s].put_nowait(data) 
                        except queue.Full:
                            logging.warning("Communication module %s: connection %s input buffer full, data lost",self.name,connection_name)
                            try:
                                #if buffer is full - erase one element and put again
                                data_lost = self.message_queues_inp[s].get_nowait()
                                self.message_queues_inp[s].put_nowait(data)
                            except queue.Empty:
                                pass 
                        else:
                            #put to main input queue
                            try:
                                self.all_message_queues_inp.put_nowait((connection_name,data)) #put to cumulative queue
                            except queue.Full:
                                logging.warning("Communication module %s: main input buffer full, data lost",self.name)
                                try:
                                    #if buffer is full - erase one element and put again
                                    data_lost = self.all_message_queues_inp.get_nowait()
                                    self.all_message_queues_inp.put_nowait((connection_name,data))
                                except queue.Empty:
                                    pass 
                            else:
                                logging.debug("Communication module %s: connection %s: data recieved",self.name,connection_name)
                        
                        #check if zero message, so, terminate connection
                        if not data:
                            logging.debug("Communication module %s: connection %s: empty message -> XXX disconnecting XXX",self.name,connection_name)
                            if s not in connection_terminate_list:
                                connection_terminate_list.append(s)  
 
                #send data
                for s in writable:
                    try:
                        next_msg = self.message_queues_out[s].get_nowait()
                    except queue.Empty:
                        outputs.remove(s)
                    else:
                        s.send(next_msg)
                        logging.debug("Communication module %s: connection %s: data sent",self.name,connection_name)

                #spread input queue between separate queues
                while True:
                    try:
                        #get data from main output queue, telegram format: (connection_name,data)
                        (conn_name,data) = self.all_message_queues_out.get_nowait() 
                        logging.debug("Communication module %s: checking main output buffer",self.name)
                        #check for specific type of telegram
                        if conn_name==self.PARTNERS_ALL:
                            #send to all
                            for pn,pc in self.partners_dict.items():
                                try:
                                    self.message_queues_out[pc].put_nowait(data)
                                    if pc not in outputs:
                                        outputs.append(pc)
                                except queue.Full:
                                    logging.warning("Communication module %s: connection %s: output buffer full, data lost",self.name,pn)
                        else:    
                            if conn_name in self.partners_dict:
                                logging.debug("Communication module %s: found data for %s",self.name,conn_name)
                                pc = self.partners_dict[conn_name]
                                #send to selected partner
                                try:
                                    self.message_queues_out[pc].put_nowait(data)
                                    if pc not in outputs:
                                        outputs.append(pc)
                                except queue.Full:
                                    logging.warning("Communication module %s: connection %s: output buffer full, data lost",self.name,conn_name)
                    except queue.Empty:  
                        break

                #mark connection to be terminated if socker error
                for s in exceptional:
                    logging.warning("Communication module %s:  error communication with: %s",self.name,'.'.join(map(str,s.getpeername())))   
                    if s not in connection_terminate_list:
                        connection_terminate_list.append(s) 
 
                #mark connection to be terminated which are not in the list partners_dict
                for s in inputs:
                    if  s is not server:
                        if s not in self.partners_dict.values():
                            logging.warning("Communication module %s: connection %s is not allowed anymore",self.name,'.'.join(map(str,s.getpeername()))) 
                            if s not in connection_terminate_list:
                                connection_terminate_list.append(s)
                #terminate connection
                if connection_terminate_list:
                    for s in connection_terminate_list:
                        conn_name = '.'.join(map(str,s.getpeername()))
                        logging.warning("Communication module %s: closing connection %s",self.name,conn_name)
                        
                        if s in inputs:
                            inputs.remove(s)
                        if s in outputs:
                            outputs.remove(s)
                        s.close()
                        #check if connection removed from partner_dict:
                        self.partners_dict = {key:val for key, val in self.partners_dict.items() if val != s}
                        #clean connections queue
                        self.message_queues_inp.pop(s,None)
                        self.message_queues_out.pop(s,None)
                    #reset termination list    
                    connection_terminate_list = []
                    
                #update partners list (readonly):
                self.partners = list(self.partners_dict.keys())
            #stopping server: close all active connections:
            for pn,pc in self.partners_dict.items():
                logging.warning("Communication module %s: closing connection %s",self.name,pn)
                pc.close()
                #clean partner_dict:
            self.partners_dict = {}
            #clean connections queue
            self.message_queues_inp = {}
            self.message_queues_out = {}
            
        logging.info("Communication module %s: server stopped",self.name)   
    
    def recieve_data(self,partner=TCP_PARTNERS_ALL):
        if partner==TCP_PARTNERS_ALL:
            try:
                data = self.all_message_queues_inp.get_nowait()
                logging.debug("Communication module %s: recieve_data: data recived from main inp buffer",self.name)
            except queue.Empty:
                logging.debug("Communication module %s: recieve_data: main input buffer empty",self.name)
                data = None 
        else:
            if partner in self.partners_dict: 
                try:
                    data = self.message_queues_inp[self.partners_dict[partner]].get_nowait()
                    logging.debug("Communication module %s: recieve_data: data recived from %s inp buffer",self.name,partner)
                except queue.Empty:
                    logging.debug("Communication module %s: recieve_data: input buffer %s empty",self.name,partner)
                    data = None   
            else:
                logging.debug("Communication module %s: recieve_data: no partner '%s' found",self.name,partner)
                data = None
        return data

        
    def send_data(self,partner_data):
        #check telegram structure tuple: (partner,data)
        if type(partner_data)==tuple and len(partner_data)==2:
            #check if partner exist
            partner = partner_data[0]
            data = partner_data[1]
            if (partner in self.partners_dict) or (partner==self.PARTNERS_ALL):
                try:
                    self.all_message_queues_out.put_nowait(partner_data) 
                    logging.debug("Communication module %s: send_data: data to '%s' added to main output buffer",self.name,partner)
                except queue.Full:
                    logging.warning("Communication module %s: send_data: main %s output buffer full",self.name)
            else:
                logging.warning("Communication module %s: send_data: no partner '%s' found",self.name,partner)
        else:
            logging.warning("Communication module %s: send_data: telegram structure doesn't fit requirements",self.name)
    
    #get communication partners list
    def get_partners(self):
        return self.partners
    
    #remove partner from communication
    def remove_partner(self,partner):
        if partner in self.partners_dict:
            self.partners_dict.pop(partner,None)
            logging.info("Communication module %s: remove_partner: partner '%s' removed from communication list",self.name,partner)
        
    
    #set communication partners filter
    def set_partners_filter(self,partners_filter):
        if(type(partners_filter)==list):
            self.partners_filter = partners_filter
            logging.debug("Communication module %s: set_partners_filter: new partner filter set: %s",self.name,partners_filter)
        else:
            logging.warning("Communication module %s: set_partners_filter: filter not accepted, list required",self.name)
    
    #start communication function
    def start_communication(self) -> None:
        logging.debug("Communication module %s: start_communication: try to start communication treading",self.name)
        #with executor as executor: #self.executor
        #set communication on events
        self.event_comm_start.set() #start communication flag
        #start thread with socket server
        self.executor.submit(self.__thread_run_server,self.event_comm_start)  
        logging.debug("Communication module %s: start_communication: communication threading started ok",self.name)
    
    #stop communication function
    def stop_communication(self):
        logging.debug("Communication module %s: stop_communication: trying to stop threading",self.name)
        self.event_comm_start.clear() #stop communication flag
        self.executor.shutdown(wait=True)
        logging.debug("Communication module %s: stop_communication: threading stopped",self.name)
    def is_data(self,partner=TCP_PARTNERS_ALL):
        result = False
        if partner==TCP_PARTNERS_ALL:
            #TODO: change to: for in partners_dict?????
            result = self.all_message_queues_inp.qsize()>0
        else:
            if partner in self.partners_dict:
                result = self.message_queues_inp[self.partners_dict[partner]].qsize()>0
            else:
                logging.debug("Communication module %s: is_data: no partner '%s' found",self.name,partner)
        return result
    
    
comm_module = CommModuleSocketTcp("Sock#1") 
comm_module.set_partners_filter([''])
comm_module.start_communication()
time.sleep(10)
for k in range(10):
    comm_module.send_data(("0",b'Message to all %s\r\n'%bytes(str(k), 'utf-8')))
    time.sleep(2)
comm_module.stop_communication()    


17:49:13: Communication module Sock#1: created
17:49:13: Communication module Sock#1: set_partners_filter: new partner filter set: ['']
17:49:13: Communication module Sock#1: start_communication: try to start communication treading
17:49:13: Communication module Sock#1: trying to start server
17:49:13: Communication module Sock#1: start_communication: communication threading started ok
17:49:13: Communication module Sock#1: starting server...
17:49:13: Communication module Sock#1: server started
17:49:16: Communication module Sock#1: incoming partner name 10.17.35.137.53525
17:49:16: Communication module Sock#1: partner name 10.17.35.137.53525 accepted
17:49:23: Communication module Sock#1: send_data: data to '0' added to main output buffer
17:49:23: Communication module Sock#1: checking main output buffer
17:49:23: Communication module Sock#1: connection 10.17.35.137.53525: data sent
17:49:25: Communication module Sock#1: send_data: data to '0' added to main output buffer
17:49:25: Co

In [18]:
comm_module.stop_communication()
#comm_module.get_partners()
#comm_module.send_data(("0",b'Message to all\r\n'))

22:09:21: Communication module Sock#1: stop_communication: trying to stop threading
22:09:21: Communication module Sock#1: server stopped
22:09:21: Communication module Sock#1: stop_communication: threading stopped


In [4]:
from mtpbox import *
import mtpbox.comm_modules as cm

#initialization of logging
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.DEBUG,datefmt="%H:%M:%S")

comm_module = cm.CommModuleSocketTcp("Sock#1") 
comm_module.set_partners_filter([''])
comm_module.start_communication()
time.sleep(10)
for k in range(10):
    #comm_module.send_data(("0",b'Message to all %s\r\n'%bytes(str(k), 'utf-8')))    
    comm_module.send_data(("0",b'El ni\xc3\xb1o come camar\xc3\xb3n'))
    time.sleep(2)
comm_module.stop_communication()    


18:55:54: Communication module Sock#1: created
18:55:54: Communication module Sock#1: set_partners_filter: new partner filter set: ['']
18:55:54: Communication module Sock#1: start_communication: try to start communication treading
18:55:54: Communication module Sock#1: trying to start server
18:55:54: Communication module Sock#1: starting server...
18:55:54: Communication module Sock#1: start_communication: communication threading started ok
18:55:54: Communication module Sock#1: server started
18:55:56: Communication module Sock#1: incoming partner name 10.17.35.137.54175
18:55:56: Communication module Sock#1: partner name 10.17.35.137.54175 accepted
18:56:04: Communication module Sock#1: send_data: data to '0' added to main output buffer
18:56:04: Communication module Sock#1: checking main output buffer
18:56:04: Communication module Sock#1: connection 10.17.35.137.54175: data sent
18:56:06: Communication module Sock#1: send_data: data to '0' added to main output buffer
18:56:06: Co

In [19]:
#comm_module.all_message_queues_inp.get_nowait()
#comm_module.stop_communication()
#comm_module.all_message_queues_inp.qsize()
#comm_module.all_message_queues_out.put_nowait(('192.168.102.216.51210',b'HiThere\r\n'))
#comm_module.send_data(("0",b'Message to all\r\n'))
#comm_module.recieve_data()
#comm_module.remove_partner('10.17.35.137.51437')
#comm_module.recieve_data()
#comm_module.get_partners()
#comm_module.recieve_data('10.17.35.137.51442')
#comm_module.remove_partner('10.17.35.137.51921')
comm_module.recieve_data('10.17.35.137.51923')


19:55:16: Communication module Sock#1: recieve_data: input buffer 10.17.35.137.51923 empty


In [3]:
comm_module.stop_communication()

18:51:57: Communication module Sock#1: stop_communication: trying to stop threading
18:51:57: Communication module Sock#1: closing connection 10.17.35.137.54165
18:51:57: Communication module Sock#1: server stopped
18:51:57: Communication module Sock#1: stop_communication: threading stopped


In [13]:
import re
conname = "192.168.102.216.50924"
partners_filter = [""]
any(map(lambda y: re.search("^"+y, conname)!=None,partners_filter))

True

In [6]:
di ={"0":123,"1":1241}


list

In [60]:
import re
x = "10.17.35.137.64762"
partners_filter = ["10.17.35.137.64762","10.17.35.130","10.19"]
#partners_filter = "10.17.35.137.64762"

result = list(map(lambda y: re.search("^"+y, x)!=None,partners_filter))
print(result)
any(result)

any(map(lambda y: re.search("^"+y, x)!=None,partners_filter))

[True, False, False]


True

In [28]:
abc = ('ada',141412535)
len(abc)

2

In [23]:
a = {"a":"1","b":"2"}
a.pop("a",None)
a.pop("a",None)  
a

{'b': '2'}

In [56]:
import queue
items=[10,9,8,7,6,5,4,3,2,1]
qwe = queue.Queue()
list(map(qwe.put, items))

while True:
    try:
        print(qwe.get_nowait())
    except queue.Empty:  
        break


10
9
8
7
6
5
4
3
2
1


In [45]:
import queue
items=[10,9,8,7,6,5,4,3,2,1]
qwe = queue.Queue()
list(map(qwe.put, items))
qwe.get_nowait()

10

In [6]:
import queue
items=[10,9,8,7,6,5,4,3,2,1]
qwe = queue.Queue()
dfg = queue.Queue(3)
list(map(qwe.put, items))

while True:
    try:
        #get data from main output queue, telegram format: (connection_name,data)
        data = qwe.get_nowait() 
        print(data)
        try:
            dfg.put_nowait(data)
        except queue.Full:
            print("#2 full")
    except queue.Empty:  
        print("#1 empty")
        break


10
9
8
7
#2 full
6
#2 full
5
#2 full
4
#2 full
3
#2 full
2
#2 full
1
#2 full
#1 empty
