# This notebook triggers a Data Wrangler preprocessing job with output pushed into a CAS in-memory table
#

## Step 1 - Upload datawrangler.fow and invoke a Data preprocessing job and save output to s3.

In [1]:
import sagemaker 
import boto3
import os
import json
import time
from botocore.exceptions import ClientError
import swat

In [2]:
# All inputs anc customizations are keyed in this cell. Rest of cells are not changed. 

import time
import uuid

# we need to create a role to run sagemaker. This role should have all permissions.
# This role should have AWS managed policy "amazonSageMakerFullAccess" attached to it. 
#sagemaker_iam_role='arn:aws:iam::123456789012:role/service-role/AmazonSageMaker-ExecutionRole-20210714T154124'
sagemaker_iam_role='arn:aws:iam::123456789012:role/fsbu-user1-sagemaker-fullaccess'
                    
# Bucket,- Root folder of all processing.
bucket = "fsbu-user1-s3bucket-1"

#####
# .flow file is the result of creating a data wrangler pre-processing pipeline using SagaMaker Studio. The file itself carries all the transformations applied to input file
# output name reflects the "processing node.default" whose output we want to save and feedback to SAS model training. We can have multiple "processing nodes" in data wrangler flow
# and we typically take the last node o/p. You can get the specific node name from ".flow" file. We can take intermediate nodes output as well if desired. 
dwflow_file_name = "dwrangler-preprocess-fraud-txn.flow"
output_name = "3a43fab9-5a35-451b-8734-dd519c72896f.default"
dwflow_node = output_name.split(".")[0]

### Input file to be processed and location of file.
input_filename = "fraud_and_transactions.csv"
input_filename_path = f"s3://{bucket}/{input_filename}"

# unique flow export ID to get to a unique processing job name. Each time you submit it will be a uniue job name.
flow_export_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_export_name = f"flow-{flow_export_id}"
processing_job_name = f"data-wrangler-flow-processing-{flow_export_id}"

#### output details
s3_output_path = f"s3://{bucket}/{dwflow_file_name}"
print(f"Final data wrangler processed file path: {s3_output_path}/{processing_job_name}/{dwflow_node}/default")



Final data wrangler processed file path: s3://fsbu-user1-s3bucket-1/dwrangler-preprocess-fraud-txn.flow/data-wrangler-flow-processing-09-03-06-17-c5dc088d/3a43fab9-5a35-451b-8734-dd519c72896f/default


In [3]:
# Set proper credentials and get a boto3 session with those creds

# If you want to use specific keys you can set them as well. I am just uing default $HOME/.aws/config,credentials file . We can set profiles using OS Environment variables
#os.environ['AWS_PROFILE'] = "123456789012-sandbox"
#print(os.environ)

#s3_client = boto3.client("s3",aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key,aws_session_token=aws_session_token,region_name=aws_region)
boto3_session = boto3.Session()

In [4]:
# PART 0 - UPLOAD DATA WRANGLER .flow file to s3 for later reference by job. 

s3_client = boto3_session.client('s3')
try:
    s3_client.upload_file(dwflow_file_name, bucket, f"data_wrangler_flows/{dwflow_file_name}")
except ClientError as e:
    print("Error with AWS call while uploading datawrangler.wflow file to S3: %s" % e)
  
flow_s3_uri = f"s3://{bucket}/data_wrangler_flows/{dwflow_file_name}"
print(f"Data Wrangler flow {dwflow_file_name} uploaded to {flow_s3_uri}")


Data Wrangler flow dwrangler-preprocess-fraud-txn.flow uploaded to s3://fsbu-user1-s3bucket-1/data_wrangler_flows/dwrangler-preprocess-fraud-txn.flow


In [5]:
# PART1 SageMaker -  Inputs required by sagemaker python sdk to launch a job
#

import sagemaker

