# SageMaker Aqua Processing Container

In [None]:
import pandas as pd
import boto3
import os
import csv 
import re

## First test your access to your desired data target 

In [None]:
#get the data for a local test before starting the full job -
s3_client = boto3.client('s3')

#open the parquet as a df
notes_df = pd.read_parquet('your data')
data = notes_df

data

### Next if needed create a test set of data to run a local code test 

In [None]:
df = data[:4000]
df.to_parquet('test_data.parquet')

### Test your code processing a single file locally
### because running a processing job takes >5 minutes to launch before seeing results

### Install your targets

In [None]:
#install needed libs 
! pip install -U spacy
! pip install scispacy
! pip install https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.5.0/en_core_sci_sm-0.5.0.tar.gz

# Build central code base 
Here we set up a NER model from sci spacy to run in a multi processed format -


* 1 we bring in the data as a target file 
* 2 we set up a method where the model will process the data and give us output
* 3 we break apart the data into smaller chunks for multiprocessing for this I have chosen 4
* 4 we set up a multi process loop to run our broken up data 
* 5 we set up a process mananger to manage our data output paramiters so we can have it properly interact with our multiple threads 
* 6 we set up a join loop to make sure our main thread waits for it to complete 
* 7 we covert the output manager list into a regular list 
* 8 we convert the list of dicts into a dataframe and set it up to go to our save method where we save all the data 

In [None]:
""" Code Summary
This code is an example template for running a model on data sets in paralele using multiprcessing 
the example uses a sci spacy NER model but can be leveraged on any model 
"""

# View path of site-packages
import sys
import os
import pandas as pd 
import spacy
import boto3
import time
import multiprocessing as mp
import scispacy
import en_core_sci_sm   #The model we are going to use
from spacy import displacy
from scispacy.abbreviation import AbbreviationDetector
from scispacy.umls_linking import UmlsEntityLinker
from IPython.display import display, clear_output
from multiprocessing import Process, Manager
sys.path



def main(input_parquet: str)-> None:
    """_summary_
    Main method where multiprocess is set up
    call method to run model in process via loop

    Args:
        input_parquet (str): _description_ path to file
    """
    
    df = pd.read_parquet(input_parquet)     
    #chunk data into segments
    chunk_count = 4
    size = int(len(df) / chunk_count)
    n = size
    chunks = [df[i:i+n] for i in range(0,df.shape[0],n)]
    start_time = time.time()
    output = manager.list()
    
    processes =[]
    for c in chunks:
        #run_all_data(c)
        p = mp.Process(target=run_all_data, args = [c, output])
        p.start()
        processes.append(p) 
        
    for p in processes:
        p.join()
        
    print("--- %s seconds ---" % (time.time() - start_time))
    final_list = list(output)
    result = pd.DataFrame(final_list)
    save_data(result, input_parquet, 'test_data_')
    


def run_all_data(df_small: pd.DataFrame, output: list)-> None:
    """_summary_
    instantitaes model and runs data through it sending results
    to list passed in as paramiter

    Args:
        df_small (pd.DataFrame): _description_ data frame of data
        output (list): _description_ multi process manager list 
    """
    nlp = spacy.load("en_core_sci_sm") 
    
    for i, row in enumerate(df_small.itertuples(), 1):
        print(i)
        clear_output(wait=True)
        text = str(row.cleaned_notes).lower()
        doc = nlp(text)
        med_data = doc.ents   
        output.append({'med data': str(med_data)})
    


def save_data(new_df: pd.DataFrame , input_parquet: str, name: str)-> bool or None:
    """_summary_
    Save the data to s3 

    Args:
        new_df (pd.DataFrame): _description_ data to save
        input_parquet (str): _description_ file name data came from
        name (str): _description_ tag to add to saved file name 

    Returns:
        bool | None: _description_ notifies of success or failure
    """
    try:
        #name the processed file
        processed_file_name = name +'_{}'.format(input_parquet)
        #write it to parquet
        name = '/'+os.path.basename(processed_file_name)
        res = new_df.to_parquet(name)

        #let the user know it was successfully processed
        print("finished processing{}".format(input_parquet))

        s3_client = boto3.client("s3")
        s3_client.upload_file(name, 'your bucket', 'your folder'.format(name))
        print("The final complete output can be downloaded from: "+ ' your path ')
        print()
        return True, None
    
    except Exception as e:
        print(e)
        return False, e

    
if __name__ == '__main__':
    manager = Manager()
    main('test_data.parquet')

## Test Output 
From here test the outputs of your run make sure it looks good 

In [None]:
#check the ouput format after to make sure it is correct -
s3_client = boto3.client('s3')

#download a parquet file
#s3://vh-sagemaker/users/matt-cintron/aqua-mirador-test/predictions/test_data__part-00000-0d26bf94-832f-4470-bc77-84f99015e8ef-c000.snappy.parquet_processed.parquet
s3_client.download_file('vh-sagemaker', 'users/matt-cintron/aqua-mirador-test/multi_process_test/test_data__test_data.parquet',
                        'test_data__test_data.parquet')


#open the parquet as a df
notes_df = pd.read_parquet('test_data__test_data.parquet')
data = notes_df

data

## Build full process code file
here we build the full process code file that our processing job will actually run 

In [None]:
%%writefile preprocess.py

""" Code Summary
This code is an example template for running a model on data sets in paralele using multiprcessing 
the example uses a sci spacy NER model but can be leveraged on any model 
"""

