# imports

In [228]:
import pandas as pd
from pathlib import Path
import json

In [229]:
from lxml.etree import SubElement

In [230]:
import os

In [231]:
import copy

# load ressources

In [232]:
path = Path('.').absolute().parent.parent/'modele_donnee'

In [233]:
path_audit = path/"audit_energetique"

# load xsd 

In [234]:
from lxml.etree import ElementTree, XMLSchema,Element
from lxml import etree

parser = etree.XMLParser(remove_blank_text=True)
path_xsd = r'D:\Developpement\observatoire-dpe\modele_donnee\DPE_complet.xsd'
schema = XMLSchema(file=path_xsd)

parser = etree.XMLParser(remove_blank_text=True)

et = etree.parse(path_xsd, parser)
root = et.getroot()

xs = '{http://www.w3.org/2001/XMLSchema}'
namespaces = {'xs': 'http://www.w3.org/2001/XMLSchema'}

all_doc = list(root.iterfind(f'*//xs:documentation', namespaces=namespaces))

for doc in all_doc:
    txt = doc.text.replace('\n', '').strip()
    txt = ' '.join(txt.split())
    doc.text = txt

et.write('test.xsd', pretty_print=True, xml_declaration=True, encoding='utf-8')

all_el = list(root.iterfind('*//xs:element', namespaces=namespaces))

all_doc = list(root.iterfind('*//xs:documentation', namespaces=namespaces))

In [235]:
root_dpe = et.getroot()

# load audit basic structure xsd

In [236]:
path_xsd_audit = r'D:\Developpement\observatoire-dpe\modele_donnee\audit_energetique'

In [237]:
structure_audit_path = os.path.join(path_xsd_audit,'structure_audit.xsd')

In [238]:
schema = XMLSchema(file=structure_audit_path)
et_audit = etree.parse(structure_audit_path, parser)
root_audit = et_audit.getroot()

# load ressources

In [239]:
ressources_audit_path = os.path.join(path_xsd_audit,'audit_reg_ressources.xsd')

In [240]:
et_ressources = etree.parse(ressources_audit_path, parser)
root_ressources = et_ressources.getroot()

# Mise à jour de la version 

In [241]:
version = 'v0.0.0'
date = '2022-03-16'
text = 'test'

In [242]:
audit = root_audit.xpath(f'//xs:element[@name="audit"]', namespaces=namespaces)[0]

In [243]:
documentation = audit.find('xs:annotation/xs:documentation',namespaces=namespaces)

In [244]:
documentation.text = f'Version {version} - {date} : {text}.'

In [245]:
audit_reg_path = os.path.join(path_xsd_audit,'audit_reg_test.xsd')

In [246]:
et_audit.write(audit_reg_path, pretty_print=True, xml_declaration=True, encoding='utf-8')

# Recupère administratif du xsd ressources et ajout dans l'audit 

In [247]:
administratif = root_ressources.xpath(f'//xs:element[@name="administratif"]', namespaces=namespaces)[0]

In [248]:
root_audit.append(administratif)

# MAJ du logement issue du dpe (ajout de scenario_id...)

## Recupère elements du xsd dpe (logement + Type)

In [249]:
logement_dpe = root_dpe.xpath(f'//xs:element[@name="logement"]', namespaces=namespaces)[0]

## Récupération des type du dpe ("t_adresse" ...) et ajout dans l'audit 

In [250]:
for el in root_dpe.getchildren():
    if ('complexType' in el.tag) or ('simpleType' in el.tag):
        root_audit.append(el)

## Definition d'un minimum de 4 logements dans logement_collection

In [251]:
logement_dpe.attrib.update({'minOccurs':'4','maxOccurs':"unbounded"})

## Ajout de scenario et etape

In [252]:
caracteristique_generale = logement_dpe.xpath(f'//xs:element[@name="caracteristique_generale"]', namespaces=namespaces)[0]

In [253]:
_all = caracteristique_generale.getchildren()[0].getchildren()[0]

In [254]:
# création de mon état_composant
scenario=Element(f'{xs}element')
scenario.attrib.update({'name':'enum_scenario_id'})
etape=Element(f'{xs}element')
etape.attrib.update({'name':'enum_etape_id'})

In [255]:
_all.append(copy.copy(scenario))
_all.append(copy.copy(etape))

# Recupère etape_travaux du xsd ressources et ajout dans logement_dpe

In [256]:
etape_travaux = root_ressources.xpath(f'//xs:element[@name="etape_travaux"]', namespaces=namespaces)[0]

