# 1. Data Preprocessing

> **Purpose:**  
> This notebook guides the user through the preprocessing steps required for the perturbation experiments.  
> It takes the raw downloads from **Step 0** and prepares them for further processing using the **Eclipse BaSyx Python SDK**, while also collecting and organizing relevant metadata.

**Notes and Requirements**

- Ensure that the directory structure created in **Step 0** (`/app/data/raw/sample/`) is preserved.  
- Preprocessing scripts assume the presence of `.aasx` and `.pdf` files in each product folder.
- The preprocessing pipeline has been tested with a random sample of the current versions of the manufacturers’ AASX data [16.10.2025].  
  However, **manufacturers may update their AASX structures or file formats**.  
  In such cases, code adjustments may be required to correctly parse or extract the relevant information.


## 1.0 Imports

In [1]:
import os
from pathlib import Path
from pathlib import PurePosixPath
import shutil
import zipfile
import re
from tqdm import tqdm

import csv
import json 
import pandas as pd
from bs4 import BeautifulSoup
from PyPDF2 import PdfReader
import basyx
from basyx.aas import model
from basyx.aas.adapter import aasx
import xml.etree.ElementTree as ET
from collections import defaultdict

# relative path inside the container
data_path = "/app/data/"
raw_data_path = os.path.join(data_path, "raw/sample") # Contains raw downloaded files 
processing_path = os.path.join(data_path, "processed/sample") # Folder where files are processed
metadata_path = os.path.join(processing_path, "metadata.csv") # metadata file

## 1.1 Setup Product Folders

The product folders are **copied to the processing directory**, and all contained files are **renamed according to the product IDs**.  
This ensures consistent naming and avoids conflicts in later processing steps.

In [None]:
if not os.path.exists(processing_path):
    shutil.copytree(raw_data_path, processing_path)

# rename files to have consistent format
for product_id in tqdm(os.listdir(processing_path)):
    product_path = os.path.join(processing_path, product_id)
    if os.path.isdir(product_path):
        for file_name in os.listdir(product_path):
            if file_name.endswith(".pdf"):
                new_file_name = f"{product_id}.pdf"
                os.rename(os.path.join(product_path, file_name), os.path.join(product_path, new_file_name))
            elif file_name.endswith(".aasx"):
                new_file_name = f"{product_id}.aasx"
                os.rename(os.path.join(product_path, file_name), os.path.join(product_path, new_file_name))
            else:
                print(f"Unknown file type: {file_name} in {product_id}")
                #os.remove(os.path.join(product_path, file_name))  # remove non-pdf and non-aasx files

## 1.2 Raw Text-Level Preprocessing

In this step, the downloaded `*.aasx` files are **normalized and corrected** to ensure they can be successfully parsed using the **BaSyx Python SDK**.

### 1.2.1 Decompress `*.aasx` Files

Each `*.aasx` archive is unzipped to allow direct access to its internal files.  
The AASX format follows the Open Packaging Convention (OPC), meaning it is effectively a structured ZIP container.

In [8]:
def decompress_aasx_files(directory='.'):
    for filename in os.listdir(directory):
        if filename.lower().endswith('.aasx'):
            base_name = os.path.splitext(filename)[0]
            zip_name = f"{base_name}.zip"
            folder_name = os.path.join(directory, base_name)
            # Rename .aasx to .zip
            original_path = os.path.join(directory, filename)
            zip_path = os.path.join(directory, zip_name)
            os.rename(original_path, zip_path)

            # Extract .zip contents
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                zip_ref.extractall(folder_name)

            # Rename .zip back to .aasx
            os.rename(zip_path, original_path)

for product_id in tqdm(os.listdir(processing_path)):
    product_path = os.path.join(processing_path, product_id)
    if os.path.isdir(product_path):
        decompress_aasx_files(product_path)

100%|██████████| 15/15 [00:04<00:00,  3.70it/s]


### 1.2.2 Fix Relationship Files

`*.rels` **relationship files** define the internal structure and references between documents inside the AASX container.  
This preprocessing step verifies and corrects these relationship definitions to ensure that all AASX files can be read without parsing errors.

In [9]:
# Define the mappings to fix
URL_REPLACEMENTS = {
    "http://www.admin-shell.io/aasx/relationships/aasx-origin":
        "http://admin-shell.io/aasx/relationships/aasx-origin",
    "http://www.admin-shell.io/aasx/relationships/aas-spec":
        "http://admin-shell.io/aasx/relationships/aas-spec",
    "http://www.admin-shell.io/aasx/relationships/aas-spec-split":
        "http://admin-shell.io/aasx/relationships/aas-spec-split",
    "http://www.admin-shell.io/aasx/relationships/aas-suppl":
        "http://admin-shell.io/aasx/relationships/aas-suppl"
}

