# Using AWS Timestream to query sensor feed data

## Introduction

This Jupyter notebook guides you through the process of querying sensor data from AWS Timestream. Timestream is AWS's cutting-edge serverless database, purpose-built for efficiently storing time-series data, especially data originating from IoT devices.

## Why Timestream

Amazon Timestream is a purpose-built, fully managed time-series database service designed to simplify the storage and analysis of time-stamped data at scale. Here are several compelling reasons to choose Amazon Timestream:

1. **Time-Series Data Management:**
   - *Efficient Storage:* Timestream is optimized for handling time-series data, providing high efficiency in storage and query performance for timestamped information.

2. **Serverless and Fully Managed:**
   - *Ease of Use:* Timestream is fully managed, eliminating operational overhead. It is serverless, automatically handling tasks such as scaling, patching, and backups.

3. **Scalability:**
   - *Horizontal Scalability:* Timestream scales horizontally, accommodating a large volume of time-series data as your application grows.

4. **Cost-Effective:**
   - *Pay-as-You-Go Model:* With a pay-as-you-go pricing model, Timestream is cost-effective, especially for varying workloads.

5. **Built-in Analytics:**
   - *Query Language:* Timestream provides a SQL-like query language for easy data retrieval, supporting analytical queries for complex analysis on time-series data.

6. **Integration with AWS Ecosystem:**
   - *Seamless Integration:* Timestream integrates seamlessly with AWS services like Amazon CloudWatch, AWS IoT, and Amazon Kinesis, allowing the ingestion and analysis of data from various sources.

7. **Security and Compliance:**
   - *AWS Identity and Access Management (IAM):* Timestream integrates with IAM for access control, ensuring secure data handling. It also supports encryption at rest and in transit to meet compliance requirements.

8. **Versatility:**
   - *Multi-Resolution Storage:* Timestream supports multi-resolution storage, enabling optimization of storage costs based on data access patterns.

9. **Time-Windowed Retention Policies:**
   - *Data Retention Control:* Timestream allows the definition of retention policies based on time windows, automatically managing the lifecycle of time-series data.

10. **Real-time Data Ingestion:**
    - *High Throughput:* Timestream supports high-speed, continuous data ingestion, making it suitable for real-time applications and scenarios with rapidly generated data.

In summary, Amazon Timestream is an excellent choice for applications requiring efficient storage, analysis, and retrieval of time-series data at scale. It is particularly well-suited for use cases such as IoT, telemetry, monitoring, and analytics.

## Setup

In [25]:
%pip install --upgrade pip

Collecting pip
  Obtaining dependency information for pip from https://files.pythonhosted.org/packages/47/6a/453160888fab7c6a432a6e25f8afe6256d0d9f2cbd25971021da6491d899/pip-23.3.1-py3-none-any.whl.metadata
  Downloading pip-23.3.1-py3-none-any.whl.metadata (3.5 kB)
