In [27]:
import sagemaker
import boto3

sess = sagemaker.Session()
bucket = "team-4-project-data"
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name="sagemaker", region_name=region)
s3 = boto3.Session().client(service_name="s3", region_name=region)

In [28]:
import tensorflow as tf
import collections
import json
import os
import pandas as pd
import csv
from transformers import DistilBertTokenizer

tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")

REVIEW_BODY_COLUMN = "Comment"
REVIEW_ID_COLUMN = "Record ID"
# DATE_COLUMN = 'date'

LABEL_COLUMN = "Overall Rating"
LABEL_VALUES = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

label_map = {}
for (i, label) in enumerate(LABEL_VALUES):
    label_map[label] = i


class InputFeatures(object):
    """BERT feature vectors."""

    def __init__(self, input_ids, input_mask, segment_ids, label_id, review_id, date, label):
        self.input_ids = input_ids
        self.input_mask = input_mask
        self.segment_ids = segment_ids
        self.label_id = label_id
        self.review_id = review_id
        self.date = date
        self.label = label
        

class Input(object):
    """A single training/test input for sequence classification."""

    def __init__(self, text, review_id, date, label=None):
        """Constructs an Input.
        Args:
          text: string. The untokenized text of the first sequence. For single
            sequence tasks, only this sequence must be specified.
          label: (Optional) string. The label of the example. This should be
            specified for train and dev examples, but not for test examples.
        """
        self.text = text
        self.review_id = review_id
        self.date = date
        self.label = label


def convert_input(the_input, max_seq_length):
    # First, we need to preprocess our data so that it matches the data BERT was trained on:
    # 1. Lowercase our text (if we're using a BERT lowercase model)
    # 2. Tokenize it (i.e. "sally says hi" -> ["sally", "says", "hi"])
    # 3. Break words into WordPieces (i.e. "calling" -> ["call", "##ing"])
    #
    # Fortunately, the Transformers tokenizer does this for us!

    tokens = tokenizer.tokenize(the_input.text)
    tokens.insert(0, '[CLS]')
    tokens.append('[SEP]')
    print("**{} tokens**\n{}\n".format(len(tokens), tokens))

    encode_plus_tokens = tokenizer.encode_plus(
        the_input.text,
        padding='max_length',
        max_length=max_seq_length,
        truncation=True
    )

    # The id from the pre-trained BERT vocabulary that represents the token.  (Padding of 0 will be used if the # of tokens is less than `max_seq_length`)
    input_ids = encode_plus_tokens["input_ids"]

    # Specifies which tokens BERT should pay attention to (0 or 1).  Padded `input_ids` will have 0 in each of these vector elements.
    input_mask = encode_plus_tokens["attention_mask"]

    # Segment ids are always 0 for single-sequence tasks such as text classification.  1 is used for two-sequence tasks such as question/answer and next sentence prediction.
    segment_ids = [0] * max_seq_length

    # Label for each training row (`star_rating` 1 through 5)
    label_id = label_map[the_input.label]

    features = InputFeatures(
        input_ids=input_ids,
        input_mask=input_mask,
        segment_ids=segment_ids,
        label_id=label_id,
        review_id=the_input.review_id,
        date=the_input.date,
        label=the_input.label,
    )

    print("**input_ids**\n{}\n".format(features.input_ids))
    print("**input_mask**\n{}\n".format(features.input_mask))
    print("**segment_ids**\n{}\n".format(features.segment_ids))
    print("**label_id**\n{}\n".format(features.label_id))
    print("**review_id**\n{}\n".format(features.review_id))
    print("**date**\n{}\n".format(features.date))
    print("**label**\n{}\n".format(features.label))

    return features