In [257]:
logement_dpe.find('xs:complexType/xs:all',namespaces=namespaces).append(etape_travaux)

# MAJ de logement_collection à partir du xsd dpe

In [258]:
complexType = audit.find('xs:complexType',namespaces=namespaces)

In [259]:
sequence = Element(f'{xs}sequence')
complexType.insert(0,sequence)

In [260]:
ref_administratif = SubElement(sequence,f'{xs}element')
ref_administratif.attrib.update({'ref':'administratif'})
logement_collection = SubElement(sequence,f'{xs}element')
logement_collection.attrib.update({'name':'logement_collection'})
complexType = SubElement(logement_collection,f'{xs}complexType')
sequence = SubElement(complexType,f'{xs}sequence')

In [261]:
sequence.append(logement_dpe)

In [262]:
et_audit.write(audit_reg_path, pretty_print=True, xml_declaration=True, encoding='utf-8')

## enum_etat_composant_id 

In [263]:
# création de mon état_composant
etat_composant=Element(f'{xs}element')
etat_composant.attrib.update({'name':'enum_etat_composant_id'})

In [264]:
## récupère les children de l'enum enum_type_isolation_id pour aller plus vite
# enum_type_isolation_id = root_dpe.xpath(f'//xs:element[@name="enum_type_isolation_id"]', namespaces=namespaces)[0]
# etat_composant.extend(enum_type_isolation_id.getchildren())

In [265]:
# tous les éléments donnée d'entrée ont maintenant un objet etat_composant obligatoire
for donnee_entree in root_audit.xpath(f'//xs:element[@name = "donnee_entree"]', namespaces=namespaces):
    # un élement donnee_entree est un complextype qui contient all qui contient tous ses éléments et on ajoute donc a ce complex type l'élément etat_composant
    donnee_entree.find('xs:complexType/xs:all',namespaces=namespaces).append(copy.copy(etat_composant))

In [266]:
et_audit.write(audit_reg_path, pretty_print=True, xml_declaration=True, encoding='utf-8')

## Rendre les références des composants obligatoires

In [267]:
for reference in root_audit.xpath(f'//xs:element[@name = "reference"]', namespaces=namespaces):
    reference_great_grandparents_name = reference.getparent().getparent().getparent().attrib.get('name',None)
    if reference_great_grandparents_name == 'donnee_entree':
        del reference.attrib['minOccurs']
        del reference.attrib['nillable']

## Ajout d'une reference pour les ponts thermiques (en plus de reference_1 et reference_2)

In [268]:
reference_1 = root_audit.xpath(f'//xs:element[@name = "reference_1"]', namespaces=namespaces)[0]
reference = copy.copy(reference_1)

reference.attrib.update({'name':'reference'})
del reference.attrib['minOccurs']
del reference.attrib['nillable']
# Mise à jour de la doc
doc_text = "reference projet de l'objet (cette référence permet de faire d'éventuels liens entre objets). La codification et utilisation des références peut différer entre logiciels."
documentation = reference.find('xs:annotation/xs:documentation',namespaces=namespaces)
documentation.text = doc_text
# Mise à jour de appinfo
appinfo = reference.find('xs:annotation/xs:appinfo',namespaces=namespaces)
source = appinfo.attrib['source']
appinfo.attrib.update({'source':source.replace('reference_1','reference')})
# ajout de 'reference' dans donnee_entreee de pont_thermique
reference_1.getparent().insert(1,copy.copy(reference))

# Generation de l'enums json

In [269]:
enum_table_audit = pd.read_excel(path_audit / 'enum_tables_audit.xlsx', sheet_name=None,dtype=str)

enums_audit_doc = {k: v.doc.astype(str)[~v.doc.isna()].iloc[0] for k, v in enum_table_audit.items() if 'doc' in v and 'id' in v and any(~v.doc.isna())}

enums_audit_doc

enums_audit_dict = {k: v[~v.lib.isna()].astype(str).set_index('id').lib.str.strip().str.lower().to_dict() for k, v in enum_table_audit.items() if 'lib' in v and 'id' in v}

enums_audit_dict

In [274]:
###### NON utilisé, car pas d'enum commun avec dpe
# # Update des enums en commun avec le dpe 
# enum_table = pd.read_excel(path / 'enum_tables.xlsx', sheet_name=None,dtype=str)
# enums_dpe_dict = {k: v.astype(str).set_index('id').lib.str.strip().str.lower().to_dict() for k, v in enum_table.items() if 'lib' in v and 'id' in v}

