# Large Scale Distributed Seismic Processing

In this notebook we will work with a much larger dataset to push the limits a bit.  Here we have a 180GB SEGY file containing raw pre-stack seismic data sitting in an S3 bucket.  Lets take a look at this dataset and then do the same mean amplitude calculations, using only a basic T2.Medium EC2 instance.

### Import packages

In [None]:
import sys
import time
import json
import boto3
import struct
import pickle
import botocore
import array as arr
import numpy as np
import matplotlib.pyplot as plt
from struct import Struct

### Functions

We will reuse the SEGY loading functions from before, so lets define them all in one go.

In [None]:
def DecodeTextHeader(text_header_raw):
    text_header = text_header_raw.decode('cp500')
    text_header = text_header.replace("C 1 ", "\nC 1 ")
    text_header = text_header.replace("C 2 ", "\nC 2 ")
    text_header = text_header.replace("C 3 ", "\nC 3 ")
    text_header = text_header.replace("C 4 ", "\nC 4 ")
    text_header = text_header.replace("C 5 ", "\nC 5 ")
    text_header = text_header.replace("C 6 ", "\nC 6 ")
    text_header = text_header.replace("C 7 ", "\nC 7 ")
    text_header = text_header.replace("C 8 ", "\nC 8 ")
    text_header = text_header.replace("C 9 ", "\nC 9 ")
    text_header = text_header.replace("C10 ", "\nC10 ")
    text_header = text_header.replace("C11 ", "\nC11 ")
    text_header = text_header.replace("C12 ", "\nC12 ")
    text_header = text_header.replace("C13 ", "\nC13 ")
    text_header = text_header.replace("C14 ", "\nC14 ")
    text_header = text_header.replace("C15 ", "\nC15 ")
    text_header = text_header.replace("C16 ", "\nC16 ")
    text_header = text_header.replace("C17 ", "\nC17 ")
    text_header = text_header.replace("C18 ", "\nC18 ")
    text_header = text_header.replace("C19 ", "\nC19 ")
    text_header = text_header.replace("C20 ", "\nC20 ")
    text_header = text_header.replace("C21 ", "\nC21 ")
    text_header = text_header.replace("C22 ", "\nC22 ")
    text_header = text_header.replace("C23 ", "\nC23 ")
    text_header = text_header.replace("C24 ", "\nC24 ")
    text_header = text_header.replace("C25 ", "\nC25 ")
    text_header = text_header.replace("C26 ", "\nC26 ")
    text_header = text_header.replace("C27 ", "\nC27 ")
    text_header = text_header.replace("C28 ", "\nC28 ")
    text_header = text_header.replace("C29 ", "\nC29 ")
    text_header = text_header.replace("C30 ", "\nC30 ")
    text_header = text_header.replace("C31 ", "\nC31 ")
    text_header = text_header.replace("C32 ", "\nC32 ")
    text_header = text_header.replace("C33 ", "\nC33 ")
    text_header = text_header.replace("C34 ", "\nC34 ")
    text_header = text_header.replace("C35 ", "\nC35 ")
    text_header = text_header.replace("C36 ", "\nC36 ")
    text_header = text_header.replace("C37 ", "\nC37 ")
    text_header = text_header.replace("C38 ", "\nC38 ")
    text_header = text_header.replace("C39 ", "\nC39 ")
    text_header = text_header.replace("C40 ", "\nC40 ")
    
    return text_header


