# InfluxDB Integration Testing with AWS Bedrock

This notebook demonstrates:
- Connecting to AWS Timestream for InfluxDB
- Writing test data to InfluxDB buckets
- Querying time-series data
- Analyzing data with Bedrock AI (Claude 3)
- Visualizing metrics with Plotly

## Prerequisites
- IAM role with permissions for Bedrock
- InfluxDB credentials stored in environment variables or AWS Secrets Manager
- Required Python packages installed

## 1. Setup and Imports

In [None]:
# Install required packages (if not already installed)
!pip install -q influxdb-client==1.49.0 boto3 pandas plotly ipywidgets aiohttp

In [None]:
import os
import json
import time
import boto3
import pandas as pd
from datetime import datetime, timedelta
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import plotly.graph_objects as go
import plotly.express as px
from IPython.display import display, Markdown, HTML
import ipywidgets as widgets

## 2. Configuration

Set up InfluxDB and AWS credentials. You can either:
- Set environment variables directly
- Load from AWS Secrets Manager
- Enter manually in the cells below

In [None]:
# InfluxDB Configuration
INFLUXDB_URL = os.getenv('INFLUXDB_URL', 'https://Lhk52q7uoe-lktzzbuyksah47.timestream-influxdb.us-east-1.on.aws')
INFLUXDB_TOKEN = os.getenv('INFLUXDB_TOKEN', '')  # Set your token here or in environment
INFLUXDB_ORG = os.getenv('INFLUXDB_ORG', 'superapp-org')
INFLUXDB_BUCKET = os.getenv('INFLUXDB_BUCKET', 'test-bucket')

# AWS Configuration
AWS_REGION = 'us-east-1'
BEDROCK_MODEL_ID = 'anthropic.claude-3-sonnet-20240229-v1:0'

print(f"✓ InfluxDB URL: {INFLUXDB_URL}")
print(f"✓ InfluxDB Org: {INFLUXDB_ORG}")
print(f"✓ InfluxDB Bucket: {INFLUXDB_BUCKET}")
print(f"✓ AWS Region: {AWS_REGION}")
print(f"✓ Token configured: {'Yes' if INFLUXDB_TOKEN else 'No - Please set INFLUXDB_TOKEN'}")

### Optional: Load credentials from AWS Secrets Manager

In [None]:
# Uncomment to load from Secrets Manager
# def load_influxdb_credentials_from_secrets_manager():
#     secrets_client = boto3.client('secretsmanager', region_name=AWS_REGION)
#     try:
#         response = secrets_client.get_secret_value(SecretId='superapp-influxdb-credentials')
#         secrets = json.loads(response['SecretString'])
#         return secrets
#     except Exception as e:
#         print(f"Error loading secrets: {e}")
#         return None

# Load and update configuration
# secrets = load_influxdb_credentials_from_secrets_manager()
# if secrets:
#     INFLUXDB_URL = secrets.get('INFLUXDB_URL', INFLUXDB_URL)
#     INFLUXDB_TOKEN = secrets.get('INFLUXDB_TOKEN', INFLUXDB_TOKEN)
#     INFLUXDB_ORG = secrets.get('INFLUXDB_ORG', INFLUXDB_ORG)
#     INFLUXDB_BUCKET = secrets.get('INFLUXDB_BUCKET', INFLUXDB_BUCKET)
#     print("✓ Credentials loaded from Secrets Manager")

## 3. Initialize Clients

In [None]:
# Initialize InfluxDB client
influx_client = InfluxDBClient(
    url=INFLUXDB_URL,
    token=INFLUXDB_TOKEN,
    org=INFLUXDB_ORG,
    timeout=30_000  # 30 second timeout
)

# Create API instances
write_api = influx_client.write_api(write_options=SYNCHRONOUS)
query_api = influx_client.query_api()
health_api = influx_client.health()

print(f"✓ InfluxDB Health Status: {health_api.status}")
print(f"✓ InfluxDB Version: {health_api.version if hasattr(health_api, 'version') else 'N/A'}")

In [None]:
# Initialize AWS Bedrock client
bedrock_runtime = boto3.client(
    service_name='bedrock-runtime',
    region_name=AWS_REGION
)

