In [1]:
!pip install -q awscli

In [52]:
import os
import pandas as pd

In [29]:
%%writefile main.nf
#!/usr/bin/env nextflow

/* pipeline input parameters, update this to your data dir */
dicom_data = "$baseDir/path_to_local_dicom_files/*.dcm"

process inference {
    
    label 'inference'
    
    input:
    path dicom_files
    
    output:
    stdout emit: perform_inference_log
    path('*.csv'), emit: csv_files
    
    script:
    """
    #!/usr/bin/env python3

    import pandas as pd
    import os
    import argparse
    from dicom_csv import join_tree
    
    dicom_input = '$dicom_files'
    inference_results_csv = f'inference_results_{dicom_input}.csv'

    def main(dicom_input, inference_results_csv):
        metadata_df = join_tree('.', verbose=2)
        dicom_metadata_df = metadata_df.loc[metadata_df['PixelRepresentation'].notnull()]
        dicom_metadata_df.drop_duplicates(inplace=True)
        inference_results = pd.DataFrame({'Results': dicom_metadata_df['BodyPartExamined'] == 'CHEST'})
        return inference_results.to_csv(inference_results_csv)

    if __name__ == '__main__':
        main(dicom_input, inference_results_csv)    
    """

}


// Define the entry workflow (initial workflow for Nextflow to run)
workflow {
    // def dicom_files = Channel.fromPath(dicom_data).buffer(size: 2)
    dicom_files = Channel.fromPath(dicom_data)
    inference(dicom_files)
}

Overwriting main.nf


In [30]:
%%writefile nextflow.config

process {
    withLabel: inference {
        executor = 'awsbatch'
        queue = 'placeholder'
        container = 'public.ecr.aws/l5b8a5z6/nextflow-approved:batch_poc2'
    } 
}

aws {
    region = 'us-east-1'
    batch {
        cliPath = '/home/ec2-user/miniconda/bin/aws'
        jobRole = 'placeholder'
    }
}
workDir = 'placeholder'


docker.enabled = true

Overwriting nextflow.config


In [31]:
!nextflow run main.nf -dsl2

N E X T F L O W  ~  version 22.10.6
Launching `main.nf` [tender_kalam] DSL2 - revision: 05d4d0b9ff
Downloading plugin nf-amazon@1.11.3
[-        ] process > inference -[K
[2A
[-        ] process > inference [  0%] 0 of 2[K
[2A
[-        ] process > inference [  0%] 0 of 4[K
[2A
[-        ] process > inference [  0%] 0 of 5[K
[2A
executor >  awsbatch (5)[K
[26/ab25e0] process > inference (5) [  0%] 0 of 5[K
[3A
executor >  awsbatch (5)[K
[26/ab25e0] process > inference (5) [  0%] 0 of 5[K
[3A
executor >  awsbatch (5)[K
[26/ab25e0] process > inference (5) [  0%] 0 of 5[K
[3A
executor >  awsbatch (5)[K
[fc/a648ba] process > inference (1) [ 40%] 2 of 5[K
[3A
executor >  awsbatch (5)[K
[26/ab25e0] process > inference (5) [ 60%] 3 of 5[K
[3A
executor >  awsbatch (5)[K
[39/d6fbfc] process > inference (4) [ 80%] 4 of 5[K
[3A
executor >  awsbatch (5)[K
[f7/05506c] process > inference (2) [100%] 5 of 5 ✔[K
[32;1mCompleted at: 21-Dec-2023 19:24:43
Duration    : 3m 1s


In [90]:
# Get batch endpoints
end_points = []
with open(".nextflow.log", 'r') as f:
    for line in f:
        if "COMPLETED" in line:
            end_points.append(line.split(' ')[-1][:-2])

In [91]:
# Download inference results from each batch session
for i in range(len(end_points)):
    command = f'aws s3 cp {end_points[i]}/ ./place_holder/ --recursive --exclude "*" --include "inference_results_*" --quiet'
    os.system(command)

In [92]:
# Combine inference results from each batch session and remove the temporary files
files = os.listdir('place_holder/')
results_df = pd.DataFrame()
for file in files:
    if file[-3:] == 'csv':
        label = file.split('_')[-1].split('.')[0]
        temp_df = pd.read_csv('place_holder/' + file)
        temp_df.drop('Unnamed: 0', axis=1, inplace=True)
        temp_df['Label'] = label
        results_df = pd.concat([results_df, temp_df])

results_df.to_csv('midrc_batch_inference_results.csv', index=False)
os.system('rm -r place_holder')
results_df

Unnamed: 0,Results,Label
0,True,1-034
0,True,1-53
0,True,1-008
0,True,1-273
0,True,1-0163