def DecodeBinHeader(bin_header_raw):
    bin_header = {}

    bin_header['job_id']                  = int.from_bytes(bin_header_raw[0:4], byteorder='big', signed=False)
    bin_header['line_no']                 = int.from_bytes(bin_header_raw[4:8], byteorder='big', signed=False)
    bin_header['reel_no']                 = int.from_bytes(bin_header_raw[8:12], byteorder='big', signed=False)
    bin_header['data_traces']             = int.from_bytes(bin_header_raw[12:14], byteorder='big', signed=False)
    bin_header['aux_traces']              = int.from_bytes(bin_header_raw[14:16], byteorder='big', signed=False)
    bin_header['sample_interval']         = int.from_bytes(bin_header_raw[16:18], byteorder='big', signed=False)
    bin_header['sample_interval_orig']    = int.from_bytes(bin_header_raw[18:20], byteorder='big', signed=False)
    bin_header['samples_per_trace']       = int.from_bytes(bin_header_raw[20:22], byteorder='big', signed=False)
    bin_header['samples_per_trace_orig']  = int.from_bytes(bin_header_raw[22:24], byteorder='big', signed=False)
    bin_header['data_sample_format']      = int.from_bytes(bin_header_raw[24:26], byteorder='big', signed=False)
    bin_header['ensemble_fold']           = int.from_bytes(bin_header_raw[26:28], byteorder='big', signed=False)
    bin_header['trace_sorting']           = int.from_bytes(bin_header_raw[28:30], byteorder='big', signed=False)
    bin_header['vert_sum_code']           = int.from_bytes(bin_header_raw[30:32], byteorder='big', signed=False)
    bin_header['sweep_hz_start']          = int.from_bytes(bin_header_raw[32:34], byteorder='big', signed=False)
    bin_header['sweep_hz_end']            = int.from_bytes(bin_header_raw[34:36], byteorder='big', signed=False)
    bin_header['sweep_length']            = int.from_bytes(bin_header_raw[36:38], byteorder='big', signed=False)
    bin_header['sweep_type']              = int.from_bytes(bin_header_raw[38:40], byteorder='big', signed=False)
    bin_header['sweep_trace_ch']          = int.from_bytes(bin_header_raw[40:42], byteorder='big', signed=False)
    bin_header['sweep_trace_taper_start'] = int.from_bytes(bin_header_raw[42:44], byteorder='big', signed=False)
    bin_header['sweep_trace_taper_end']   = int.from_bytes(bin_header_raw[44:46], byteorder='big', signed=False)
    bin_header['taper_type']              = int.from_bytes(bin_header_raw[46:48], byteorder='big', signed=False)
    bin_header['data_traces_correlated']  = int.from_bytes(bin_header_raw[48:50], byteorder='big', signed=False)
    bin_header['binary_gain_recovered']   = int.from_bytes(bin_header_raw[50:52], byteorder='big', signed=False)
    bin_header['amp_recovery_method']     = int.from_bytes(bin_header_raw[52:54], byteorder='big', signed=False)
    bin_header['measurement_system']      = int.from_bytes(bin_header_raw[54:56], byteorder='big', signed=False)
    bin_header['impulse_sig_polarity']    = int.from_bytes(bin_header_raw[56:58], byteorder='big', signed=False)
    bin_header['vib_polarity']            = int.from_bytes(bin_header_raw[58:60], byteorder='big', signed=False)
    bin_header['unassigned']              = int.from_bytes(bin_header_raw[60:300], byteorder='big', signed=False)
    bin_header['segy_format']             = int.from_bytes(bin_header_raw[300:302], byteorder='big', signed=False)
    bin_header['fixed_length_flag']       = int.from_bytes(bin_header_raw[302:304], byteorder='big', signed=False)
    bin_header['extended_text_header_no'] = int.from_bytes(bin_header_raw[304:306], byteorder='big', signed=False)
    bin_header['unassigned2']             = int.from_bytes(bin_header_raw[306:400], byteorder='big', signed=False)
    
    return bin_header


def PrintBinHeader(bin_header):
    print("job_id                  = ", bin_header['job_id']                 )
    print("line_no                 = ", bin_header['line_no']                )
    print("reel_no                 = ", bin_header['reel_no']                )
    print("data_traces             = ", bin_header['data_traces']            )
    print("aux_traces              = ", bin_header['aux_traces']             )
    print("sample_interval         = ", bin_header['sample_interval']        )
    print("sample_interval_orig    = ", bin_header['sample_interval_orig']   )
    print("samples_per_trace       = ", bin_header['samples_per_trace']      )
    print("samples_per_trace_orig  = ", bin_header['samples_per_trace_orig'] )
    print("data_sample_format      = ", bin_header['data_sample_format']     )
    print("ensemble_fold           = ", bin_header['ensemble_fold']          )
    print("trace_sorting           = ", bin_header['trace_sorting']          )
    print("vert_sum_code           = ", bin_header['vert_sum_code']          )
    print("sweep_hz_start          = ", bin_header['sweep_hz_start']         )
    print("sweep_hz_end            = ", bin_header['sweep_hz_end']           )
    print("sweep_length            = ", bin_header['sweep_length']           )
    print("sweep_type              = ", bin_header['sweep_type']             )
    print("sweep_trace_ch          = ", bin_header['sweep_trace_ch']         )
    print("sweep_trace_taper_start = ", bin_header['sweep_trace_taper_start'])
    print("sweep_trace_taper_end   = ", bin_header['sweep_trace_taper_end']  )
    print("taper_type              = ", bin_header['taper_type']             )
    print("data_traces_correlated  = ", bin_header['data_traces_correlated'] )
    print("binary_gain_recovered   = ", bin_header['binary_gain_recovered']  )
    print("amp_recovery_method     = ", bin_header['amp_recovery_method']    )
    print("measurement_system      = ", bin_header['measurement_system']     )
    print("impulse_sig_polarity    = ", bin_header['impulse_sig_polarity']   )
    print("vib_polarity            = ", bin_header['vib_polarity']           )
    print("unassigned              = ", bin_header['unassigned']             )
    print("segy_format             = ", bin(bin_header['segy_format'])[2:]   )    
    print("fixed_length_flag       = ", bin_header['fixed_length_flag']      )
    print("extended_text_header_no = ", bin_header['extended_text_header_no'])
    print("unassigned2             = ", bin_header['unassigned2']            )
    
    
