In [2]:
import boto3
import sagemaker
import pandas as pd
import os
import numpy as np
import json
import boto3
import string
import collections
import logging

import sagemaker
from sagemaker.amazon.amazon_estimator import get_image_uri

print(f"sagemaker:version:{sagemaker.__version__}")
print(f"Pandas:version:{pd.__version__}")
print(f"Numpy:version:{np.__version__}")
import logging
import warnings
warnings.filterwarnings('ignore')



sagemaker:version:2.83.0
Pandas:version:1.3.5
Numpy:version:1.21.4


In [3]:
DEF_BUCKET="sagemaker-crossaccnt-train"
PREFIX="data/finance/distrib-multi"

In [4]:
import boto3
import sagemaker


region = boto3.Session().region_name
role = sagemaker.get_execution_role()
sm_session = sagemaker.session.Session(default_bucket=DEF_BUCKET)
default_bucket = sm_session.default_bucket()
sm_client=sm_session.sagemaker_client
sm_session.default_bucket()

'sagemaker-crossaccnt-train'

## Generate Data
* Generate synthetic housing data -- with no Categorical columns for now -- 
    * SHARD is RANDOM INDEX and so everytime we run we get a new set of shard indexes
    * Generate Json File -- not included as yet
    * Generate Zip file - not included as yet

In [5]:
from random import choice
import numpy as np
import pandas as pd


NUM_HOUSES_PER_LOCATION = 1000
LOCATIONS = [
    "NewYork_NY",
    "LosAngeles_CA",
    "Chicago_IL",
    "Houston_TX",
    "Dallas_TX",
    "Phoenix_AZ",
    "Philadelphia_PA",
    "SanAntonio_TX",
    "SanDiego_CA",
    "SanFrancisco_CA",
]
MAX_YEAR = 2019


def generate_price(house):
    """Generate price based on features of the house"""

    if house["FRONT_PORCH"] == "y":
        garage = 1
    else:
        garage = 0

    if house["FRONT_PORCH"] == "y":
        front_porch = 1
    else:
        front_porch = 0

    price = int(
        150 * house["SQUARE_FEET"]
        + 10000 * house["NUM_BEDROOMS"]
        + 15000 * house["NUM_BATHROOMS"]
        + 15000 * house["LOT_ACRES"]
        + 10000 * garage
        + 10000 * front_porch
        + 15000 * house["GARAGE_SPACES"]
        - 5000 * (MAX_YEAR - house["YEAR_BUILT"])
    )
    return price


def generate_yes_no():
    """Generate values (y/n) for categorical features"""
    answer = choice([1, 0])
    return answer


def generate_random_house():
    """Generate a row of data (single house information)"""
    house = {
        "SHARD_PREFIX": np.random.randint(0, 4),
        "SQUARE_FEET": np.random.normal(3000, 750),
        "NUM_BEDROOMS": np.random.randint(2, 7),
        "NUM_BATHROOMS": np.random.randint(2, 7) / 2,
        "LOT_ACRES": round(np.random.normal(1.0, 0.25), 2),
        "GARAGE_SPACES": np.random.randint(0, 4),
        "YEAR_BUILT": min(MAX_YEAR, int(np.random.normal(1995, 10))),
        "FRONT_PORCH": generate_yes_no(),
        "DECK": generate_yes_no(),
    }

    price = generate_price(house)

    return [
        house["SHARD_PREFIX"],
        house["YEAR_BUILT"],
        house["SQUARE_FEET"],
        house["NUM_BEDROOMS"],
        house["NUM_BATHROOMS"],
        house["LOT_ACRES"],
        house["GARAGE_SPACES"],
        house["FRONT_PORCH"],
        house["DECK"],
        price,
    ]


def generate_houses(num_houses):
    """Generate housing dataset"""
    house_list = []

    for _ in range(num_houses):
        house_list.append(generate_random_house())
        #print(house_list)

    df = pd.DataFrame(
        house_list,
        columns=[
            "SHARD_PREFIX",
            "YEAR_BUILT",
            "SQUARE_FEET",
            "NUM_BEDROOMS",
            "NUM_BATHROOMS",
            "LOT_ACRES",
            "GARAGE_SPACES",
            "FRONT_PORCH",
            "DECK",
            "PRICE",
        ],
    )
    return df


