@@ -333,6 +333,111 @@ def failed(self) -> bool:
333333 return self .proc .returncode != 0
334334
335335
336+ class _PopenHandler :
337+ """
338+ Handles the creation ``subprocess.Popen`` for each replica process with
339+ configuration extracted from ``ReplicaParam``.
340+ """
341+
342+ def __init__ (self ) -> None :
343+ pass
344+
345+ def _get_file_io (self , file : Optional [str ]) -> Optional [io .FileIO ]:
346+ """
347+ Given a file name, opens the file for write and returns the IO.
348+ If no file name is given, then returns ``None``
349+ Raises a ``FileExistsError`` if the file is already present.
350+ """
351+
352+ if not file :
353+ return None
354+
355+ if os .path .isfile (file ):
356+ raise FileExistsError (
357+ f"log file: { file } already exists,"
358+ f" specify a different log_dir, app_name, or remove the file and retry"
359+ )
360+
361+ os .makedirs (os .path .dirname (file ), exist_ok = True )
362+ return io .open (file , mode = "wb" , buffering = 0 )
363+
364+ def _popen (
365+ self ,
366+ role_name : RoleName ,
367+ replica_id : int ,
368+ replica_params : ReplicaParam ,
369+ ) -> _LocalReplica :
370+ """
371+ Same as ``subprocess.Popen(**popen_kwargs)`` but is able to take ``stdout`` and ``stderr``
372+ as file name ``str`` rather than a file-like obj.
373+ """
374+
375+ stdout_ , stderr_ , combined_ = self ._get_replica_output_handles (replica_params )
376+
377+ args_pfmt = pprint .pformat (asdict (replica_params ), indent = 2 , width = 80 )
378+ log .debug (f"Running { role_name } (replica { replica_id } ):\n { args_pfmt } " )
379+ env = self ._get_replica_env (replica_params )
380+
381+ proc = subprocess .Popen (
382+ args = replica_params .args ,
383+ env = env ,
384+ stdout = stdout_ ,
385+ stderr = stderr_ ,
386+ start_new_session = True ,
387+ cwd = replica_params .cwd ,
388+ )
389+ return _LocalReplica (
390+ role_name ,
391+ replica_id ,
392+ proc ,
393+ stdout = stdout_ ,
394+ stderr = stderr_ ,
395+ combined = combined_ ,
396+ error_file = env .get ("TORCHELASTIC_ERROR_FILE" , "<N/A>" ),
397+ )
398+
399+ def _get_replica_env (
400+ self ,
401+ replica_params : ReplicaParam ,
402+ ) -> Dict [str , str ]:
403+ """
404+ Returns environment variables for the ``_LocalReplica``
405+ """
406+
407+ # inherit parent's env vars since 99.9% of the time we want this behavior
408+ # just make sure we override the parent's env vars with the user_defined ones
409+ env = os .environ .copy ()
410+ env .update (replica_params .env )
411+ # PATH is a special one, instead of overriding, append
412+ env ["PATH" ] = _join_PATH (replica_params .env .get ("PATH" ), os .getenv ("PATH" ))
413+
414+ # default to unbuffered python for faster responsiveness locally
415+ env .setdefault ("PYTHONUNBUFFERED" , "x" )
416+
417+ return env
418+
419+ def _get_replica_output_handles (
420+ self ,
421+ replica_params : ReplicaParam ,
422+ ) -> Tuple [Optional [io .FileIO ], Optional [io .FileIO ], Optional [Tee ]]:
423+ """
424+ Returns the stdout, stderr, and combined outputs of the replica.
425+ If the combined output file is not specified, then the combined output is ``None``.
426+ """
427+
428+ stdout_ = self ._get_file_io (replica_params .stdout )
429+ stderr_ = self ._get_file_io (replica_params .stderr )
430+ combined_ : Optional [Tee ] = None
431+ combined_file = self ._get_file_io (replica_params .combined )
432+ if combined_file :
433+ combined_ = Tee (
434+ combined_file ,
435+ none_throws (replica_params .stdout ),
436+ none_throws (replica_params .stderr ),
437+ )
438+ return stdout_ , stderr_ , combined_
439+
440+
336441class _LocalAppDef :
337442 """
338443 Container object used by ``LocalhostScheduler`` to group the pids that
@@ -600,6 +705,8 @@ def __init__(
600705 self ._cache_size = cache_size
601706 _register_termination_signals ()
602707
708+ self ._popen_handler = _PopenHandler ()
709+
603710 self ._extra_paths : List [str ] = extra_paths or []
604711
605712 # sets lazily on submit or dryrun based on log_dir cfg
@@ -660,101 +767,6 @@ def _evict_lru(self) -> bool:
660767 log .debug (f"no apps evicted, all { len (self ._apps )} apps are running" )
661768 return False
662769
663- def _get_file_io (self , file : Optional [str ]) -> Optional [io .FileIO ]:
664- """
665- Given a file name, opens the file for write and returns the IO.
666- If no file name is given, then returns ``None``
667- Raises a ``FileExistsError`` if the file is already present.
668- """
669-
670- if not file :
671- return None
672-
673- if os .path .isfile (file ):
674- raise FileExistsError (
675- f"log file: { file } already exists,"
676- f" specify a different log_dir, app_name, or remove the file and retry"
677- )
678-
679- os .makedirs (os .path .dirname (file ), exist_ok = True )
680- return io .open (file , mode = "wb" , buffering = 0 )
681-
682- def _popen (
683- self ,
684- role_name : RoleName ,
685- replica_id : int ,
686- replica_params : ReplicaParam ,
687- ) -> _LocalReplica :
688- """
689- Same as ``subprocess.Popen(**popen_kwargs)`` but is able to take ``stdout`` and ``stderr``
690- as file name ``str`` rather than a file-like obj.
691- """
692-
693- stdout_ , stderr_ , combined_ = self ._get_replica_output_handles (replica_params )
694-
695- args_pfmt = pprint .pformat (asdict (replica_params ), indent = 2 , width = 80 )
696- log .debug (f"Running { role_name } (replica { replica_id } ):\n { args_pfmt } " )
697- env = self ._get_replica_env (replica_params )
698-
699- proc = subprocess .Popen (
700- args = replica_params .args ,
701- env = env ,
702- stdout = stdout_ ,
703- stderr = stderr_ ,
704- start_new_session = True ,
705- cwd = replica_params .cwd ,
706- )
707- return _LocalReplica (
708- role_name ,
709- replica_id ,
710- proc ,
711- stdout = stdout_ ,
712- stderr = stderr_ ,
713- combined = combined_ ,
714- error_file = env .get ("TORCHELASTIC_ERROR_FILE" , "<N/A>" ),
715- )
716-
717- def _get_replica_output_handles (
718- self ,
719- replica_params : ReplicaParam ,
720- ) -> Tuple [Optional [io .FileIO ], Optional [io .FileIO ], Optional [Tee ]]:
721- """
722- Returns the stdout, stderr, and combined outputs of the replica.
723- If the combined output file is not specified, then the combined output is ``None``.
724- """
725-
726- stdout_ = self ._get_file_io (replica_params .stdout )
727- stderr_ = self ._get_file_io (replica_params .stderr )
728- combined_ : Optional [Tee ] = None
729- combined_file = self ._get_file_io (replica_params .combined )
730- if combined_file :
731- combined_ = Tee (
732- combined_file ,
733- none_throws (replica_params .stdout ),
734- none_throws (replica_params .stderr ),
735- )
736- return stdout_ , stderr_ , combined_
737-
738- def _get_replica_env (
739- self ,
740- replica_params : ReplicaParam ,
741- ) -> Dict [str , str ]:
742- """
743- Returns environment variables for the ``_LocalReplica``
744- """
745-
746- # inherit parent's env vars since 99.9% of the time we want this behavior
747- # just make sure we override the parent's env vars with the user_defined ones
748- env = os .environ .copy ()
749- env .update (replica_params .env )
750- # PATH is a special one, instead of overriding, append
751- env ["PATH" ] = _join_PATH (replica_params .env .get ("PATH" ), os .getenv ("PATH" ))
752-
753- # default to unbuffered python for faster responsiveness locally
754- env .setdefault ("PYTHONUNBUFFERED" , "x" )
755-
756- return env
757-
758770 def _get_app_log_dir (self , app_id : str , cfg : LocalOpts ) -> str :
759771 """
760772 Returns the log dir. We redirect stdout/err
@@ -802,7 +814,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[PopenRequest]) -> str:
802814 replica_log_dir = role_log_dirs [replica_id ]
803815
804816 os .makedirs (replica_log_dir )
805- replica = self ._popen (
817+ replica = self ._popen_handler . _popen (
806818 role_name ,
807819 replica_id ,
808820 replica_params ,
0 commit comments