Skip to content

Commit

Permalink
XME 4.2.5
Browse files Browse the repository at this point in the history
  • Loading branch information
wacmkxiaoyi committed Feb 28, 2024
1 parent 65a1137 commit b0226a0
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 91 deletions.
90 changes: 53 additions & 37 deletions XME/MPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,54 +21,67 @@
STATUS_ALLOCATED="<XME-MPI-STATUS: ALLOCATED>"
STATUS_INRUN="<XME-MPI-STATUS: INRUN>"
STATUS_END="<XME-MPI-STATUS: END>"
TASK_NO_REGISTERED="<XME-MPI-RUN: TASK_NO_REGISTERED>"
TASK_RUN_FAILURE="<XME-MPI-RUN: TASK_NO_REGISTERED>"
BUFFER_EMPTY="<XME-MPI-BUFFER: EMPTY>"
BUFFER_ALLOCATED="<XME-MPI-BUFFER: ALLOCATED>"
BUFFER_EOF="<XME-MPI-BUFFER: EOF"
@staticmethod
def get_process_mark(tpid):
if tpid==0: return "<XME-MPI-MARK: MAIN PROCESS>"
else: return f"<XME-MMPI-MARK: TASK PROCESS {tpid}>"

class __list__(list):
def to_end(self,key):
self.remove(key)
self.append(key)

class __XMEMPI__:
def __init__(self,mpi,status,Main_Connector,Main_Request_Connector):
self.mpi=mpi
self.status=status
self.Main_Connector=Main_Connector
self.Main_Request_Connector=Main_Request_Connector
def acquire(self, funid, args=(), kwargs={}, to=ANY_PROCESS,block=False): return self.mpi.acquire(self.status,self.Main_Connector,self.Main_Request_Connector, funid, args, kwargs, to, block)
def acquire(self, fun, args=(), kwargs={}, to=ANY_PROCESS,block=False): return self.mpi.acquire(self.status,self.Main_Connector,self.Main_Request_Connector, fun, args, kwargs, to, block)
def close(self, to=ALL_PROCESSES): return self.mpi.close(self.status,self.Main_Connector,self.Main_Request_Connector,to)
def get(self,pos): return self.mpi.get(pos)
def __getitem__(self,key): return self.get(key)
def __call__(self, fun, args=(), kwargs={}, to=ANY_PROCESS,block=False): return self.acquire(fun, args, kwargs, to, block)
def __iter__(self):
yield self.mpi
yield self.status
yield self.Main_Connector
yield self.Main_Request_Connector

class __XMEBUFFER__:
def __init__(self,buffersize):
self.size=buffersize
self.locks=None
self.last_alloc=-1
for pos in range(self.size): exec(f"self.buffer_{pos}=BUFFER_EMPTY")

def initial_locks(self): self.locks=[Lock() for _ in range(self.size)]
self.buffermap=__list__(range(self.size))
for pos in range(self.size): self[pos]=BUFFER_EMPTY

def allocate(self):
lls=list(range(self.size))
lls=lls[self.last_alloc+1:]+lls[:self.last_alloc+1]
for pos in lls:
if self.__getitem__(pos)==BUFFER_EMPTY:
if not self.locks: self.locks=[Lock() for _ in range(self.size)]
for pos in self.buffermap:
if self[pos]==BUFFER_EMPTY:
if self.locks[pos].acquire(blocking=False):
self.__setitem__(pos,BUFFER_ALLOCATED)
self[pos]=BUFFER_ALLOCATED
self.buffermap.to_end(pos)
self.locks[pos].release()
self.last_alloc=pos
if self.last_alloc== self.size-1: self.last_alloc=-1
return pos
return None

def __len__(self): return self.size

def __getitem__(self,pos): return eval(f"self.buffer_{pos}")

def __setitem__(self,pos,value): exec(f"self.buffer_{pos}=value")

def __iter__(self):
for pos in range(self.size):yield self[pos]


