In [4]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

bucket = sagemaker.Session().default_bucket()
region = boto3.session.Session().region_name

role = get_execution_role()

Read in the raw data from a public S3 bucket

In [5]:
import pandas as pd

input_data = 's3://sagemaker-sample-data-{}/processing/census/census-income.csv'.format(region)
df = pd.read_csv(input_data, nrows=10)
df.to_csv('dataset1.csv')
df.to_csv('dataset2.csv')
#df.head(n=10)

In [6]:
s3_uri_prefix = 's3://roymark-aws-ml/processing-job-files'

In [7]:
prefix = 'processing-job-files'

In [8]:
s3_uri_prefix = f's3://{bucket}/{prefix}'

In [9]:
!aws s3 cp dataset1.csv $s3_uri_prefix/

upload: ./dataset1.csv to s3://sagemaker-us-east-1-355151823911/processing-job-files/dataset1.csv


In [10]:
!aws s3 cp dataset2.csv $s3_uri_prefix/

upload: ./dataset2.csv to s3://sagemaker-us-east-1-355151823911/processing-job-files/dataset2.csv


In [72]:
%%writefile pyspark_script.py
import pyspark
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType
from pyspark.sql.functions import desc, dense_rank
from pyspark.sql import SparkSession, DataFrame
from  argparse import Namespace, ArgumentParser
from pyspark.sql.window import Window
import argparse
import logging
import boto3
from botocore.config import Config 
import time
import sys
import os
import glob
from pyspark import SparkFiles
import numpy as np

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--num_processes", type=int, default=1)
    parser.add_argument("--s3_uri_prefix", type=str)
    parser.add_argument("--feature_group_name", type=str)

    args, _ = parser.parse_known_args()
    return args

def ingest_df_to_fg(feature_group_name, rows, columns):
    rows = list(rows)
    total_rows = len(rows)
    if total_rows == 0:
        print('\nEMPTY df')
        return 0

    print(f'*** ingesting {total_rows} rows to {feature_group_name}')

    latencies = []

    session = boto3.session.Session()
    runtime = session.client(service_name='sagemaker-featurestore-runtime',
                                 config=Config(
                                     retries = {
                                         'max_attempts': 10,
                                         'mode': 'standard'
                                     }
                                 )
                            )

    start_time = time.time()
    i = 0

    for index, row in enumerate(rows):
        record = [{"FeatureName": column, "ValueAsString": str(row[column])} \
                   for column in row.__fields__ if row[column] != None]

        one_start_time = time.time()
        resp = runtime.put_record(FeatureGroupName=feature_group_name, Record=record)
        one_end_time = time.time()

        latencies.append(one_end_time - one_start_time)
        if not resp['ResponseMetadata']['HTTPStatusCode'] == 200:
            raise (f'PutRecord failed: {resp}')

        if (i % 1000) == 0:
            avg_of_each_latency = int(np.mean(latencies) * 1000)
            pct = (i / total_rows) * 100
            if i == 0:
                stage = 'COLD START'
            else:
                stage = 'INTERIM avg'

            multiplier = 1
            seconds_so_far = time.time() - start_time
            if (seconds_so_far == 0) or (pct == 0):
                mins_remaining = 99999
            else:
                multiplier = 100 / pct
                total_mins = (seconds_so_far/60) * multiplier
                mins_remaining = total_mins - (seconds_so_far/60)
            print(f'{stage} latency {avg_of_each_latency:3d} ms,' + \
                  f' with {pct:4.1f}% complete [{i}/{total_rows}, Remaining: {mins_remaining:1.1f} mins]')
#             print(f'   {(seconds_so_far/60):1.1f} mins so far, {multiplier:1.1f} multiplier')
        i += 1

    avg_of_each_latency = int(np.mean(latencies) * 1000)
    print(f'...avg latency from individual latencies: {avg_of_each_latency:3d}')

    total_put_latency = time.time() - start_time
    avg_put_latency_ms = int((total_put_latency / total_rows) * 1000)
    return avg_put_latency_ms

def ingest(fg_name, rows):
    rows_list = list(rows)
    print(f'*** ingesting {len(rows_list)} rows to {fg_name}')
    
    with open(SparkFiles.get("tmp.txt")) as testFile:
        fileVal = int(testFile.readline())
        print(f'**** read value: {fileVal}')
    
def run_spark_job():
    spark = SparkSession.builder.appName('PySparkJob').getOrCreate()
    args = parse_args()
    
    print(f'*** Starting processing job, received the following input files: \n{args.s3_uri_prefix}')
    
    df = spark.read.options(Header=True).csv(args.s3_uri_prefix)
    
    print('*** Files to ingest have the following schema:')
    df.printSchema()
    
    columns = ['tid','datetime','cc_num','amount','fraud_label']
    df.foreachPartition(lambda rows: ingest_df_to_fg(args.feature_group_name, rows, columns))

if __name__ == '__main__':
    run_spark_job()
    print('Finished running processing job')

Overwriting pyspark_script.py


In [65]:
s3_uri_prefix = 's3://roymark-ohio/feature-store/raw-one-day' #smoke-test' #by-day'
feature_group_name = 'trans-both-fg'

In [66]:
!aws s3 cp tmp.txt s3://roymark-ohio/feature-store/tmp.txt

upload: ./tmp.txt to s3://roymark-ohio/feature-store/tmp.txt   


In [None]:
import boto3
import sagemaker
from sagemaker import get_execution_role

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.spark.processing import PySparkProcessor
import boto3

region = boto3.session.Session().region_name

role = get_execution_role()

pyspark_processor = PySparkProcessor(framework_version='2.4', # spark version
                                     role=role,
                                     instance_type='ml.m4.4xlarge',
                                     instance_count=2,
                                     base_job_name='sm-proc-pyspark',
                                     env={'AWS_DEFAULT_REGION': boto3.Session().region_name,
                                          'mode': 'python'},
                                     max_runtime_in_seconds=3600)

# configuration = [{
#   "Classification": "spark-defaults",
#   "Properties": {"spark.driver.memoryOverhead": "768m", 
#                  "spark.executor.memory": "2g",
#                  "spark.driver.memory": "4096m", 
#                  "spark.executor.cores":"60"},
# }]
# configuration=configuration,

pyspark_processor.run(submit_app='pyspark_script.py', 
                      submit_files=['s3://roymark-ohio/feature-store/tmp.txt'],
                    arguments = ['--feature_group_name', feature_group_name,
                                 '--s3_uri_prefix', s3_uri_prefix],
                    spark_event_logs_s3_uri=f's3://{bucket}/spark-logs',
                    logs=True)


Job Name:  sm-proc-pyspark-2021-05-12-19-06-28-404
Inputs:  [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-355151823911/sm-proc-pyspark-2021-05-12-19-06-28-404/input/code/pyspark_script.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-355151823911/spark-logs', 'LocalPath': '/opt/ml/processing/spark-events/', 'S3UploadMode': 'Continuous'}}]
.............................[35m05-12 19:11 smspark.cli  INFO     Parsing arguments. argv: ['/usr/local/bin/smspark-submit', '--files', 's3://roymark-ohio/feature-store/tmp.txt', '--local-spark-event-logs-dir', '/opt/ml/processing/spark-events/', '/opt/ml/processing/input/code/pyspark_script.py', '--feature_group_name', 'trans-both-fg', '--s3_uri_prefix', 's3://roymark-ohio/