# # Supprime les enums hors audit de l'enums_dpe_dict
# enums_dpe_dict_filtered = {enum_name:id_lib for (enum_name,id_lib) in enums_dpe_dict.copy().items() if enum_name in enums_audit_dict.keys()}

# enums_concat_dict = enums_dpe_dict_filtered.copy().copy()

# for enum_name,id_lib in enums_concat_dict.items():
#     id_lib.update(enums_audit_dict[enum_name])

# enums_audit_dict.update(enums_concat_dict)

In [275]:
# enregistrement des enums audit dans un .json
for enum_name, id_lib in enums_audit_dict.items():
    for _id, lib in id_lib.items():
        if isinstance(lib, str):
            id_lib[_id] = lib.replace('\xa0', ' ')

with open(path_audit / 'enums_audit.json', 'w', encoding='utf-8') as f:
    json.dump(enums_audit_dict, f, indent=4, ensure_ascii=False)

## Compléter les énums

In [276]:
# 'methode_saisie_u' in enums_dpe_dict.keys()

# enum_name_audit_only = [enum_name for enum_name in enums_audit_dict.keys() if enum_name not in enums_dpe_dict.keys()]

# enum_name = 'enum_etat_composant_id'

# el = root_audit.xpath(f'//xs:element[@name ="{enum_name}"]', namespaces=namespaces)[0]

# el

# simpletype = el.find('xs:simpleType', namespaces=namespaces)

# if simpletype is not None:
#     el.remove(simpletype)
# simpletype = SubElement(el, f"{xs}simpleType")

# enum_ids = list(enums_audit_dict[enum_name].keys())

# apply_restriction_enum(simpletype, enum_name, enum_ids)

In [277]:
def apply_restriction_enum(simpletype, enum_ids:list):
    restriction = simpletype.find('xs:restriction', namespaces=namespaces)
    if restriction is not None:
        simpletype.remove(restriction)
    restriction = SubElement(simpletype, f"{xs}restriction")
    enum_ids_checked = list()
    for el in enum_ids:
        # Vérifie qu'il s'agit bien d'un int
        # sinon, on le stock en string
        try:
            int_el=int(float(el))
            if int_el == float(el):
                enum_ids_checked.append(int_el)
            else:
                enum_ids_checked.append(str(el))
        except:
            enum_ids_checked.append(str(el))
            
    ## Retire les ids supprimés (n'est pas utilisé pour audit)
    #deleted_enums_values = deleted_enums.get(enum_name, [])
    #enum_ids_checked = set(enum_ids_checked) - set(deleted_enums_values)
    
    # Vérifie qu'on a uniquement des int (bool is_int)
    is_int = all([str(el).isdigit() for el in enum_ids_checked])

    if is_int:

        enum_ids_checked = [int(el) for el in enum_ids_checked]
        restriction.attrib.update({"base": 'xs:int'})
        enum_range = range(min(enum_ids_checked), max(enum_ids_checked) + 1)
        if set(enum_ids_checked) != set(enum_range):
            for value in enum_ids_checked:
                enumeration = SubElement(restriction, f"{xs}enumeration")
                enumeration.attrib.update({'value': str(value)})
        else:
            min_value = min(enum_ids_checked)
            max_value = max(enum_ids_checked)
            minInclusive = SubElement(restriction, f"{xs}minInclusive")
            minInclusive.attrib.update({'value': str(min_value)})
            maxInclusive = SubElement(restriction, f"{xs}maxInclusive")
            maxInclusive.attrib.update({'value': str(max_value)})
            
    # Case ids are not int
    else:
        enum_ids_checked = [str(el) for el in enum_ids_checked]
        restriction.attrib.update({"base": 'xs:string'})
        for value in enum_ids_checked:
            enumeration = SubElement(restriction, f"{xs}enumeration")
            enumeration.attrib.update({'value': value})

In [278]:
def apply_documentation_enum(annotation,enum_name,enums_audit_doc:dict):
    documentation = annotation.find(f'xs:documentation', namespaces=namespaces)
    if documentation is None:
        documentation=Element(f'{xs}documentation')
        annotation.insert(0,documentation)
    if enum_name in enums_audit_doc.keys():
        if documentation.text is not None:
            print(f'La documentation dpe pour enum_{enum_name}_id est écrasée')
        documentation.text = enums_audit_doc[enum_name]
    if len(documentation.text)==0:
        print(f"===WARNING : enum_{enum_name}_id est sans documentation !===")