def fix_urls_in_file(file_path):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            content = f.read()

        original_content = content
        for old_url, new_url in URL_REPLACEMENTS.items():
            content = content.replace(old_url, new_url)

        if content != original_content:
            with open(file_path, "w", encoding="utf-8") as f:
                f.write(content)
            print(f"Updated: {file_path}")
        else:
            pass
            #print(f"No changes: {file_path}")

    except Exception as e:
        print(f"Error processing {file_path}: {e}")


for product_path in tqdm(Path(processing_path).iterdir()):
    if os.path.isdir(product_path):
        for rels_file in product_path.rglob("*.rels"):
            fix_urls_in_file(rels_file)


15it [00:01, 12.07it/s]


### 1.2.3 Collect Metadata

During preprocessing, relevant metadata is collected for later analysis.  
Specifically, this step:

- Checks the **AAS version** (e.g., v2.0, v3.0).  
- Identifies the **Technical Data Submodel**, if present.

In [10]:
def read_rels(folder_path, rels_path):
    """Parse a .rels file from the extracted AASX folder."""
    rels_file = Path(folder_path) / rels_path
    if not rels_file.exists():
        raise FileNotFoundError(f"Missing .rels file: {rels_file}")
    
    xml = ET.parse(rels_file).getroot()
    ns = {"r": "http://schemas.openxmlformats.org/package/2006/relationships"}
    rels = []
    for rel in xml.findall("r:Relationship", ns):
        rels.append({
            "Id": rel.attrib["Id"],
            "Type": rel.attrib["Type"],
            "Target": rel.attrib["Target"],
        })
    return rels

def find_aas_file(folder_path):
    """Find the main AAS file path in an extracted AASX folder.
    Returns:
        main_aas_path (str): Path to the main AAS XML/JSON file.
        origin_rels_path (str): Path to the rels file pointing to it.
        aas_rels_path (str | None): Path to the .rels file next to the AAS file, if it exists.
    """
    folder_path = Path(folder_path)

    # 1️⃣ Read package-level relationships
    root_rels = read_rels(folder_path, "_rels/.rels")
    origin_rel = next((r for r in root_rels if "aasx-origin" in r["Target"]), None)
    if not origin_rel:
        raise FileNotFoundError("No aasx-origin found in _rels/.rels")

    # 2️⃣ Follow to the next relationship file
    origin_rel_path = PurePosixPath(origin_rel["Target"])
    if origin_rel_path.is_absolute():
        origin_rel_path = PurePosixPath(str(origin_rel_path)[1:])  # strip leading slash

    rels_for_origin = f"{origin_rel_path.parent}/_rels/{origin_rel_path.name}.rels"
    rels_for_origin = str(rels_for_origin).lstrip("/")  # ensure relative
    origin_rels = read_rels(folder_path, rels_for_origin)

    # 3️⃣ Find the main AAS entry (XML or JSON)
    main_rel = next(
        (r for r in origin_rels if "aas" in r["Target"] or "content" in r["Target"]),
        None
    )
    if not main_rel:
        raise FileNotFoundError("No main AAS relationship found")

    main_target = origin_rel_path.parent / main_rel["Target"]
    if main_target.is_absolute():
        main_target = str(main_target)[1:]
    else:
        main_target = str(main_target)

    main_aas_path = folder_path / main_target

    # 4️⃣ Find the .rels file next to the main AAS file (if it exists)
    aas_rels_path = (
        main_aas_path.parent / "_rels" / f"{main_aas_path.name}.rels"
    )
    if not aas_rels_path.exists():
        aas_rels_path = None  # not all AASX files include this

    # 5️⃣ Return all three
    return str(main_aas_path), str(folder_path / rels_for_origin), str(aas_rels_path) if aas_rels_path else None

find_aas_file(os.path.join(processing_path,"Wago_2000-3228","Wago_2000-3228"))
find_aas_file(os.path.join(processing_path,"Festo_8062201","Festo_8062201"))
#find_aas_file(os.path.join(processing_path,"Harting_24024070000","Harting_24024070000"))
#find_aas_file(os.path.join(processing_path,"RStahl_261385","RStahl_261385")) 

