@@ -692,15 +692,29 @@ def call_async(self, func, *args, **kwargs):
692
692
:param bool use_login_context:
693
693
If present and :data:`True`, send the call to the login account
694
694
context rather than the optional become user context.
695
+
696
+ :param bool no_reply:
697
+ If present and :data:`True`, send the call with no ``reply_to``
698
+ header, causing the context to execute it entirely asynchronously,
699
+ and to log any exception thrown. This allows avoiding a roundtrip
700
+ in places where the outcome of a call is highly likely to succeed,
701
+ and subsequent actions will fail regardless with a meaningful
702
+ exception if the no_reply call failed.
703
+
695
704
:returns:
696
- mitogen.core.Receiver that receives the function call result.
705
+ :class:` mitogen.core.Receiver` that receives the function call result.
697
706
"""
698
707
self ._connect ()
708
+
699
709
if kwargs .pop ('use_login_context' , None ):
700
710
call_context = self .login_context
701
711
else :
702
712
call_context = self .context
703
- return call_context .call_async (func , * args , ** kwargs )
713
+
714
+ if kwargs .pop ('no_reply' , None ):
715
+ return call_context .call_no_reply (func , * args , ** kwargs )
716
+ else :
717
+ return call_context .call_async (func , * args , ** kwargs )
704
718
705
719
def call (self , func , * args , ** kwargs ):
706
720
"""
@@ -713,7 +727,10 @@ def call(self, func, *args, **kwargs):
713
727
"""
714
728
t0 = time .time ()
715
729
try :
716
- return self .call_async (func , * args , ** kwargs ).get ().unpickle ()
730
+ recv = self .call_async (func , * args , ** kwargs )
731
+ if recv is None : # no_reply=True
732
+ return None
733
+ return recv .get ().unpickle ()
717
734
finally :
718
735
LOG .debug ('Call took %d ms: %r' , 1000 * (time .time () - t0 ),
719
736
mitogen .parent .CallSpec (func , args , kwargs ))
@@ -786,19 +803,33 @@ def fetch_file(self, in_path, out_path):
786
803
787
804
def put_data (self , out_path , data , mode = None , utimes = None ):
788
805
"""
789
- Implement put_file() by caling the corresponding
790
- ansible_mitogen.target function in the target.
806
+ Implement put_file() by caling the corresponding ansible_mitogen.target
807
+ function in the target, transferring small files inline .
791
808
792
809
:param str out_path:
793
810
Remote filesystem path to write.
794
811
:param byte data:
795
812
File contents to put.
796
813
"""
814
+ # no_reply=True here avoids a roundrip that 99% of the time will report
815
+ # a successful response. If the file transfer fails, the target context
816
+ # will dump an exception into the logging framework, which will appear
817
+ # on console, and the missing file will cause the subsequent task step
818
+ # to fail regardless. This is safe since CALL_FUNCTION is presently
819
+ # single-threaded for each target, so subsequent steps cannot execute
820
+ # until the transfer RPC has completed.
797
821
self .call (ansible_mitogen .target .write_path ,
798
822
mitogen .utils .cast (out_path ),
799
823
mitogen .core .Blob (data ),
800
824
mode = mode ,
801
- utimes = utimes )
825
+ utimes = utimes ,
826
+ no_reply = True )
827
+
828
+ #: Maximum size of a small file before switching to streaming file
829
+ #: transfer. This should really be the same as
830
+ #: mitogen.services.FileService.IO_SIZE, however the message format has
831
+ #: slightly more overhead, so just randomly subtract 4KiB.
832
+ SMALL_FILE_LIMIT = mitogen .core .CHUNK_SIZE - 4096
802
833
803
834
def put_file (self , in_path , out_path ):
804
835
"""
@@ -817,14 +848,14 @@ def put_file(self, in_path, out_path):
817
848
# If the file is sufficiently small, just ship it in the argument list
818
849
# rather than introducing an extra RTT for the child to request it from
819
850
# FileService.
820
- if st .st_size <= 32768 :
851
+ if st .st_size <= self . SMALL_FILE_LIMIT :
821
852
fp = open (in_path , 'rb' )
822
853
try :
823
- s = fp .read (32769 )
854
+ s = fp .read (self . SMALL_FILE_LIMIT + 1 )
824
855
finally :
825
856
fp .close ()
826
857
827
- # Ensure file was not growing during call .
858
+ # Ensure did not grow during read .
828
859
if len (s ) == st .st_size :
829
860
return self .put_data (out_path , s , mode = st .st_mode ,
830
861
utimes = (st .st_atime , st .st_mtime ))
0 commit comments