In [1]:
#sandbox notebook for my study on AWS' training module,
#Build a Text Classification Model with AWS Glue and Amazon SageMaker

In [2]:
#data source is Amazon's product reviews,
#which is heavily biased towards books

In [3]:
#do this using AWS Glue Development Endpoints
#and launch Notebook instance

In [4]:
#balancing strategies:
# copy records
# remove records
# more sophisticated approaches, but these don't work so well with texts

In [5]:
#here, equalize with the category with lowest count
#and remove randomly
#so, find the category with the lowest count,
#and calculate a samplingfactor Ni for each category

In [None]:
#read data from the source with a Glue DynamicFrame
#point to the table that was created by Glue
datasource = glueContext.create_dynamic_frame.from_catalog(
    database = database,
    table_name = table)

In [None]:
#convert the DynamicFrame to a Spark DataFrame
df = satasource.toDF()

In [None]:
#count number of reviews per category
per_category_count = df.groupBy('product_category').count().collect()

In [None]:
#find the category with the least reviews
counts = [x['count'] for x in per_category_count]
min_count = float(min(conts))

In [None]:
#calculate factor to apply to each category and put in a tuple
factors = map(lambda x: (x['product_category'],min_count/float(x['count'])),
             per_category_count)

In [None]:
#take a sample of N reviews for each category
samples = []
for category, n in factors:
    sample = glueContext.create_dynamic_frame.from_catalog(
        database = database,
    table_name = table,
    push_down_predicate = "product_category == '{}'".format(category))
    #this line:
    #push_down_predicate = "product_category == '{}'".format(category)
    #allows push the query via leveraging the partitioning
    #this speeds up the process
    #take a sample of each category
    sample = sample.toDF().sample(
        withReplacement=False,
        fraction=n,
        seed=42)
    sample.append(sample)

In [None]:
#write samples into S3
#build a Spark DataFrame, a union of all samples
balanced_df = samples[0]   #initialize sample collection
for sample in samples[1:]:
    balanced_df = balanced_df.union(sample)
    
#convert back into a Glue DynamicFrame
balanced = DynamicFrame.fromDF(balanced_df, glueContext, "balanced")

#write as Parquet
sampled_data_sink = glueContext.write_dynamic_frame.from_options(
    frame = balanced,
    connection_type = 's3',
    connection_options = {"path":target, "partitionKeys":["product_category"]},
    format = "parquet")

# SageMaker BlazingText
## Two modes:
### unsupervised, highly optimized version of Word2vec, and used to convert words to vectors (word embeddings)
### supervised, extends fastText text classifier, used for multi class/label text classification

In [None]:
# BlazingText input requirements:
# single preprocessed text file
# space-separated tokens ( a word or punctuation symbols)
# single sentence per line
# labels alongside the sentence
## a label is a word prefixed by __label__

#target input:
#__label__10 this product is awesome....
#__label_1 wishing they all make products like this!

In [None]:
#start with DynamicFrame
datasource = glueContext.create_dynamic_frame.from_catalog(
    database = database, table_name = table)
#select the fields that we want
select = SelectFields.apply(
    frame = datasource,
    paths = ["product_category","review_body"])