('/app/data/processed/sample/Festo_8062201/Festo_8062201/aasx/AAS_Type_VFFG_T_F6_A_V1/AAS_Type_VFFG_T_F6_A_V1.aas.xml',
 '/app/data/processed/sample/Festo_8062201/Festo_8062201/aasx/_rels/aasx-origin.rels',
 '/app/data/processed/sample/Festo_8062201/Festo_8062201/aasx/AAS_Type_VFFG_T_F6_A_V1/_rels/AAS_Type_VFFG_T_F6_A_V1.aas.xml.rels')

In [13]:
# Collect Metadata

# Regex to extract version from URL
version_pattern = re.compile(r"https?://(?:www\.)?admin-shell\.io/aas/([1-3])/0")
#TODO find version also from *.json files

def analyze_file(filepath):
    """Reads the file and returns (version, has_technical_data)"""
    try:
        with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
            content = f.read()

        # Extract version
        version_match = version_pattern.search(content)
        version = f"{version_match.group(1)}.0" if version_match else "Not Found"

        # Check for 'TechnicalData'
        has_technical_data = "Yes" if "TechnicalData" in content else "No"

        return (str(filepath), version, has_technical_data)
    except Exception as e:
        return (str(filepath), f"Error: {e}", "Error")

metadata = []
for product_id in tqdm(os.listdir(processing_path)):
    if not os.path.isdir(os.path.join(processing_path, product_id)) or product_id == "configs":
        continue
    product_path = os.path.join(processing_path, product_id, product_id)
    aas_file, _, _ = find_aas_file(product_path)
    if not os.path.exists(aas_file):
        print(f"AAS file not found for {product_id}")
        continue
    aas_path, aas_version, has_technical_data = analyze_file(aas_file)
    metadata.append({"Product_Id": product_id, "AAS_File": aas_path, "AAS_Version": aas_version, "Has_Technical_Data": has_technical_data})
metadata_df = pd.DataFrame(metadata)
metadata_df.to_csv(metadata_path, index=False)



100%|██████████| 15/15 [00:00<00:00, 22.57it/s]


In [14]:
# Move datasheets to product folder
# Currently only for RStahl AASX files contain the product datasheets
# Uses a specific regex to identify the correct datasheet


no_datasheet = []
en_pattern = re.compile(r'V\s*[\d\.]+\s+EN')

for product_id in os.listdir(processing_path):
    if not product_id.startswith("RStahl"):
        continue
    file_versions = {}

    product_files_path = os.path.join(processing_path, product_id, product_id, "aasx", "files")

    for file in os.listdir(product_files_path):
        if file.startswith("ZDB") and file.endswith(".pdf"):
            file_path = os.path.join(product_files_path, file)
            reader = PdfReader(file_path)

            highest_version = None

            for page in reader.pages[:1]:
                text = page.extract_text()
                if text:
                    matches = en_pattern.findall(text)
                    if matches:
                        # Assuming version appears once per page, take first match
                        version_str = matches[0]
                        try:
                            version = float(version_str)
                        except ValueError:
                            version = -1  # fallback if version string can't be converted
                        highest_version = version  # Keep latest found version in document

            if highest_version is not None:
                file_versions[file] = highest_version

    if not file_versions:
        no_datasheet.append(product_id) # should be empty
    else:
        if len(file_versions) == 1:
            selected_file = list(file_versions.keys())[0]
        else:
            # Select file with highest version
            selected_file = max(file_versions.items(), key=lambda x: x[1])[0]
        # Move selected file to product folder
        selected_file_path = os.path.join(product_files_path, selected_file)
        product_folder = os.path.join(processing_path, product_id)
        new_file_path = os.path.join(product_folder, f"{product_id}.pdf")
        shutil.copy(selected_file_path, new_file_path)

### 1.2.4 Remove Unnecessary Files

Files that **inflate the AASX size** and are **not required for subsequent processing** are removed.  
References to these files are also deleted to avoid broken links or parsing errors.

In [15]:
# Remove files to save space (RStahl datasheets are now in product folder)
for product_id in os.listdir(processing_path):
    if not product_id.startswith("RStahl"):
        continue
    product_files_path = os.path.join(processing_path, product_id, product_id, "aasx", "files")
    if os.path.exists(product_files_path):
        for file in os.listdir(product_files_path):
            if file.endswith(".pdf"):
                file_path = os.path.join(product_files_path, file)
                os.remove(file_path)

