In [None]:
%env AWS_DEFAULT_REGION=us-east-1

In [2]:
import boto3
import pandas as pd
import json

from pprint import pprint

In [3]:
build_permit_text = """
Commencement – Breach of registered restrictive covenant
This permit will not come into effect until the covenant contained in Instrument of Transfer [insert details] in the Register of Titles is removed or varied to avoid a breach of the covenant by this permit
Include in any permit where the grant of the permit would authorise anything that would result in a breach of a registered restrictive covenant Mandatory condition required by Planning and Environment Act 1987 section 62(1)(aa
Expiry – Development
This permit as it relates to development (buildings and works) will expire if one of the following circumstances applies: a) The development is not started within 2 years of the issued date of this permit. b) The development is not completed within 4 years of the issued date of this permit. In accordance with Section 69 of the Planning and Environment Act 1987, an application may be submitted to the responsible authority for an extension of the periods referred to in this condition
Include in all permits for development for buildings and works with appropriate modification to time for starting and completion. Where development is to be undertaken in stages, include additional conditions stating when each stage of development is to be started and completed. See chapter 6.2.1
"""

In [4]:
bedrock = boto3.client('bedrock')

bedrock_runtime = boto3.client('bedrock-runtime')

model_id = 'amazon.titan-embed-text-v2:0'

### on-demand invoke embedding model

In [None]:
sample_input = {
    "inputText": build_permit_text,
    "dimensions": 256,
    "normalize": True
}
body = json.dumps(sample_input)
response = bedrock_runtime.invoke_model(
    modelId=model_id,
    body=body,
    accept='application/json',
    contentType='application/json'
)

response_body = json.loads(response.get('body').read())
embeddings = response_body['embedding']
print(f"embedding lenght: {len(embeddings)}")
print(f"embedding: {embeddings}")
print(f"response body: {response_body}")

### Synthesize data


In [None]:
words_count = build_permit_text.split()
print(f"words count: {len(words_count)}")

In [7]:
sample_text_length = 150

assert len(words_count) > sample_text_length

In [None]:
import random

record_count = 101
records = []

for i in range(record_count):
    random_words = random.sample(words_count, sample_text_length)
    random_text = ' '.join(random_words)
    records.append(random_text)

print(f"records count: {len(records)}")


output `jsonl` file for batch inference job.

In [9]:
output_file = 'synthetic-data.jsonl'
with open(output_file, 'w') as f:
    for i, record in enumerate(records):
        output = {
            "recordId": str(i), 
            "modelInput": {
                "inputText": record,
                "dimensions": 256,
                "embeddingsByType": "float"
            }
        }
        f.write(json.dumps(output) + '\n')



In [10]:
sts_client = boto3.client("sts")
response = sts_client.get_caller_identity()
account_id = response["Account"]

In [None]:
region_code = boto3.session.Session().region_name
region_code

In [12]:
bucket_name = f"embedding-batch-job-{account_id}-{region_code}"
data_uri_prefix = f"s3://{bucket_name}/input"

### Prepare Batch Job Testing

Create the batch files, then upload to S3, and create the DynamoDB records.

In [13]:
batches = 25

In [None]:
import uuid

data_file_uris = []
for i in range(batches):
    batch_id = str(uuid.uuid4())[:8]
    input_file_uri = f"{data_uri_prefix}/{batch_id}/data.jsonl"
    data_file_uris.append(input_file_uri)
    !aws s3 cp ./$output_file $input_file_uri

In [None]:
data_file_uris

In [None]:
from datetime import datetime
from datetime import timezone

dynamodb = boto3.client('dynamodb')

# Get current timestamp
current_time = datetime.now(timezone.utc).isoformat()

# Store each data file URI in DynamoDB
for uri in data_file_uris:
    item = {
        'data_s3_uri': {'S': uri},
        'created_dt': {'S': current_time},
        'status': {'S': 'Pending'},
        'id': {'S': uri.split('/')[-2]}  # Extract batch_id from URI
    }
    
    dynamodb.put_item(
        TableName='embedding-batch-registry',
        Item=item
    )

print(f"Stored {len(data_file_uris)} records in DynamoDB")

In [None]:
from boto3.dynamodb.conditions import Key

table = boto3.resource('dynamodb').Table("embedding-batch-registry")
response = table.query(
    IndexName='status-index',
    KeyConditionExpression=Key('status').eq('Pending')
)
response

### Reference

#### Create Batch Inference Job

In [30]:
iam_role_arn = f"arn:aws:iam::{account_id}:role/SolutionStack-BedrockBatchJobRole98C7DFA0-iyjwcI9nLVu3"

input_file_uri = f"s3://{bucket_name}/batch/input/synthetic-data-50k.jsonl"

output_data_uri = f"s3://{bucket_name}/batch-inference/embedding/building/output/"

In [33]:
inputDataConfig=({
    "s3InputDataConfig": {
        "s3Uri": input_file_uri
    }
})

outputDataConfig=({
    "s3OutputDataConfig": {
        "s3Uri": output_data_uri
    }
})

response=bedrock.create_model_invocation_job(
    roleArn=iam_role_arn,
    modelId=model_id,
    jobName="building-text-embedding-batch-job-002",
    inputDataConfig=inputDataConfig,
    outputDataConfig=outputDataConfig
)

print(response)

In [10]:


def get_batch_inference_jobs(status_list: list[str]=["Submitted", "Validating", "Scheduled", "InProgress", "Stopping"], name_contains: str="building-text-embedding"):
    """
    Get batch inference jobs by status and name contains.
    
    """
    if not status_list:
        return []
    next_token = None
    invocations = []
    while True:
        if next_token is None:
            res = bedrock.list_model_invocation_jobs(
                nameContains=name_contains,
            )
        else:
            res = bedrock.list_model_invocation_jobs(
                nameContains=name_contains,
                nextToken=next_token
            )
            
        invocations.extend(res.get("invocationJobSummaries"))
        if "nextToken" in res: 
            next_token = res.get("nextToken")
        else:
            break

    return [inv for inv in invocations if inv.get("status") in status_list]


In [None]:
results = get_batch_inference_jobs()

pprint(results)


In [16]:
from datetime import datetime
from datetime import timezone

In [None]:
datetime.now(timezone.utc).isoformat()
