In [11]:
from typing import List, Dict, Any
import pandas as pd
from pathlib import Path

In [14]:
from typing import List, Dict, Any
import pandas as pd
from pathlib import Path

class Principle7Extractor:
    def __init__(self):
        self.ELEMENT_COL = 'Element Name'
        self.UNIT_COL = 'Unit'
        self.VALUE_COL = 'Fact Value'
        self.COMPANY_ELEMENT = 'NameOfTheCompany'
        self.AFFILIATION_COUNT_FIELD = 'NumberOfAffiliationsWithTradeAndIndustryChambersOrAssociations'
        self.TRADE_ASSOC_NAME_FIELD = 'NameOfTheTradeAndIndustryChambersOrAssociations'
        self.TRADE_ASSOC_REACH_FIELD = 'ReachOfTradeAndIndustryChambersOrAssociations'
        self.ANTI_COMPETITIVE_PREFIX = 'D_IssuesRelatedToAntiCompetitiveAxis'
        self.PUBLIC_POLICY_PREFIX = 'D_PublicPolicyAxis'
        self.PUBLIC_POLICY_FIELDS = [
            'PublicPolicyAdvocated',
            'MethodResortedForSuchAdvocacy',
            'WhetherInformationAvailableInPublicDomain',
            'FrequencyOfReviewByBoard',
            'WebLinkPublicPolicyPositionAdvocated'
        ]

    def _get_company_name(self, df: pd.DataFrame) -> str:
        row = df[df[self.ELEMENT_COL] == self.COMPANY_ELEMENT]
        return str(row[self.VALUE_COL].values[0]).strip() if not row.empty else "Unknown Company"

    def _get_value(self, df: pd.DataFrame, element: str) -> Any:
        row = df[df[self.ELEMENT_COL] == element]
        return row[self.VALUE_COL].values[0] if not row.empty else None

    def _extract_affiliations(self, df: pd.DataFrame) -> Dict[str, Any]:
        result = {}
        value = self._get_value(df, self.AFFILIATION_COUNT_FIELD)
        try:
            result['Number of Affiliations'] = int(float(value))
        except:
            result['Number of Affiliations'] = None

        for i in range(1, 11):
            name_row = df[(df[self.ELEMENT_COL] == self.TRADE_ASSOC_NAME_FIELD) &
                          (df[self.UNIT_COL] == f'D_IndustryChambersOrAssociations{i}')]
            reach_row = df[(df[self.ELEMENT_COL] == self.TRADE_ASSOC_REACH_FIELD) &
                           (df[self.UNIT_COL] == f'D_IndustryChambersOrAssociations{i}')]

            result[f'Affiliation {i} Name'] = name_row[self.VALUE_COL].values[0] if not name_row.empty else ""
            result[f'Affiliation {i} Reach'] = reach_row[self.VALUE_COL].values[0] if not reach_row.empty else ""

        return result

    def _extract_dynamic_block(self, df: pd.DataFrame, unit_prefix: str, fields: List[str]) -> Dict[str, Any]:
        result = {}
        df = df.dropna(subset=[self.UNIT_COL])
        units = df[self.UNIT_COL].dropna().unique()
        matching_units = [u for u in units if isinstance(u, str) and u.startswith(unit_prefix)]

        for idx, unit in enumerate(matching_units, start=1):
            block = df[df[self.UNIT_COL] == unit]
            for field in fields:
                row = block[block[self.ELEMENT_COL] == field]
                if not row.empty:
                    result[f'{field}_{idx}'] = row[self.VALUE_COL].values[0]
        return result

    def _process_single_file(self, file_path: Path) -> pd.DataFrame:
        try:
            df = pd.read_excel(file_path, engine="openpyxl")
        except Exception as e:
            print(f"Failed to load {file_path.name}: {e}")
            return pd.DataFrame()

        if df.empty:
            return pd.DataFrame()

        df.columns = [col.strip() for col in df.columns]
        company_name = self._get_company_name(df)

        result = {'Company Name': company_name}
        result.update(self._extract_affiliations(df))
        result.update(self._extract_dynamic_block(df, self.ANTI_COMPETITIVE_PREFIX, [
            'NameOfAuthority',
            'BriefOfAnyIssuesRelatedToAntiCompetitiveConductByTheEntity',
            'CorrectiveActionTakenForAnyIssuesRelatedToAntiCompetitiveConductByTheEntity'
        ]))
        result.update(self._extract_dynamic_block(df, self.PUBLIC_POLICY_PREFIX, self.PUBLIC_POLICY_FIELDS))

        return pd.DataFrame([result])

    def process_directory(self, directory: str) -> pd.DataFrame:
        all_records = []
        dir_path = Path(directory)
        if not dir_path.exists():
            print(f"Directory '{directory}' not found.")
            return pd.DataFrame()

        for file in dir_path.iterdir():
            if file.suffix.lower() in [".xlsx", ".xls"]:
                df = self._process_single_file(file)
                if not df.empty:
                    all_records.append(df)

        return pd.concat(all_records, ignore_index=True) if all_records else pd.DataFrame()

    def export_to_excel(self, df: pd.DataFrame, output_file: str):
        if df.empty:
            print("No data to export.")
            return
        try:
            df.to_excel(output_file, index=False)
            print(f"\n✅ Exported to '{output_file}' successfully.")
        except Exception as e:
            print(f"Export failed: {e}")


In [None]:
extractor = Principle7Extractor()
df = extractor.process_directory("excel_files")
extractor.export_to_excel(df, "p7_all.xlsx")
