Autor: Kaike Castro Carvalho

Date: 12-Jun-2024

# Import Libraries

In [None]:
import requests
import json
import urllib3
from requests_ntlm import HttpNtlmAuth

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

## Base Code

In [None]:
class VisionApi:
    def __init__(self, url_endpoint, username, password):
        self.url_endpoint = url_endpoint
        self.username = username
        self.password = password
        self.session = requests.Session()
        self.session.verify = False  # Be cautious with this in a production environment

    def _request(self, method, custom_url, body=None):
        """
        Generic request method to handle all types of HTTP requests.
        """
        # Set up authentication and headers
        auth = HttpNtlmAuth(self.username, self.password)
        headers = {
            'Content-Type': 'application/json',
            'X-Requested-With': 'XMLHttpRequest'
        }

        # Construct the full API URL
        api_url = f"{self.url_endpoint}/{custom_url}"

        # Try to make the HTTP request
        try:
            if method.upper() == 'GET':
                response = self.session.get(api_url, auth=auth, headers=headers)
            elif method.upper() == 'POST':
                response = self.session.post(api_url, auth=auth, headers=headers, json=body)
            elif method.upper() == 'PUT':
                response = self.session.put(api_url, auth=auth, headers=headers, json=body)
            elif method.upper() == 'DELETE':
                response = self.session.delete(api_url, auth=auth, headers=headers)
            else:
                raise ValueError("Unsupported HTTP method provided.")

            # Handle responses
            if response.status_code == 200:
                return response.json()
            if response.status_code == 204:
                print(f"204 No Content: No content returned from the API: {api_url}")
                return None
            print(f"{response.status_code} Error: {response.text} from the API: {api_url}")
            return None

        except requests.exceptions.RequestException as e:
            print(f"Connection Error while accessing the API: {api_url}: {e}")
            return None

    def get_pi_vision_api(self, custom_url):
        return self._request("GET", custom_url)

    def post_pi_vision_api(self, custom_url, data):
        return self._request("POST", custom_url, body=data)

    def put_pi_vision_api(self, custom_url, data):
        return self._request("PUT", custom_url, body=data)

    def delete_pi_vision_api(self, custom_url):
        return self._request("DELETE", custom_url)


In [None]:
api = VisionApi("https://<server>/PIVision/Utility/api/v1", "username", "password")

# Searching PI TAGs in Display

In [None]:
display_id = "0000"
data = api.get_pi_vision_api(f"displays/{display_id}/export")
print(json.dumps(data, indent=4))

Getting the PI Tags

In [None]:
tags = []
for symbol in data['Display']['Symbols']:
    if 'DataSources' in symbol:
        for data_source in symbol['DataSources']:
            try:
                tag = data_source.split('\\')[3].split('?')[0]
                print(tag)
                tags.append(tag)
            except IndexError:
                continue