def DecodeTraceHeader(trace_header_raw):
    trace_header = {}
    trace_header['trace_seq_no_all']            = int.from_bytes(trace_header_raw[0:4], byteorder='big', signed=False)
    trace_header['trace_seq_no_file']           = int.from_bytes(trace_header_raw[4:8], byteorder='big', signed=False)
    trace_header['field_record_no_orig']        = int.from_bytes(trace_header_raw[8:12], byteorder='big', signed=False)
    trace_header['trace_no_field_orig']         = int.from_bytes(trace_header_raw[12:16], byteorder='big', signed=False)
    trace_header['energy_source_point_no']      = int.from_bytes(trace_header_raw[16:20], byteorder='big', signed=False)
    trace_header['ensemble_no']                 = int.from_bytes(trace_header_raw[20:24], byteorder='big', signed=False)
    trace_header['ensemble_trace_no']           = int.from_bytes(trace_header_raw[24:28], byteorder='big', signed=False)
    trace_header['trace_id']                    = int.from_bytes(trace_header_raw[28:30], byteorder='big', signed=False)
    trace_header['sum_vertical_traces']         = int.from_bytes(trace_header_raw[30:32], byteorder='big', signed=False)
    trace_header['sum_horizontal_traces']       = int.from_bytes(trace_header_raw[32:34], byteorder='big', signed=False)
    trace_header['data_use']                    = int.from_bytes(trace_header_raw[34:36], byteorder='big', signed=False)
    trace_header['distance_from_source_center'] = int.from_bytes(trace_header_raw[36:40], byteorder='big', signed=False)
    # ... incomplete
    trace_header['group_x']                     = int.from_bytes(trace_header_raw[80:84], byteorder='big', signed=False)
    trace_header['group_y']                     = int.from_bytes(trace_header_raw[84:88], byteorder='big', signed=False)
    trace_header['coord_units']                 = int.from_bytes(trace_header_raw[88:90], byteorder='big', signed=False)
    trace_header['trace_samples']               = int.from_bytes(trace_header_raw[114:116], byteorder='big', signed=False)
    trace_header['sample_interval']             = int.from_bytes(trace_header_raw[116:118], byteorder='big', signed=False)
    trace_header['gain_type']                   = int.from_bytes(trace_header_raw[118:120], byteorder='big', signed=False)
    trace_header['CDP_X']                       = int.from_bytes(trace_header_raw[180:184], byteorder='big', signed=False)
    trace_header['CDP_Y']                       = int.from_bytes(trace_header_raw[184:188], byteorder='big', signed=False)
    trace_header['inline']                      = int.from_bytes(trace_header_raw[188:192], byteorder='big', signed=False)
    trace_header['xline']                       = int.from_bytes(trace_header_raw[192:196], byteorder='big', signed=False)
    trace_header['trace_unit']                  = int.from_bytes(trace_header_raw[202:204], byteorder='big', signed=False) 
    trace_header['inline_custom']               = int.from_bytes(trace_header_raw[232:236], byteorder='big', signed=False)
    trace_header['xline_custom']                = int.from_bytes(trace_header_raw[236:240], byteorder='big', signed=False)

    return trace_header
    