In [279]:
for enum_name,ids_lib in enums_audit_dict.items():
    enum_ids = list(ids_lib.keys())
    enum_name_xsd = "enum_"+enum_name+"_id"
    for el in root_audit.xpath(f'//xs:element[@name ="{enum_name_xsd}"]', namespaces=namespaces):
        simpletype = el.find('xs:simpleType', namespaces=namespaces)
        if simpletype is not None:
            el.remove(simpletype)
        simpletype = SubElement(el, f"{xs}simpleType")
        apply_restriction_enum(simpletype, enum_ids)
        
        
        annotation = el.find(f'xs:annotation', namespaces=namespaces)
        if annotation is None:
            annotation=Element(f'{xs}annotation')
            el.insert(0,annotation)
        appinfo = annotation.find(f'xs:appinfo', namespaces=namespaces)
        if appinfo is None:
#             annotation.remove(appinfo)
            appinfo = SubElement(annotation, f"{xs}appinfo")
        appinfo.text = '\n' + json.dumps(enums_audit_dict[enum_name], ensure_ascii=False, indent=4) + '\n'
        
        apply_documentation_enum(annotation,enum_name,enums_audit_doc)

In [280]:
et_audit.write(audit_reg_path, pretty_print=True, xml_declaration=True, encoding='utf-8')

# Mise à jour de appinfo source

In [281]:
all_appinfo = list(audit.iterfind('*//xs:appinfo', namespaces=namespaces))

In [282]:
for appinfo in all_appinfo:
    if 'source' in appinfo.attrib.keys():
        source = appinfo.attrib['source']
        # Case when the path starts with 'dpe'
        if source[0:3] == 'dpe':
            # replace 'dpe' by 'audit/logement_collection'
            new_audit_source = 'audit/logement_collection' + source[3:]
            appinfo.attrib.update({'source': new_audit_source})

In [283]:
# appinfo_no_source = []
# for appinfo in all_appinfo:
#     if 'source' not in appinfo.attrib.keys():
#         appinfo_no_source.append(appinfo)

In [284]:
# Code to create source in appinfo when missing.
for appinfo in all_appinfo:
    if 'source' not in appinfo.attrib.keys(): 
        element = appinfo.getparent().getparent()

        el_name = copy.copy(element.attrib['name'])

        all_borthers_appinfo = [el.find('xs:annotation/xs:appinfo',namespaces=namespaces) for el in element.getparent().getchildren() if el.find('xs:annotation/xs:appinfo',namespaces=namespaces) is not None]

        source_brother = [appinfo.attrib['source'] for appinfo in all_borthers_appinfo if ('source' in appinfo.attrib.keys())][0]

        name_to_replace = source_brother.split('/')[-1]

        source_path = source_brother.replace(name_to_replace,el_name)

        appinfo.attrib.update({'source': source_path})

In [285]:
et_audit.write(audit_reg_path, pretty_print=True, xml_declaration=True, encoding='utf-8')

# Validation du xml par le xsd

In [286]:
# path_xsd_audit = r'..\modele_donnee\audit_energetique'
# xsd_path = os.path.join(path_xsd_audit,'audit_reg_test.xsd')
xml_path = os.path.join(path_xsd_audit,'exemples_metier','xml_version_0.xml')

xmlschema_doc = etree.parse(audit_reg_path)
xmlschema = etree.XMLSchema(xmlschema_doc)
xml_doc = etree.parse(xml_path)

xmlschema.validate(xml_doc)

False

In [287]:
xmlschema.error_log

file:/D:/Developpement/observatoire-dpe/modele_donnee/audit_energetique/exemples_metier/xml_version_0.xml:100:0:ERROR:SCHEMASV:SCHEMAV_ELEMENT_CONTENT: Element 'logement': Missing child element(s). Expected is one of ( production_elec_enr, etape_travaux ).
file:/D:/Developpement/observatoire-dpe/modele_donnee/audit_energetique/exemples_metier/xml_version_0.xml:706:0:ERROR:SCHEMASV:SCHEMAV_ELEMENT_CONTENT: Element 'logement': Missing child element(s). Expected is one of ( production_elec_enr, etape_travaux ).
file:/D:/Developpement/observatoire-dpe/modele_donnee/audit_energetique/exemples_metier/xml_version_0.xml:1312:0:ERROR:SCHEMASV:SCHEMAV_ELEMENT_CONTENT: Element 'logement': Missing child element(s). Expected is one of ( production_elec_enr, etape_travaux ).
file:/D:/Developpement/observatoire-dpe/modele_donnee/audit_energetique/exemples_metier/xml_version_0.xml:1918:0:ERROR:SCHEMASV:SCHEMAV_ELEMENT_CONTENT: Element 'logement': Missing child element(s). Expected is one of ( producti