In [2]:
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
import sys
import os
import re
import pandas as pd
sys.path.append("../src")
from AvevaInsightLibrary import Aveva_Insight
load_dotenv()
import csv
import json
import time
from urllib import request, parse
import ssl
from dateutil import parser
import logging
import gzip

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

logger.debug(f"DEBUG works")

# Disable SSL verification (not recommended for production)
ssl._create_default_https_context = ssl._create_unverified_context

In [3]:
# Configuration
config = {
    "Resource": "https://euno.datahub.connect.aveva.com",
    "ApiVersion": "v1",
    "TenantId": os.getenv('TenantId'),
    "NamespaceId": os.getenv('NamespaceId'),
    "ClientId": os.getenv('ClientId'),
    "ClientSecret": os.getenv('ClientSecret')
}

base_endpoint = f"{config['Resource']}/api/{config['ApiVersion']}/tenants/{config['TenantId']}/namespaces/{config['NamespaceId']}"
omf_endpoint = f"{base_endpoint}/omf"

In [4]:
user_token = os.getenv('USER_TOKEN_JARVIS')
aveva = Aveva_Insight(user_token=user_token)


In [5]:
# get all tags from a specific Source
source_tagname = "PH-Tanauan.AOA.Philippines.Tanauan.UTI01.AHR01.AH01."

all_tags = aveva.get_Tag_List(source_tagname)

#all_tags = all_tags[-15:]

print(f"Total Tags found: {len(all_tags)}")


Total Tags found: 12


In [6]:
def get_token():
    discovery_url = f"{config['Resource']}/identity/.well-known/openid-configuration"
    with request.urlopen(discovery_url) as response:
        discovery_data = json.loads(response.read())
    
    token_endpoint = discovery_data["token_endpoint"]
    data = parse.urlencode({
        'client_id': config['ClientId'],
        'client_secret': config['ClientSecret'],
        'grant_type': 'client_credentials'
    }).encode()
    
    req = request.Request(token_endpoint, data=data, method='POST')
    with request.urlopen(req) as response:
        token_data = json.loads(response.read())
    
    return token_data["access_token"]

In [7]:
def send_omf_message(message_type, message_json, action='create'):
    token = get_token()
    headers = {
        'Authorization': f'Bearer {token}',
        'messagetype': message_type,
        'action': action,
        'messageformat': 'JSON',
        'omfversion': '1.2',
        'Content-Type': 'application/json'
    }
    
    if message_type == 'data':
        headers['compression'] = 'gzip'
        headers["x-requested-with"] = 'xmlhttprequest'
        data = gzip.compress(json.dumps(message_json).encode('utf-8'))
    else:
        data = json.dumps(message_json).encode('utf-8')
    
    logger.debug(f"Sending {message_type} message. Headers: {headers}")
    
    req = request.Request(omf_endpoint, data=data, headers=headers, method='POST')
    try:
        with request.urlopen(req) as response:
            response_body = response.read().decode()
            logger.debug(f"Response status: {response.status}")
            logger.debug(f"Response body: {response_body}")
            if response.status == 409:
                logger.warning(f"Type already exists: {message_type}")
            elif 200 <= response.status < 300:
                logger.info(f"Successfully sent {message_type} message")
            else:
                logger.error(f"Error sending {message_type} message: {response.status} {response_body}")
    except Exception as e:
        logger.exception(f"Exception when sending {message_type} message: {str(e)}")

In [8]:
def define_and_create_type(type_definition):
    send_omf_message("type", type_definition)

def create_stream(stream_definition):
    send_omf_message("container", stream_definition)

In [9]:
def create_data(df):
    data = []
    try:
        # Ensure rows are sorted by DateTime
        df['DateTime'] = df['DateTime'].astype(str)  # Convert DateTime to string if it's not already
        sorted_df = df.sort_values(by='DateTime')
        for _, row in sorted_df.iterrows():
            try:
                dt_obj = parser.parse(row['DateTime'])
                value = None if pd.isna(row['Value']) else float(row['Value'])
                uom = None if pd.isna(row['Unit']) else row['Unit']
                data.append({
                    "datetime": dt_obj.astimezone(timezone.utc).isoformat(),
                    "Value": value,
                    "UOM": uom
                })
            except Exception as e:
                logger.error(f"Error processing row: {str(e)}")
    except Exception as e:
        logger.exception(f"Error processing DataFrame: {str(e)}")
        return None

    return data

In [10]:
def process_and_send_data(data, container_id, batch_size=1000):
    for i in range(0, len(data), batch_size):
        batch = data[i:i+batch_size]
        
        # Prepare data message
        data_message = [{
            "containerid": container_id,
            "values": batch
        }]

        # Send data
        try:
            send_omf_message("data", data_message)
            first_date = batch[0]['datetime'] if batch else None
            logger.info(f"Batch {i//batch_size + 1} - Container: {container_id} - completed - date: {first_date}")
        except Exception as e:
            logger.exception(f"Error sending data message for batch {i//batch_size + 1}: {str(e)}")


In [11]:
# Create types for Analog, Discrete, and String

type_definitions = {
    "Analog": [{
        "id": "JarvisType_Analog",
        "version": "1.0.0.0",
        "type": "object",
        "classification": "dynamic",
        "properties": {
            "datetime": {"type": "string", "format": "date-time", "isindex": True},
            "Value": {"type": "number", "format": "float64"},
            "UOM": {"type": "string"}
        }
    }],
    "Discrete": [{
        "id": "JarvisType_Discrete",
        "version": "1.0.0.0",
        "type": "object",
        "classification": "dynamic",
        "properties": {
            "datetime": {"type": "string", "format": "date-time", "isindex": True},
            "Value": {"type": "boolean"},
            "UOM": {"type": "string"}
        }
    }],
    "String": [{
        "id": "JarvisType_String",
        "version": "1.0.0.0",
        "type": "object",
        "classification": "dynamic",
        "properties": {
            "datetime": {"type": "string", "format": "date-time", "isindex": True},
            "Value": {"type": "string"},
            "UOM": {"type": "string"}
        }
    }]
}

# Create all 3 type definitions
#for tag_type, type_def in type_definitions.items():
   # define_and_create_type(type_def)

In [None]:
#Process DATA

start_time = datetime(2024, 5, 1)
end_time = datetime.now()

for index, row in all_tags.iterrows():
    tag = row['FQN']
    streamName = tag.replace('.', '_')
    
    # Create stream for each tag
    stream_definition = [{
        "id": streamName,
        "typeid": f"JarvisType_{row['TagType']}"
    }]
    create_stream(stream_definition)
    
    for chunk in aveva.get_Insight_Data(tag, start_time, end_time, RetrievalMode="DELTA"):
        data = create_data(chunk)
        
        if(data):
            process_and_send_data(data, streamName, batch_size=5000)