# We'll need to transform our data into a format that BERT understands.
# - `text` is the text we want to classify, which in this case, is the `Request` field in our Dataframe.
# - `label` is the Overall Rating label (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) for our training input data
def transform_inputs_to_tfrecord(inputs, output_file, max_seq_length):
    records = []
    print(output_file)
    tf_record_writer = tf.io.TFRecordWriter(output_file)

    for (input_idx, the_input) in enumerate(inputs):
        if input_idx % 10000 == 0:
            print("Writing input {} of {}\n".format(input_idx, len(inputs)))

        features = convert_input(the_input, max_seq_length)

        all_features = collections.OrderedDict()

        # Create TFRecord With input_ids, input_mask, segment_ids, and label_ids
        all_features["input_ids"] = tf.train.Feature(int64_list=tf.train.Int64List(value=features.input_ids))
        all_features["input_mask"] = tf.train.Feature(int64_list=tf.train.Int64List(value=features.input_mask))
        all_features["segment_ids"] = tf.train.Feature(int64_list=tf.train.Int64List(value=features.segment_ids))
        all_features["label_ids"] = tf.train.Feature(int64_list=tf.train.Int64List(value=[features.label_id]))

        tf_record = tf.train.Example(features=tf.train.Features(feature=all_features))
        tf_record_writer.write(tf_record.SerializeToString())

        # Create Record For Feature Store With All Features
        records.append(
            {  #'tf_record': tf_record.SerializeToString(),
                "input_ids": features.input_ids,
                "input_mask": features.input_mask,
                "segment_ids": features.segment_ids,
                "label_id": features.label_id,
                "review_id": the_input.review_id,
                "date": the_input.date,
                "label": features.label,
                #'review_body': features.review_body
            }
        )

    tf_record_writer.close()

    return records

In [29]:
from datetime import datetime
from time import strftime

# timestamp = datetime.now().replace(microsecond=0).isoformat()
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
print(timestamp)

2024-04-20T01:35:17Z


In [30]:
import pandas as pd

data = [
    [
        5,
        0,
        """I needed an "antivirus" application and know the quality of Norton products.  This was a no brainer for me and I am glad it was so simple to get.""",
    ],
    [
        3,
        1,
        """The problem with ElephantDrive is that it requires the use of Java. Since Java is notorious for security problems I haveit removed from all of my computers. What files I do have stored are photos.""",
    ],
    [
        1,
        2,
        """Terrible, none of my codes worked, and I can't uninstall it.  I think this product IS malware and viruses""",
    ],
]

df = pd.DataFrame(data, columns=["Overall Rating", "Record ID", "Comment"])

# Use the InputExample class from BERT's run_classifier code to create examples from the data
inputs = df.apply(
    lambda x: Input(label=x[LABEL_COLUMN], text=x[REVIEW_BODY_COLUMN], review_id=x[REVIEW_ID_COLUMN], date=timestamp),
    axis=1,
)

In [31]:
# Make sure the date is in the correct ISO-8601 format for Feature Store
print(inputs[0].date)

2024-04-20T01:35:17Z


In [32]:
output_file = "data.tfrecord"

In [33]:
featurestore_runtime = boto3.Session().client(service_name="sagemaker-featurestore-runtime", region_name=region)

In [34]:
from time import gmtime, strftime, sleep

feature_group_name = "reviews-feature-group-" + strftime("%d-%H-%M-%S", gmtime())
print(feature_group_name)

reviews-feature-group-20-01-35-17


In [35]:
from sagemaker.feature_store.feature_definition import (
    FeatureDefinition,
    FeatureTypeEnum,
)

feature_definitions = [
    FeatureDefinition(feature_name="input_ids", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="input_mask", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="segment_ids", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="label_id", feature_type=FeatureTypeEnum.INTEGRAL),
    FeatureDefinition(feature_name="review_id", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="date", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="label", feature_type=FeatureTypeEnum.INTEGRAL),
    #    FeatureDefinition(feature_name='review_body', feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="split_type", feature_type=FeatureTypeEnum.STRING),
]

In [36]:
from sagemaker.feature_store.feature_group import FeatureGroup

feature_group = FeatureGroup(name=feature_group_name, feature_definitions=feature_definitions, sagemaker_session=sess)
print(feature_group)

FeatureGroup(name='reviews-feature-group-20-01-35-17', sagemaker_session=<sagemaker.session.Session object at 0x7f05e1246350>, feature_definitions=[FeatureDefinition(feature_name='input_ids', feature_type=<FeatureTypeEnum.STRING: 'String'>), FeatureDefinition(feature_name='input_mask', feature_type=<FeatureTypeEnum.STRING: 'String'>), FeatureDefinition(feature_name='segment_ids', feature_type=<FeatureTypeEnum.STRING: 'String'>), FeatureDefinition(feature_name='label_id', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>), FeatureDefinition(feature_name='review_id', feature_type=<FeatureTypeEnum.STRING: 'String'>), FeatureDefinition(feature_name='date', feature_type=<FeatureTypeEnum.STRING: 'String'>), FeatureDefinition(feature_name='label', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>), FeatureDefinition(feature_name='split_type', feature_type=<FeatureTypeEnum.STRING: 'String'>)])