In [16]:
# Remove CAD files to avoid license issues
for product_id in tqdm(os.listdir(processing_path)):
    if not os.path.isdir(os.path.join(processing_path,product_id)):
        continue
    if not product_id.startswith("Festo"):
        continue

    product_files_path = os.path.join(processing_path, product_id, product_id, "aasx")
    files_to_delete = []
    for file in Path(product_files_path).rglob("*"):
        if str(file).endswith(".zip") or str(file).endswith(".edz"):
            file_path = os.path.join(product_files_path, file)
            files_to_delete.append(file)
            os.remove(file)

    aas_path, _, rels_path = find_aas_file(os.path.join(processing_path, product_id, product_id))

    # remove references to deleted files in the .rels file
    with open(rels_path, 'r', encoding='utf-8') as file:
        rels_content = file.read()
    for filename in files_to_delete:
        pattern = rf'<Relationship\s[^>]*Target="[^"]*{re.escape(filename.name)}"[^>]*/>'
        rels_content = re.sub(pattern, '', rels_content)
    with open(rels_path, 'w', encoding='utf-8') as file:
        file.write(rels_content)

    # remove references to deleted files in the main AAS file
    with open(aas_path, 'r', encoding='utf-8') as file:
        aas_content = file.read()
    for filename in files_to_delete:
        pattern = rf'/aasx(?:/[^/\s]+)*/{re.escape(filename.name)}'
        aas_content = re.sub(pattern, '', aas_content)

    # save the cleaned AAS file
    with open(aas_path, 'w', encoding='utf-8') as file:
        file.write(aas_content)



100%|██████████| 16/16 [00:00<00:00, 29.92it/s]


In [17]:
def remove_missing_file_urls(product_id,xml_path):
    ns = {'aas': 'https://admin-shell.io/aas/3/0'} # assume verson 3.0 only
    tree = ET.parse(xml_path)
    root = tree.getroot()
    # Iterate over all <aas:file> elements
    for file_elem in root.findall('.//aas:file', ns):
        # Find the <aas:value> element
        value_elem = file_elem.find('aas:value', ns)
        if value_elem is not None:
            if value_elem.text is not None:
                full_path = os.path.join(processing_path, product_id, product_id, value_elem.text[1:])
                if not os.path.isfile(full_path):
                    print(f"File not found: {full_path}")
                    value_elem.text = ''
                    tree.write(xml_path, encoding='UTF-8', xml_declaration=True)


for product_id in tqdm(os.listdir(processing_path)):
    if not os.path.isdir(os.path.join(processing_path,product_id)):
        continue
    if not product_id.startswith("Harting"):
        continue
    # folder_path = os.path.join(harting_path, product_id, product_id, "aasx")
    # folder_path = Path(folder_path)
    # files = folder_path.rglob("*.xml")
    file_path = find_aas_file(os.path.join(processing_path, product_id, product_id))[0]
    remove_missing_file_urls(product_id, file_path)
 

100%|██████████| 16/16 [00:00<00:00, 48.94it/s]


### 1.2.4 Clean AAS Files
 - Merge General Technical Data Submodel Element Collection
 - Merge or Delete Properties with duplicate idShort
 - Change all idShorts to only contain alphanumeric digits and "_"
 - Replace German decimal "," with "."
 - Lowercase all language tags

### 1.2.5 Clean AAS Files

Each AAS file is standardized to improve downstream compatibility:

- Merge duplicate **General Technical Data** submodel and **Submodel Element Collections**.  
- Merge or delete **properties with duplicate `idShort`**.  
- Replace all invalid characters in `idShort` fields so they only contain **alphanumeric characters** and underscores (`_`).  
- Convert **German decimal commas (`,`)** to **periods (`.`)** for numeric values.  
- Convert all **language tags to lowercase**.

In [18]:
def merge_general_technical_data(xml_content):
    """
    Some AAS files contain two "GeneralTechnicalData" blocks which cannot be handled by basyx.
    This function merges these two blocks into one.
    """
    pattern = re.compile(
        r'(<submodelElementCollection>\s*<idShort>GeneralTechnicalData</idShort>.*?<value>.*?</value>\s*</submodelElementCollection>)',
        re.DOTALL
    )
    matches = pattern.findall(xml_content)

    if len(matches) <= 1:
        return xml_content  # Nothing to merge

    # Identify preferred block: the one with "Allgemeine Technische Daten"
    preferred_block = None
    other_block = None

    for block in matches:
        if "Allgemeine Technische Daten" in block:
            preferred_block = block
        else:
            other_block = block

    if not preferred_block or not other_block:
        raise ValueError("Unexpected XML Structure: Can't identify both blocks.")

    # Extract properties from both blocks
    value_pattern = re.compile(
        r'<value>\s*((?:\s*<(property|multiLanguageProperty)>.*?</\2>\s*)+)</value>',
        re.DOTALL
    )

    preferred_value = value_pattern.search(preferred_block).group(1)
    other_value = value_pattern.search(other_block).group(1)

    merged_value = preferred_value + other_value
    merged_block = preferred_block.replace(preferred_value,merged_value)
    xml_content = xml_content.replace(preferred_block,merged_block)
    xml_content = xml_content.replace(other_block, "")
    return xml_content