print("✓ AWS Bedrock client initialized")

## 4. Test Data Writing

Write sample energy metrics and sensor data to InfluxDB

In [None]:
def write_sample_energy_metrics(count=10):
    """
    Write sample energy metrics to InfluxDB
    """
    import random
    
    points = []
    base_time = datetime.utcnow() - timedelta(hours=1)
    
    for i in range(count):
        timestamp = base_time + timedelta(minutes=i * 6)
        
        point = (
            Point("energy_metrics")
            .tag("location", "datacenter-1")
            .tag("source", "jupyter-test")
            .field("power_kw", round(random.uniform(50, 150), 2))
            .field("voltage", round(random.uniform(220, 240), 2))
            .field("current", round(random.uniform(10, 30), 2))
            .field("temperature", round(random.uniform(20, 35), 2))
            .time(timestamp, WritePrecision.NS)
        )
        points.append(point)
    
    try:
        write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=points)
        print(f"✓ Successfully wrote {count} energy metric points")
        return True
    except Exception as e:
        print(f"✗ Error writing data: {e}")
        return False

# Write sample data
write_sample_energy_metrics(20)

In [None]:
def write_sample_sensor_data(count=10):
    """
    Write sample sensor data to InfluxDB
    """
    import random
    
    points = []
    base_time = datetime.utcnow() - timedelta(hours=1)
    
    for i in range(count):
        timestamp = base_time + timedelta(minutes=i * 6)
        
        point = (
            Point("sensor_data")
            .tag("sensor_id", f"sensor-{random.randint(1, 3)}")
            .tag("source", "jupyter-test")
            .field("temperature", round(random.uniform(15, 30), 2))
            .field("humidity", round(random.uniform(30, 70), 2))
            .field("pressure", round(random.uniform(980, 1020), 2))
            .time(timestamp, WritePrecision.NS)
        )
        points.append(point)
    
    try:
        write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=points)
        print(f"✓ Successfully wrote {count} sensor data points")
        return True
    except Exception as e:
        print(f"✗ Error writing data: {e}")
        return False

# Write sample data
write_sample_sensor_data(20)

## 5. Query and Visualize Data

In [None]:
def query_energy_metrics(hours=1):
    """
    Query energy metrics from InfluxDB
    """
    flux_query = f'''
    from(bucket: "{INFLUXDB_BUCKET}")
      |> range(start: -{hours}h)
      |> filter(fn: (r) => r._measurement == "energy_metrics")
      |> filter(fn: (r) => r._field == "power_kw" or r._field == "temperature")
      |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    '''
    
    try:
        tables = query_api.query(flux_query, org=INFLUXDB_ORG)
        
        # Convert to pandas DataFrame
        records = []
        for table in tables:
            for record in table.records:
                records.append({
                    'time': record.get_time(),
                    'location': record.values.get('location'),
                    'power_kw': record.values.get('power_kw'),
                    'temperature': record.values.get('temperature')
                })
        
        df = pd.DataFrame(records)
        print(f"✓ Retrieved {len(df)} energy metric records")
        return df
    except Exception as e:
        print(f"✗ Error querying data: {e}")
        return pd.DataFrame()

# Query data
energy_df = query_energy_metrics(hours=2)
if not energy_df.empty:
    display(energy_df.head())

In [None]:
# Visualize energy metrics
if not energy_df.empty:
    fig = go.Figure()
    
    # Add power trace
    fig.add_trace(go.Scatter(
        x=energy_df['time'],
        y=energy_df['power_kw'],
        mode='lines+markers',
        name='Power (kW)',
        line=dict(color='#3b82f6', width=2)
    ))
    
    fig.update_layout(
        title='Energy Consumption Over Time',
        xaxis_title='Time',
        yaxis_title='Power (kW)',
        hovermode='x unified',
        template='plotly_white',
        height=400
    )
    
    fig.show()
else:
    print("No data to visualize")