from sagemaker.processing import ProcessingInput, ProcessingOutput
#from sagemaker.dataset_definition.inputs import AthenaDatasetDefinition, DatasetDefinition, RedshiftDatasetDefinition

## Input - 
## Data wrangler Flow: fraud-txn-preprocess.flow
flow_input = ProcessingInput(
    source=flow_s3_uri,
    destination="/opt/ml/processing/flow",
    input_name="flow",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
)

# Data input to be processed 
data_sources = []
data_sources.append(ProcessingInput(
    source=f"{input_filename_path}", 
    destination=f"/opt/ml/processing/{input_filename}",
    input_name=f"{input_filename}",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))


processing_job_output = ProcessingOutput(
    output_name=output_name,
    source="/opt/ml/processing/output",
    destination=s3_output_path,
    s3_upload_mode="EndOfJob"
)


In [6]:
# PART 2 -  configure SAGEMAKER job settings

# Sagemaker session
sess = sagemaker.Session(boto3_session)

# IAM role for executing the processing job.We need following only wen running from SagaeMaker Studio.
#iam_role = sagemaker.get_execution_role()


# Data Wrangler Container URL.
container_uri = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.x"
# Pinned Data Wrangler Container URL. 
container_uri_pinned = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.6.2"

# Processing Job Instance count and instance type.
instance_count = 2
instance_type = "ml.m5.4xlarge"

# Size in GB of the EBS volume to use for storing data during processing
volume_size_in_gb = 100

# Content type for each output. Data Wrangler supports CSV as default and Parquet.
output_content_type = "CSV"

# Network Isolation mode; default is off
enable_network_isolation = False

# Output configuration used as processing job container arguments 
output_config = {
    output_name: {
        "content_type": output_content_type
    }
}

In [7]:
# PART 3 - CREATE JOB AND RUN JOB

from sagemaker.processing import Processor
from sagemaker.network import NetworkConfig

processor = Processor(
    role=sagemaker_iam_role,
    image_uri=container_uri,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size_in_gb=volume_size_in_gb,
    network_config=NetworkConfig(enable_network_isolation=enable_network_isolation),
    sagemaker_session=sess
)

# Start Job
processor.run(
    inputs=[flow_input] + data_sources, 
    outputs=[processing_job_output],
    arguments=[f"--output-config '{json.dumps(output_config)}'"],
    wait=False,
    logs=False,
    job_name=processing_job_name
)


