# Install dependencies

# Import Libraries

In [1]:
# Environment variable handling
import os
import yaml
from dotenv import load_dotenv
from pprint import pprint
import pandas as pd

# For class
import requests
import numpy as np
from requests.auth import HTTPBasicAuth
from datetime import datetime, timezone, timedelta
from urllib.parse import urlparse
import pytz

from WebID_mapping import WEBID_MAPPING

# Static parameters

In [2]:
DST_DOT_ENV_FILE = './.env'
TIMEZONE = 'Europe/Berlin'
CHARGE_START_HOUR = 6

# Load environment Parameters

In [3]:
load_dotenv()
OSI_PI_BASE_URL = os.environ.get('OSI_PI_BASE_URL')
OSI_PI_USERNAME = os.environ.get('OSI_PI_USERNAME')
OSI_PI_PASSWORD = os.environ.get('OSI_PI_PASSWORD')
OSI_PI_WEBID_MAPPING = {
    "ACTUAL_DRUCK_DUESENWAND_LANZE_1" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwQPOiVRhh30aTjtLVjXTS0wUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9EUlVDS19EVUVTRU5XQU5EX0xBTlpFXzE",
    "ACTUAL_DRUCK_DUESENWAND_LANZE_2" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwl0J_sWSu8kukBbE3kqL1vwUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9EUlVDS19EVUVTRU5XQU5EX0xBTlpFXzI",
    "ACTUAL_DRUCK_DUESENWAND_LANZE_3" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwazz1nrFi20S2KGygwaoXawUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9EUlVDS19EVUVTRU5XQU5EX0xBTlpFXzM",
    "ACTUAL_DRUCK_DUESENWAND_LANZE_4" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwkqrjPdijB0uRsJAYEQmFvwUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9EUlVDS19EVUVTRU5XQU5EX0xBTlpFXzQ",
    "ACTUAL_DRUCK_DUESENWAND_LANZE_5" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwRYVI0n4IiES79ukW4HfitAUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9EUlVDS19EVUVTRU5XQU5EX0xBTlpFXzU",
    "ACTUAL_DRUCK_DUESENWAND_LANZE_6" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwqeZODiJuiUicm61G8eHzBgUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9EUlVDS19EVUVTRU5XQU5EX0xBTlpFXzY",
    "ACTUAL_DRUCK_DUESENWAND_LANZE_7" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIweHwFJsmoUUKCrHLRMahMvAUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9EUlVDS19EVUVTRU5XQU5EX0xBTlpFXzc",
    "ACTUAL_DRUCK_DUESENWAND_LANZE_8" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwangCTfv6wkeoQmtsCKYv0AUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9EUlVDS19EVUVTRU5XQU5EX0xBTlpFXzg",
    "ACTUAL_DRUCK_DUESENWAND_LANZE_9" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwD9vou8iklE-cuUeP2qHiuwUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9EUlVDS19EVUVTRU5XQU5EX0xBTlpFXzk",
    "ACTUAL_DRUCK_DUESENWAND_LANZE_10" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwV0s6Lhx1S0-vaGIGPx7QegUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9EUlVDS19EVUVTRU5XQU5EX0xBTlpFXzEw",
    "ACTUAL_TEMP_DECKE_1" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwP0WpGWUwGkOJU5jF2lndRwUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9URU1QX0RFQ0tFXzE",
    "ACTUAL_TEMP_DECKE_2" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIw9B5-5LGsk02YxSRpJI-SpgUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9URU1QX0RFQ0tFXzI",
    "ACTUAL_TEMP_DECKE_3" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwg-QduNglt0CshyM6OBwVNgUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9URU1QX0RFQ0tFXzM",
    "ACTUAL_TEMP_DECKE_4" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwX_T2IBMmT0OMp-c1nJL3ZgUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9URU1QX0RFQ0tFXzQ",
    "ACTUAL_TEMP_DECKE_5" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwT5BrhA6xXkiPNIUQmaDyaAUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9URU1QX0RFQ0tFXzU",
    "ACTUAL_TEMP_DECKE_6" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIw_75KN1i8uUusatimBt_tKwUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9URU1QX0RFQ0tFXzY",
    "ACTUAL_O2_CELOX" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIw_3HD_UOyg0-9YKIfdAz4bwUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9PMl9DRUxPWA",
    "ACTUAL_TEMP_CELOX" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwGlZ45ljIwEurz9GCo74WiQUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9URU1QX0NFTE9Y",
    "ACTUAL_S02_KAMIN_100" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwAijkDTKOIkm6zrqB0X_8ZQUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9TMDJfS0FNSU5fMTAw",
    "ACTUAL_SO2_KESSEL_7" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwvgZn1B7uEk2lUMI3NFoCkwUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9TTzJfS0VTU0VMXzc",
    "ACTUAL_TEMP_LECKAGE_RLA" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwlD7J2zmpcUiaicRyAsfm7AUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9URU1QX0xFQ0tBR0VfUkxB",
    "ACTUAL_TEMP_LECKAGE_RLB" : "F1AbEFdD4Wibe7USdVhXAbVfEOgzeySUjld7hGDXEwdloWrIwbM1BkedgnECBWjueD1MuOgUElBRkNMU1xBVVIgTFVFXEFGX1NUUlVLVFVSX0VGRVNPX0FEVklTT1JcTFUtMDkwIEFOT0RFTkjDnFRURVxBTy1BRFZJU09SfEFDVFVBTF9URU1QX0xFQ0tBR0VfUkxC",
}
OSI_PI_PARAMETERS_REQUESTED = [key for key in OSI_PI_WEBID_MAPPING.keys()]
CACHE_STORAGE_DURATION_HOURS = 24*3

