### Step 1
- Install the `databricks-industrial-automation-suite` Python package.
- _See documentation_: [PyPi](https://pypi.org/project/databricks-industrial-automation-suite/#description)

In [0]:
%pip install databricks-industrial-automation-suite mlflow --quiet
%pip install https://github.com/irfanghat/Databricks-Hackathon-Nov-2025/releases/download/v0.0.1/opcua_manufacturing_server-0.0.1-py3-none-any.whl --quiet

In [0]:
dbutils.library.restartPython()

In [0]:
%pip list | grep databricks

### Step 2
- Setup a basic OPC UA Client.
- We can:
  * Connect to a remote OPC UA server via IP address (e.g. 132.65.41.78).
  * Use a prebuilt simulation.

In [0]:
from databricks_industrial_automation_suite.integrations.opcua import OPCUAClient
import asyncio
import mlflow

In [0]:
# -------------------------------------------
# Configure multiple OPC UA endpoints
# -------------------------------------------

dbutils.widgets.text("Endpoint", "localhost:4840", "OPC UA endpoint")
endpoint = dbutils.widgets.get("Endpoint")
print(endpoint)

In [0]:
# -----------------------------------------------------------------------------------------
# Specify the OPC UA server URL and optionally, the path to the certificate file
# to connect to a remote or on-prem OPC UA Server.
#
# The following is an example that connects to a remote OPC UA Server without a certificate.
# -------------------------------------------------------------------------------------------

oc = OPCUAClient(server_url=f"opc.tcp://{endpoint}/freeopcua/server/")

In [0]:
# ------------------------------------------------------------------- 
# We can also simulate an OPC UA Server locally.
# Start the server by running: !opcua-manufacturing-server
# -------------------------------------------------------------------

# !opcua-manufacturing-server

### Browsing OPC UA Nodes with Databricks Industrial Automation Suite

This example demonstrates how to **connect to an OPC UA server** and **browse its nodes** using the `databricks_industrial_automation_suite`. This is a core capability in industrial automation, enabling you to **discover devices, sensors, and production equipment programmatically** from within Databricks.

---

#### Why This Matters

* Industrial plants generate **huge volumes of real-time data** from PLCs, sensors, and other devices.
* Using OPC UA, a **standard industrial protocol**, you can easily integrate this data into **Databricks**, enabling analytics, predictive maintenance, and optimization.
* Browsing nodes is the **first step** to understanding your plant's data structure before you start streaming, monitoring, or controlling devices.

## Connecting to a Server

You can connect to a server with or without authentication. For **Testing** and **Prototyping**, a local test server works well:

```python
oc = OPCUAClient(server_url="opc.tcp://localhost:4840/freeopcua/server/")
```

**Important**: For **secure** servers, you can provide `security_policy`, `message_security_mode`, `certificate_path`, and `private_key_path`.

## Browsing Nodes

We can get **all nodes and their hierarchy**:

* `nodes` is a **nested dictionary** representing the full node hierarchy.
* Each node contains:

  * `id`: OPC UA node identifier
  * `browse_name`: Human-readable name
  * `children`: List of child nodes (empty if leaf node)

---

## Sample Output

```json
[
  {
    "id": "i=87",
    "browse_name": "QualifiedName(NamespaceIndex=0, Name='Views')",
    "children": []
  },
  {
    "id": "i=85",
    "browse_name": "QualifiedName(NamespaceIndex=0, Name='Objects')",
    "children": [
      {"id": "ns=4;i=1240", "browse_name": "Boilers", "children": [...]}
    ]
  }
]
```

The output shows the **structure of an industrial plant**: Boilers ‚Üí Individual Boiler ‚Üí Pipes ‚Üí Sensors. With this, you can **start building dashboards, analytics pipelines, or predictive maintenance models in Databricks**.

In [0]:
# -------------------------------------------------------------------------------------------------------------------
# With a single call, we can map an entire industrial plant's sensor network. 
# We could scale this to multiple factories and integrating it with Databricks' ML workflows‚Äîreal-time insights 
# and Predictive Analytics become possible at enterprise scale.
# -------------------------------------------------------------------------------------------------------------------

# -------------------------------------------------------------------------------------------------------
# Secure OPC UA Connection (with Databricks UC Volumes)
#
# In this example, we use Unity Catalog Volumes to securely store and access client certificates.
# This ensures our encryption keys and certificates are managed centrally under Databricks governance.
# -------------------------------------------------------------------------------------------------------

# -------------------------------------------------------------------------------------------
# Example UC volume paths
# Note: Replace <catalog>, <schema>, and <volume> with your actual UC volume identifiers.
# -------------------------------------------------------------------------------------------

# ----------------------------------------------------------------------------
# Volume path structure: /Volumes/<catalog>/<schema>/<volume>/<file>
# ----------------------------------------------------------------------------
cert_path = "/Volumes/industrial_automation/security/client_cert.pem"
key_path  = "/Volumes/industrial_automation/security/client_key.pem"

# -----------------------------------------------------------------------------------------
# Secure OPC UA client configuration
#
# oc = OPCUAClient(
#     server_url="opc.tcp://opcua.demo-this.com:51210/UA/SampleServer",
#     security_policy="Basic256Sha256",         # Strong encryption policy
#     message_security_mode="SignAndEncrypt",   # Ensures confidentiality + integrity
#     certificate_path=cert_path,               # Certificate stored in UC volume
#     private_key_path=key_path,                # Private key stored in UC volume
# )
# ---------------------------------------------------------------------------------------

# oc = OPCUAClient(server_url=f"opc.tcp://{endpoint}/freeopcua/server/")


# async def main():
#     await oc.connect()
#     nodes = await oc.browse_all()
#     print(nodes)


# await main()

### Checking Security Policies in OPC UA

In industrial automation, **security is critical**. OPC UA supports multiple **security policies and message security modes** to ensure safe communication between clients and servers. Using the `databricks_industrial_automation_suite`, you can inspect and configure these policies when connecting to OPC UA servers.

---

#### Why This Matters

* Industrial plants often transmit **sensitive operational data** (sensor readings, machine states).
* Proper **encryption and authentication** prevent eavesdropping, tampering, or unauthorized control.
* Databricks integration allows you to **safely ingest and analyze industrial data at scale** without compromising security.

* `security_policy` ‚Üí Determines encryption algorithm and strength.
* `message_security_mode` ‚Üí Controls whether messages are signed, encrypted, or both.
* `certificate_path` & `private_key_path` ‚Üí Client credentials for authentication.

In [0]:
# -----------------------------------------------------------------
# Evaluating Security Policies programmatically
# -----------------------------------------------------------------

# -------------------------------------------------------------------------------------------------------
# Secure OPC UA Connection (with Databricks UC Volumes)
#
# In this example, we use Unity Catalog Volumes to securely store and access client certificates.
# This ensures our encryption keys and certificates are managed centrally under Databricks governance.
# -------------------------------------------------------------------------------------------------------

# -------------------------------------------------------------------------------------------
# Example UC volume paths
# Note: Replace <catalog>, <schema>, and <volume> with your actual UC volume identifiers.
# -------------------------------------------------------------------------------------------

# ----------------------------------------------------------------------------
# Volume path structure: /Volumes/<catalog>/<schema>/<volume>/<file>
# ----------------------------------------------------------------------------
cert_path = "/Volumes/industrial_automation/security/client_cert.pem"
key_path  = "/Volumes/industrial_automation/security/client_key.pem"

# -----------------------------------------------------------------------------------------
# Secure OPC UA client configuration
#
# oc = OPCUAClient(
#     server_url="opc.tcp://opcua.demo-this.com:51210/UA/SampleServer",
#     security_policy="Basic256Sha256",         # Strong encryption policy
#     message_security_mode="SignAndEncrypt",   # Ensures confidentiality + integrity
#     certificate_path=cert_path,               # Certificate stored in UC volume
#     private_key_path=key_path,                # Private key stored in UC volume
# )
# ---------------------------------------------------------------------------------------

oc = OPCUAClient(server_url=f"opc.tcp://{endpoint}/freeopcua/server/")

import pandas as pd

async def main():
    policies = await oc.get_security_policies()
    df = pd.DataFrame({'available_opcua_server_security_policies': policies})
    spark_df = spark.createDataFrame(df)
    display(spark_df)
    spark_df.write.format("delta").mode("overwrite").saveAsTable("opcua_security_policies")

await main()

# -------------------------------------------------------------------------------
# Sample Output
#
# http://opcfoundation.org/UA/SecurityPolicy#None
# http://opcfoundation.org/UA/SecurityPolicy#Basic256Sha256
# http://opcfoundation.org/UA/SecurityPolicy#Basic256Sha256
# http://opcfoundation.org/UA/SecurityPolicy#Basic256Sha256
# http://opcfoundation.org/UA/SecurityPolicy#Aes128_Sha256_RsaOaep
# http://opcfoundation.org/UA/SecurityPolicy#Aes256_Sha256_RsaPss
# http://opcfoundation.org/UA/SecurityPolicy#Aes128_Sha256_RsaOaep
# http://opcfoundation.org/UA/SecurityPolicy#Aes256_Sha256_RsaPss
#
# -------------------------------------------------------------------------------

In [0]:
%sql

SELECT * FROM default.opcua_security_policies;


* In traditional factory environments, certificates often live on local machines or control systems ‚Äî less secure and hard to manage.
* With **Unity Catalog Volumes**, we can:

  * Store security artifacts centrally.
  * Control access with fine-grained permissions.
  * Audit all data movements and authentications.
* It's the foundation for **secure, enterprise-grade industrial connectivity at scale**.

---

### Quick Setup

In Databricks:

1. Create a volume in Unity Catalog:

   ```sql
   CREATE VOLUME industrial_automation.security;
   ```
2. Upload your certificate and key:

   ```bash
   databricks fs cp client_cert.pem dbfs:/Volumes/industrial_automation/security/
   databricks fs cp client_key.pem  dbfs:/Volumes/industrial_automation/security/
   ```
3. Then the code above will access them like regular files.


### Subscribing to OPC UA Nodes (Streaming PLC Data in Real Time)

This example shows how to use the `databricks_industrial_automation_suite` to **subscribe to real-time OPC UA nodes** ‚Äî simulating streaming data from sensors, PLCs, or control systems in an industrial plant.

By subscribing to node updates, Databricks can continuously ingest live telemetry into Delta tables or directly into MLflow pipelines for **predictive maintenance, anomaly detection, and process optimization**.

---

### Subscribing to Nodes

You can subscribe to any number of OPC UA nodes ‚Äî for instance, temperature, pressure, and RPM sensors ‚Äî to stream live updates directly into your Databricks environment.

```python
async def main():
    await oc.connect()

    node_ids = [
        "ns=2;i=5",  # Temperature
        "ns=2;i=6",  # Pressure
        "ns=2;i=7",  # RPM
    ]

    # Subscribe to multiple nodes at once
    for node_id in node_ids:
        await oc.subscribe_to_node(node_id)

    # Stream incoming events (real-time sensor updates)
    async for event in oc.stream():
        print(event)  # Structured dicts with timestamp, value, and node info
        

await main()
```

---

### Example Event Payload

Each event streamed back from the OPC UA server will look like this:

```python
{
    "timestamp": "2025-11-12T13:32:45.123Z",
    "node_id": "ns=2;i=5",
    "browse_name": "Temperature",
    "value": 88.4,
    "unit": "¬∞C"
}
```

This can easily be:

* written to a **Delta table** for analytics,
* **visualized in real-time dashboards**, or
* **fed into Databricks ML pipelines** for model training or live inference.


We can then:

* Log each event into a **Delta Live Table** to create an event stream.
* Use **Databricks Auto Loader** or **Structured Streaming** to persist incoming events.
* Add **MLflow** tracking for anomaly detection models that consume this stream.

Resulting in a full industrial data pipeline from OPC ‚Üí Delta ‚Üí Analytics

In [0]:
import time
import pandas as pd
import numpy as np
import re

oc = OPCUAClient(server_url=f"opc.tcp://{endpoint}/freeopcua/server/")

MAX_ROWS = 100
MAX_WAIT_SECONDS = 60

# ---------------------------------------------------------------------------
# Threshold definitions for each sensor (Warning & Critical)
# ---------------------------------------------------------------------------
THRESHOLDS = {
    "Pump2.Temperature": {"warning": 65, "critical": 75, "description": "Pump 2 overheating"},
    "Pump2.Vibration": {"warning": 4.5, "critical": 6.0, "description": "Pump 2 excessive vibration"},
    "Compressor.Temperature": {"warning": 80, "critical": 90, "description": "Compressor overheating"},
    "Compressor.DischargePressure": {"warning": 14, "critical": 18, "description": "Compressor overpressure"},
    "QualityControl.PassRate": {"warning": 97, "critical": 95, "direction": "below", "description": "Quality below target"},
    "Energy.TotalPowerConsumption": {"warning": 220, "critical": 250, "description": "High energy usage"},
    "Environmental.AmbientTemperature": {"warning": 35, "critical": 40, "description": "Ambient temperature high"},
}


# ---------------------------------------------------------------------------
# Extract clean node name
# ---------------------------------------------------------------------------
def extract_node_name(browse_name):
    """Extract clean name from QualifiedName string"""
    if not browse_name:
        return "Unknown"
    match = re.search(r"Name='([^']+)'", str(browse_name))
    return match.group(1) if match else str(browse_name)


# ---------------------------------------------------------------------------
# Detect anomaly level (Normal / Warning / Critical)
# ---------------------------------------------------------------------------
def check_threshold(name, value):
    for k, t in THRESHOLDS.items():
        if k.endswith(name):
            direction = t.get("direction", "above")
            if direction == "below":
                if value < t["critical"]:
                    return "critical", t["description"]
                elif value < t["warning"]:
                    return "warning", t["description"]
            else:
                if value > t["critical"]:
                    return "critical", t["description"]
                elif value > t["warning"]:
                    return "warning", t["description"]
    return "normal", ""


# ---------------------------------------------------------------------------
# Predictive indicator based on short-term trend
# ---------------------------------------------------------------------------
def compute_trend_forecast(df, window=5):
    """Simple rolling linear forecast to predict if thresholds will be exceeded soon"""
    forecasts = []
    for name, group in df.groupby("name"):
        if len(group) < window:
            continue

        # ------------------------------------
        # Compute slope of last few readings
        # -------------------------------------
        x = np.arange(len(group[-window:]))
        y = group["value"].tail(window).values
        slope = np.polyfit(x, y, 1)[0]

        # -----------------------
        # Estimate next value
        # -----------------------
        predicted = y[-1] + slope * 2  # short projection
        forecasts.append({"name": name, "predicted_next": predicted})
    return pd.DataFrame(forecasts)


async def main():
    await oc.connect()
    print("üîå Connected to OPC UA server.")

    node_ids = [
        "ns=2;i=44",  # Pump2.Temperature
        "ns=2;i=45",  # Pump2.Vibration
        "ns=2;i=48",  # Compressor.Temperature
        "ns=2;i=47",  # Compressor.DischargePressure
        "ns=2;i=52",  # QualityControl.PassRate
        "ns=2;i=57",  # Energy.TotalPowerConsumption
        "ns=2;i=63",  # Environmental.AmbientTemperature
    ]
    for node_id in node_ids:
        await oc.subscribe_to_node(node_id)

    print(f"üì° Subscribed to {len(node_ids)} nodes")

    data = []
    start_time = time.time()

    async for event in oc.stream():
        name = extract_node_name(event.get("browse_name"))
        value = event.get("value")

        severity, desc = check_threshold(name, value)

        data.append({
            "timestamp": event.get("timestamp"),
            "node_id": event.get("node_id"),
            "name": name,
            "value": value,
            "severity": severity,
            "description": desc
        })

        if severity != "normal":
            emoji = "üî¥" if severity == "critical" else "üü°"
            print(f"{emoji} {severity.upper()} ‚Äî {name}: {value:.2f} ({desc})")

        if len(data) >= MAX_ROWS or (time.time() - start_time > MAX_WAIT_SECONDS):
            break

    await oc.disconnect()
    print(f"üìä Collected {len(data)} rows in {time.time()-start_time:.1f}s")

    df = pd.DataFrame(data)

    # -----------------------------
    # Add rolling predictions
    # -----------------------------
    forecast_df = compute_trend_forecast(df)
    df = df.merge(forecast_df, on="name", how="left")

    # --------------------------------------
    # Flag predicted threshold violations
    # --------------------------------------
    df["predicted_status"] = df.apply(
        lambda r: (
            "‚ö†Ô∏è predicted_warning"
            if THRESHOLDS.get(r["name"], {}).get("warning") and r.get("predicted_next", 0) > THRESHOLDS[r["name"]]["warning"]
            else (
                "üö® predicted_critical"
                if THRESHOLDS.get(r["name"], {}).get("critical") and r.get("predicted_next", 0) > THRESHOLDS[r["name"]]["critical"]
                else "normal"
            )
        ),
        axis=1
    )

    return df


try:
    df = await asyncio.wait_for(main(), timeout=MAX_WAIT_SECONDS + 2)
except asyncio.TimeoutError:
    print("‚ö†Ô∏è Timeout")
    df = pd.DataFrame()

display(df)


if not df.empty:
    spark_df = spark.createDataFrame(df)
    spark_df.write.format("delta").mode("append").saveAsTable("factory_telemetry")

In [0]:

%sql

SELECT
  timestamp,
  node_id,
  name,
  value,
  severity,
  description,
  predicted_next,
  predicted_status
FROM default.factory_telemetry
WHERE severity = 'critical';

#### MLFlow Analysis

In [0]:
import mlflow
import mlflow.sklearn
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from mlflow.models.signature import infer_signature
import pandas as pd
import numpy as np

df = spark.table("factory_telemetry").toPandas()

print("Available metrics:", df["name"].unique())

metric = "Temperature"
data = df[df["name"] == metric].sort_values("timestamp").reset_index(drop=True)

if len(data) < 10:
    raise ValueError(f"Not enough samples for {metric}: only {len(data)} rows")


# ------------------------------------
# Lag features
# ------------------------------------
for lag in range(1, 6):
    data[f"lag_{lag}"] = data["value"].shift(lag)
data = data.dropna()

X = data[[f"lag_{i}" for i in range(1, 6)]]
y = data["value"]

if len(X) < 2:
    raise ValueError(f"Not enough lagged samples for {metric}: {len(X)} rows")

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

model = LinearRegression()
model.fit(X_train, y_train)
r2 = model.score(X_test, y_test)

# ------------------------------------
# Prepare MLflow metadata
# ------------------------------------
input_example = X_test.iloc[[0]].to_dict(orient="records")[0]  # sample row as dict
signature = infer_signature(X_test, model.predict(X_test))

with mlflow.start_run(run_name=f"{metric}_forecast_model"):
    mlflow.log_param("metric", metric)
    mlflow.log_metric("r2_score", r2)
    mlflow.sklearn.log_model(
        sk_model=model,
        name="model",
        input_example=input_example,
        signature=signature
    )

print(f"{metric}: R¬≤ = {r2:.3f}")
print("Model training complete with input example and signature.")

In [0]:
import mlflow

# ------------------------------------------------------------------------------------------------------------
# Retrieve run id: https://<workspace_url>/ml/experiments/<experiment_id>/models/<model_id>
# ------------------------------------------------------------------------------------------------------------
run_id = '2e7a7a133c7848c69d1ea22814911b9e'
model_uri = f"runs:/{run_id}/model"
model = mlflow.sklearn.load_model(model_uri)


import pandas as pd
X_example = pd.DataFrame([{
    "lag_1": 70.3,
    "lag_2": 70.1,
    "lag_3": 69.8,
    "lag_4": 69.5,
    "lag_5": 69.2
}])

y_pred = model.predict(X_example)
print(f"Predicted Temperature: {y_pred[0]:.2f}")