Extract the imported zip file

In [62]:
import zipfile
import os

zip_path = os.path.join('cmt-files/imported-data', f'data-data.zip')
extract_dir = 'cmt-files/imported-data'

# Check if the file exists
if not os.path.exists(zip_path):
    print(f"Error: The file '{zip_path}' does not exist.")
else:
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)
        print(f"Extracted files to '{extract_dir}'")

Error: The file 'cmt-files/imported-data\data-data.zip' does not exist.


To convert data.xml into a Dataframe

In [None]:
import pandas as pd
import xml.etree.ElementTree as ET
from pathlib import Path

# Define the XML file path
data_file = Path("cmt-files/imported-data/data.xml")

# Parse the XML file and extract fields with their parent record IDs
def parse_xml_with_all_attributes(file_path):
    records = []
    tree = ET.parse(file_path)
    root = tree.getroot()

    # Extract the name attribute from the entity element
    table_name = root.find(".//entity").get("name") if root.find(".//entity") is not None else None

    # Iterate over each <record> element
    for record in root.findall(".//record"):
        record_id = record.get("id")  # Extract the record ID
        # Iterate over each <field> within the <record>
        for field in record.findall("field"):
            field_data = {key: field.get(key) for key in field.keys()}  # Extract all field attributes
            field_data["record_id"] = record_id  # Add the parent record ID

            # Check if lookupentity attribute exists
            if field.get("lookupentity"):
                # Add lookupentityname as a separate row
                records.append({
                    "record_id": record_id,
                    "name": field.get("name") + "_name",
                    "value": field.get("lookupentityname")
                })

            records.append(field_data)

    return pd.DataFrame(records), table_name

# Parse the XML and create the DataFrame
df, table_name = parse_xml_with_all_attributes(data_file)

# Pivot the DataFrame
df_pivoted = df.pivot(index="record_id", columns="name", values="value")

# Reset the index if needed
df_pivoted.reset_index(inplace=True)

# Save the DataFrame to a Parquet file
if table_name:
    parquet_file = f"cmt-files/imported-data/{table_name}.parquet"
    df_pivoted.to_parquet(parquet_file, engine="pyarrow")
    print(f"DataFrame saved to {parquet_file}")
else:
    print("Table name is not available. DataFrame not saved.")

# Display the pivoted DataFrame and table name
print(f"Table Name: {table_name}")
print(df_pivoted)

Create a difference file

In [10]:
import pandas as pd
from pathlib import Path

# Define paths for the imported and formatted data folders
imported_data_path = Path(f"cmt-files/imported-data/{table_name}.parquet")
formatted_data_path = Path(f"cmt-files/formatted-data/{table_name}.parquet")
difference_data_path = Path(f"cmt-files/difference-data/{table_name}.parquet")
print(imported_data_path)
print(formatted_data_path)

# Load the DataFrames from the Parquet files
if imported_data_path.exists() and formatted_data_path.exists():
    imported_df = pd.read_parquet(imported_data_path)
    formatted_df = pd.read_parquet(formatted_data_path)

    # Ensure both DataFrames have a 'modifiedon' column
    if 'modifiedon' in imported_df.columns and 'modifiedon' in formatted_df.columns:
        # Convert 'modifiedon' columns to datetime for comparison
        imported_df['modifiedon'] = pd.to_datetime(imported_df['modifiedon'])
        formatted_df['modifiedon'] = pd.to_datetime(formatted_df['modifiedon'])

        # Find records in imported_df that are newer than the latest in formatted_df
        latest_modifiedon = formatted_df['modifiedon'].max()
        new_records_df = imported_df[imported_df['modifiedon'] > latest_modifiedon]

        # Save the new records to the difference-data folder
        new_records_df.to_parquet(difference_data_path, engine="pyarrow")
        print(f"New records saved to {difference_data_path}")
    else:
        print("Error: 'modifiedon' column is missing in one of the DataFrames.")
else:
    print("Error: One or both Parquet files do not exist.")

cmt-files\imported-data\account.parquet
cmt-files\formatted-data\account.parquet
New records saved to cmt-files\difference-data\account.parquet


Create an export file

In [None]:
import pandas as pd
from pathlib import Path
import xml.etree.ElementTree as ET
from xml.dom import minidom
from datetime import datetime

# Define paths for the formatted data and export
formatted_data_path = Path(f"cmt-files/formatted-data/{table_name}.parquet")
schema_file_path = Path(f"cmt-files/schema/{table_name}-schema.xml")
export_data_path = Path(f"cmt-files/data-to-be-exported/export.parquet")
export_xml_path = Path(f"cmt-files/data-to-be-exported/data.xml")
schema_export_path = Path(f"cmt-files/data-to-be-exported/data_schema.xml")

# Load the formatted DataFrame
formatted_df = pd.read_parquet(formatted_data_path)

# Parse the schema file to get field attributes and entity details
def parse_schema(schema_path):
    schema_tree = ET.parse(schema_path)
    schema_root = schema_tree.getroot()
    field_attributes = {}

    # Extract field attributes
    for field in schema_root.findall(".//field"):
        name = field.get("name")
        if name:
            field_attributes[name] = {key: field.get(key) for key in field.keys()}

    # Extract entity attributes
    entity_element = schema_root.find(".//entity")
    entity_name = entity_element.get("name") if entity_element is not None else "unknown_entity"
    display_name = entity_element.get("displayname") if entity_element is not None else "Unknown Display Name"

    return field_attributes, entity_name, display_name