Downloading pip-23.3.1-py3-none-any.whl (2.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 23.2.1
    Uninstalling pip-23.2.1:
      Successfully uninstalled pip-23.2.1
Successfully installed pip-23.3.1
Note: you may need to restart the kernel to use updated packages.


In [26]:
%pip install --no-build-isolation --force-reinstall \
    "boto3>=1.28.57" \
    "awscli>=1.29.57" \
    "botocore>=1.31.57"

Collecting boto3>=1.28.57
  Downloading boto3-1.33.10-py3-none-any.whl.metadata (6.7 kB)
Collecting awscli>=1.29.57
  Downloading awscli-1.31.10-py3-none-any.whl.metadata (11 kB)
Collecting botocore>=1.31.57
  Downloading botocore-1.33.10-py3-none-any.whl.metadata (6.1 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3>=1.28.57)
  Downloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Collecting s3transfer<0.9.0,>=0.8.2 (from boto3>=1.28.57)
  Downloading s3transfer-0.8.2-py3-none-any.whl.metadata (1.8 kB)
Collecting docutils<0.17,>=0.10 (from awscli>=1.29.57)
  Downloading docutils-0.16-py2.py3-none-any.whl (548 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m548.2/548.2 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting PyYAML<6.1,>=3.10 (from awscli>=1.29.57)
  Downloading PyYAML-6.0.1-cp311-cp311-macosx_11_0_arm64.whl.metadata (2.1 kB)
Collecting colorama<0.4.5,>=0.2.5 (from awscli>=1.29.57)
  Downloading colorama-0.4.4-py2.py3-none-any.

## Setup AWS Credentials in Jupyter Notebook

To set up boto3 credentials in a Jupyter notebook:# Setting up Boto3 Credentials in a Jupyter Notebook

To establish Boto3 credentials in a Jupyter notebook, follow these steps:

1. **Configure AWS Credentials on the EC2 Instance:**
   - Create a credentials file at `~/.aws/credentials`.
   - Add the `aws_access_key_id` and `aws_secret_access_key` to this file.

     ```plaintext
     [default]
     aws_access_key_id = YOUR_ACCESS_KEY_ID
     aws_secret_access_key = YOUR_SECRET_ACCESS_KEY
     ```

2. **Alternative Configuration using Environment Variables:**
   - Set environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` with your credentials.

3. **Install Boto3 in the Jupyter Notebook Environment:**
   - Execute the following command in a Jupyter notebook cell:

     ```bash
     !pip install boto3
     ```
These steps will enable you to set up and use Boto3 credentials within your Jupyter notebook environment.

In [27]:
import boto3

# ---- ⚠️ Un-comment and edit the below lines as needed for your AWS setup ⚠️ ----

# os.environ["AWS_DEFAULT_REGION"] = "<REGION_NAME>"  # E.g. "us-east-1"
# os.environ["AWS_PROFILE"] = "<YOUR_PROFILE>"

session = boto3.Session()

## Set environment and region variable to connect to the right database 

Replace the `ENVIRONMENT` variable with the specific environment used during deployment.

In [28]:
ENVIRONMENT='dev' # Replace this with the environment that you used when deploying the solution 

# Insert your database name here
DATABASE_NAME=f'afriset-{ENVIRONMENT}'
TABLE_NAME=f'afriset-{ENVIRONMENT}-sensor-feeds'

## Query Helper Class

Upon completing the file upload through the user interface (UI), the sensor data has been successfully ingested into Timestream. To retrieve this data, we will proceed by creating a Query object capable of executing any written query. The output will be iterated through and displayed on the screen. You can find the relevant code on GitHub at https://github.com/awslabs/amazon-timestream-tools/blob/master/sample_apps/python/QueryExample.py.

In [29]:
class Query(object):

    def __init__(self, client):
        self.client = client
        self.paginator = client.get_paginator('query')

    # See records ingested into this table so far
    SELECT_ALL = f"SELECT * FROM {DATABASE_NAME}.{TABLE_NAME}"

    def run_query(self, query_string):
        try:
            page_iterator = self.paginator.paginate(QueryString=query_string)
            for page in page_iterator:
                self._parse_query_result(page)
        except Exception as err:
            print("Exception while running query:", err)

    def _parse_query_result(self, query_result):
        column_info = query_result['ColumnInfo']

        print("Metadata: %s" % column_info)
        print("Data: ")
        for row in query_result['Rows']:
            print(self._parse_row(column_info, row))

    def _parse_row(self, column_info, row):
        data = row['Data']
        row_output = []
        for j in range(len(data)):
            info = column_info[j]
            datum = data[j]
            row_output.append(self._parse_datum(info, datum))

        return "{%s}" % str(row_output)

    def _parse_datum(self, info, datum):
        if datum.get('NullValue', False):
            return "%s=NULL" % info['Name'],

        column_type = info['Type']

        # If the column is of TimeSeries Type
        if 'TimeSeriesMeasureValueColumnInfo' in column_type:
            return self._parse_time_series(info, datum)

        # If the column is of Array Type
        elif 'ArrayColumnInfo' in column_type:
            array_values = datum['ArrayValue']
            return "%s=%s" % (info['Name'], self._parse_array(info['Type']['ArrayColumnInfo'], array_values))

        # If the column is of Row Type
        elif 'RowColumnInfo' in column_type:
            row_column_info = info['Type']['RowColumnInfo']
            row_values = datum['RowValue']
            return self._parse_row(row_column_info, row_values)

        # If the column is of Scalar Type
        else:
            return self._parse_column_name(info) + datum['ScalarValue']

    def _parse_time_series(self, info, datum):
        time_series_output = []
        for data_point in datum['TimeSeriesValue']:
            time_series_output.append("{time=%s, value=%s}"
                                      % (data_point['Time'],
                                         self._parse_datum(info['Type']['TimeSeriesMeasureValueColumnInfo'],
                                                           data_point['Value'])))
        return "[%s]" % str(time_series_output)

    def _parse_array(self, array_column_info, array_values):
        array_output = []
        for datum in array_values:
            array_output.append(self._parse_datum(array_column_info, datum))

        return "[%s]" % str(array_output)

    def run_query_with_multiple_pages(self, limit):
        query_with_limit = self.SELECT_ALL + " LIMIT " + str(limit)
        print("Starting query with multiple pages : " + query_with_limit)
        self.run_query(query_with_limit)

    def cancel_query(self):
        print("Starting query: " + self.SELECT_ALL)
        result = self.client.query(QueryString=self.SELECT_ALL)
        print("Cancelling query: " + self.SELECT_ALL)
        try:
            self.client.cancel_query(QueryId=result['QueryId'])
            print("Query has been successfully cancelled")
        except Exception as err:
            print("Cancelling query failed:", err)

    @staticmethod
    def _parse_column_name(info):
        if 'Name' in info:
            return info['Name'] + "="
        else:
            return ""

In [23]:
query_client = session.client('timestream-query', region_name=os.environ.get("AWS_DEFAULT_REGION", None))

query = Query(query_client)

## Querying the sensor data from database

In [24]:
QUERY_1 = f"""
        SELECT * FROM "{DATABASE_NAME}"."{TABLE_NAME}" WHERE time between ago(15m) and now() ORDER BY time DESC LIMIT 10 
        """

query_output = query.run_query(QUERY_1)

Metadata: [{'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}}]
Data: 