class MPI:
def __init__(self,mainfun,**xmeargs):
self.Tasks={}
self.buffer=__XMEBUFFER__(XME.get_par(xmeargs,"mpi_buffer_size",0xff))
self.mainfun=mainfun
self.xmeargs=xmeargs
Expand All @@ -92,6 +105,7 @@ def run(self,*args,**kwargs):
Main_Request_Connector={}
Task_Request_Connector=[] #pipes to tasks processes
tpids=[]
self.task_alloc_map=__list__()
for i in range(1,self.pnum):
tpid=get_process_mark(i)
tpids.append(tpid)
Expand All @@ -102,19 +116,22 @@ def run(self,*args,**kwargs):
pc,cc=Pipe()
Main_Request_Connector[tpid]=pc
Task_Request_Connector.append(cc)
self.task_alloc_map.append(tpid)
xme=XME.XME(self.__task__,self.mainfun,**self.xmeargs)
kwargs.update({"XMEMPI":__XMEMPI__(self,status,Main_Connector,Main_Request_Connector)})
return xme.gfun(xme.Array(tpids),status,xme.Array(Task_Connector),xme.Array(Task_Request_Connector),garg=args,gargs=kwargs)[0][0]

def __str__(self): return f"<class XME::MPI @ {hex(id(self))}: Task processes: {self.pnum}; Registed Tasks: {len(self.Tasks)}>"
def __setitem__(self,key,value): self.Tasks[key]=value
def __getitem__(self,key): return self.Tasks[key]
return xme.gfun(xme.Array(tpids),status,xme.Array(Task_Connector),xme.Array(Task_Request_Connector),gargs=args,gkwargs=kwargs)[0][0]

def __str__(self): return f"<class XME::MPI @ {hex(id(self))}: Task processes: {self.pnum}; Buffer size: {len(self)}>"
def __setitem__(self,key,value): self.buffer[key]=value
def __getitem__(self,key): return self.buffer[key]
def __len__(self): return len(self.buffer)
def __call__(self,*args,**kwargs): return self.run(*args,**kwargs)

def acquire(self, status,connets,req, funid, args=(), kwargs={}, to=ANY_PROCESS,block=False):
def acquire(self, status,connets,req, fun, args=(), kwargs={}, to=ANY_PROCESS,block=False):
def initial_locks():
#each Event() object has 48 bytes size
if self.global_buffer_lock: return
self.buffer.initial_locks()
self.global_buffer_lock=Event()
self.global_task_lock=Event()
self.tasklocks={tpid:Event() for tpid in status.keys()}
Expand All @@ -127,31 +144,32 @@ def allocate_buffer():

def send_and_recv(to,pos):
with self.buffer.locks[pos]:
connets[to].send(self.__encode__((funid,args,kwargs)))
self.buffer[pos]=connets[to].recv()
connets[to].send(self.__encode__((fun,args,kwargs)))
self[pos]=connets[to].recv()
self.global_task_lock.set()
self.tasklocks[to].set()

initial_locks()
if to==ALL_PROCESSES: return [self.acquire(status, connets, funid, args, kwargs,tpid) for tpid in status.keys()]
elif type(to) in (list,tuple): return [self.acquire(status, connets, funid, args, kwargs,tpid) for tpid in to]
if to==ALL_PROCESSES: return [self.acquire(status, connets, fun, args, kwargs,tpid) for tpid in status.keys()]
elif type(to) in (list,tuple): return [self.acquire(status, connets, fun, args, kwargs,tpid) for tpid in to]
elif to==ANY_PROCESS:
closed=[]
while True:
wait=True
for tpid,sta in status.items():
if sta==STATUS_BEGIN: #means at least one task processes can be called in the future
for tpid in self.task_alloc_map:
if status[tpid]==STATUS_BEGIN: #means at least one task processes can be called in the future
wait=False
continue
elif sta==STATUS_IDLE:
elif status[tpid]==STATUS_IDLE:
req[tpid].send(self.__encode__(REQUEST_ACQUIRE))
if self.__decode__(req[tpid].recv())!=REQUEST_SUCCESSFUL:continue
status[tpid]=STATUS_ALLOCATED
self.task_alloc_map.to_end(tpid)
pos=allocate_buffer()
if not block: Thread(target=send_and_recv,args=(tpid,pos)).start()
else: send_and_recv(tpid,pos)
return pos
elif sta==STATUS_END and tpid not in closed: closed.append(tpid)
elif status[tpid]==STATUS_END and tpid not in closed: closed.append(tpid)
if set(status.keys())==set(closed):return
if wait: self.global_task_lock.wait()
else:
Expand All @@ -170,11 +188,11 @@ def send_and_recv(to,pos):

