# Prepare the processing code

Below is a code placeholder to test that dependencies were properly installed in our customer Spark container.

In [17]:
%%writefile preprocess.py
import pyspark
import pandas as pd


def main():
    print("Printing version of pre-installed packages")
    print(pd.__version__)
    print(pyspark.__version__)


if __name__ == "__main__":
    main()

Overwriting preprocess.py


# Create custom Spark container

This container extends default Sagemaker Spark container and install popular packages such as `pandas` and `pyspark`.

In [28]:
import sagemaker

sm_session = sagemaker.Session()
region = sm_session.boto_region_name
account = sm_session.account_id()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()


In [24]:
! pygmentize Dockerfile

[34mFROM[39;49;00m [33m173754725891.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-processing:2.4-cpu-py37-v1.2[39;49;00m

[34mRUN[39;49;00m yum install -y epel-release
[34mRUN[39;49;00m yum install -y python-pip
[34mRUN[39;49;00m pip install pandas pyspark

[34mARG[39;49;00m [31mcode_dir[39;49;00m=/opt/ml/code
[34mRUN[39;49;00m mkdir -p [31m$code_dir[39;49;00m
[34mCOPY[39;49;00m preprocess.py [31m$code_dir[39;49;00m/preprocess.py
[34mWORKDIR[39;49;00m[33m $code_dir[39;49;00m

[34mENTRYPOINT[39;49;00m [[33m"python"[39;49;00m,[33m"preprocess.py"[39;49;00m]


In [25]:
# loging to Sagemaker ECR with Deep Learning Containers
!aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin 173754725891.dkr.ecr.{region}.amazonaws.com
# loging to your private ECR
!aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin 553020858742.dkr.ecr.{region}.amazonaws.com

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded
https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded


In [None]:
! ./build_and_push.sh custom-pyspark latest Dockerfile

# Run Processing Job on Sagemaker

In [29]:
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

prefix = 'sagemaker/spark-preprocess-demo/' + timestamp_prefix
input_prefix = prefix + '/input/raw/test'
input_preprocessed_prefix = prefix + '/input/preprocessed/test'
model_prefix = prefix + '/model'

In [None]:

from sagemaker.processing import ScriptProcessor, ProcessingInput
from sagemaker.processing import Processor

spark_processor = Processor(base_job_name='spark-preprocessor',
                                  image_uri="553020858742.dkr.ecr.us-east-1.amazonaws.com/custom-pyspark:latest",
                                  role=role,
                                  instance_count=2,
                                  instance_type='ml.r5.xlarge',
                                  max_runtime_in_seconds=1200,
                                  env={'mode': 'python'}
                                 )

spark_processor.run(
                    arguments=['s3_input_bucket', bucket,
                               's3_input_key_prefix', input_prefix,
                               's3_output_bucket', bucket,
                               's3_output_key_prefix', input_preprocessed_prefix],
                    logs=False)


Job Name:  spark-preprocessor-2021-02-08-17-25-36-448
Inputs:  []
Outputs:  []
......................................................