# Spark processing with SageMaker Studio and local IDE

In [None]:
!pip install sagemaker-studio-image-build

In [None]:
# Adds processing registry login to the build template:
# - $(aws ecr get-login --no-include-email --region $AWS_DEFAULT_REGION --registry-ids 571004829621)

!cp /opt/conda/lib/python3.7/site-packages/sagemaker_studio_image_build/data/buildspec.template.yml /opt/conda/lib/python3.7/site-packages/sagemaker_studio_image_build/data/buildspec.template.yml~
!patch -p0 /opt/conda/lib/python3.7/site-packages/sagemaker_studio_image_build/data/buildspec.template.yml < ./spark-processing-image/sm_docker_ecr.patch

In [None]:
!cat /opt/conda/lib/python3.7/site-packages/sagemaker_studio_image_build/data/buildspec.template.yml

In [None]:
!cd ./spark-processing-image; sm-docker build . --repository sagemaker-studio-spark-glue:latest

In [None]:
!aws sagemaker create-image --image-name sagemaker-studio-spark-glue --role-arn arn:aws:iam::054035656282:role/service-role/AmazonSageMaker-ExecutionRole-20220215T072743

In [None]:
!aws sagemaker create-image-version --image-name sagemaker-studio-spark-glue --base-image 054035656282.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-studio-spark-glue:latest

In [None]:
!aws sagemaker describe-image-version --image-name sagemaker-studio-spark-glue

In [None]:
!aws sagemaker create-app-image-config --cli-input-json file://spark-processing-image/app-image-config-input.json

In [None]:
!aws sagemaker update-app-image-config --app-image-config-name sagemaker-studio-spark-glue-config --cli-input-json file://spark-processing-image/app-image-config-input.json

In [None]:
!aws sagemaker update-domain --cli-input-json file://spark-processing-image/update-domain-input.json

In [None]:
!aws sagemaker describe-app-image-config --app-image-config-name sagemaker-studio-spark-glue-config

In [None]:
!aws sagemaker list-images

At this point you should select Python3 (sagemaker-studio-spark-glue) kernel.

In [2]:
import sys
sys.path.append('/usr/lib/spark/python/lib/pyspark.zip')
sys.path.append('/usr/lib/spark/python/lib/py4j-0.10.9-src.zip')

## pyspark_estimate_pi.py

Code sample is adopted and modified from https://docs.aws.amazon.com/code-samples/latest/catalog/python-emr-pyspark_estimate_pi.py.html

In [2]:
%%writefile pyspark_estimate_pi.py
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Purpose

