In [1]:
import os
import datetime as dt
import boto3
import math
import pandas as pd
import numpy as np
import timestreamquery as timestream
from IPython import display
from dotenv import load_dotenv

load_dotenv()

DATABASE_NAME = os.getenv("DATABASE_NAME")
TABLE_NAME = os.getenv("TABLE_NAME")
PAST_TABLE_NAME = os.getenv("PAST_TABLE_NAME")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")

assert DATABASE_NAME
assert TABLE_NAME
assert PAST_TABLE_NAME
assert AWS_ACCESS_KEY_ID
assert AWS_SECRET_ACCESS_KEY



TIMESTREAM_DATATYPES = ("DOUBLE", "BIGINT", "VARCHAR", "BOOLEAN")

In [2]:
#################################################
##### Timestream Configurations.  ###############
#################################################
ENDPOINT = "eu-west-1" # <--- specify the region service endpoint
client = timestream.createQueryClient(ENDPOINT, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

#################################################

Using credentials from the environment
eu-west-1


In [3]:
query_describe = """
DESCRIBE {}.{}
""".format(DATABASE_NAME, PAST_TABLE_NAME)

describe_table = timestream.executeQueryAndReturnAsDataframe(client, query_describe, True)

display.display(describe_table)

QueryId: AEDQCANIJXXWUMCP2L625HHE5Q2WEK64DTJ5F23IC4KD4Y37EZHJQPUOTJTRYKY
2022-11-14 13:35:48. QueryId: AEDQCANIJXXWUMCP2L625HHE5Q2WEK64DTJ5F23IC4KD4Y37EZHJQPUOTJTRYKY Time: 2.144. First result: 2.144. Time to read results: 0.0.


Unnamed: 0,Column,Type,Timestream attribute type
0,lon,varchar,DIMENSION
1,gateway_id,varchar,DIMENSION
2,lat,varchar,DIMENSION
3,measure_name,varchar,MEASURE_NAME
4,time,timestamp,TIMESTAMP
5,sum(precipitation_amount PT1H),double,MULTI
6,max(wind_speed PT1H),double,MULTI
7,wind_direction_sin,double,MULTI
8,wind_direction_cos,double,MULTI
9,max(air_temperature PT1H),double,MULTI


In [41]:
query_get_all_data_last_60_days = f"""
SELECT time, wind_direction_sin, wind_direction_cos
FROM {DATABASE_NAME}.{PAST_TABLE_NAME}
WHERE time > ago(60d)
"""

df = timestream.executeQueryAndReturnAsDataframe(client, query_get_all_data_last_60_days, True)
display.display(df)

QueryId: AEDQCANHYE7D6UNFRZE3Z2LSSWHUUXCHD5666WP6ASQMVH7ZYZK4RENDTCQF4TI
2022-10-20 13:18:19. QueryId: AEDQCANHYE7D6UNFRZE3Z2LSSWHUUXCHD5666WP6ASQMVH7ZYZK4RENDTCQF4TI Time: 0.693. First result: 0.693. Time to read results: 0.0.


Unnamed: 0,time,wind_direction_sin,wind_direction_cos
0,2022-08-21 14:00:00.000000000,-0.669131,0.743145
1,2022-08-21 15:00:00.000000000,-0.829038,0.559193
2,2022-08-21 16:00:00.000000000,-0.961262,-0.275637
3,2022-08-21 17:00:00.000000000,-0.978148,-0.207912
4,2022-08-21 18:00:00.000000000,-0.559193,-0.829038
...,...,...,...
1435,2022-10-03 00:00:00.000000000,0.121869,-0.992546
1436,2022-10-03 01:00:00.000000000,0.173648,-0.984808
1437,2022-10-03 02:00:00.000000000,-0.422618,-0.906308
1438,2022-10-03 03:00:00.000000000,-0.156434,-0.987688


In [84]:
def get_measurement(df):
    """Extract measurements from json object
    """
    measurements = []
    for name in df.index:
        measurements.append({
            "Name": f"{name}",
            "Value": str(df.loc[name]),
            "Type": "DOUBLE"
        })
    
    # wind direction is set together of sin and cos to avoid 359 -> 0 
    return measurements


In [43]:
df_csv = pd.read_csv("WeatherForecast.csv")
df_csv.time = pd.to_datetime(df_csv.time, )
df_csv.head()


Unnamed: 0,time,1h_air_temperature,1h_percipitation,1h_wind_speed,1h_relative_humidity,2h_air_temperature,2h_percipitation,2h_wind_speed,2h_relative_humidity,3h_air_temperature,...,22h_wind_speed,22h_relative_humidity,23h_air_temperature,23h_percipitation,23h_wind_speed,23h_relative_humidity,24h_air_temperature,24h_percipitation,24h_wind_speed,24h_relative_humidity
0,2022-09-02 14:00:00+00:00,17.7,0.0,4.7,57.8,17.8,0.0,4.6,59.0,17.5,...,5.1,62.9,16.1,0.0,5.7,61.2,16.6,0.0,5.9,56.9
1,2022-09-02 15:00:00+00:00,18.2,0.0,4.6,59.0,17.8,0.0,4.9,59.7,17.1,...,5.7,61.2,16.7,0.0,5.9,56.9,17.1,0.0,6.0,51.9
2,2022-09-02 16:00:00+00:00,18.2,0.0,4.9,59.7,17.4,0.0,5.5,61.2,16.4,...,5.9,56.9,17.1,0.0,6.0,51.9,17.3,0.0,5.5,49.8
3,2022-09-02 17:00:00+00:00,17.2,0.0,5.8,59.8,16.2,0.0,6.9,63.8,15.4,...,6.9,49.6,16.7,0.0,7.0,48.2,16.5,0.0,6.9,51.0
4,2022-09-02 18:00:00+00:00,16.5,0.0,6.9,63.8,15.7,0.0,6.8,68.1,15.0,...,7.0,48.2,16.5,0.0,6.9,51.0,16.2,0.0,7.0,54.2


In [44]:
df_csv.time = df_csv.time.dt.tz_localize(None)
df_csv.time

0      2022-09-02 14:00:00
1      2022-09-02 15:00:00
2      2022-09-02 16:00:00
3      2022-09-02 17:00:00
4      2022-09-02 18:00:00
               ...        
1098   2022-10-18 08:00:00
1099   2022-10-18 09:00:00
1100   2022-10-18 10:00:00
1101   2022-10-18 11:00:00
1102   2022-10-18 12:00:00
Name: time, Length: 1103, dtype: datetime64[ns]

In [45]:
df.head()
for j in range(24):
        df[f'{j+1}h_wind_direction_sin'] = 0
        df[f'{j+1}h_wind_direction_cos'] = 0
for i in range(df.shape[0]-24):
    for j in range(24):
        df.loc[i, f'{j+1}h_wind_direction_sin'] = df.loc[i+j,'wind_direction_sin']
        df.loc[i, f'{j+1}h_wind_direction_cos'] = df.loc[i+j,'wind_direction_cos']
df.drop(columns=['wind_direction_sin', 'wind_direction_cos'], inplace=True)



In [46]:
df.time = pd.to_datetime(df.time)
df.time

0      2022-08-21 14:00:00
1      2022-08-21 15:00:00
2      2022-08-21 16:00:00
3      2022-08-21 17:00:00
4      2022-08-21 18:00:00
               ...        
1435   2022-10-03 00:00:00
1436   2022-10-03 01:00:00
1437   2022-10-03 02:00:00
1438   2022-10-03 03:00:00
1439   2022-10-03 04:00:00
Name: time, Length: 1440, dtype: datetime64[ns]

In [47]:
df_merged = df_csv.merge(df, on="time", how="left")

In [50]:
df_merged.set_index("time", inplace=True)
df_merged

Unnamed: 0_level_0,1h_air_temperature,1h_percipitation,1h_wind_speed,1h_relative_humidity,2h_air_temperature,2h_percipitation,2h_wind_speed,2h_relative_humidity,3h_air_temperature,3h_percipitation,...,20h_wind_direction_sin,20h_wind_direction_cos,21h_wind_direction_sin,21h_wind_direction_cos,22h_wind_direction_sin,22h_wind_direction_cos,23h_wind_direction_sin,23h_wind_direction_cos,24h_wind_direction_sin,24h_wind_direction_cos
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2022-09-02 14:00:00,17.7,0.0,4.7,57.8,17.8,0.0,4.6,59.0,17.5,0.0,...,0.681998,0.731354,0.707107,0.707107,0.529919,0.848048,0.629320,0.777146,0.390731,0.920505
2022-09-02 15:00:00,18.2,0.0,4.6,59.0,17.8,0.0,4.9,59.7,17.1,0.0,...,0.707107,0.707107,0.529919,0.848048,0.629320,0.777146,0.390731,0.920505,0.500000,0.866025
2022-09-02 16:00:00,18.2,0.0,4.9,59.7,17.4,0.0,5.5,61.2,16.4,0.0,...,0.529919,0.848048,0.629320,0.777146,0.390731,0.920505,0.500000,0.866025,0.587785,0.809017
2022-09-02 17:00:00,17.2,0.0,5.8,59.8,16.2,0.0,6.9,63.8,15.4,0.0,...,0.629320,0.777146,0.390731,0.920505,0.500000,0.866025,0.587785,0.809017,0.694658,0.719340
2022-09-02 18:00:00,16.5,0.0,6.9,63.8,15.7,0.0,6.8,68.1,15.0,0.0,...,0.390731,0.920505,0.500000,0.866025,0.587785,0.809017,0.694658,0.719340,0.601815,0.798636
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2022-10-18 08:00:00,7.5,0.0,1.9,85.5,7.8,0.0,2.3,82.0,8.0,0.0,...,-0.484810,-0.874620,-0.515038,-0.857167,-0.719340,-0.694658,-0.694658,-0.719340,-0.559193,-0.829038
2022-10-18 09:00:00,7.8,0.0,2.3,82.0,8.0,0.0,3.7,85.8,7.8,0.0,...,-0.515038,-0.857167,-0.719340,-0.694658,-0.694658,-0.719340,-0.559193,-0.829038,-0.544639,-0.838671
2022-10-18 10:00:00,7.9,0.0,3.7,85.8,7.7,0.0,5.1,79.5,7.8,0.0,...,-0.719340,-0.694658,-0.694658,-0.719340,-0.559193,-0.829038,-0.544639,-0.838671,-0.587785,-0.809017
2022-10-18 11:00:00,7.8,0.0,5.4,82.6,7.6,0.0,4.5,79.5,7.8,0.0,...,-0.694658,-0.719340,-0.559193,-0.829038,-0.544639,-0.838671,-0.587785,-0.809017,-0.681998,-0.731354


In [81]:

def get_records(df):
    # add all columns as columns in the timestream database
    records = []
    measure_values = []
    for timestamp, row in df.iterrows():
        measure_values = get_measurement(df=row)
        record = {
            'Dimensions': [
                {'Name': 'lat', 'Value': str(63.44256)},
                {'Name': 'lon', 'Value': str(10.4285)},
                {'Name': 'gateway_id', 'Value': str(8)},

            ],
            'Time': str(int(timestamp.timestamp()*1000)),
            'MeasureName': 'yr_prediction',
            'MeasureValueType': 'MULTI',
            'MeasureValues': measure_values
        }
        if len(measure_values) != 0:
            records.append(record)
    return records

In [82]:


def insert_data(df):
    records = get_records(df)
    client = boto3.client("timestream-write")
    for window_i in range(0, len(records), 100):
        record_window = records[window_i: window_i + 100]
        try:
            result = client.write_records(DatabaseName=DATABASE_NAME, TableName=TABLE_NAME,
                                                Records=record_window, CommonAttributes={'Version': round(dt.datetime.now().timestamp())})
            print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
        except client.exceptions.RejectedRecordsException as err:
            print(err.response)
        except Exception as err:
            print("Error:", err)

In [85]:
insert_data(df_merged)

WriteRecords Status: [200]
WriteRecords Status: [200]
WriteRecords Status: [200]
WriteRecords Status: [200]
WriteRecords Status: [200]
WriteRecords Status: [200]
WriteRecords Status: [200]
WriteRecords Status: [200]
WriteRecords Status: [200]
WriteRecords Status: [200]
WriteRecords Status: [200]
WriteRecords Status: [200]