# Precompute lookupentityname_map using schema
def precompute_lookupentityname_map(df, field_attributes):
    lookupentityname_map = {}
    for col in df.columns:
        if col.endswith("_name"):
            base_name = col[:-5]  # Remove '_name'
            if base_name in field_attributes:
                lookupentityname_map[base_name] = col
    return lookupentityname_map

# Load schema field attributes and entity details
field_attributes, entity_name, display_name = parse_schema(schema_file_path)
lookupentityname_map = precompute_lookupentityname_map(formatted_df, field_attributes)

# Save the DataFrame to the exported-data folder as Parquet
formatted_df.to_parquet(export_data_path, engine="pyarrow")
print(f"Exported data saved to {export_data_path}")

# Convert the DataFrame to XML and save it
def dataframe_to_custom_xml(df, entity_name, display_name, timestamp, field_attributes, lookupentityname_map):
    entities = ET.Element("entities", attrib={
        "xmlns:xsd": "http://www.w3.org/2001/XMLSchema",
        "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
        "timestamp": timestamp
    })
    entity = ET.SubElement(entities, "entity", attrib={"name": entity_name, "displayname": display_name})
    records = ET.SubElement(entity, "records")

    for _, row in df.iterrows():
        record_elem = ET.SubElement(records, "record")

        for col, val in row.items():
            if not col.endswith("_name") and pd.notna(val):
                # Add other fields as child elements
                field_elem = ET.SubElement(record_elem, "field", attrib={"name": col, "value": str(val)})

                # Add attributes from the schema if they exist
                if col in field_attributes:
                    for attr_key, attr_val in field_attributes[col].items():
                        if attr_key not in ["name", "value"]:  # Avoid overwriting existing attributes
                            field_elem.set(attr_key, attr_val)

                # If the field name matches a base name, add the lookupentityname attribute
                if col in lookupentityname_map:
                    field_elem.set("lookupentityname", str(row[lookupentityname_map[col]]) if pd.notna(row[lookupentityname_map[col]]) else "")

    return ET.ElementTree(entities)

# Get the current timestamp
timestamp = datetime.utcnow().isoformat() + "Z"

# Convert and save as XML
xml_tree = dataframe_to_custom_xml(
    formatted_df,
    entity_name=entity_name,
    display_name=display_name,
    timestamp=timestamp,
    field_attributes=field_attributes,
    lookupentityname_map=lookupentityname_map
)

# Pretty print XML and save it
def pretty_print_xml(tree, file_path):
    rough_string = ET.tostring(tree.getroot(), encoding="utf-8")
    reparsed = minidom.parseString(rough_string)
    with open(file_path, "w", encoding="utf-8") as f:
        f.write(reparsed.toprettyxml(indent="  "))

pretty_print_xml(xml_tree, export_xml_path)
print(f"Exported data saved to {export_xml_path}")

# Save the schema file to the data-to-be-exported folder
schema_tree = ET.parse(schema_file_path)
schema_tree.write(schema_export_path, encoding="utf-8", xml_declaration=True)
print(f"Schema file saved to {schema_export_path}")

In [3]:
import shutil
from pathlib import Path

# Define the paths of the files and the target folder
content_types_path = Path("cmt-files/data-to-be-exported/[Content_Types].xml")
data_schema_path = Path("cmt-files/data-to-be-exported/data_schema.xml")
data_xml_path = Path("cmt-files/data-to-be-exported/data.xml")
data_folder_path = Path("cmt-files/data-to-be-exported/data")

# Create the target folder if it doesn't exist
data_folder_path.mkdir(parents=True, exist_ok=True)

# Copy the files into the target folder
shutil.copy(content_types_path, data_folder_path / "[Content_Types].xml")
shutil.copy(data_schema_path, data_folder_path / "data_schema.xml")
shutil.copy(data_xml_path, data_folder_path / "data.xml")

print(f"Files copied to {data_folder_path}")

Files copied to cmt-files\data-to-be-exported\data


In [4]:
import zipfile
from pathlib import Path

# Define the path of the folder to be compressed and the output zip file
data_folder_path = Path("cmt-files/data-to-be-exported/data")
data_zip_path = Path("cmt-files/data-to-be-exported/data.zip")

# Create a zip file and add the contents of the data folder
with zipfile.ZipFile(data_zip_path, 'w') as zipf:
    for file in data_folder_path.rglob("*"):
        zipf.write(file, arcname=file.relative_to(data_folder_path))

print(f"Folder {data_folder_path} compressed into {data_zip_path}")

Folder cmt-files\data-to-be-exported\data compressed into cmt-files\data-to-be-exported\data.zip


In [None]:
import os
import zipfile
from pathlib import Path

# Define the path of the folder to be compressed and the output zip file
data_folder_path = Path("cmt-files/data-to-be-exported/data")
data_zip_path = Path("cmt-files/data-to-be-exported/data.zip")

# Ensure the folder exists
if not data_folder_path.exists() or not data_folder_path.is_dir():
    print(f"Error: Folder {data_folder_path} does not exist or is not a directory.")
else:
    # Create a zip file and add the contents of the data folder
    with zipfile.ZipFile(data_zip_path, 'w') as zipf:
        for root, _, files in os.walk(data_folder_path):
            for file in files:
                file_path = Path(root) / file
                zipf.write(file_path, arcname=file_path.relative_to(data_folder_path))

    print(f"Folder {data_folder_path} compressed into {data_zip_path}")