def merge_duplicate_properties(xml_text):
    """
    Some submodels contain duplicate properties which cannot be handled by basyx.
    This function merges these duplicate properties into one, combining their values or removing duplicates."""
    # Match property or multiLanguageProperty blocks
    pattern = re.compile(
        r'(?P<block><(?P<tag>property|multiLanguageProperty).*?</\2>)',
        re.DOTALL
    )

    # Group by idShort
    grouped = defaultdict(list)

    # These properties are manually identified to be merged or removed
    # This could possibly be done more elegantly
    # Possible improvements:
    # - Automatically identify properties with duplicate idShort
    # - Split timeconstant into Electric, Thermal
    # - Split range properties into min and max
    properties_to_merge = ['Assembly', 'TypeOfPistonRod', 'ModelOfRotatingElectricalMachines_AccordingToCodeI',
                           'Certificate_Approval', 'Certification', 'AmbientTemperature',
                           'ControllerFunction', 'PneumaticOutputPort','PneumaticInputPort',
                           'PneumaticPilotPort','PilotMedium', 'OperatingMedium', 'PressureMedium', 'CompressedAirQualityClassAtInlet',
                           'DegreeOfProtection_IP_Mounted',
                           'DesignOfTheElectricalConnection_head2_','DesignOfTheElectricalConnection_head1_', 'FieldBus_system',
                           'Markings','InterfaceDesign','AnalogOutputVoltage', 'AnalogOutput', 'IO_LinkDeviceProfile', 'SupportedProtocol',
                           'MaterialOfHousing', 'HousingMaterial', 'TypeOfAdjustment', 'StyleOfCommunicationInterface', 'ConductorConnectionMethod','TypeOfConnector',
                           'SignalStatusDisplay', 'DegreeOfProtection', 'ManualOverride', 'TypeOfPneumaticConnections', 'IPProtectionClassWithConnector',
                           'ConnectionType', 'DesignOfTheElectricalConnection', 'ThreadSizeConnector', 'Coding', 'EncoderProtocol', 'DesignOfFeedbackSystem',
                           'ModeOfOperationEnd_PositionLocking', 'TypeOfPistonRodEnd', 'Labs_ConformityToVDMA24364','TypeOfCushioning', 'PistonRodThread',
                           'CondensateDrain', 'CompressedAirQualityClassAtOutlet', 'ArrangementOfTheCableLead_In_head2_', 'TypeOfPlug_InContactHead1',
                           'DesignOfTheProcessConnection','Lap', 'DirectionalControlValveFunction', 'ControlCharacteristics', 'FunctionInNormalPosition',
                           'MountingOrientation', 'ValveReturn', 'OperatingVoltageType', 'MeasurementMethodForPressureFlow','IPProtectionClassWithoutConnector', 'PneumaticExhaustPort']
    properties_max_one = ['MaxWorkingPressure','PressureRegulationRange','OperatingPressure','MaxPressureHysteresis', 'MaximumOutputPressure',
                            'MinimumOutputPressure','MaximumOperatingPressure','MinimumOperatingPressure', 'MinimumPilotPressure','MaximumPilotPressure']
    properties_to_delete = ['TimeConstant']

    for match in pattern.finditer(xml_text):
        block = match.group('block')
        tag = match.group('tag')
        idshort_match = re.search(r'<idShort>(.*?)</idShort>', block)
        if idshort_match.group(1) not in properties_to_merge + properties_max_one + properties_to_delete:
            continue

        # Parse the XML fragment inside a root element
        root = ET.fromstring(f"<root>{block}</root>")
        content = {}
        if tag == 'property':
            # Find the <property> element
            property_elem = root.find('property')
            # Loop through direct children to find the correct <value>
            for child in property_elem:
                if child.tag == 'value':
                    content['de'] = child.text.strip()
        elif tag == 'multiLanguageProperty':
            # Find the multiLanguageProperty block
            ml_prop = root.find('multiLanguageProperty')
            # Navigate to value > langStringTextType
            for lang_entry in ml_prop.find('value').findall('langStringTextType'):
                lang = lang_entry.find('language').text.strip()
                text = lang_entry.find('text').text.strip()
                content[lang] = text

        if idshort_match:
            idshort = idshort_match.group(1)
            grouped[idshort].append([match.start(), match.end(), block, tag, content])
    ops = []
    for dprop in properties_max_one:
        if dprop in grouped:
            for i in grouped[dprop][1:]:
                ops.append((i[0],i[1],''))
    for dprop in properties_to_delete:
        if dprop in grouped:
            for i in grouped[dprop]:
                ops.append((i[0],i[1],''))
    for mprop in properties_to_merge:
        if mprop in grouped:
            for i in grouped[mprop][1:]:
                if 'de' in i[-1]:
                    grouped[mprop][0][2] = grouped[mprop][0][2].replace(grouped[mprop][0][-1]['de'], i[-1]['de'] + "\n" +grouped[mprop][0][-1]['de'])
                    grouped[mprop][0][-1]['de'] = i[-1]['de']+ '\n' + grouped[mprop][0][-1]['de']
                if 'en' in i[-1] and 'en' in grouped[mprop][0][-1]:
                    grouped[mprop][0][2] = grouped[mprop][0][2].replace(grouped[mprop][0][-1]['en'], i[-1]['en'] + "\n" +grouped[mprop][0][-1]['en'])
                    grouped[mprop][0][-1]['en'] = i[-1]['en']+ '\n' + grouped[mprop][0][-1]['en']
            for i in grouped[mprop][1:][::-1]:
                ops.append((i[0],i[1],''))
            ops.append((grouped[mprop][0][0],grouped[mprop][0][1], grouped[mprop][0][2]))
    ops.sort(key=lambda x: x[0], reverse=True)
    for start, end, text in ops:
        xml_text = xml_text[:start] + text + xml_text[end:]
    return xml_text


