## 1. Set up

In [137]:
import boto3
import sagemaker
import pandas as pd
from time import gmtime, strftime

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name
print(bucket)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker-us-east-1-477886989750


In [211]:
num_files=1000
num_instances=10
num_spark_instances=9
num_spark_cores=9

# 2. Generate Data

In [212]:
!mkdir data

mkdir: cannot create directory ‘data’: File exists


In [190]:
#generate a method that generates 10 csv files each of which has two numbers on each line. The files have an ascending name. The files are saved in the directory preprocessed_data
import random
def generate_data():
    for i in range(0,num_files):
        with open(f'data/file{i}.csv', 'w') as f:
                f.write(f'{random.randint(1,100)}\n{random.randint(1,100)}')
    return True

In [191]:
generate_data()

True

In [213]:
input_prefix = "sagemaker/spark-preprocess-demo/input/raw/data".format(timestamp_prefix)
output_prefix = "sagemaker/spark-preprocess-demo/output".format(timestamp_prefix)

In [193]:
#generate a method that takes all the files from the data local folder and saves them in the S3 bucket with name bucket under the prefixe
def upload_data():
    s3_client = boto3.client('s3')
    for i in range(0,num_files):
        s3_client.upload_file(f'data/file{i}.csv', bucket, f'{input_prefix}/file{i}.csv')
    return "s3://{}/{}/".format(bucket, input_prefix)

In [194]:
data_s3_uri=upload_data()
print (data_s3_uri)
s3_output_uri= f"s3://{bucket}/{output_prefix}"
print(s3_output_uri)

s3://sagemaker-us-east-1-477886989750/sagemaker/spark-preprocess-demo/input/raw/data/
s3://sagemaker-us-east-1-477886989750/sagemaker/spark-preprocess-demo/output


## 3. Write the Processing Script

In [130]:
!mkdir code

mkdir: cannot create directory ‘code’: File exists


In [215]:
%%writefile ./code/preprocess.py
from __future__ import print_function
from __future__ import unicode_literals

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as _sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row

def transform(spark, s3_input_data,s3_output_train_data):
    print('Processing {} => {}'.format(s3_input_data, s3_output_train_data))

    rdd = spark.sparkContext.wholeTextFiles(s3_input_data, 9)
    sum_rdd = rdd.map(lambda x: sum(int(y) for y in x[1].split("\n")))

    row = Row("sum")
    df = sum_rdd.map(row).toDF()  
    df.show()

    print('Saving to output file {}'.format(s3_output_train_data))
    df.write.format('csv').option('header','true').save(f'{s3_output_train_data}/output.csv',mode='overwrite')

    print('Wrote to output file:  {}'.format(s3_output_train_data))



def main():
    spark = SparkSession.builder.appName("pyspark-demo").getOrCreate()

    args_iter = iter(sys.argv[1:])
    args = dict(zip(args_iter, args_iter))
    print(args.keys())
    # Retrieve the args and replace 's3://' with 's3a://'
    s3_input_data = args['s3_input_data'].replace('s3://', 's3a://')
    print(s3_input_data)

    s3_output_data = args['s3_output_data'].replace('s3://', 's3a://')
    print(s3_output_data)
    
    transform(spark,s3_input_data, s3_output_data)
    

if __name__ == "__main__":
    main()
    

Overwriting ./code/preprocess.py


## 4. Run the Processing with Amazon SageMaker

In [216]:
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.processing import ProcessingOutput

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.3",
    role=role,
    instance_count=num_instances, #pyspark_process_instance_count
    instance_type="ml.m5.4xlarge", #pyspark_process_instance_type
    max_runtime_in_seconds=2400
)


configuration = [
    {
        "Classification": "spark-defaults",
        "Properties":{
            "spark.executor.memory":"10g",
            "spark.executor.memoryOverhead":"5g",
            "spark.driver.memory":"10g",
            "spark.driver.memoryOverhead":"10g",
            "spark.driver.maxResultSize":"10g",
            "spark.executor.cores":num_spark_cores,
            "spark.executor.instances":num_spark_instances,
            "spark.yarn.maxAppAttempts":1
        }
    }
]

# configuration = [
#     {
#         "Classification": "spark-defaults",
#         "Properties": {"spark.executor.memory": "2g", "spark.executor.cores": "1"},
#     }
# ]

spark_processor.run(
    submit_app="./code/preprocess.py", #pyspark_process_code
    spark_event_logs_s3_uri="s3://{}/sagemaker/spark-preprocess-demo/spark_event_logs".format(bucket),
    arguments=[
        "s3_input_data", data_s3_uri,
        "s3_output_data", s3_output_uri
    ],
    configuration=configuration,
    outputs=[
                       ProcessingOutput(s3_upload_mode='EndOfJob',
                                        output_name='process-job',
                                        source='/opt/ml/processing/output')
              ],          
    logs=True,
    wait=False
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


INFO:sagemaker:Creating processing-job with name sm-spark-2024-01-22-16-31-48-998


ResourceLimitExceeded: An error occurred (ResourceLimitExceeded) when calling the CreateProcessingJob operation: The account-level service limit 'ml.m5.4xlarge for processing job usage' is 10 Instances, with current utilization of 0 Instances and a request delta of 11 Instances. Please use AWS Service Quotas to request an increase for this quota. If AWS Service Quotas is not available, contact AWS support to request an increase for this quota.

In [206]:
spark_processing_job_name = spark_processor.jobs[-1].describe()['ProcessingJobName']

In [207]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/processing-jobs/{}">Processing Job</a></b>'.format(region, spark_processing_job_name)))

  from IPython.core.display import display, HTML


In [208]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a> After About 5 Minutes</b>'.format(region, spark_processing_job_name)))


  from IPython.core.display import display, HTML


In [210]:
from IPython.core.display import display, HTML

# This is different than the job name because we are not using ProcessingOutput's in this Spark ML case.
spark_processing_job_s3_output_prefix = output_prefix

display(HTML('<b>Review <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}/?region={}&bucketType=general&prefix={}">S3 Output Data</a> After The Processing Job Has Completed</b>'.format(bucket,region, output_prefix)))


  from IPython.core.display import display, HTML