In [None]:
def query_sensor_data(hours=1):
    """
    Query sensor data from InfluxDB
    """
    flux_query = f'''
    from(bucket: "{INFLUXDB_BUCKET}")
      |> range(start: -{hours}h)
      |> filter(fn: (r) => r._measurement == "sensor_data")
      |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    '''
    
    try:
        tables = query_api.query(flux_query, org=INFLUXDB_ORG)
        
        records = []
        for table in tables:
            for record in table.records:
                records.append({
                    'time': record.get_time(),
                    'sensor_id': record.values.get('sensor_id'),
                    'temperature': record.values.get('temperature'),
                    'humidity': record.values.get('humidity'),
                    'pressure': record.values.get('pressure')
                })
        
        df = pd.DataFrame(records)
        print(f"✓ Retrieved {len(df)} sensor data records")
        return df
    except Exception as e:
        print(f"✗ Error querying data: {e}")
        return pd.DataFrame()

# Query sensor data
sensor_df = query_sensor_data(hours=2)
if not sensor_df.empty:
    display(sensor_df.head())

In [None]:
# Visualize sensor data by sensor
if not sensor_df.empty:
    fig = px.line(
        sensor_df,
        x='time',
        y='temperature',
        color='sensor_id',
        title='Sensor Temperature Readings',
        labels={'temperature': 'Temperature (°C)', 'time': 'Time'},
        template='plotly_white'
    )
    fig.update_layout(height=400)
    fig.show()
else:
    print("No sensor data to visualize")

## 6. AI Analysis with AWS Bedrock

Use Claude 3 to analyze the time-series data

In [None]:
def invoke_bedrock_claude(prompt, max_tokens=2048):
    """
    Invoke Claude 3 via Bedrock for data analysis
    """
    body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": max_tokens,
        "messages": [
            {
                "role": "user",
                "content": prompt
            }
        ],
        "temperature": 0.7
    })
    
    try:
        response = bedrock_runtime.invoke_model(
            modelId=BEDROCK_MODEL_ID,
            body=body
        )
        
        response_body = json.loads(response['body'].read())
        return response_body['content'][0]['text']
    except Exception as e:
        print(f"✗ Error invoking Bedrock: {e}")
        return None

In [None]:
# Prepare data summary for AI analysis
if not energy_df.empty:
    stats = energy_df['power_kw'].describe()
    
    prompt = f"""
    Analyze the following energy consumption data and provide insights:
    
    Data Summary:
    - Total readings: {len(energy_df)}
    - Time period: {energy_df['time'].min()} to {energy_df['time'].max()}
    - Average power: {stats['mean']:.2f} kW
    - Min power: {stats['min']:.2f} kW
    - Max power: {stats['max']:.2f} kW
    - Std deviation: {stats['std']:.2f} kW
    
    Please provide:
    1. Key observations about the energy consumption pattern
    2. Any anomalies or unusual patterns
    3. Recommendations for optimization
    4. Potential cost-saving opportunities
    """
    
    print("Analyzing data with Claude 3...\n")
    analysis = invoke_bedrock_claude(prompt)
    
    if analysis:
        display(Markdown("## AI Analysis Results\n\n" + analysis))
else:
    print("No energy data available for analysis")

## 7. Interactive Testing Widget

Interactive interface for writing and querying data

In [None]:
# Create interactive widgets
measurement_dropdown = widgets.Dropdown(
    options=['energy_metrics', 'sensor_data'],
    value='energy_metrics',
    description='Measurement:',
    style={'description_width': 'initial'}
)

write_count = widgets.IntSlider(
    value=10,
    min=1,
    max=100,
    step=1,
    description='Data Points:',
    style={'description_width': 'initial'}
)

query_hours = widgets.IntSlider(
    value=1,
    min=1,
    max=24,
    step=1,
    description='Query Hours:',
    style={'description_width': 'initial'}
)

write_button = widgets.Button(
    description='Write Test Data',
    button_style='success',
    icon='database'
)

query_button = widgets.Button(
    description='Query Data',
    button_style='info',
    icon='search'
)

analyze_button = widgets.Button(
    description='AI Analysis',
    button_style='warning',
    icon='brain'
)

output = widgets.Output()