Job Name:  data-wrangler-flow-processing-09-03-06-17-c5dc088d
Inputs:  [{'InputName': 'flow', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://fsbu-user1-s3bucket-1/data_wrangler_flows/dwrangler-preprocess-fraud-txn.flow', 'LocalPath': '/opt/ml/processing/flow', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'fraud_and_transactions.csv', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://fsbu-user1-s3bucket-1/fraud_and_transactions.csv', 'LocalPath': '/opt/ml/processing/fraud_and_transactions.csv', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': '3a43fab9-5a35-451b-8734-dd519c72896f.default', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://fsbu-user1-s3bucket-1/dwrangler-preprocess-fraud-txn.flow', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]


In [8]:
# PART 4 - Wait for job to complete 
print(f"Final data wrangler processed file path: {s3_output_path}/{processing_job_name}/{dwflow_node}/default")
job_result = sess.wait_for_processing_job(processing_job_name)
job_result

Final data wrangler processed file path: s3://fsbu-user1-s3bucket-1/dwrangler-preprocess-fraud-txn.flow/data-wrangler-flow-processing-09-03-06-17-c5dc088d/3a43fab9-5a35-451b-8734-dd519c72896f/default
.....................................................................!

{'ProcessingInputs': [{'InputName': 'flow',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://fsbu-user1-s3bucket-1/data_wrangler_flows/dwrangler-preprocess-fraud-txn.flow',
    'LocalPath': '/opt/ml/processing/flow',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}},
  {'InputName': 'fraud_and_transactions.csv',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://fsbu-user1-s3bucket-1/fraud_and_transactions.csv',
    'LocalPath': '/opt/ml/processing/fraud_and_transactions.csv',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}}],
 'ProcessingOutputConfig': {'Outputs': [{'OutputName': '3a43fab9-5a35-451b-8734-dd519c72896f.default',
    'S3Output': {'S3Uri': 's3://fsbu-user1-s3bucket-1/dwrangler-preprocess-fraud-txn.flow',
     'LocalPath': '/opt/ml/processing/output',
     'S3UploadMode': 'EndOfJob'},
  

## Step 2 - Pull processed data into Viya in-memory for next step of model training

In [9]:
##
# SAS SWAT Takes over from here
import os
import swat
import pandas
import json
from getpass import getpass

os.environ['CAS_CLIENT_SSL_CA_LIST'] = '/sgrid/openssl_certs/cas_controller_certs/fsds-viya34lab-controller.pem'
#os.environ['CAS_CLIENT_SSL_CA_LIST'] = '/sgrid/openssl_certs/cas_controller_certs/fsbuviya4.fsbu-openstack-k8s.unx.sas.com.pem'
os.environ['TKESSL_OPENSSL_LIB'] = '/usr/lib64/libssl.so.10'
# os.environ['SSLREQCERT']='ALLOW'    ==> if you want to ignore cert business.

cashost = "fsds-viya34lab-controller.fsl.sashq-d.openstack.sas.com"
casport = 5570
protocol = "cas"
username = "user1"
password = getpass()
#password = ""

 ···········


In [10]:
conn = swat.CAS(cashost, casport, protocol=protocol,username=username,password=password)
out = conn.serverstatus()
print("status : ", out)

# conn.help()
#conn.help(actionset='table')
#conn.help(actionset='table',action='fileinfo');


NOTE: Grid node action status report: 5 nodes, 8 total actions executed.
status :  [About]

 {'CAS': 'Cloud Analytic Services',
  'Version': '3.05',
  'VersionLong': 'V.03.05M0P11112019',
  'Copyright': 'Copyright © 2014-2018 SAS Institute Inc. All Rights Reserved.',
  'ServerTime': '2021-08-09T03:13:42Z',
  'System': {'Hostname': 'fsds-viya34lab-controller.fsl.sashq-d.openstack.sas.com',
   'OS Name': 'Linux',
   'OS Family': 'LIN X64',
   'OS Release': '3.10.0-1062.1.1.el7.x86_64',
   'OS Version': '#1 SMP Fri Sep 13 22:55:44 UTC 2019',
   'Model Number': 'x86_64',
   'Linux Distribution': 'CentOS Linux release 7.7.1908 (Core)'},
  'license': {'site': 'Viya 3.5 FSBU GA Shipped',
   'siteNum': 70180938,
   'expires': '11Feb2022:00:00:00',
   'gracePeriod': 45,

[server]

 Server Status
 
    nodes  actions
 0      5        8

[nodestatus]

 Node Status
 
                                                 name        role  uptime  \
 0  fsds-viya34lab-worker-1.fsl.sashq-d.openstack....  

In [14]:
# Get S3 data loaded into CAS in-memory for processing

bucket = "fsbu-user1-s3bucket-1"
datasource = "{\"srctype\":\"s3\",\"bucket\":\"" + bucket + "\", \"objectPath\":\"/" + dwflow_file_name + "/" + processing_job_name + "/" + dwflow_node + "/default/\", \"awsConfigPath\":'/sgrid/home/user1/.aws/config', \"awsCredentialsPath\":'/sgrid/home/user1/.aws/credentials'} " 

conn.table.addCaslib(name='S3CASLIB3', description='', subDirectories='false', session='true', activeOnAdd='true', dataSource=eval(datasource), createDirectory='false')
#conn.table.addCaslib(name='S3CASLIB3', description='', subDirectories='false', session='true', activeOnAdd='true', dataSource={"srctype":"s3","bucket":"fsbu-user1-s3bucket-1", "objectPath":"/fraud-txn-preprocess.flow/data-wrangler-flow-processing-23-14-47-25-0005098d/3a43fab9-5a35-451b-8734-dd519c72896f/default/", "awsConfigPath":'/sgrid/home/user1/.aws/config', "awsCredentialsPath":'/sgrid/home/user1/.aws/credentials'}, createDirectory='false')
filename = conn.table.fileinfo(caslib='s3caslib3')['FileInfo']['Name'][0]
conn.table.loadtable(caslib='s3caslib3',path=filename,casout={"replication":0,"replace":"True"})

conn.table.tableinfo(caslib='s3caslib3')


NOTE: 'S3CASLIB3' is now the active caslib.
NOTE: Cloud Analytic Services added the caslib 'S3CASLIB3'.
NOTE: Cloud Analytic Services made the file part-00000-6dfc3862-f368-4d76-a6b8-4f5db094b8a9-c000.csv in AWS S3 bucket fsbu-user1-s3bucket-1 available as table PART-00000-6DFC3862-F368-4D76-A6B8-4F5DB094B8A9-C000 in caslib S3CASLIB3.


Unnamed: 0,Name,Rows,Columns,IndexedColumns,Encoding,CreateTimeFormatted,ModTimeFormatted,AccessTimeFormatted,JavaCharSet,CreateTime,...,Repeated,View,MultiPart,SourceName,SourceCaslib,Compressed,Creator,Modifier,SourceModTimeFormatted,SourceModTime
0,PART-00000-6DFC3862-F368-4D76-A6B8-4F5DB094B8A...,171408,4,0,utf-8,2021-08-08T23:14:41-04:00,2021-08-08T23:14:41-04:00,2021-08-08T23:14:41-04:00,UTF8,1944098000.0,...,0,0,0,part-00000-6dfc3862-f368-4d76-a6b8-4f5db094b8a...,S3CASLIB3,0,user1,,,


In [15]:
print(conn.table.fileinfo(caslib='s3caslib3'))
print(conn.table.tableinfo(caslib='s3caslib3'))

[FileInfo]

    Permission             Owner Group  \
 0  ----------  awssandboxroot01         
 
                                                 Name     Size Encryption  \
 0  part-00000-6dfc3862-f368-4d76-a6b8-4f5db094b8a...  6127514              
 
                         Time       ModTime  
 0  2021-08-08T23:12:06-04:00  1.944098e+09  

+ Elapsed: 0.647s, user: 0.0268s, sys: 0.00931s, mem: 1.82mb
[TableInfo]

                                                 Name    Rows  Columns  \
 0  PART-00000-6DFC3862-F368-4D76-A6B8-4F5DB094B8A...  171408        4   
 
    IndexedColumns Encoding        CreateTimeFormatted  \
 0               0    utf-8  2021-08-08T23:14:41-04:00   
 
             ModTimeFormatted        AccessTimeFormatted JavaCharSet  \
 0  2021-08-08T23:14:41-04:00  2021-08-08T23:14:41-04:00        UTF8   
 
      CreateTime  ...  Repeated  View  MultiPart  \
 0  1.944098e+09  ...         0     0          0   
 
                                           SourceName  Sour

In [16]:
# conn.help()
#conn.help(actionset='table')
#conn.help(actionset='table',action='fileinfo');
conn.table.dropcaslib(caslib='s3caslib3')
conn.close()

NOTE: 'CASUSERHDFS(user1)' is now the active caslib.
NOTE: Cloud Analytic Services removed the caslib 's3caslib3'.


In [13]:
conn.table.dropcaslib(caslib='s3caslib3')

NOTE: 'CASUSERHDFS(user1)' is now the active caslib.
NOTE: Cloud Analytic Services removed the caslib 's3caslib3'.