In [19]:
for product_id in tqdm(os.listdir(processing_path)):
    if not os.path.isdir(os.path.join(processing_path,product_id)):
        continue
    if not product_id.startswith("Festo"):
        # TODO: scan all manufacuterers for the following issues
        continue
    aas_path, _, _ = find_aas_file(os.path.join(processing_path, product_id, product_id))
    with open(aas_path, 'r', encoding='utf-8') as file:
        aas_content = file.read()


    # german floats with comma
    pattern = rf'(?<=\d),(?=\d)'
    aas_content = re.sub(pattern, '.', aas_content)
    # replace all idShort with only alphanumeric characters and underscores
    aas_content = re.sub(r'<idShort>([^<]*)</idShort>', lambda m: f"<idShort>{re.sub(r'[^a-zA-Z0-9_]', '_', m.group(1))}</idShort>", aas_content)
    # merge duplicate properties

    aas_content = merge_duplicate_properties(aas_content)
    aas_content = merge_general_technical_data(aas_content)

    with open(aas_path, 'w', encoding='utf-8') as file:
        file.write(aas_content)

100%|██████████| 16/16 [00:00<00:00, 52.02it/s]


In [20]:
def fix_language_tags_in_dict(obj):
    """
    Recursively correct 'value' fields with language tags to lowercase.
    """
    if isinstance(obj, list):
        for item in obj:
            fix_language_tags_in_dict(item)
    elif isinstance(obj, dict):
        for key, value in obj.items():
            if isinstance(value, dict):
                fix_language_tags_in_dict(value)
            elif isinstance(value, list):
                for item in value:
                    fix_language_tags_in_dict(item)
            elif isinstance(value, str) and len(value) == 2 and key == "language":
                # If the value is a language tag, convert it to lowercase
                obj[key] = value.lower()
                #print(f"Corrected language tag: {value} to {obj[key]}")
                
            if 'idShort' in obj and obj['idShort'].startswith("Language") and key == "value":
                obj[key] = value.lower()
                #print(f"Corrected language tag: {value} to {obj[key]}")


def fix_json_file(file_path):
    """
    Load JSON, fix language tags, and overwrite the file.
    """
    with open(file_path, encoding="utf-8-sig") as f:
        data = json.load(f)

    fix_language_tags_in_dict(data)

    with open(file_path, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)



In [21]:
for product_id in tqdm(os.listdir(processing_path)):
    if not os.path.isdir(os.path.join(processing_path,product_id)):
        continue
    if not product_id.startswith("RStahl"):
        continue
    json_file_path = find_aas_file(os.path.join(processing_path, product_id, product_id))[0]
    fix_json_file(json_file_path)