Shows how to write a script that calculates pi by using a large number of random
numbers run in parallel on an Amazon EMR cluster. This script is intended to be
uploaded to an Amazon S3 bucket so it can be run as a job step.
"""

import argparse
import logging
from operator import add
import random

import pyspark
from pyspark.sql import SparkSession

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')


def calculate_pi(spark, partitions, tries, output_uri):
    """
    Calculates pi by testing a large number of random numbers against a unit circle
    inscribed inside a square. The trials are partitioned so they can be run in
    parallel on cluster instances.

    :param partitions: The number of partitions to use for the calculation.
    :param output_uri: The URI where the output is written, typically an Amazon S3
                       bucket, such as 's3://example-bucket/pi-calc'.
    """
    def calculate_hit(_):
        x = random.randint(0, 256) / 256.0 * 2 - 1
        y = random.randint(0, 256) / 256.0 * 2 - 1
        return 1 if x ** 2 + y ** 2 < 1 else 0

    logger.info(
        "Calculating pi with a total of %s tries in %s partitions.", tries, partitions)
    
    hits = spark.sparkContext.parallelize(range(tries), partitions)\
        .map(calculate_hit)\
        .reduce(add)
    pi = 4.0 * hits / tries
    logger.info("%s tries and %s hits gives pi estimate of %s.", tries, hits, pi)
    df = spark.createDataFrame(
        [(tries, hits, pi)], ['tries', 'hits', 'pi'])
    return df


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--partitions', default=2, type=int,
        help="The number of parallel partitions to use when calculating pi.")
    parser.add_argument(
        '--tries', default=0, type=int,
        help="The number of tries.")
    args, _ = parser.parse_known_args()
    
    if args.tries > 0:
        spark = SparkSession.builder.appName("My PyPi")\
            .config("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")\
            .config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")\
            .config("hadoop.hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")\
            .enableHiveSupport()\
            .getOrCreate()
        
        # Test Glue Data Catalog
        spark.sql('show databases').show()
        
        # Test S3 access
        df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Books")
        df.first()
        
        calculate_pi(spark, args.partitions, args.tries, None)

Overwriting pyspark_estimate_pi.py


First let's make sure the script is working locally in the notebook instance.

In [3]:
%run -i pyspark_estimate_pi.py  # load calculate_pi() without running

In [4]:
import os
os.environ['SPARK_LOCAL_IP'] = '127.0.0.1'

In [5]:
import pyspark
from pyspark.sql import SparkSession

# Note: for Glue catalog to work Hive needs patching, see:
#   https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore#patching-apache-hive-and-installing-it-locally
# Also Glue jars need to be installed from emr-apps repo (package aws-hm-client)

conf = pyspark.SparkConf()\
    .set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.0.3")\
    .set("spark.jars", "/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar")\
    .set("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")\
    .set("hadoop.hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")\
    .set("spark.sql.catalogImplementation", "hive")\
    .set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")\
    .set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

sc = pyspark.SparkContext("local[4]", "My PyPi", conf=conf)
spark = SparkSession(sc)

sc

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-54137fa3-0809-4a31-88f2-c276f80f015d;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.0.3 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.271 in central
:: resolution report :: resolve 313ms :: artifacts dl 16ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.271 from central in [default]
	org.apache.hadoop#hadoop-aws;3.0.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------

In [6]:
spark.sql('show databases').show()

+-------------+
|    namespace|
+-------------+
|      default|
|glue-database|
+-------------+



In [7]:
%%time
df = calculate_pi(spark, 4, 10_000_000, None)

INFO: Calculating pi with a total of 10000000 tries in 4 partitions.
INFO: 10000000 tries and 7787180 hits gives pi estimate of 3.114872.            


CPU times: user 501 ms, sys: 44.6 ms, total: 546 ms
Wall time: 34.2 s


In [8]:
df.toPandas()

Unnamed: 0,tries,hits,pi
0,10000000,7787180,3.114872


In [7]:
%%time
%%sh

# NOTE: for some reason --conf doesn't override fs.s3a.aws.credentials.provider
#   when passed as a command line option, so we set it in code 

spark-submit \
    --conf "spark.jars=/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar" \
    --conf "spark.jars.packages=org.apache.hadoop:hadoop-aws:3.0.3" \
    --conf "spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" \
    --conf "hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" \
    --conf "hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" \
    ./pyspark_estimate_pi.py --partitions 4 --tries 10000000

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3ba0876e-0c07-4538-bd84-f82663f4365e;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.0.3 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.271 in central
:: resolution report :: resolve 331ms :: artifacts dl 15ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.271 from central in [default]
	org.apache.hadoop#hadoop-aws;3.0.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------

+-------------+
|    namespace|
+-------------+
|      default|
|glue-database|
+-------------+



2022-07-22 09:40:25,934 INFO Configuration.deprecation: fs.s3a.server-side-encryption-key is deprecated. Instead, use fs.s3a.server-side-encryption.key
2022-07-22 09:40:29,302 INFO datasources.InMemoryFileIndex: It took 873 ms to list leaf files for 1 paths.
2022-07-22 09:40:30,130 INFO spark.SparkContext: Starting job: parquet at NativeMethodAccessorImpl.java:0
2022-07-22 09:40:30,164 INFO scheduler.DAGScheduler: Got job 0 (parquet at NativeMethodAccessorImpl.java:0) with 1 output partitions
2022-07-22 09:40:30,171 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (parquet at NativeMethodAccessorImpl.java:0)
2022-07-22 09:40:30,176 INFO scheduler.DAGScheduler: Parents of final stage: List()
2022-07-22 09:40:30,182 INFO scheduler.DAGScheduler: Missing parents: List()
2022-07-22 09:40:30,205 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at parquet at NativeMethodAccessorImpl.java:0), which has no missing parents
2022-07-22 09:40:30,323 INFO memory.Memo

INFO: Calculating pi with a total of 10000000 tries in 4 partitions.


2022-07-22 09:40:39,277 INFO spark.SparkContext: Starting job: reduce at /root/plexperiments-p-led4ckjkoxzx/sagemaker-plexperiments-modelbuild/./pyspark_estimate_pi.py:42
2022-07-22 09:40:39,279 INFO scheduler.DAGScheduler: Got job 2 (reduce at /root/plexperiments-p-led4ckjkoxzx/sagemaker-plexperiments-modelbuild/./pyspark_estimate_pi.py:42) with 4 output partitions
2022-07-22 09:40:39,279 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (reduce at /root/plexperiments-p-led4ckjkoxzx/sagemaker-plexperiments-modelbuild/./pyspark_estimate_pi.py:42)
2022-07-22 09:40:39,279 INFO scheduler.DAGScheduler: Parents of final stage: List()
2022-07-22 09:40:39,280 INFO scheduler.DAGScheduler: Missing parents: List()
2022-07-22 09:40:39,281 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (PythonRDD[7] at reduce at /root/plexperiments-p-led4ckjkoxzx/sagemaker-plexperiments-modelbuild/./pyspark_estimate_pi.py:42), which has no missing parents
2022-07-22 09:40:39,285 INFO memory.MemoryStor

INFO: 10000000 tries and 7788020 hits gives pi estimate of 3.115208.


2022-07-22 09:41:10,003 INFO spark.SparkContext: Invoking stop() from shutdown hook
2022-07-22 09:41:10,012 INFO server.AbstractConnector: Stopped Spark@5bffd05d{HTTP/1.1,[http/1.1]}{127.0.0.1:4041}
2022-07-22 09:41:10,014 INFO ui.SparkUI: Stopped Spark web UI at http://localhost:4041
2022-07-22 09:41:10,043 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
2022-07-22 09:41:10,066 INFO memory.MemoryStore: MemoryStore cleared
2022-07-22 09:41:10,067 INFO storage.BlockManager: BlockManager stopped
2022-07-22 09:41:10,084 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
2022-07-22 09:41:10,091 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
2022-07-22 09:41:10,111 INFO spark.SparkContext: Successfully stopped SparkContext
2022-07-22 09:41:10,111 INFO util.ShutdownHookManager: Shutdown hook called
2022-07-22 09:41:10,112 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a50f17d3-

CPU times: user 307 ms, sys: 54 ms, total: 361 ms
Wall time: 1min 1s


Now we're ready to sumbit the bigger processing jobs for 200M records. If you're running the notebook on your local machine, replace the placeholder `<<YOUR_IAM_ROLE_ARN_FOR_SAGEMAKER_HERE>>` with the IAM role ARN. 

In [None]:
import sagemaker
import os

role = sagemaker.get_execution_role() \
    if os.path.exists('/opt/ml/metadata/resource-metadata.json') \
    else "<<YOUR_IAM_ROLE_ARN_FOR_SAGEMAKER_HERE>>"

In [None]:
%%time
from sagemaker.spark.processing import PySparkProcessor

spark_processor = PySparkProcessor(
    base_job_name="sm-spark-pi",
    framework_version="3.0",
    role=role,
    instance_count=4,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1800,
)

spark_processor.run(
    submit_app="pyspark_estimate_pi.py",
    arguments=[
        "--partitions", "16",
        "--tries", "200_000_000"
    ],
    logs=True
)

Finally, you can test your script in AWS Glue, which is a serverless data integration service with extra useful ETL capabilities like visualization of Spark-specific metrics in AWS Console and the catalog of data sets, but more loosely integrated with capabilities of SageMaker like SageMaker Pipelines and MLOps and gives less choice in the processing instance types.

Make sure that your IAM role also has the trust relationship with Glue.

In [None]:
input_source = spark_processor.latest_job.inputs[0].source
input_source

In [None]:
%%sh -s "$input_source" "$role"

aws glue create-job \
    --name "Glue Evaluate Pi" \
    --role $2 \
    --command '{"Name": "glueetl", "ScriptLocation": "'$1'", "PythonVersion": "3"}' \
    --max-retries 0 \
    --timeout 30 \
    --worker-type "G.1X" \
    --number-of-workers 2 \
    --glue-version "3.0" \
    --default-arguments '{"--partitions": "1", "--tries": "200_000_000", "--job-language": "python", "--class": "GlueApp", "--enable-metrics": "true", "--enable-glue-datacatalog": "true", "--enable-job-insights": "true", "--enable-spark-ui": "true", "--enable-continuous-cloudwatch-log": "true"}'

In [None]:
!aws glue start-job-run --job-name "Glue Evaluate Pi"

In [None]:
!aws glue delete-job --job-name "Glue Evaluate Pi"