In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
import boto3
import os 

# uncomment this for debugging and experimenting

args = {}
args["fraud_samples"] = 12
args["legit_samples"] = 130
args["bucket"] = "fraud-sample-data"
args["train-source-key"] = "input/fraudTrain.csv"
args["test-source-key"] = "input/fraudTest.csv"
args["train-dest-key"] = "input/fraudTrain_glue_transformed.csv"
args["test-dest-key"]=  "input/fraudTest_glue_transformed.csv"
args["train_max_cut_off"]= "2020-04-30 00:00:00"
args["test_min_cut_off"]= "2020-08-30 00:00:00"
args["catalog_db"] =  "default"
args["catalog_table"] = "fraud-raw-input"


sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# uncomment this for running job in glue

#job = Job(glueContext)
#args = getResolvedOptions(sys.argv, ["JOB_NAME", "train-source", "test-source", "train-dest","test-dest", "train_max_cut_off", "test_min_cut_off"])
#job.init(args["JOB_NAME"], args)



Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1656267673903_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:

def sparkUnion(glueContext, unionType, mapping, transformation_ctx) -> DynamicFrame:
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(
        "(select * from source1) UNION " + unionType + " (select * from source2)"
    )
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)


def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(query)
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)


def write_output_to_s3(dyf, s3_path, prefix, renamed_key, transformation_ctx):
    
    client = boto3.client('s3')
    resource = boto3.resource('s3')

    print(f"saving dyanmic frame to S3 bucket with prefix path: {prefix}")
    # Script generated for node S3 bucket
    S3bucket_dyf = glueContext.write_dynamic_frame.from_options(
        frame=dyf ,
        connection_type="s3",
        format="csv",
        connection_options={"path": s3_path},
        transformation_ctx=transformation_ctx,
    )

    #https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.copy
    response = client.list_objects(
        Bucket=args['bucket'],
        Prefix=f"{prefix}/run-",
    )

    objectkey_to_rename = response['Contents'][0]['Key']

    copy_output = {
        'Bucket': args['bucket'],
        'Key': objectkey_to_rename
    }

    print(f"renaming filename to {renamed_key} as glue output filename is random")

    resource.meta.client.copy(copy_output, args['bucket'], renamed_key)
    print(f"deleting original output {objectkey_to_rename}....")
    response = client.delete_object(
        Bucket=args['bucket'],
        Key=objectkey_to_rename 
    )
    
# set these but they should not overlap
TRAIN_MAX_TIMESTAMP = args["train_max_cut_off"]
TEST_MIN_TIMESTAMP = args["test_min_cut_off"]

fraud_samples = args["fraud_samples"]
legit_samples = args["legit_samples"]


# Script generated for node SQL
SqlQuery0 = f'''
select * from 
(
    (
    select * from myDataSource
    where EVENT_LABEL == 'fraud'
    order BY RAND() 
    limit {fraud_samples}
    ) 
    union all
    (
    select * from myDataSource
    where EVENT_LABEL == 'legit'
    order BY RAND() 
    limit {legit_samples}
    )
)

'''


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

###### Using S3 as source

In [None]:

bucket = args['bucket']
train_input_key = args["train-source-key"]
test_input_key = args["test-source-key"]


train_dyF = glueContext.create_dynamic_frame.from_options(
        's3',
        {'paths': [f"s3://{bucket}/{train_input_key}"]},
        'csv',
        {'withHeader': True})
test_dyF = glueContext.create_dynamic_frame.from_options(
        's3',
        {'paths': [f"s3://{bucket}/{test_input_key}"]},
        'csv',
        {'withHeader': True})


Union_node_dyf = sparkUnion(
    glueContext,
    unionType="ALL",
    mapping={
        "source1": train_dyF,
        "source2": test_dyF,
    },
    transformation_ctx="Union_train_test",
)


Union_node_dyf.count()



mappings=[
        ("trans_date_trans_time", "string", "trans_date_trans_time", "timestamp"),
        ("cc_num", "string", "cc_num", "bigint"),
        ("merchant", "string", "merchant", "string"),
        ("category", "string", "category", "string"),
        ("amt", "string", "amt", "float"),
        ("first", "string", "first", "string"),
        ("late", "string", "late", "string"),
        ("gender", "string", "gender", "string"),
        ("street", "string", "street", "string"),
        ("city", "string", "city", "string"),
        ("state", "string", "state", "string"),
        ("zip", "string", "zip", "int"),
        ("lat", "string", "lat", "float"),
        ("long", "string", "long", "float"),
        ("city_pop", "string", "city_pop", "int"),
        ("job", "string", "job", "string"),
        ("dob", "string", "dob", "date"),
        ("trans_num", "string", "trans_num", "string"),
        ("unix_time", "string", "unix_time", "int"),
        ("merch_lat", "string", "merch_lat", "float"),
        ("merch_long", "string", "merch_long", "float"),
        ("is_fraud", "string", "is_fraud", "binary"),
]


###### Using glue data catalog as source

In [3]:

# comment out this if uncommenting out the code above which reads from S3 as source
Union_node_dyf = glueContext.create_dynamic_frame_from_catalog(
    database = args["catalog_db"],
    table_name = args["catalog_table"],
    transformation_ctx = "Read fraud train and test combined data from catalog table ")

# This mapping is customised for catalog table inferred schema.
# comment this out if uncommenting out the code above which reads from S3 as source
mappings = [
    ("trans_date_trans_time", "string", "trans_date_trans_time", "timestamp"),
    ("cc_num", "long", "cc_num", "long"),
    ("merchant", "string", "merchant", "string"),
    ("category", "string", "category", "string"),
    ("amt", "double", "amt", "double"),
    ("first", "string", "first", "string"),
    ("last", "string", "last", "string"),
    ("gender", "string", "gender", "string"),
    ("street", "string", "street", "string"),
    ("city", "string", "city", "string"),
    ("state", "string", "state", "string"),
    ("zip", "long", "zip", "long"),
    ("lat", "double", "lat", "double"),
    ("long", "double", "long", "double"),
    ("city_pop", "long", "city_pop", "int"),
    ("job", "string", "job", "string"),
    ("dob", "string", "dob", "date"),
    ("trans_num", "string", "trans_num", "string"),
    ("unix_time", "long", "unix_time", "int"),
    ("merch_lat", "double", "merch_lat", "double"),
    ("merch_long", "double", "merch_long", "double"),
    ("is_fraud", "long", "is_fraud", "short"), #seems to drop all rows if casting to binary so use short
]

Union_node_dyf.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1852394

##### ApplyMapping 

This uses the glue applymapping transform on the DynamicDataframe to rename some columns and cast them to different types as per the mapping list of tuples defined above

In [4]:

# Script generated for node ApplyMapping
ApplyMapping_dyf = ApplyMapping.apply(
    frame=Union_node_dyf,
    mappings=mappings,
    transformation_ctx="ApplyMapping",
)
ApplyMapping_dyf.printSchema()

ApplyMapping_dyf.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- trans_date_trans_time: timestamp
|-- cc_num: long
|-- merchant: string
|-- category: string
|-- amt: double
|-- first: string
|-- last: string
|-- gender: string
|-- street: string
|-- city: string
|-- state: string
|-- zip: long
|-- lat: double
|-- long: double
|-- city_pop: int
|-- job: string
|-- dob: date
|-- trans_num: string
|-- unix_time: int
|-- merch_lat: double
|-- merch_long: double
|-- is_fraud: short

1852394

##### DropFields Transform 

Dropping fields which only add to noise in model - after checking mdoel variable importance plot after first run on all data

In [5]:

# Script generated for node Drop Fields
DropFields_dyf = DropFields.apply(
    frame=ApplyMapping_dyf,
    paths=['col0', "merch_lat",  "merch_long","lat","long","unix_time", "dob"],
    transformation_ctx="DropFields",
)

DropFields_dyf.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- trans_date_trans_time: timestamp
|-- cc_num: long
|-- merchant: string
|-- category: string
|-- amt: double
|-- first: string
|-- last: string
|-- gender: string
|-- street: string
|-- city: string
|-- state: string
|-- zip: long
|-- city_pop: int
|-- job: string
|-- trans_num: string
|-- is_fraud: short

In [6]:
DropFields_dyf.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1852394

##### RenameField Transform

Rename fileds corresponding to timestamp and label to their required names as understood by AWS Fraud Detector online insights model type

In [7]:
# Script generated for node Rename Field
RenameField_timestamp= RenameField.apply(
    frame=DropFields_dyf,
    old_name="trans_date_trans_time",
    new_name="EVENT_TIMESTAMP",
    transformation_ctx="RenameField_timestamp",
)

# Script generated for node Rename Field
RenameField_label= RenameField.apply(
    frame=RenameField_timestamp,
    old_name="is_fraud",
    new_name="EVENT_LABEL",
    transformation_ctx="RenameField_label",
)

RenameField_label.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- cc_num: long
|-- merchant: string
|-- category: string
|-- amt: double
|-- first: string
|-- last: string
|-- gender: string
|-- street: string
|-- city: string
|-- state: string
|-- zip: long
|-- city_pop: int
|-- job: string
|-- trans_num: string
|-- EVENT_TIMESTAMP: timestamp
|-- EVENT_LABEL: short

###### Filter timeframes

Convert to pyspark dataframe and use pyspark methods to create the train and test datasest by using defined timestamp
filter ranges. We also rename the 0,1 values in the EVENT_LABEL column to 'fraud' and 'legit'

In [8]:
from pyspark.sql.functions import *

df = RenameField_label.toDF()

train_df = df.filter(df.EVENT_TIMESTAMP <  "2020-04-30 00:00:00").withColumn("EVENT_LABEL", when(df.EVENT_LABEL == '0',"legit").when(df.EVENT_LABEL == '1',"fraud "))
test_df = df.filter(df.EVENT_TIMESTAMP >  "2020-08-30 00:00:00").withColumn("EVENT_LABEL", when(df.EVENT_LABEL == '0',"legit").when(df.EVENT_LABEL == '1',"fraud "))

