# Infrastructure Testing Notebook

This notebook is designed to test the deployed AWS infrastructure, specifically focusing on S3 and Bedrock integration.

In [16]:
# Import necessary libraries
import boto3
import os
import json
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv(override=True)

print("‚úÖ Libraries imported and environment variables loaded.")

‚úÖ Libraries imported and environment variables loaded.


In [17]:
# Configuration
aws_region = os.getenv("AWS_REGION", "us-west-2")
aws_profile = os.getenv("AWS_PROFILE", "default")
bucket_name_source = os.getenv("S3_BUCKET_NAME_SOURCE")
bucket_name_destination = os.getenv("S3_BUCKET_NAME_DESTINATION")
bedrock_model_id = os.getenv("BEDROCK_MODEL_ID", "meta.llama3-1-8b-instruct-v1:0")

print(f"AWS Region: {aws_region}")
print(f"AWS Profile: {aws_profile}")
print(f"S3 Bucket Source: {bucket_name_source}")
print(f"S3 Bucket Destination: {bucket_name_destination}")
print(f"Bedrock Model: {bedrock_model_id}")

# Setup AWS Session
session = boto3.Session(profile_name=aws_profile, region_name=aws_region)
s3_client = session.client("s3")
bedrock_client = session.client("bedrock-runtime")

AWS Region: us-east-1
AWS Profile: warike-development
S3 Bucket Source: 5ry5v6lz-source-vector-s3-lambda
S3 Bucket Destination: 5ry5v6lz-destination-vector-s3-lambda
Bedrock Model: meta.llama3-1-8b-instruct-v1:0


## Test 1: S3 Connectivity
Check if we can list objects in the configured bucket.

In [None]:
try:
    if bucket_name_source:
        response = s3_client.list_objects_v2(Bucket=bucket_name_source, MaxKeys=5)
        if 'Contents' in response:
            print(f"‚úÖ Successfully listed objects in {bucket_name}:")
            for obj in response['Contents']:
                print(f" - {obj['Key']} ({obj['Size']} bytes)")
        else:
            print(f"‚úÖ Bucket {bucket_name} exists but is empty.")
    else:
        print("‚ö†Ô∏è S3_BUCKET_NAME_SOURCE not set in .env, skipping S3 test.")

    if bucket_name_destination:
        response = s3_client.list_objects_v2(Bucket=bucket_name_destination, MaxKeys=5)
        if 'Contents' in response:
            print(f"‚úÖ Successfully listed objects in {bucket_name}:")
            for obj in response['Contents']:
                print(f" - {obj['Key']} ({obj['Size']} bytes)")
        else:
            print(f"‚úÖ Bucket {bucket_name} exists but is empty.")
    else:
        print("‚ö†Ô∏è S3_BUCKET_NAME_DESTINATION not set in .env, skipping S3 test.")
except Exception as e:
    print(f"‚ùå Error accessing S3: {e}")

## Test 2: Bedrock Invocation
Test if we can invoke the Bedrock model (Llama 3).

In [None]:
try:
    prompt_text = "Hello, tell about Sneek, Netherlands."
    # Llama 3 Instruct format
    formatted_prompt = f"<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n{prompt_text}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n"
    
    body = json.dumps({
        "prompt": formatted_prompt,
        # "max_gen_len": 100,
        "temperature": 0.5,
        "top_p": 0.9
    })

    print(f"ü§ñ Invoking Bedrock model: {bedrock_model_id}...")
    response = bedrock_client.invoke_model(
        body=body,
        modelId=bedrock_model_id,
        accept="application/json",
        contentType="application/json"
    )

    response_body = json.loads(response.get("body").read())
    print("‚úÖ Bedrock Response:")
    # Llama 3 returns 'generation'
    print(response_body.get('generation'))

except Exception as e:
    print(f"‚ùå Error invoking Bedrock: {e}")