def PrintTraceHeaders(trace_header):
    print("trace_seq_no_all            = ", trace_header['trace_seq_no_all'])
    print("trace_seq_no_file           = ", trace_header['trace_seq_no_file'])
    print("field_record_no_orig        = ", trace_header['field_record_no_orig'])
    print("trace_no_field_orig         = ", trace_header['trace_no_field_orig'])
    print("energy_source_point_no      = ", trace_header['energy_source_point_no'])
    print("ensemble_no                 = ", trace_header['ensemble_no'])
    print("ensemble_trace_no           = ", trace_header['ensemble_trace_no'])
    print("trace_id                    = ", trace_header['trace_id'])
    print("sum_vertical_traces         = ", trace_header['sum_vertical_traces'])
    print("sum_horizontal_traces       = ", trace_header['sum_horizontal_traces'])
    print("data_use                    = ", trace_header['data_use'])
    print("distance_from_source_center = ", trace_header['distance_from_source_center'])
    # ... incomplete
    print("group_x                     = ", trace_header['group_x'])
    print("group_y                     = ", trace_header['group_y'])
    print("coord_units                 = ", trace_header['coord_units'])
    print("trace_samples               = ", trace_header['trace_samples'])
    print("sample_interval             = ", trace_header['sample_interval'])
    print("gain_type                   = ", trace_header['gain_type'])
    print("CDP_X                       = ", trace_header['CDP_X'])          
    print("CDP_Y                       = ", trace_header['CDP_Y'])          
    print("inline                      = ", trace_header['inline'])         
    print("xline                       = ", trace_header['xline'])          
    print("trace_unit                  = ", trace_header['trace_unit'])     
    print("inline_custom               = ", trace_header['inline_custom'])         
    print("xline_custom                = ", trace_header['xline_custom'])          


### Load SEGY File

Lets load in the large 180GB SEGY file as a data stream.  Again, when you attempt this, adjust the variables to point towards your own seismic files residing in S3.

In [None]:
# Load in the 180GB raw pre-stack SEGY data

source_bucket   = 'equinor-volve-data-village'                  # S3 bucket name with input data
source_folder   = 'Seismic/ST10010/Raw_data/ST10010+NAV_MERGE'  # Folder path
source_filename = 'ST10010_1150780_40203.sgy'                   # Filename

s3 = boto3.resource('s3')
segy_obj = s3.Object(source_bucket, f"{source_folder}/{source_filename}")
segy_stream = segy_obj.get()['Body']

### Read Headers

Lets read the text and binary headers.  We will see that this file does not follow SEGY revision 1 standard as before and has custom trace header byte locations for some information.  We will not be using that data for this notebook.  If we did, we would need to adapt our function above to query the correct locations.

In [None]:
text_header_raw = segy_stream.read(3200)
bin_header_raw = segy_stream.read(400)

text_header = DecodeTextHeader(text_header_raw)

print("Text Header:")
print(text_header)

bin_header = DecodeBinHeader(bin_header_raw)
print("\nBinary Header:")
PrintBinHeader(bin_header)

### Trace Headers

Reading the first trace header.  Notice CDP X/Y and inline/xline values are not correct, as they are stored at another byte location, as per the text header.

In [None]:
trace_header_raw = segy_stream.read(240)

trace_header = DecodeTraceHeader(trace_header_raw)
PrintTraceHeaders(trace_header)

### Trace Data

Lets read in the first trace and plot it.  In this file the trace data is in IEEE 4-byte float, which is useable in Python.  We do not need to convert it, but only unpack it from the byte format.

This raw data looks very different than the processed data we saw before, as it does not have any amplitude gain applied.

In [None]:
trace_raw = segy_stream.read(trace_header['trace_samples']*4)

trace = []
for x in range(4, trace_header['trace_samples']*4+4, 4):
    trace.append(struct.unpack('>f', trace_raw[x-4:x])[0])

# Lets plot the trace
limits = np.amax(np.absolute(trace))
plt.figure(figsize=(5, 10))
plt.plot(trace, range(trace_header['trace_samples']), 'red')
plt.xlim(-limits, limits)
plt.ylim(trace_header['trace_samples'], 0)
plt.axvline(0, c='grey')


segy_stream.close()

### Process Data

Great, we are loading the data correctly.  Now to scale this out across multiple Lambdas.

First, for a benchmark, lets start processing here and see how long it would take on a single vCPU.

In [None]:
segy_stream = segy_obj.get()['Body']

text_header_raw = segy_stream.read(3200)
bin_header_raw = segy_stream.read(400)
text_header = DecodeTextHeader(text_header_raw)
bin_header = DecodeBinHeader(bin_header_raw)

In [None]:
trace_amp_mean = {}
start_time = time.time()