# Class deffinition

In [4]:
class OSIPIConnector:
    def __init__(self, base_url: str, username: str, password: str, timezone: str = TIMEZONE, request_timeout_seconds: int = 120):
        if not base_url:
            raise ValueError("Missing required argument: base_url")
        if not username:
            raise ValueError("Missing required argument: username")
        if not password:
            raise ValueError("Missing required argument: password")

        # Check that base_url has no leading and trailing slash
        if base_url.endswith('/'):
            base_url = base_url.rstrip('/')
        self.base_url = base_url

        self.auth = HTTPBasicAuth(username.encode('utf-8'), password.encode('utf-8'))
        self.request_timeout_seconds = request_timeout_seconds
        self.webid_mapping = OSI_PI_WEBID_MAPPING
        self.timezone = pytz.timezone(timezone)

        # On instantiation, the current time is set as the start_timestamp
        start_timestamp = datetime.now(self.timezone)
        end_timestamp = self._generate_batch_start_timestamp()

        self.cached_data = self._batch_retriev_data(
            start_time = start_timestamp, 
            end_time = end_timestamp,
            object_names = OSI_PI_PARAMETERS_REQUESTED,
        )

    def _convert_timestamps(self, timestamp: datetime) -> str:
        """
        Convert a timestamp to the correct format for the OSI PI system.
        OSI-PI syntax is * equals now, *-1h equals one hour ago, *-1d equals one day ago, etc.
        """

        # Ensure that the timestamp is ts aware
        if timestamp.tzinfo is None or timestamp.tzinfo.utcoffset(timestamp) is None:
            timestamp = timestamp.replace(tzinfo=self.timezone)

        # Calculate the difference in hours
        now = datetime.now(self.timezone)
        difference_in_hours = int((now - timestamp).total_seconds() / 3600)

        return f'*-{difference_in_hours}h'

    def _generate_batch_start_timestamp(self, charge_hour_start: int = CHARGE_START_HOUR) -> datetime:
        """
        Generate a timestamp for the start of the current charge which is set to (yesterday, h=CHARGE_START_HOUR).
        """
        now = datetime.now(self.timezone)
        timestamp = now - timedelta(days=1)
        timestamp = timestamp.replace(hour=charge_hour_start, minute=0, second=0, microsecond=0)
        return timestamp

    def _batch_retriev_data(self, start_time: datetime, end_time: datetime, verify_cert: bool = False, object_names: list = OSI_PI_PARAMETERS_REQUESTED) -> pd.DataFrame:
        """
        Retrieve multiple data-ranges for same timespan and returns a Pandas DataFrame.
        """

        # Create timestamps
        start_time_converted = self._convert_timestamps(start_time)
        end_time_converted = self._convert_timestamps(end_time)

        # Generate a list of tuples for the params of the request. the WebID field will have multiple values.
        params = [(('webid', self.webid_mapping.get(object_name, None))) for object_name in object_names]
        params.append(('startTime', start_time_converted,))
        params.append(('endTime', end_time_converted,))
        
        # Construct and validate the URL
        url = f"{self.base_url}/piwebapi/streamsets/recorded"

        # Fetch data from API
        response = requests.get(url, params=params, auth=self.auth, verify=verify_cert, timeout=self.request_timeout_seconds)
        
        if response.status_code == 200:
            # Extract the response into a dict and convert into pandas DataFrame, Ugly to look at but most efficient as a dict-comprehension.
            response_dict = {item.get('Name', None): {pd.Timestamp(entry.get('Timestamp', None)): entry.get('Value', None) for entry in item.get('Items', [])} for item in response.json().get('Items', [])}

            # Turn the response into a pandas DataFrame
            df_response = pd.DataFrame(response_dict)
            df_response.index = pd.to_datetime(df_response.index)
            return df_response
        else:
            return pd.DataFrame() # Return empty DataFrame if nothing was returned

    def get_data(self, start_time: datetime = None, end_time: datetime = None, verify_cert: bool = False) -> pd.DataFrame:
        """
        Retrieves data from OSI PI system for a given time range.

        Args:
            start_time (datetime): Start time of the time range. If not provided, the default start time is generated.
            end_time (datetime): End time of the time range. If not provided, the current time is used.
            verify_cert (bool): Whether to verify the SSL certificate of the PI server.

        Returns:
            pd.DataFrame: A Pandas DataFrame containing the requested data.
        """

        # Ensure that the timestamps are tz aware
        if start_time is not None and (start_time.tzinfo is None or start_time.tzinfo.utcoffset(start_time) is None):
            start_time = start_time.replace(tzinfo=self.timezone)
        if end_time is not None and (end_time.tzinfo is None or end_time.tzinfo.utcoffset(end_time) is None):
            end_time = end_time.replace(tzinfo=self.timezone)

        # If no timestamps are provided, use the default timestamps.
        if start_time is None:
            start_time = self._generate_batch_start_timestamp()
        if end_time is None:
            end_time = datetime.now(self.timezone)

        # # Only during development
        # return self._batch_retriev_data(
        #     start_time=start_time, 
        #     end_time = end_time,
        #     object_names =OSI_PI_PARAMETERS_REQUESTED,
        #     verify_cert=verify_cert,
        # )

        # Check if cache is initialized and get the max and min timestamps in the cache
        cache_has_data = False
        if self.cached_data is not None and not self.cached_data.empty:
            cached_start_time = self.cached_data.index.min().to_pydatetime()
            cached_end_time = self.cached_data.index.max().to_pydatetime()
            cache_has_data = True

        # If cache has no data, the data is loaded between start_time and end_time and updates the cache
        if not cache_has_data:
            self.cached_data = self._batch_retriev_data(OSI_PI_PARAMETERS_REQUESTED, start_time, end_time, verify_cert)

        # If cache has data, the data is queried between start_time and cache_start_time as well as cache_end_time and end_time and cache is updated
        elif cache_has_data:
            if start_time < cached_start_time:
                new_data = self._batch_retriev_data(OSI_PI_PARAMETERS_REQUESTED, start_time, cached_start_time - timedelta(seconds=1), verify_cert)
                self.cached_data = pd.concat([new_data, self.cached_data])
            if end_time > cached_end_time:
                new_data = self._batch_retriev_data(OSI_PI_PARAMETERS_REQUESTED, cached_end_time + timedelta(seconds=1), end_time, verify_cert)
                self.cached_data = pd.concat([self.cached_data, new_data])

        # Return the data between start_time and end_time
        return self.cached_data[(self.cached_data.index >= start_time) & (self.cached_data.index <= end_time)]