In [10]:
df = generate_houses(10)
df

Unnamed: 0,SHARD_PREFIX,YEAR_BUILT,SQUARE_FEET,NUM_BEDROOMS,NUM_BATHROOMS,LOT_ACRES,GARAGE_SPACES,FRONT_PORCH,DECK,PRICE
0,3,2002,2587.245611,2,3.0,1.65,1,0,0,407836
1,0,2004,3932.424543,2,2.0,1.3,1,1,1,599363
2,1,2002,2110.358082,6,2.5,1.02,2,1,1,374353
3,2,2009,2688.848561,2,2.0,0.84,2,1,1,445927
4,1,1995,2356.615287,2,2.0,1.41,3,0,1,349642
5,0,1987,2806.100714,3,1.5,0.69,1,0,0,338765
6,1,1986,3359.818625,3,1.0,1.18,3,0,0,446672
7,2,2002,3482.570149,4,3.0,1.26,3,1,1,586285
8,3,2000,2275.129275,6,1.0,0.4,0,0,0,327269
9,2,1995,2868.584367,4,3.0,0.91,1,0,0,423937


In [11]:
print(df['SHARD_PREFIX'].to_list())

[3, 0, 1, 2, 1, 0, 1, 2, 3, 2]


In [None]:
for shard in range(0,4):
    df_shard = df[df['SHARD_PREFIX'] == shard]
    if len(df_shard) > 0:
        data_loc = f"./data/test_{shard}.csv"
        s3_uri = f"s3://{DEF_BUCKET}/{PREFIX}/directshard"
        df_shard.to_csv(data_loc, na_rep=0 )

        s3_upload_uri = sagemaker.s3.S3Uploader.upload(
            local_path=data_loc,
            desired_s3_uri=s3_uri,
            kms_key=None,
            sagemaker_session=sm_session
        )
        print(f"S3:::data_loc={data_loc}::Uploaded:={s3_upload_uri}")

In [17]:
%%writefile ./scripts/distprocess.py

"""Feature engineers the distributed data set """
import argparse
import logging
import os
import pathlib
import requests
import tempfile

import boto3
import numpy as np
import pandas as pd

import json

import multiprocessing

_logger = logging.getLogger()
_logger.setLevel(logging.INFO)
_logger.addHandler(logging.StreamHandler())

try:
    _logger.info(f"Pandas:version:{pd.__version__}")
    _logger.info(f"Numpy:version:{np.__version__}")
except:
    pass


# Since we get a headerless CSV file we specify the column names here.

# Since we get a headerless CSV file we specify the column names here.

columns=[
    "SHARD_PREFIX",
    "YEAR_BUILT",
    "SQUARE_FEET",
    "NUM_BEDROOMS",
    "NUM_BATHROOMS",
    "LOT_ACRES",
    "GARAGE_SPACES",
    "FRONT_PORCH",
    "DECK",
    "PRICE"
]


columns_dtype = {
    'SHARD_PREFIX': np.int, 
    "YEAR_BUILT": np.int,
    "SQUARE_FEET": np.float64,
    "NUM_BEDROOMS": np.int,
    "NUM_BATHROOMS": np.int,
    "LOT_ACRES": np.float64,
    "GARAGE_SPACES": np.int,
    "FRONT_PORCH": np.int,
    "DECK": np.int,
    "PRICE": np.float64
    #   'body': str, 
}

def merge_two_dicts(x, y):
    """Merges two dicts, returning a new copy."""
    z = x.copy()
    z.update(y)
    return z

def listLocalDirectory(dirPath="."):
    for path, dnames, fnames in os.walk(dirPath):
        _logger.info(f"List::path={path}::dirNames={dnames}::fileNames={fnames}::")

    print("list:done:")
       