# Iterate through all the traces
while True:
    trace_header_raw = segy_stream.read(240)
    trace_header = DecodeTraceHeader(trace_header_raw)
    trace_raw = segy_stream.read(trace_header['trace_samples']*4)

    trace = []
    for x in range(4, trace_header['trace_samples']*4+4, 4):
        trace.append(struct.unpack('>f', trace_raw[x-4:x])[0])
    
    trace_amp_mean[trace_header['trace_seq_no_all']] = np.mean(np.absolute(trace))
    
    if trace_header['trace_seq_no_all']%1000 == 0:
        print("Trace #{} has a mean amplitude of {}, elapse time is {:.2f} seconds.".format(
                                        trace_header['trace_seq_no_all'], 
                                        trace_amp_mean[trace_header['trace_seq_no_all']], 
                                        time.time() - start_time))
        
    if trace_header['trace_seq_no_all'] > 5000:
        print("Stopping here.")
        break

segy_stream.close()

### Scale Out

So thats about 2-3 seconds per 1,000 traces.  Problem is that there are about 9.3 million traces in this file.  That would take about 6 hours.  Plus the entire raw dataset for the seismic cube contains many 180GB files.

We could scale up and use a 16xLarge instance (64 CPUs) and start threading the calculations.  This might take about 6 mins to process, but now we will have file bandwidth bottlenecks trying to read in the file into the EC2 instance.  It takes about 75 minutes to read through the file from S3 to this notebook.  A further downside is that you have to do the undiferentiated heavy lifting of provisioning the hardware resources for the notebook and starting/stopping them as needed to save costs.  The T3.16xLarge instance cost is about \\$3/hour, versus $0.03/hour for our T2.Medium.  Also, once your testing is done, all this code can be moved to a Lambda function to be call only as-needed.

So lets use Lambda again to process the file.  No need for provisioning resources and take advantage of S3's file transfer bandwidth.  With our current default limits, we can have 1000 concurrent Lambdas running, so we will use them all!  This limit can be increased if needed (and should, if doing seismic calculations like this).

Lets get things setup up below.  Adjust variables below to an S3 bucket you have access to.

In [None]:
lambda_name        = "SegyBatchProcessMeanAmp"             # Name of the Lambda function to invoke
results_bucket     = "vavourak-testing"                    # Bucket to use
mean_amp_folder    = "temp-trace-bundles-st10010-mean-amp" # Subfolder to place the calculation results
concurrent_lambdas = 1000                                  # Number of Lambdas to invoke

Now lets split up the SEGY file into byte ranges and invoke 1000 Lambda functions.

In [None]:
# Get S3 object
segy_obj = s3.Object(source_bucket, f"{source_folder}/{source_filename}")

# Define some needed variables based off the above parameters
start_time = time.time()
header_size = 3600
trace_header_size = 240
trace_size = bin_header['samples_per_trace'] * 4
trace_size_with_headers = trace_size + trace_header_size
filesize = segy_obj.content_length
trace_count = int((filesize - 3600) / trace_size_with_headers)
bundle_size = round(trace_count/concurrent_lambdas)
lambda_counter = 0

lambda_client = boto3.client('lambda')

results_file_list = [] # Lets keep track of the output file names, so we can grab them later

print(f"Total traces in file: {trace_count}")
print(f"Traces per Lambda for {concurrent_lambdas} concurrency (not rounded): {trace_count/concurrent_lambdas}")
print(f"Traces per Lambda, rounded up: {round(trace_count/concurrent_lambdas+0.5)}")

# Send the trace bundle information over to Lambda
for bundle in range(0, int(trace_count), bundle_size):
    lambda_counter = lambda_counter + 1
    bytes_start = bundle * trace_size_with_headers + header_size
    bytes_stop = (bundle + bundle_size) * trace_size_with_headers + header_size - 1
    print(f"Lambda #{lambda_counter}, bundled traces: {bundle}-{bundle+bundle_size}, bytes: {bytes_start}-{bytes_stop}.")
    
    # Build the message for the Lambda to find the files
    payload = {
        "bucket_in"          : source_bucket,
        "folder_in"          : source_folder,
        "filename_in"        : source_filename,
        "bucket_out"         : results_bucket,
        "folder_out"         : mean_amp_folder,
        "bytes_start"        : bytes_start,
        "bytes_stop"         : bytes_stop,
        "use_custom_lines"   : 1,
        "data_sample_format" : bin_header['data_sample_format']
    }

    # Invoke the Lambda SegyBatchProcessMeanAmp
    response = lambda_client.invoke(FunctionName=lambda_name,
                                    InvocationType='Event',
                                    Payload=json.dumps(payload))

    results_file_list.append(f"{mean_amp_folder}/{source_filename}.{bytes_start}-{bytes_stop}.pkl")
    