# Helper Functions

In [5]:
def structure(d):
    if isinstance(d, dict):
        return {k: structure(v) for k, v in d.items()}
    elif isinstance(d, list):
        return [structure(item) for item in d] if d else []
    else:
        return type(d).__name__

# Connection test

In [6]:
con = OSIPIConnector(OSI_PI_BASE_URL, OSI_PI_USERNAME, OSI_PI_PASSWORD)



In [7]:
test_start_date = datetime(year=2023, month=8, day=1)
test_end_date = datetime(year=2023, month=8, day=2)
parameters_to_be_requested = [
    "ACTUAL_DRUCK_DUESENWAND_LANZE_1",
    "ACTUAL_DRUCK_DUESENWAND_LANZE_2",
    # "ACTUAL_DRUCK_DUESENWAND_LANZE_3",
    # "ACTUAL_DRUCK_DUESENWAND_LANZE_4",
    # "ACTUAL_DRUCK_DUESENWAND_LANZE_5",
    # "ACTUAL_DRUCK_DUESENWAND_LANZE_6",
    # "ACTUAL_DRUCK_DUESENWAND_LANZE_7",
    # "ACTUAL_DRUCK_DUESENWAND_LANZE_8",
    # "ACTUAL_DRUCK_DUESENWAND_LANZE_9",
    # "ACTUAL_DRUCK_DUESENWAND_LANZE_10",
    # "ACTUAL_TEMP_DECKE_1",
    # "ACTUAL_TEMP_DECKE_2",
    # "ACTUAL_TEMP_DECKE_3",
    # "ACTUAL_TEMP_DECKE_4",
    # "ACTUAL_TEMP_DECKE_5",
    # "ACTUAL_TEMP_DECKE_6",
    # "ACTUAL_O2_CELOX",
    # "ACTUAL_TEMP_CELOX",
    # "ACTUAL_S02_KAMIN_100",
    # "ACTUAL_SO2_KESSEL_7",
    # "ACTUAL_TEMP_LECKAGE_RLA",
    # "ACTUAL_TEMP_LECKAGE_RLB",
]

In [8]:
try:
    data = con.get_data(test_start_date, test_end_date)
    print(data)
except requests.exceptions.RequestException as e:
    print(f'Error: {e}')

  cached_start_time = self.cached_data.index.min().to_pydatetime()


AttributeError: 'list' object has no attribute 'tzinfo'

In [None]:
df.plot()