In [37]:
record_identifier_feature_name = "review_id"
event_time_feature_name = "date"

In [38]:
prefix = "reviews-feature-store-" + timestamp
print(prefix)

reviews-feature-store-2024-04-20T01:35:17Z


In [39]:
feature_group.create(
    s3_uri=f"s3://{bucket}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=False,
)

ClientError: An error occurred (ValidationException) when calling the CreateFeatureGroup operation: Invalid S3Uri provided. Exception from S3: {The specified bucket does not exist (Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket; Request ID: NK2QEMKCJD2V4Z83; S3 Extended Request ID: GAz7b+i0gtxH5d76YekOE0yHLKykrFdTVabjM3YbF4qGYw+q6ebV0C3MQ0Kwo+f0zybhuMnl28o=; Proxy: null)}. Please ensure that the OfflineStore S3 bucket exists and that the given RoleArn has the 'AmazonSageMakerFeatureStoreAccess' managed policy attached, with access to the bucket and objects in the bucket, with the principal 'sagemaker.amazonaws.com' as a trusted entity. If a KMS Key is provided for the offline store, please ensure that the RoleArn has 'kms:GenerateDataKey' permission and has access to the KMS Key.

In [None]:
feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:211125778552:feature-group/reviews-feature-group-11-21-16-31',
 'FeatureGroupName': 'reviews-feature-group-11-21-16-31',
 'RecordIdentifierFeatureName': 'review_id',
 'EventTimeFeatureName': 'date',
 'FeatureDefinitions': [{'FeatureName': 'input_ids', 'FeatureType': 'String'},
  {'FeatureName': 'input_mask', 'FeatureType': 'String'},
  {'FeatureName': 'segment_ids', 'FeatureType': 'String'},
  {'FeatureName': 'label_id', 'FeatureType': 'Integral'},
  {'FeatureName': 'review_id', 'FeatureType': 'String'},
  {'FeatureName': 'date', 'FeatureType': 'String'},
  {'FeatureName': 'label', 'FeatureType': 'Integral'},
  {'FeatureName': 'split_type', 'FeatureType': 'String'}],
 'CreationTime': datetime.datetime(2024, 4, 11, 21, 16, 31, 950000, tzinfo=tzlocal()),
 'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': 's3://team-4-project-data/reviews-feature-store-2024-04-11T21:16:31Z',
   'ResolvedOutputS3Uri': 's3://team-4-project-data/reviews-feat

In [None]:
import time


def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