def get(self,pos):
if not self.global_buffer_lock:raise ValueError("The XME.MPI Locks are undefinded that buffer cannot be accessed, please use acquire function to initialize.")
if pos==None or pos>=self.buffer.size: return BUFFER_EOF
elif self.buffer[pos]==BUFFER_EMPTY: return BUFFER_EOF
if pos==None or pos>=len(self): return BUFFER_EOF
elif self[pos]==BUFFER_EMPTY: return BUFFER_EOF
with self.buffer.locks[pos]:
result=self.__decode__(self.buffer[pos])
self.buffer[pos]=BUFFER_EMPTY
result=self.__decode__(self[pos])
self[pos]=BUFFER_EMPTY
self.global_buffer_lock.set()
return result

Expand Down Expand Up @@ -226,11 +244,9 @@ def process_request():
if request==TASK_END:
status[tpid]=STATUS_END
break
funid, arg, kwargs=request
if funid in self.Tasks:
result=self.Tasks[funid](*arg,**kwargs)
connect.send(self.__encode__(result))
else: connect.send(self.__encode__(TASK_NO_REGISTERED))
fun, arg, kwargs=request
result=fun(*arg,**kwargs)
connect.send(self.__encode__(result))
except Exception as err:
traceback.print_exc()
connect.send(self.__encode__(TASK_RUN_FAILURE))
1 change: 1 addition & 0 deletions XME/Manager/Object.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from multiprocessing.managers import BaseManager
class __Manager__(BaseManager): pass
@staticmethod
def __get_manager__():
m=__Manager__()
m.start()
Expand Down
2 changes: 1 addition & 1 deletion XME/Manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def status(self,Operator=None,status=None):
if status==None: status=f"<-HeartJump::{time.time()}->"
self.append(__XMEHJ__,Operator,status=status)
def get_buffers(self):
return deepcopy(self.__Buffer_Call__)
return iter(self.__Buffer_Call__.items())
def len(self):
return len(self.__Buffer_Call__["GUARD"])
def __len__(self):
Expand Down
6 changes: 1 addition & 5 deletions XME/XMElib/ArrayOperator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
from multiprocessing import cpu_count,Manager
def get_par(args,name,default=None):
try:
return args[name]
except:
return default
from . import get_par
class ArrayOperator:
def __init__(self,**args):
self.pnum=get_par(args,"pnum",cpu_count())
Expand Down
12 changes: 1 addition & 11 deletions XME/XMElib/Executor.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
from multiprocessing import Pool,cpu_count#,Lock
from . import get_par,tuplize
import numpy,traceback
def tuplize(array):
if type(array) in (list,tuple, numpy.ndarray):
result=[]
for i in array:
if type(i) in (list, numpy.ndarray): result.append(tuplize(i))
else: result.append(i)
return tuple(result)
return array
def get_par(args,name,default=None):
try: return args[name]
except: return default
class Executor:
default_fun_strc="fun"
def __init__(self,*fun,**args):
Expand Down
7 changes: 1 addition & 6 deletions XME/XMElib/Logputter.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import datetime
import os
from XME.XMElib import XME_Version
def get_par(args,name,default=None):
try:
return args[name]
except:
return default
from . import get_par,XME_Version
class Logputter:
logfile=None
time=True
Expand Down
18 changes: 17 additions & 1 deletion XME/XMElib/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,17 @@
XME_Version="4.2.1"
XME_Version="4.2.5"
@staticmethod
def get_par(args,name,default=None):
try:
return args[name]
except:
return default

@staticmethod
def tuplize(array):
if type(array) in (list,tuple, numpy.ndarray):
result=[]
for i in array:
if type(i) in (list, numpy.ndarray): result.append(tuplize(i))
else: result.append(i)
return tuple(result)
return array
49 changes: 19 additions & 30 deletions XME/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,13 @@
from .XMElib.ArrayOperator import ArrayOperator
from .XMElib.Executor import Executor
from .XMElib.Logputter import Logputter

def get_par(args,name,default=None):
try: return args[name]
except: return default
from .XMElib import get_par

class XME:
aoobj_array=[]
exobj_array=[]
class Array:
def __init__(self,array):
self.array=array
self.length=len(array)
def __len__(self): return self.length
def __str__(self): return str(self.array)
def __getitem__(self,key): return self.array[key]
def __delitem__(self,key): del self.array[key]
def __setitem__(self,key,value): self.array[key]=value