train_df.select(col('EVENT_TIMESTAMP'), col('EVENT_LABEL')).orderBy(desc('EVENT_TIMESTAMP')).show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-----------+
|EVENT_TIMESTAMP    |EVENT_LABEL|
+-------------------+-----------+
|2020-04-29 23:59:18|legit      |
|2020-04-29 23:58:29|fraud      |
|2020-04-29 23:58:25|legit      |
|2020-04-29 23:58:21|legit      |
|2020-04-29 23:57:55|legit      |
|2020-04-29 23:57:55|legit      |
|2020-04-29 23:57:48|legit      |
|2020-04-29 23:57:47|legit      |
|2020-04-29 23:56:49|fraud      |
|2020-04-29 23:56:45|legit      |
|2020-04-29 23:55:03|legit      |
|2020-04-29 23:52:52|legit      |
|2020-04-29 23:52:46|legit      |
|2020-04-29 23:51:29|legit      |
|2020-04-29 23:50:06|legit      |
|2020-04-29 23:49:59|legit      |
|2020-04-29 23:49:39|legit      |
|2020-04-29 23:46:49|legit      |
|2020-04-29 23:46:18|fraud      |
|2020-04-29 23:44:47|legit      |
+-------------------+-----------+
only showing top 20 rows

##### Create batch sample from test dataframe

We convert the pyspark dataframes created earlier into DynamicDataFrames and use the sparkSqlQuery class to the run sql query (defined in the first block of this notebook), to take a random sample of the the data such that 
we have x number of fraud samples and y number of legit samples. We do not need the entire dataset for batch predictions to keep billing cost down.

In [9]:

test_dyf = DynamicFrame.fromDF(test_df, glueContext, "test_dyf")
train_dyf = DynamicFrame.fromDF(train_df, glueContext, "train_dyf")

sampled_test_dyf = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"myDataSource":test_dyf}, transformation_ctx = "SQLQuery_test_sample")
sampled_test_dyf.count()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

130

In [10]:
sampled_test_dyf.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+-------------+------+---------+---------+------+--------------------+-----------------+-----+-----+--------+--------------------+--------------------+-------------------+-----------+
|             cc_num|            merchant|     category|   amt|    first|     last|gender|              street|             city|state|  zip|city_pop|                 job|           trans_num|    EVENT_TIMESTAMP|EVENT_LABEL|
+-------------------+--------------------+-------------+------+---------+---------+------+--------------------+-----------------+-----+-----+--------+--------------------+--------------------+-------------------+-----------+
|     36890292963032|     fraud_Rau-Grant|    kids_pets|  7.04| Kimberly|     Webb|     F|  380 Martin Mission|           Girard|   GA|30426|    1100|Conservator, muse...|985bea2e56a683feb...|2020-09-28 19:51:36|      legit|
|   2229378226512508| fraud_Schmeler-Howe|personal_care| 81.68|   Thomas|     Hale|     M|   949 Smi

##### Repartiton

Reparition the dynamicdataframe to single partition dataframe to avoid saving multiple paritions in s3 bucket. We would need to convert to pyspark dataframe to use the 'repartition' method and then convert back to Dynamic DF.

In [11]:


single_part_train_dyf = DynamicFrame.fromDF(train_dyf.toDF().repartition(1), glueContext, "single_partition_train")
single_part_test_dyf = DynamicFrame.fromDF(sampled_test_dyf.toDF().repartition(1), glueContext, "single_partition_test_sample")



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Saving Train and sampled batch data to S3 

The next code blocks save the train and sampled test dataset to path in S3.  By default `glueContext.write_dynamic_frame.from_options` saves the file with a random name.
The code in `write_output_to_s3` renames the file to something sensible and then deletes the original file.

In [12]:

train_dest_split = args["train-dest-key"].split('/')
train_filename = train_dest_split.pop(-1)
renamed_key = args['train-dest-key']
transformation_ctx = "S3bucket_write_train_dyf"
prefix = '/'.join(train_dest_split)
s3_path = os.path.join("s3://", args["bucket"], prefix)

print("")
print("Saving training data ......")
write_output_to_s3(train_dyf, s3_path, prefix, renamed_key, transformation_ctx)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Saving training data ......
saving dyanmic frame to S3 bucket with prefix path: input
renaming filename to input/fraudTrain_glue_transformed.csv as glue output filename is random
deleting original output input/run-1656278830255-part-r-00000....

In [13]:
test_dest_split = args["test-dest-key"].split('/')
test_filename = test_dest_split.pop(-1)
renamed_key = args['test-dest-key']
transformation_ctx = "S3bucket_write_test_dyf"
prefix = '/'.join(test_dest_split)

print("Saving test data ......")
write_output_to_s3(sampled_test_dyf, s3_path, prefix, renamed_key, transformation_ctx)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Saving test data ......
saving dyanmic frame to S3 bucket with prefix path: input
renaming filename to input/fraudTest_glue_transformed.csv as glue output filename is random
deleting original output input/run-1656278830255-part-r-00001....

In [14]:
#uncomment this when running job in glue
#job.commit()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…