100%|██████████| 16/16 [00:00<00:00, 19.96it/s]


### 1.2.6 Compress to AASX Archive Again

After cleaning and validation, each directory is **zipped back into a `.aasx` archive**, and the temporary unzipped folders are removed.


In [22]:
# compress and rename to product_id.aasx
for product_id in tqdm(os.listdir(processing_path)):
    folder_path = os.path.join(processing_path, product_id, product_id)
    aasx_path = os.path.join(processing_path, product_id, product_id+".aasx")
    if os.path.exists(folder_path):
        # Create the ZIP (aasx) file
        with zipfile.ZipFile(aasx_path, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
            # Walk the folder and add files
            for root, _, files in os.walk(folder_path):
                for file in files:
                    full_path = os.path.join(root, file)
                    # Compute the archive name (relative path inside the ZIP)
                    arcname = os.path.relpath(full_path, start=folder_path)
                    zf.write(full_path, arcname)
        shutil.rmtree(folder_path)



100%|██████████| 16/16 [00:08<00:00,  1.95it/s]


## 1.3 Object-Level Preprocessing

At this stage, the cleaned `*.aasx` files can be processed using the **BaSyx Python SDK** to extract individual submodels and gather additional metadata.

### 1.3.1 Extract Technical Data Submodels

The **TechnicalData** submodels are extracted from each AAS and saved in **JSON format** for downstream processing and analysis.  

In [None]:
for product_id in tqdm(os.listdir(processing_path)):
    if not os.path.isdir(os.path.join(processing_path,product_id)):
        continue

    object_store = model.DictObjectStore()
    file_store = aasx.DictSupplementaryFileContainer()    
    with aasx.AASXReader(os.path.join(processing_path,product_id, product_id+".aasx")) as reader:
        # Read all contained AAS objects and all referenced auxiliary files
        reader.read_into(object_store=object_store,
                         file_store=file_store)
    for i in object_store:
        if i.id_short == "TechnicalData":
            technical_data_url = i.id
            break
    else:
        raise ValueError(f"TechnicalData submodel not found for {product_id}")
    
    submodel = object_store.get_identifiable(technical_data_url)

    with open(os.path.join(processing_path,product_id,product_id+"_technical_data.json"), "w") as file:
        json.dump(submodel, file, cls=basyx.aas.adapter.json.AASToJsonEncoder)

### 1.3.2 Collect Metadata

For each processed product, the following metadata is collected:

- **Product Classification** according to the ECLASS system (version and identifier)  
- **Number of technical properties** present in the TechnicalData submodel


In [None]:
def walk_elements(element):
    """Recursively yield all SubmodelElements."""
    if isinstance(element, model.SubmodelElementCollection):
        for child in element.value:
            yield from walk_elements(child)
    else:
        yield element

def normalize_name(name: str):
    """Normalize element names for easier matching."""
    return name.strip().lower() if name else ""

def parse_class_id(raw_value: str):
    """
    Parse a class ID like '27-44-02-17 | 0173-1#01-AFR572#009'
    into (normalized_id, irdi).
    """
    parts = [p.strip() for p in raw_value.split("|")]
    class_id = parts[0].replace("-", "").strip()
    irdi = parts[1] if len(parts) > 1 else None
    return class_id, irdi

def looks_like_eclass(class_id: str):
    """Heuristic to detect if a class ID pattern resembles an ECLASS code."""
    if not class_id:
        return False
    if re.match(r"^\d{8}$", class_id):  # e.g., 51030401
        return True
    if re.match(r"^\d{2}-\d{2}-\d{2}-\d{2}$", class_id):  # e.g., 27-44-02-17
        return True
    if "|" in class_id and "0173-1#" in class_id:
        return True
    return False

def normalize_version(version: str):
    """Normalize ECLASS version strings like '12.0 (BASIC)' or '12' → '12.0'."""
    if not version:
        return ""
    version = version.strip()
    # Remove things like (BASIC), (ADVANCED), etc.
    version = re.sub(r"\s*\([^)]*\)", "", version).strip()
    # If it's a plain integer, add .0
    if re.fullmatch(r"\d+", version):
        version = f"{version}.0"
    return version

def extract_classifications(submodel):
    """Extract possible classification triplets from a TechnicalData submodel."""
    results = []
    current = {}
    
    for elem in walk_elements(submodel):
        name = normalize_name(elem.id_short)
        value = str(getattr(elem, "value", "")).strip()
        if not value:
            continue

        if "systemversion" in name:
            current["version"] = value
        elif "system" in name:
            current["system"] = value
        elif "classid" in name:
            current["class_id_raw"] = value

        if all(k in current for k in ("system", "version", "class_id_raw")):
            results.append(current)
            current = {}

    if current and "class_id_raw" in current:
        results.append(current)

    return results

def get_eclass_info(submodel):
    """Return the ECLASS classification info (system, version, class_id, irdi)."""
    classifications = extract_classifications(submodel)
    if not classifications:
        return None

    # Filter for ECLASS
    eclass_entries = [
        c for c in classifications if "eclass" in c.get("system", "").lower()
    ]

    # If no explicit ECLASS but only one system, infer from pattern
    if not eclass_entries and len(classifications) == 1:
        c = classifications[0]
        class_id_raw = c.get("class_id_raw", "")
        if looks_like_eclass(class_id_raw):
            c["system"] = "ECLASS"
            eclass_entries = [c]

    if not eclass_entries:
        return None

    chosen = eclass_entries[-1]  # choose last or most complete one
    class_id_raw = chosen.get("class_id_raw", "")
    class_id, irdi = parse_class_id(class_id_raw)
    version = normalize_version(chosen.get("version", ""))

    result = {
        "Classification_System": "ECLASS",
        "Classification_System_Version": version,
        "Class_Id": class_id,
        "IRDI": irdi
    }
    return result


In [None]:
metadata = []
for product_id in tqdm(os.listdir(processing_path)):
    if not os.path.isdir(os.path.join(processing_path, product_id)) or product_id=="configs":
        continue
    object_store = model.DictObjectStore()
    file_store = aasx.DictSupplementaryFileContainer()    
    with aasx.AASXReader(os.path.join(processing_path,product_id, product_id+".aasx")) as reader:
        # Read all contained AAS objects and all referenced auxiliary files
        reader.read_into(object_store=object_store,
                        file_store=file_store)
    for i in object_store:
        if i.id_short == "TechnicalData":
            technical_data_url = i.id
            break
    else:
        raise ValueError(f"TechnicalData submodel not found for {product_id}")


    submodel = object_store.get_identifiable(technical_data_url)
    technical_properties = submodel.get_referable("TechnicalProperties")
    properties = []
    def extract_properties(elements):
        for elem in elements:
            if isinstance(elem, model.submodel.SubmodelElementCollection):
                extract_properties(elem)  # Recursive call for nested collections
            elif isinstance(elem, model.submodel.MultiLanguageProperty) or isinstance(elem, model.submodel.Property):
                properties.append(elem)
            else:
                print("Unknown Element", elem)
    extract_properties(technical_properties)


    data = get_eclass_info(submodel.get_referable("ProductClassifications"))

    data["Product_Id"] = product_id
    data["Company"] = product_id.split("_")[0]
    data['n_Properties'] = len(properties)
    for i in submodel.get_referable("GeneralInformation"):
        if i.id_short == "ManufaturerArticleNumber":
            data['Article_Number'] = submodel.get_referable("GeneralInformation").get_referable("ManufacturerArticleNumber").value
    metadata.append(data)

new_metadata_df = pd.DataFrame(metadata)
metadata_df = pd.read_csv(metadata_path)
metadata_df = pd.merge(metadata_df, new_metadata_df, on="Product_Id", how="left")
metadata_df.to_csv(metadata_path, index=False)
metadata_df


## 1.4 Miscellaneous Preprocessing Code

This section contains auxiliary code.

### 1.4.1 Retrieve Product URLs

Helper routines are provided to retrieve the **source URLs** of each product.  

In [None]:
def company_from_product_id(product_id):
    return(product_id.split("_")[0])

metadata_df = pd.read_csv(os.path.join(processing_path, "metadata.csv"))
for product_id in os.listdir(processing_path):
    if not os.path.isdir(os.path.join(processing_path,product_id)):
        continue
    company = product_id.split("_")[0]
    product_id = "_".join(product_id.split("_",1)[1:])
    if company == "Wago":
        url = f"http://www.wago.com/global/p/{product_id}"
    elif company == "Harting":
        url = f"https://www.harting.com/{product_id}"
    elif company == "RStahl":
        url = f"https://dt.r-stahl.com/type/{product_id}"
    elif company == "Festo":
        url = f"https://www.festo.com/de/en/a/{product_id}"
    print(url)