In [1]:
"""
Run MeasureModuleEfficiency.py in parallel using concurrent.futures

Be really careful with this. 
"""

'\nRun MeasureModuleEfficiency.py in parallel using concurrent.futures\n\nBe really careful with this. \n'

In [2]:
# import sys
# from datetime import datetime
import pandas as pd
from concurrent.futures import ProcessPoolExecutor, as_completed
from mu2etools import read_data as rd 
import numpy as np
import subprocess

In [3]:
# # Generate a timestamped log filename
# timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# logName = f"../Logs/output_{timestamp}.log"
# # Original standard output state
# print("Log file is", logName)

In [4]:
# 8 cores is safe, 16 cores works, 24 starts causing exceptions 
def Multiprocess(processFunction, fileList_, recon, particles_, PEs_, layers_, triggerModes_, quiet, resub, max_workers=16):
    print("\n---> Starting multiprocessing...\n")
    
    completedFiles = 0
    totalFiles = len(fileList_)
    
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Prepare a list of futures and map them to file names
        futures_ = {}

        if not resub: 
            futures_ = {executor.submit(processFunction, fileName, recon, particles_, PEs_, layers_, triggerModes_, quiet, resub): fileName for fileName in fileList_}
        else:
            failedJobsFile = "../Txt/MDC2020ae/FailedJobs/failures.csv"
            failedJobs_ = pd.read_csv(failedJobsFile) #[:5]
            failedFileList = [] 
            for fileName in fileList_:
                finTag = fileName.split('.')[-2] 
                if finTag not in list(failedJobs_["Tag"]):
                    continue
                else:
                    thisFailedJob = failedJobs_[failedJobs_["Tag"] == finTag]
                    PEs_ = list(thisFailedJob["PEs"]) 
                    particles_ = list(thisFailedJob["Particle"])  
                    layers_ = list(thisFailedJob["Layer"])  
                    triggerModes_ = list(thisFailedJob["Trigger"]) 
                    
                future = executor.submit(processFunction, fileName, recon, particles_, PEs_, layers_, triggerModes_, quiet, resub)
                futures_[future] = fileName    
                
        # Process results as they complete
        for future in as_completed(futures_):
            fileName = futures_[future]  # Get the file name associated with this future
            try:
                future.result()  # Retrieve the result
                completedFiles += 1
                percentComplete = (completedFiles / totalFiles) * 100
                print(f'\n---> {fileName} processed successfully! ({percentComplete:.1f}% complete)')
            except Exception as exc:
                print(f'\n---> {fileName} encountered an exception: {exc}')

    print("\n---> Multiprocessing completed!")
    return

In [5]:
def processFunction(fileName, recon, particles_, PEs_, layers_, triggerModes_, quiet, resub):
    try:
        # Convert list arguments to string representations for passing via the command line
        particles_str = ','.join(particles_)
        PEs_str = ','.join(map(str, PEs_))
        layers_str = ','.join(map(str, layers_))
        triggerModes_str = ','.join(triggerModes_)

        # Path to the Python executable in your Conda environment
        python_executable = "/home/sgrant/.conda/envs/mu2e_env/bin/python" 
        
        # Call the script MeasureModuleEfficiency.py using subprocess
        subprocess.run(
            [
                python_executable, '/home/sgrant/CRVSim/Analyses/PyMacros/MeasureModuleEfficiency.py',
                fileName, recon, particles_str, PEs_str, layers_str, triggerModes_str, str(quiet), str(resub)
            ],
            check=True
        )
    except subprocess.CalledProcessError as exc:
        print(f'---> Exception!\n{exc}')
    return

In [6]:
#########################################################
defname = "nts.sgrant.CosmicCRYExtractedCatTriggered.MDC2020ae_best_v1_3.root"
recon = "MDC2020ae"
particles_ = ["all", "muons", "non_muons"]
layers_ = [2, 3] 
PEs_ = np.arange(10, 135, 5)
triggerModes_ = ["crv_2layers_trigger", "crv_3layers_trigger", "trk_crv_2layers_trigger", "trk_crv_3layers_trigger"] # ["crv_trigger", "trk_trigger", "trk_crv_trigger", "trk_crv2_trigger", "trk_crv3_trigger", "trk_crv2_2layers_trigger", "trk_crv2_3layers_trigger"]
quiet = True
resub = False

In [7]:
#########################################################
# Testing
# defname = "nts.sgrant.CosmicCRYExtractedCatTriggered.MDC2020ae_best_v1_3.root"
# recon = "MDC2020ae"
# particles_ = ["all"] #, "muons", "non_muons"]
# layers_ = [3] # , 3] 
# PEs_ = [10] #np.arange(10, 135, 5)
# triggerModes_ = ["trk_crv2_3layers_trigger"] 
# quiet = False

In [8]:
# fileList_ = rd.get_file_list(defname) #[:2]
# print(f"---> Got {len(fileList_)} files.")
# vomsCert not working 
# Use 
# ../Txt/FileLists/MDC2020aeOnExpData.txt

fileList_ = [] 
# Open the file and read line by line
with open("../Txt/FileLists/MDC2020aeOnExpData.txt", 'r') as file:
    fileList_= file.readlines()
# remove newline characters from each line
fileList_ = [line.strip() for line in fileList_]
# print(fileList_)

In [9]:
import itertools 
combinations = list(itertools.product(particles_, layers_, PEs_, triggerModes_))
print(f"{len(fileList_)} files")
print(f"{len(combinations)} configurations / file")
print(f"{len(combinations)*len(fileList_)} tasks")
print(f"---> Run time ~ {len(combinations)*50/3600:.2f} hours")

96 files
600 configurations / file
57600 tasks
---> Run time ~ 8.33 hours


In [10]:
import sys
from datetime import datetime

In [11]:
# Generate a timestamped log filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
logName = f"../Logs/output_{timestamp}.log"
# Original standard output state
print("Log file is", logName)

Log file is ../Logs/output_20240918_201628.log


In [None]:
original_stdout = sys.stdout

# Open the log file
with open(logName, "w") as log:
    # Redirect stdout to the file
    sys.stdout = log
    try:
        # Run the jobs
        # print("test")
        Multiprocess(processFunction
                     , fileList_=fileList_
                     , recon=recon
                     , particles_=particles_
                     , PEs_=PEs_
                     , layers_=layers_
                     , triggerModes_=triggerModes_
                     , quiet=quiet
                     , resub=resub)
    finally:
        # Reset stdout to its original state
        sys.stdout = original_stdout