if __name__ == "__main__":
    _logger.info("Starting distprocess.py.")
    parser = argparse.ArgumentParser()
    parser.add_argument("--data-size", type=int, default=100)
    parser.add_argument("--input-path", type=str, default='/opt/ml/processing/input') # "input_path", "/opt/ml/processing/input",
    args = parser.parse_args()
    data_size = args.data_size
    input_dir = args.input_path

    # Split list of files into sub-lists
    cpu_count = multiprocessing.cpu_count()

    
    BASE_DIR = "/opt/ml/processing"
    
    print(input_dir)
    _logger.info(f"Input:data:path:={input_dir}::")


    #fn = os.path.join("/opt/ml/processing/input", "combined_tweets.csv")
    
    onlyFiles = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
    _logger.info(f"Data Downloaded::Now Reading downloaded data.:dir:{input_dir}::And:FILES:ARE::{onlyFiles}")
    
    fn = os.path.join(input_dir, onlyFiles[0])
    
    # read in csv
    df = pd.read_csv(fn, names=columns)
    #_logger.info(f"df:head={df.head()}")
    _logger.info(f"DF:SHARD:INDEX={df['SHARD_PREFIX'].to_list()}::")
    
    _logger.info(f"AFTER:READ:Going to write it to {BASE_DIR}/output : file will be {BASE_DIR}/output/{onlyFiles[0]}:: ")
    _logger.info(f"READ:Data:len={len(df)}::  shape={df.shape}::")
    df.to_csv(
        f"{BASE_DIR}/output/{onlyFiles[0]}", header=True, index=False
    )
    
    _logger.info("All Done !! written out !!")
    
    # ----------------   OLD TEMPLATE CODE --------------------#

Writing ./scripts/distprocess.py


In [8]:
region = boto3.Session().region_name
role = sagemaker.get_execution_role()
print(f"Role:SageMaker:session={role}::")


Role:SageMaker:session=arn:aws:iam::622343165275:role/service-role/AmazonSageMaker-ExecutionRole-20220208T115633::


## Run 
Run the pre processingjob with shardbys3

In [10]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)


print(f"Using:role={role}:")
print(f"using SageMaker session={sm_session}:")

BASE_JOB_PREFIX="smjobs-dist",  # Choose any name

processing_instance_type = ParameterString(
        name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)


# Processing step for feature engineering
sklearn_processor = SKLearnProcessor(
        framework_version="0.23-1",
        instance_type=processing_instance_type,
        instance_count=2,
        base_job_name="dist-proc" ,
        sagemaker_session=sm_session,
        role=role,
    )

from sagemaker.processing import ProcessingInput, ProcessingOutput
inputs_p=[
    ProcessingInput(
        source=f"s3://{DEF_BUCKET}/{PREFIX}/directshard",
        destination='/opt/ml/processing/input',
        s3_data_distribution_type='ShardedByS3Key'
    ),
 ]
outputs_p=[
    ProcessingOutput(
        s3_upload_mode="EndOfJob",
        output_name='final_output',
        source='/opt/ml/processing/output',
        destination=f's3://{DEF_BUCKET}/{PREFIX}/directshard/output'
    ),
    
    
]
# -- if we d onot create a output then this directory is never creatd on tbe processing job

preproc_script =  "./scripts/distprocess.py"

print(f"Preprocessing:script:loc={preproc_script}:")
job_arguments=["--input-path", "/opt/ml/processing/input", 
              "--data-size", "100"]
sklearn_processor.run(
    code=preproc_script,
    inputs=inputs_p,
    outputs=outputs_p,
    arguments=job_arguments, 
    wait=True,
    logs="All",
)


Using:role=arn:aws:iam::622343165275:role/service-role/AmazonSageMaker-ExecutionRole-20220208T115633:
using SageMaker session=<sagemaker.session.Session object at 0x7fb854d983d0>:
Preprocessing:script:loc=./scripts/distprocess.py:

Job Name:  dist-proc-2022-06-09-22-52-19-655
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-crossaccnt-train/data/finance/distrib-multi/directshard', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-crossaccnt-train/dist-proc-2022-06-09-22-52-19-655/input/code/distprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'final_output', 'AppManaged': False, 'S3Output': {'