In [None]:
wait_for_feature_group_creation_complete(feature_group=feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup reviews-feature-group-11-21-16-31 successfully created.


In [None]:
max_seq_length = 183
records = transform_inputs_to_tfrecord(inputs, output_file, max_seq_length)

data.tfrecord
Writing input 0 of 3

**37 tokens**
['[CLS]', 'i', 'needed', 'an', '"', 'anti', '##virus', '"', 'application', 'and', 'know', 'the', 'quality', 'of', 'norton', 'products', '.', 'this', 'was', 'a', 'no', 'brain', '##er', 'for', 'me', 'and', 'i', 'am', 'glad', 'it', 'was', 'so', 'simple', 'to', 'get', '.', '[SEP]']

**input_ids**
[101, 1045, 2734, 2019, 1000, 3424, 23350, 1000, 4646, 1998, 2113, 1996, 3737, 1997, 10770, 3688, 1012, 2023, 2001, 1037, 2053, 4167, 2121, 2005, 2033, 1998, 1045, 2572, 5580, 2009, 2001, 2061, 3722, 2000, 2131, 1012, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

**input_mask**
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

**segment_ids**
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

In [None]:
import pandas as pd

df_records = pd.DataFrame.from_dict(records)
df_records["split_type"] = "train"
df_records

Unnamed: 0,input_ids,input_mask,segment_ids,label_id,review_id,date,label,split_type
0,"[101, 1045, 2734, 2019, 1000, 3424, 23350, 100...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",4,0,2024-04-11T21:16:31Z,5,train
1,"[101, 1996, 3291, 2007, 10777, 23663, 2003, 20...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",2,1,2024-04-11T21:16:31Z,3,train
2,"[101, 6659, 1010, 3904, 1997, 2026, 9537, 2499...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",0,2,2024-04-11T21:16:31Z,1,train


In [None]:
def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == "object":
            data_frame[label] = data_frame[label].astype("str").astype("string")

In [None]:
cast_object_to_string(df_records)

In [None]:
df_records

Unnamed: 0,input_ids,input_mask,segment_ids,label_id,review_id,date,label,split_type
0,"[101, 1045, 2734, 2019, 1000, 3424, 23350, 100...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",4,0,2024-04-11T21:16:31Z,5,train
1,"[101, 1996, 3291, 2007, 10777, 23663, 2003, 20...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",2,1,2024-04-11T21:16:31Z,3,train
2,"[101, 6659, 1010, 3904, 1997, 2026, 9537, 2499...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",0,2,2024-04-11T21:16:31Z,1,train


In [None]:
feature_group.ingest(data_frame=df_records, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='reviews-feature-group-11-21-16-31', sagemaker_session=<sagemaker.session.Session object at 0x7fec077e9550>, data_frame=                                           input_ids  \
0  [101, 1045, 2734, 2019, 1000, 3424, 23350, 100...   
1  [101, 1996, 3291, 2007, 10777, 23663, 2003, 20...   
2  [101, 6659, 1010, 3904, 1997, 2026, 9537, 2499...   

                                          input_mask  \
0  [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...   
1  [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...   
2  [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...   

                                         segment_ids  label_id  review_id  \
0  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...         4          0   
1  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...         2          1   
2  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...         0          2   

                   date  label split_type  
0  2024-04-11T21:16:31Z      5     

In [None]:
offline_store_contents = None

while offline_store_contents is None:
    objects_in_bucket = s3.list_objects(Bucket=bucket, Prefix=prefix)
    if "Contents" in objects_in_bucket and len(objects_in_bucket["Contents"]) > 1:
        offline_store_contents = objects_in_bucket["Contents"]
    else:
        print("Waiting for data in offline store...\n")
        sleep(60)

print("Data available.")

Waiting for data in offline store...

Waiting for data in offline store...

Waiting for data in offline store...

Waiting for data in offline store...

Waiting for data in offline store...

Data available.


In [None]:
feature_store_query = feature_group.athena_query()

In [None]:
feature_store_table = feature_store_query.table_name

In [None]:
print(feature_group.as_hive_ddl())

CREATE EXTERNAL TABLE IF NOT EXISTS sagemaker_featurestore.reviews-feature-group-11-21-16-31 (
  input_ids STRING
  input_mask STRING
  segment_ids STRING
  label_id INT
  review_id STRING
  date STRING
  label INT
  split_type STRING
  write_time TIMESTAMP
  event_time TIMESTAMP
  is_deleted BOOLEAN
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  STORED AS
  INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
  OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
LOCATION 's3://team-4-project-data/reviews-feature-store-2024-04-11T21:16:31Z/211125778552/sagemaker/us-east-1/offline-store/reviews-feature-group-11-21-16-31'


In [None]:
query_string = """
SELECT input_ids, input_mask, segment_ids, label_id, split_type  FROM "{}" WHERE split_type='train' LIMIT 5
""".format(
    feature_store_table
)

print("Running " + query_string)

Running 
SELECT input_ids, input_mask, segment_ids, label_id, split_type  FROM "reviews_feature_group_11_21_16_31_1712870191" WHERE split_type='train' LIMIT 5



In [None]:
feature_store_query.run(query_string=query_string, output_location="s3://" + bucket + "/" + prefix + "/query_results/")

feature_store_query.wait()

In [None]:
dataset = pd.DataFrame()

dataset = feature_store_query.as_dataframe()

dataset

Unnamed: 0,input_ids,input_mask,segment_ids,label_id,split_type
0,"[101, 6659, 1010, 3904, 1997, 2026, 9537, 2499...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",0,train
1,"[101, 1996, 3291, 2007, 10777, 23663, 2003, 20...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",2,train
2,"[101, 1045, 2734, 2019, 1000, 3424, 23350, 100...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",4,train