print("Done!  Elapse time to gather traces and send to Lambda: {:0.2f} seconds, now waiting a bit for processing to complete.".format(time.time() - start_time))

time.sleep(400)      # Waiting before carrying on next steps, to allow time for Lambda to finish.

With a 1000 Lambdas, taking roughly 330 seconds each, 60 seconds to start them, our total time is about 6.5 minutes.  The cost to perform this calculation on this 180GB file is about $0.67.  As it takes 60 seconds to invoke the Lambdas, there is some room for optimization here to invoke them faster.  Having more Lambda concurrency available will help further, as each one can process a smaller portion of the file.  

Our current architecture:


![title](images/Page-2.png)


As mentioned before, leveraging Step Functions would be beneficial here to monitor the status of the Lambdas and notify us once they are all complete to trigger the next steps automatically.  Furthermore, using EventBridge, the Step Function can automatically run when a new seismic file gets uploaded to the S3 bucket or if there is an external trigger:


![title](images/Page-3.png)


### Load Results
The Lambdas should be done by now. Lets load in the results from S3.

In [None]:
s3_client = boto3.client('s3')

traces = []
start_time = time.time()

# Iterate through the files
for x in range(0, len(results_file_list)):
    print("Reading file: ", results_file_list[x])
    
    # Get file from S3, convert from Pickle format
    object = s3_client.get_object(Bucket=results_bucket, Key=results_file_list[x])
    serializedObject = object['Body'].read()
    trace_bundle_temp = pickle.loads(serializedObject)
    
    for y in range(0, len(trace_bundle_temp)):
        traces.append(trace_bundle_temp[y])

print("Elapsed time: {:0.2f} seconds.".format(time.time() - start_time))


Plotting over 9 million traces will be taxing for Matplotlib.  Lets only show the max mean amplitude for each inline-xline location.  

In [None]:
traces_grouped = {}
traces_max = {}

# Go through every trace and make a dictionary with the values at each unique inline/xline location
counter = 0
for trace in traces:
    counter = counter + 1
    group = f"{trace[1]}-{trace[2]}"
    
    if group in traces_grouped:
        traces_grouped[str(group)].append(trace[0])
    else:
        traces_grouped[str(group)] = [trace[0]]

# Find the max amplitude at each location
for group in traces_grouped:
    traces_max[group] = np.amax(traces_grouped[group])

In [None]:
# Convert the dictionary back to arrays for easy consuption
traces_max_value = []
traces_max_inline = []
traces_max_xline = []

counter = 0
for group in traces_max:
    inline = int(group.split("-")[0])
    xline = int(group.split("-")[1])
    
    traces_max_inline.append(inline)
    traces_max_xline.append(xline)
    traces_max_value.append(traces_max[group])
    counter = counter + 1


Lets map out the mean amplitudes using a Matplotlib scatter plot.

In [None]:
plt.figure(figsize=(5, 10))
plt.scatter(traces_max_inline, traces_max_xline, c=traces_max_value)
plt.show()

### Conclusion
We have successfully performed a mathematical calculation on a 180GB seismic SEGY file without having to spin up or down a single computer.  All at the cost of $0.67 and 6.5 minutes of processing time.  This process is scalable and repeatable.  It can be expanded upon with AWS Step Functions to orchestrate multiple steps and SageMaker to perform ML inference on the data.  Lambda concurrency can even be increased from the default 1000 to hundreds of thousands, greatly increasing performance.  

Of course, this calculation is not at the complexity of Full Waveform Inversion or other computations that require high performance computing (HPC).  However, if those calculations can be adjusted to work with Lambda (up to 6 vCPU, 10GB RAM, 15 minute limit), a huge amount of undifferentiated heavy lifting can be removed and offer an alternative to the complixity of scheduling thousands of EC2 spot instances.  Cost differences would have to be explored too.

### Cleanup

Lets clean up the files we generated, so that we can delete the CloudFormation stack without issue.

In [None]:
for x in range(0, len(results_file_list)):
    _ = s3_client.delete_object(Bucket=results_bucket, Key=results_file_list[x])
    print("Deleting: {}".format(results_file_list[x]))