class Array(list):pass

def __init__(self,*fun,**args):
if get_par(args,"pnum",cpu_count())>cpu_count(): args["pnum"]=cpu_count()
Expand All @@ -35,49 +25,47 @@ def func(funum,*targ,**args):
calnum=get_par(args,"calnum",0)
if calnum==0:
for i in targ:
if type(i)==self.Array: calnum=max(calnum,i.length)
if type(i)==self.Array: calnum=max(calnum,len(i))
for i in args.keys():
if type(args[i])==self.Array: calnum=max(calnum,args[i].length)
if type(args[i])==self.Array: calnum=max(calnum,len(args[i]))
if calnum==0: calnum+=1 #at least run once
ao=self.ao(calnum,self.pnum)
for i in targ: #first set
if type(i)!=self.Array: ao.add_common_args(i)
else: ao.add_argscut(i.array)
else: ao.add_argscut(i)
for i in fun[funum].__code__.co_varnames: #follow sequence
if i in args.keys():
if type(args[i])!=self.Array: ao.add_common_args(args[i])
else: ao.add_agrscut(args[i].array)
else: ao.add_argscut(args[i])
return ao
if len(fun)>0:
def single_fun(*targ,**args):
ao=func(0,*targ,**args)
ex=self.ex(fun[0],pnum=self.pnum)
ex.build_from_ao(ao)
ao.result_combine()
return ao.results
self.fun=single_fun
def multi_funs(funum_array=range(len(fun)),targ_array=[[]]*len(fun),args_array=[{}]*len(fun)):
if len(funum_array)!=len(targ_array) or len(funum_array)!=len(args_array):
def run_funs(funum_array=range(len(fun)),args_array=[[]]*len(fun),kwargs_array=[{}]*len(fun)):
if len(funum_array)!=len(args_array) or len(funum_array)!=len(kwargs_array):
print("Error parameters number")
return ()
ao=[]
results=[]
tfuns=[]
for i in range(len(funum_array)):
tfuns.append(fun[funum_array[i]])
ao.append(func(funum_array[i],*(targ_array[i]),**(args_array[i])))
ao.append(func(funum_array[i],*(args_array[i]),**(kwargs_array[i])))
ex=self.ex(*tfuns,pnum=self.pnum)
ex.build_from_ao(ao)
results=[]
for i in range(len(funum_array)):
ao[i].result_combine()
results.append(ao[i].results)
return tuple(results)
self.funs=multi_funs
def gfun(*farg,garg=[],gargs={},**fargs): return multi_funs([-1,0],targ_array=[garg,farg],args_array=[gargs,fargs])
self.funs=run_funs
def single_fun(*targ,**args): return run_funs([0],[targ],[args])[0]
self.fun=single_fun
def gfun(*fargs,gargs=[],gkwargs={},**fkwargs): return run_funs([-1,0],args_array=[gargs,fargs],kwargs_array=[gkwargs,fkwargs])
self.gfun=gfun
def gfuns(funum_array=range(len(fun)-1),farg_array=[[]]*(len(fun)-1),fargs_array=[{}]*(len(fun)-1),garg=[],gargs={}): return multi_funs([-1]+funum_array,targ_array=[garg]+farg_array,args_array=[gargs]+fargs_array)
def gfuns(funum_array=range(len(fun)-1),fargs_array=[[]]*(len(fun)-1),fkwargs_array=[{}]*(len(fun)-1),gargs=[],gkwargs={}): return run_funs([-1]+funum_array,args_array=[gargs]+fargs_array,kwargs_array=[gkwargs]+fkwargs_array)
self.gfuns=gfuns

def __call__(self,*args,**kwargs): return self.fun(*args,**kwargs)

def ao(self,calnum,pnum=None):
if pnum==None: pnum=self.pnum
self.aoobj_array.append(ArrayOperator(cal_num=calnum,pnum=pnum))
Expand All @@ -88,5 +76,6 @@ def ex(self,*fun,**args):
def clean(self):
self.aoobj_array=[]
self.exobj_array=[]

def build(*fun,**args):
return XME(*fun,**args)

0 comments on commit b0226a0

Please sign in to comment.