# HomeAssistant BERT Training Data generation

This notebook is used to generate data to train the BERT model for using sentences in Catalan.

It is based in the existing intent definition in catalan in:
https://github.com/home-assistant/intents/tree/main/sentences/ca

Data from that repository is not to train a BERT system but for using it as a phrase structure to interpret the senteces to generate intents.

In this notebook, we will expand those phrases to be able to use them to train a BERT system.


## Install required dependencies

In [1]:
%pip install pyyaml pandas

Note: you may need to restart the kernel to use updated packages.


## Import required libs

In [2]:
import yaml
import os
import re
import itertools
import pandas as pd

In [3]:
def load_expansion_rules(common_file_path):
    """
    Load expansion rules from the _common.yaml file.
    """
    print(f"Loading expansion rules from {common_file_path}")
    with open(common_file_path, 'r', encoding='utf-8') as f:
        content = yaml.safe_load(f)
    return content.get('expansion_rules', {})


def expand_rules(sentence, expansion_rules):
    """
    Expand rules in the sentence using the provided expansion rules.
    """
    while '<' in sentence and '>' in sentence:
        match = re.search(r'<(.*?)>', sentence)
        if not match:
            break
        rule_name = match.group(1)
        rule_expansion = expansion_rules.get(rule_name, f"<{rule_name}>")
        old_sentence = sentence
        sentence = sentence.replace(f"<{rule_name}>", rule_expansion, 1)
        if sentence == old_sentence:
            print(f"Warning: No expansion found for {rule_name}. Keeping original.")
            break
    return sentence


def expand_blocks(sentence):
    """
    Expand blocks in the sentence between the specified initial and end characters.
    """
    initial_chars = ['(','[']
    end_chars = [')',']']
    expanded_sentences = []
    expanded=False
    #if sentence contains any of the initial characters and end characters
    if any(char in sentence for char in initial_chars) and any(char in sentence for char in end_chars):
        end_char_pos_found= False
        initial_char_pos2_found = False
        for initial_char_pos in range(len(sentence)):
            if sentence[initial_char_pos] in initial_chars:
                break;
        for end_char_pos in range(initial_char_pos+1, len(sentence)):
            if sentence[end_char_pos] in end_chars:
                end_char_pos_found = True
                break;

        for initial_char_pos2 in range(initial_char_pos+1, len(sentence)):
            if sentence[initial_char_pos2] in initial_chars:
                initial_char_pos2_found = True
                break;
        
        if end_char_pos_found and initial_char_pos2_found and initial_char_pos2 < end_char_pos:
            #execute the expansion recursive between the initial2 and end characters                       
            generatedsubstrings,expanded = expand_blocks(sentence[initial_char_pos2:end_char_pos+1])
            for generatedsubstring in generatedsubstrings:
                expanded_sentences.append(sentence[:initial_char_pos2] + generatedsubstring + sentence[end_char_pos+1:])
        else:
            #expand the sentence between the initial and end characters generate as may sentences as values separeted by |            
            options = sentence[initial_char_pos+1:end_char_pos].split('|')            
            for option in options:                
                expanded_sentences.append(sentence[:initial_char_pos] + option + sentence[end_char_pos+1:])
                expanded=True
    else:
        expanded_sentences = [sentence]
    return expanded_sentences, expanded

def expand_sentence_blocks(sentence):
    """
    Expand blocks in the sentence using the provided expansion rules.
    """    
    sentences= [sentence]
    expanded=True
    while expanded:
        expanded=False
        outsentences = []
        for sentence in sentences:
            expanded_sentences,expanded_inner=expand_blocks(sentence)
            if expanded_inner:
                expanded=True
            for expanded_sentence in expanded_sentences:
                outsentences.append(expanded_sentence)
        #remove duplicates
        for i in range(len(outsentences)):
            for j in range(i+1, len(outsentences)):
                if outsentences[i] == outsentences[j]:
                    outsentences.pop(j)
                    break
        sentences = outsentences
    return sentences

def expand_sentence(sentence, expansion_rules):
    """
    Expand a sentence using the provided expansion rules.
    """
    sentences = [sentence]
    outsentences=[]
    for sentence in sentences:
        outsentences.append(expand_rules(sentence, expansion_rules))

    sentences = outsentences
    outsentences = []
    for sentence in sentences:
        sentence_outsentences = expand_sentence_blocks(sentence)
        for sentence_outsentence in sentence_outsentences:
            outsentences.append(sentence_outsentence)
    sentences = outsentences
    return sentences

def load_sentences_from_yaml(file_path, expansion_rules):
    """
    Load sentences from a YAML file and expand them using the provided expansion rules.
    """
    print(f"Loading sentences from {file_path}")
    with open(file_path, 'r', encoding='utf-8') as f:
        content = yaml.safe_load(f)
    data = []
    # Navigate through the YAML structure
    for intent_name, intent_data in content.get('intents', {}).items():
        for item in intent_data.get('data', []):
            sentences = item.get('sentences', [])
            for sentence in sentences:
                expanded = expand_sentence(sentence,expansion_rules)
                for s in expanded:
                    data.append({'sentence': s, 'intent': intent_name})
    return data

def process_directory(yaml_dir):
    all_data = []
    # Process general YAML files
    common_file_path = os.path.join(yaml_dir, "_common.yaml")
    expansion_rules = load_expansion_rules(common_file_path)

    # Process each YAML file in the directory
    for file_name in os.listdir(yaml_dir):
        print(file_name)
        if file_name.endswith('.yaml') or file_name.endswith('.yml'):
            path = os.path.join(yaml_dir, file_name)
            sentences=load_sentences_from_yaml(path,expansion_rules)
            all_data.extend(sentences)
            print(f"Loaded {len(sentences)} sentences from {file_name}")
    return all_data




## Process directory where intens are present 

In [4]:
if __name__ == "__main__":
    yaml_directory = r".\from_ha_intents\sentences\ca"
    #yaml_directory = r".\test_ca"
    output_csv = "hass_intents_ca.csv"

    data = process_directory(yaml_directory)
    df = pd.DataFrame(data)
    df.to_csv(output_csv, index=False)
    print(f"Dataset generat amb {len(df)} frases i desat a: {output_csv}")

Loading expansion rules from .\from_ha_intents\sentences\ca\_common.yaml
assist_satellite_HassBroadcast.yaml
Loading sentences from .\from_ha_intents\sentences\ca\assist_satellite_HassBroadcast.yaml
Loaded 6 sentences from assist_satellite_HassBroadcast.yaml
climate_HassClimateGetTemperature.yaml
Loading sentences from .\from_ha_intents\sentences\ca\climate_HassClimateGetTemperature.yaml
Loaded 248 sentences from climate_HassClimateGetTemperature.yaml
climate_HassClimateSetTemperature.yaml
Loading sentences from .\from_ha_intents\sentences\ca\climate_HassClimateSetTemperature.yaml
Loaded 1098 sentences from climate_HassClimateSetTemperature.yaml
climate_HassTurnOff.yaml
Loading sentences from .\from_ha_intents\sentences\ca\climate_HassTurnOff.yaml
Loaded 14618 sentences from climate_HassTurnOff.yaml
climate_HassTurnOn.yaml
Loading sentences from .\from_ha_intents\sentences\ca\climate_HassTurnOn.yaml
Loaded 30948 sentences from climate_HassTurnOn.yaml
cover_HassGetState.yaml
Loading sen