import sys
import os
import boto3
import subprocess
import glob

subprocess.check_call([sys.executable, "-m", "pip", "install", 'spacy'])
subprocess.check_call([sys.executable, "-m", "pip", "install", 'scispacy'])
subprocess.check_call([sys.executable, "-m", "pip", "install", 'https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.5.0/en_core_sci_sm-0.5.0.tar.gz'])


sys.path
import pandas as pd 
import spacy
import time
import multiprocessing as mp
import scispacy
import en_core_sci_sm   #The model we are going to use
from spacy import displacy
from scispacy.abbreviation import AbbreviationDetector
from scispacy.umls_linking import UmlsEntityLinker
from IPython.display import display, clear_output
from multiprocessing import Process, Manager



def main(input_parquet: str)-> None:
    """_summary_
    Main method where multiprocess is set up
    call method to run model in process via loop

    Args:
        input_parquet (str): _description_ path to file
    """
    try:
        manager = Manager()          
        df = pd.read_parquet(input_parquet)     
        #chunk data into segments
        chunk_count = 4
        size = int(len(df) / chunk_count)
        n = size
        chunks = [df[i:i+n] for i in range(0,df.shape[0],n)]
        start_time = time.time()
        output = manager.list()

        processes =[]
        for c in chunks:
            #run_all_data(c)
            p = mp.Process(target=run_all_data, args = [c, output])
            p.start()
            processes.append(p) 

        for p in processes:
            p.join()

        print("--- %s seconds ---" % (time.time() - start_time))
        final_list = list(output)
        result = pd.DataFrame(final_list)
        save_data(result, input_parquet, 'test_data_')
        
        return True, None
    
    except Exception as e:
        print(e)
        return False, e
    
    
def run_all_data(df_small: pd.DataFrame, output: list)-> None:
    """_summary_
    instantitaes model and runs data through it sending results
    to list passed in as paramiter

    Args:
        df_small (pd.DataFrame): _description_ data frame of data
        output (list): _description_ multi process manager list 
    """
    nlp = spacy.load("en_core_sci_sm") 
    
    for i, row in enumerate(df_small.itertuples(), 1):
        text = str(row.cleaned_notes).lower()
        doc = nlp(text)
        med_data = doc.ents   
        output.append({'med data': str(med_data)})



def save_data(new_df: pd.DataFrame , input_parquet: str, name: str)-> bool or None:
    """_summary_
    Save the data to s3 

    Args:
        new_df (pd.DataFrame): _description_ data to save
        input_parquet (str): _description_ file name data came from
        name (str): _description_ tag to add to saved file name 

    Returns:
        bool | None: _description_ notifies of success or failure
    """
    try:  
        #name the processed file
        processed_file_name = '_{}'.format(input_parquet)
        #write it to parquet
        name = '/'+ext+os.path.basename(processed_file_name)
        res = new_df.to_parquet(name)

        #let the user know it was successfully processed
        print("finished processing{}".format(input_parquet))
        
        s3_client = boto3.client("s3")
        s3_client.upload_file(name, 'your bucket', 'your folder'.format(name))
        print("The final complete output can be downloaded from: "+ ' your path ')
        print()
        return True, None
    
    except Exception as e:
        print(e)
        return False, e
    

if __name__ == "__main__":

    #get a list of the input files that are copied onto the instance in the input folder
    print("The files we found were:")
    print("\n")
    files = glob.glob("/opt/ml/processing/input_data/*.parquet")
    print(files)
    # your total number of files will be input_files_number / instance_count
    for index, file in enumerate(files):
        
        #run our main function
        main(file)
        print("successfully procesed", file)

## Build out the processing Job 
Next we build out the processing job be sure to make note of the input and output sections
these need to be set to your desired locations

* Note - You also need to make sure the process code has the location output right in its save data method - 

also set the number of instances and the instance type you are using for this processing job 

In [None]:
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role

#name your job name
base_job_name='test-job-AQUA-Mirador-matt-c'

script_processor = ScriptProcessor(command=['python3'],
                image_uri="763104351884.dkr.ecr.us-east-1.amazonaws.com/huggingface-pytorch-trcomp-training:1.9.0-transformers4.11.0-gpu-py38-cu111-ubuntu20.04",
                role=get_execution_role(),
                base_job_name=base_job_name,
                instance_type='ml.g4dn.xlarge',
                #instance_type='ml.r5.8xlarge',
                instance_count=1)


script_processor.run(code='preprocess.py',
                    inputs=[ProcessingInput(
                        #s3://vh-sagemaker/users/matt-cintron/aqua-mirador-test/test_1/
                        source='s3://vh-sagemaker/users/matt-cintron/Test_multiprocess',
                        destination='/opt/ml/processing/input_data',
                        s3_data_distribution_type='ShardedByS3Key',
                        s3_data_type='S3Prefix'),
                        ],
                    outputs=[ProcessingOutput(
                        source='/opt/ml/processing/processed_data',
                        #s3://vh-sagemaker/users/matt-cintron/aqua-mirador-test/predictions/
                        destination='s3://vh-sagemaker/users/matt-cintron/aqua-mirador-test/multi_process_test',
                        s3_upload_mode="EndOfJob")],
                    )

## Final Test
check the output data after the job is complete make sure your files 
are formated right and all data was processed corectly and with that you are all set 

In [None]:
#check the ouput format after to make sure it is correct -
s3_client = boto3.client('s3')

#open the parquet as a df
notes_df = pd.read_parquet('your file')
data = notes_df

data