def on_write_button_clicked(b):
    with output:
        output.clear_output()
        print(f"Writing {write_count.value} points to {measurement_dropdown.value}...")
        
        if measurement_dropdown.value == 'energy_metrics':
            write_sample_energy_metrics(write_count.value)
        else:
            write_sample_sensor_data(write_count.value)

def on_query_button_clicked(b):
    with output:
        output.clear_output()
        print(f"Querying {measurement_dropdown.value} for last {query_hours.value} hour(s)...")
        
        if measurement_dropdown.value == 'energy_metrics':
            df = query_energy_metrics(query_hours.value)
        else:
            df = query_sensor_data(query_hours.value)
        
        if not df.empty:
            display(df.head(10))
            print(f"\nTotal records: {len(df)}")

def on_analyze_button_clicked(b):
    with output:
        output.clear_output()
        print("Running AI analysis...\n")
        
        if measurement_dropdown.value == 'energy_metrics':
            df = query_energy_metrics(query_hours.value)
            if not df.empty:
                stats = df['power_kw'].describe()
                prompt = f"Analyze this energy data: mean={stats['mean']:.2f}kW, min={stats['min']:.2f}kW, max={stats['max']:.2f}kW. Provide brief insights."
                analysis = invoke_bedrock_claude(prompt, max_tokens=500)
                if analysis:
                    display(Markdown(analysis))

write_button.on_click(on_write_button_clicked)
query_button.on_click(on_query_button_clicked)
analyze_button.on_click(on_analyze_button_clicked)

# Display widgets
display(widgets.VBox([
    widgets.HTML("<h3>InfluxDB Interactive Testing</h3>"),
    measurement_dropdown,
    write_count,
    query_hours,
    widgets.HBox([write_button, query_button, analyze_button]),
    output
]))

## 8. Performance Testing

Test write and query performance

In [None]:
def performance_test_write(num_points=100):
    """
    Test InfluxDB write performance
    """
    import random
    
    print(f"Performance test: Writing {num_points} points...")
    
    points = []
    for i in range(num_points):
        point = (
            Point("perf_test")
            .tag("test_id", "performance")
            .field("value", random.random() * 100)
            .field("iteration", i)
        )
        points.append(point)
    
    start_time = time.time()
    try:
        write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=points)
        elapsed = time.time() - start_time
        
        print(f"✓ Wrote {num_points} points in {elapsed:.3f} seconds")
        print(f"  Throughput: {num_points/elapsed:.1f} points/second")
        return elapsed
    except Exception as e:
        print(f"✗ Write failed: {e}")
        return None

# Run performance test
performance_test_write(100)

In [None]:
def performance_test_query():
    """
    Test InfluxDB query performance
    """
    flux_query = f'''
    from(bucket: "{INFLUXDB_BUCKET}")
      |> range(start: -24h)
      |> filter(fn: (r) => r._measurement == "perf_test")
      |> count()
    '''
    
    print("Performance test: Querying data...")
    start_time = time.time()
    
    try:
        tables = query_api.query(flux_query, org=INFLUXDB_ORG)
        elapsed = time.time() - start_time
        
        total_records = sum([len(table.records) for table in tables])
        print(f"✓ Query completed in {elapsed:.3f} seconds")
        print(f"  Records processed: {total_records}")
        return elapsed
    except Exception as e:
        print(f"✗ Query failed: {e}")
        return None

# Run query performance test
performance_test_query()

## 9. Cleanup and Close Connections

In [None]:
# Close InfluxDB client
influx_client.close()
print("✓ InfluxDB client closed")
print("\nTesting complete!")

## Summary

This notebook demonstrated:
- ✓ Connecting to AWS Timestream for InfluxDB
- ✓ Writing time-series data (energy metrics and sensor data)
- ✓ Querying data using Flux query language
- ✓ Visualizing metrics with Plotly
- ✓ AI-powered data analysis with AWS Bedrock (Claude 3)
- ✓ Interactive testing interface with ipywidgets
- ✓ Performance testing

### Next Steps
1. Customize the data models for your specific use case
2. Implement more advanced Flux queries and aggregations
3. Set up automated data collection pipelines
4. Create dashboards for real-time monitoring
5. Integrate with other AWS services (Lambda, Step Functions, etc.)