In [2]:
import docx
from docx import Document
import re
import pandas as pd
import numpy as np
import os

In [None]:
def unify_punctuation_in_list(text_list):
    replacements = {
        '车道道宽(m)':'车行道宽(m)',
        '桥梁桩号':'桥位桩号',
        '引道线形(m)':'引道线形',
        '引道线型':'引道线形',
        '存档案':'存档号',
        '桥下净空(m)':'桥下净高(m)',
        '历次维修资料':'历史维修资料',
        '机动车道宽':'车行道宽',
        'f支座形式':'支座形式',
        '（': '(',
        '）': ')',
        '跨经(m)':'跨径(m)',
        '线路编号':'路线编号',
        '线路名称':'路线名称',
        '建成年月':'建成年限',
        '引道路面宽':'引道路面宽(m)',
        '线路名称':'路线名称',
        '线路名称':'路线名称',
        
    }
    
    unified_list = []
    for text in text_list:
        for ch, replacement in replacements.items():
            text = text.replace(ch, replacement)
        unified_list.append(text)
    
    return unified_list


def remove_blanks(text:str):
    return re.sub(r'\s', '', text)


def remove_punctuations_and_blanks(text:str):
    pattern = re.compile(r'[\u4e00-\u9fffA-Za-z0-9]+')
    words = pattern.findall(text)
    return ''.join(words)


def find_doc_files(directory:str):
    doc_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".docx"):
                doc_files.append(os.path.join(root, file))
    return doc_files

 
def find_doc_files_without_walking(directory: str):
    doc_files = []
    for entry in os.listdir(directory):
        full_path = os.path.join(directory, entry)
        if os.path.isfile(full_path) and entry.endswith(".docx"):
            doc_files.append(full_path)
    return doc_files


def clean_row(row_data:list):
    cleaned_row = []
    shang = []
    previous_text = None
    
    if row_data[0] in ["上", "部", "结","构","上部结构"]:
        shang.append(row_data)
        
    else:   
        for text in row_data:
            if text != previous_text:
                cleaned_row.append(text)
                previous_text = text
    return cleaned_row,shang


def read_data_from_row(row_data:list):
    result = {}
    if len(row_data)%3 == 0:
        for i in range(len(row_data)//3):
            result[row_data[3*i+1]] = [row_data[3*i+2]]
    elif len(row_data) == 4:
        result[row_data[2]] = [row_data[3]]
    return result


def read_data_from_shangbujiegou(row_data:list):  
    result = {}
    targets = [['下部结构','下部结构','下部结构','下部结构'],['下','部','结','构']]
    for target in targets:
        if target in row_data:
            index = row_data.index(target)
    for i in range(index-3):
        result[f'{row_data[2]}{i+1}'] = "@".join(row_data[i+3])
    for j in range(len(row_data)-index-3):
        result[f'{row_data[index+2]}{j+1}'] = "@".join(row_data[index+j+3])        
    return result


def clean_cell(cell):
    if isinstance(cell, list) and len(cell) > 0:
        # 如果是非空列表，去掉列表符号和内容的引号
        return str(cell[0]).strip("'[]")
    elif isinstance(cell, list):
        # 如果是空列表，返回空字符串
        return ''
    else:
        # 如果不是列表，直接返回内容
        return str(cell).strip("'[]")

In [None]:
def extract_static_1(document: docx.Document)->pd.DataFrame:
    """
    Function responsible for extracting the static data table from inspection reports
    This function works for reports in 2023 and 2022, where there is one large table containing all the information

    Args:
        document (docx.Document): inspection report document

    Returns:
        extracted_data (pd.DataFrame): extracted data, in which 1 row represents 1 bridge
    """

    extracted_data = pd.DataFrame([])

    for table in document.tables:
        if "桥梁所处行政区划代码" in remove_blanks(table.rows[0].cells[0].text):
            table_content = {}
            for row in table.rows:
                row_content = clean_row([remove_blanks(cell.text) for cell in row.cells])
                
                table_content.update(read_data_from_row(row_content))
            table_content = pd.DataFrame(table_content)
            extracted_data = pd.concat([extracted_data, table_content], ignore_index=True, axis=0)

    return extracted_data


def extract_static_2(document: docx.Document)->pd.DataFrame:
    """
    Function responsible for extracting the static data table from inspection reports
    This function works for reports in 2021 S32,同三,沪芦, where there is one large table containing all the information

    Args:
        document (docx.Document): inspection report document

    Returns:
        extracted_data (pd.DataFrame): extracted data, in which 1 row represents 1 bridge
    """

    columns_to_delete = ['竣工', '工程范围','质量评定','施工单位','主要负责人','填卡人','填卡日期']
    extracted_data = pd.DataFrame([])

    for table in document.tables:
        if "行政识别数据" in remove_blanks(table.rows[0].cells[0].text):
            table_content = {}
            for row in table.rows:
                row_content = clean_row([remove_blanks(cell.text) for cell in row.cells])
                print(f'row_content:{row_content}')
                table_content.update(read_data_from_row(row_content))
                print(f'table_content:{table_content}')
            table_content = pd.DataFrame(table_content)
            table_content.drop(columns=columns_to_delete, inplace=True)
            extracted_data = pd.concat([extracted_data, table_content], ignore_index=True, axis=0)

    return extracted_data


def extract_static_3(document: docx.Document) -> pd.DataFrame:
    """
    Function responsible for extracting the static data table from inspection reports

    Args:
        document (docx.Document): inspection report document

    Returns:
        extracted_data (pd.DataFrame): extracted data, where 1 row represents 1 bridge
    """

    extracted_data = pd.DataFrame([])
    one_bridge_data = {}

    for block in document.iter_inner_content():
        if type(block) == docx.text.paragraph.Paragraph:  # if it is a paragraph
            text = remove_punctuations_and_blanks(block.text)
            if "桥梁基本状况卡片" in text:
                # If there is already data in one_bridge_data, add it to extracted_data before starting new bridge
                if one_bridge_data:
                    extracted_data = pd.concat([extracted_data, pd.DataFrame([one_bridge_data])], ignore_index=True)
                    one_bridge_data = {}  # Reset for new bridge
        elif type(block) == docx.table.Table:
            if ("行政识别数据" in remove_blanks(block.rows[0].cells[0].text)) or ("结构技术数据" in remove_blanks(block.rows[0].cells[0].text)):
                vertical_content = []
                for row in block.rows:
                    row_content = clean_row([remove_blanks(cell.text) for cell in row.cells])
                    if row_content[0] in ["上", "部", "结","构"]:  # Collect vertical data
                        vertical_content.append(row_content)
                    one_bridge_data.update(read_data_from_row(row_content))  # Process and update the data
                

    # Add the last bridge's data if any remains
    if one_bridge_data:
        extracted_data = pd.concat([extracted_data, pd.DataFrame([one_bridge_data])], ignore_index=True)

    return extracted_data


def extract_static_4(document: docx.Document) -> pd.DataFrame:
    """
    Function responsible for extracting the static data table from inspection reports
    This function works for reports in 2021、2020、2019、2018, where there is one large table containing all the information

    Args:
        document (docx.Document): inspection report document

    Returns:
        extracted_data (pd.DataFrame): extracted data, where 1 row represents 1 bridge
    """
    
   
    columns_to_delete = ['竣工', '工程范围','质量评定','施工单位']
    target_paragraphs = ['行政识别数据', '结构技术数据','档案资料（全、不全或无）','档案资料(全、不全或无)']
    extracted_data = pd.DataFrame([])      
    table_content = {}
    current_paragraph = None 
    shangbujiegou = []
    cleaned_row = []
    previous_text = None 
    
    for element in document.element.body:
        if element.tag.endswith('tbl') and current_paragraph == None:
            table = docx.table.Table(element, document)
            if ("行政识别数据" in remove_blanks(table.rows[0].cells[0].text)) or ("结构技术数据" in remove_blanks(table.rows[0].cells[0].text)) or ("档案资料（全、不全或无）" in remove_blanks(table.rows[0].cells[0].text)):
                for row in table.rows:
                    row_content,shang = clean_row([remove_blanks(cell.text) for cell in row.cells])
                    if shang == []:
                        pass
                    else:
                        shang[0] = unify_punctuation_in_list(shang[0])
                        shangbujiegou.append(shang)
                    row_content = unify_punctuation_in_list(row_content)
                    print(f"row_content: {row_content}")
                    print(f"shang: {shang}")
                    if row_content:
                       print('进row_content了')
                       table_content.update(read_data_from_row(row_content))
                    elif shang: 
                        print('进shang了')
                        print(shangbujiegou)        
                        if len(shangbujiegou) == 4:  # Collect vertical data 
                            l1 = shangbujiegou[0][0]
                            l2 = shangbujiegou[1][0]
                            l3 = shangbujiegou[2][0]
                            l4 = shangbujiegou[3][0]
                            print(l1)
                            print(l2)
                            print(l3)
                            print(l4)
                            a = list(zip(l1, l2, l3, l4))
                            b = [list(item) for item in a if not any(x == '' for x in item)]
                            print(a)
                            print(b)
                            for text in b:
                                if text != previous_text:
                                    cleaned_row.append(text)
                                    previous_text = text#处理
                            print(cleaned_row)
                            table_content.update(read_data_from_shangbujiegou(cleaned_row))
                            shangbujiegou = []
                            cleaned_row = []
                    else:
                        print('wrong')
                        
                    print(f'table_content: {table_content}')
                if table_content:
                    extracted_data = pd.concat([extracted_data, pd.DataFrame(table_content)], ignore_index=True)
                    extracted_data.drop(columns=columns_to_delete, inplace=True)
                    
                    
        elif element.tag.endswith('p'):
                paragraph = element.xpath(".//text()")
                paragraph_text = ''.join(paragraph).strip()
                for target in target_paragraphs:
                    if target in paragraph_text:
                        current_paragraph = target
                        print(current_paragraph)
                        break                                  
        elif element.tag.endswith('tbl') and current_paragraph: 
            table = docx.table.Table(element, document)
            for row in table.rows:
                row_content,shang = clean_row([remove_blanks(cell.text) for cell in row.cells])
                if shang == []:
                    pass
                else:
                    shang[0] = unify_punctuation_in_list(shang[0])
                    shangbujiegou.append(shang)
                row_content = unify_punctuation_in_list(row_content)
                print(f"row_content: {row_content}")
                print(f"shang: {shang}")
                if row_content:
                    print('进row_content了')
                    table_content.update(read_data_from_row(row_content))
                elif shang:  
                    print('进shang了')
                    print(f'shangbujiegou:{shangbujiegou}')       
                    if len(shangbujiegou) == 4:  # Collect vertical data 
                        l1 = shangbujiegou[0][0]
                        l2 = shangbujiegou[1][0]
                        l3 = shangbujiegou[2][0]
                        l4 = shangbujiegou[3][0]
                        print(l1)
                        print(l2)
                        print(l3)
                        print(l4)
                        a = list(zip(l1, l2, l3, l4))
                        b = [list(item) for item in a if not any(x == '' for x in item)]
                        print(a)
                        print(b)
                        for text in b:
                            if text != previous_text:
                                cleaned_row.append(text)
                                previous_text = text#处理
                        print(cleaned_row)
                        table_content.update(read_data_from_shangbujiegou(cleaned_row))
                        shangbujiegou = []
                        cleaned_row = []
                else:
                    print('wrong')
                print(f'table_content: {table_content}')
            current_paragraph = None                               
            if table_content:
                extracted_data = pd.concat([extracted_data, pd.DataFrame(table_content)], ignore_index=True)
                extracted_data.drop(columns=columns_to_delete, inplace=True)
               
            
        

    return extracted_data

    

In [None]:
column_mapping = {
    '线路编号': '路线编号', 
    '线路名称': '路线名称',
    '建成时间':'建成年限',
    '建成年月':'建成年限',
    '引道路面宽':'引道路面宽(m)',
    '引道总宽（m）':'引道总宽(m)',
    '调制构造':'调制构造物',
    '设计文献':'设计文件',
    '历次维修资料':'历史维修资料',
    '历次维修、加固资料':'历史维修资料',
    '290':'墩台',
    '291':'墩台',
    '300':'形式',
    '301':'形式',
    '310':'材料',
    '311':'材料',
    '桥梁分孔':'桥梁分孔（m）',
    '桥下实际净空':'桥下实际净空(m)',
    '车道宽度':'车道宽度(m)',
    '人行道宽度(m)':'人行道宽度（m）',
    '中央分隔带宽度(m)':'中央分隔带宽度（m）',
    '桥面实际净空(m)':'桥面实际净空（m）',
    '桥下通航等级及标准净空(m)':'桥下通航等级及标准净空（m）',
    '桥下实际净空(m)':'桥下实际净空（m）',
    '引道线形或曲线半径(m)':'引道线形或曲线半径（m）',
    '桥梁全宽(m)':'桥面总宽(m)',
   
    

    # 添加更多列名映射...
}

input_dir = "D:\\statics\\Reports\\2022\\沪莘枫1标-.docx"
output_dir = "D:\\statics\\statics\\沪莘枫1标"
doc_files = find_doc_files_without_walking(input_dir)
for doc_file in doc_files: 
    try:       
        input_file = os.path.abspath(doc_file)
        relative_path = os.path.relpath(doc_file, start=input_dir)
        output_file_path = os.path.join(output_dir, relative_path[0:-4] + 'xlsx')
        document = Document(doc_file)
        extracted_data = extract_static_1(document=document)
        extracted_data.rename(columns=column_mapping, inplace=True)
        for col in ['...','立面照','平面照','定期检查','2类`','1类','3类','','2类','/','桥梁总体照片','桥梁正面照片','桥梁侧面照片','37','较好','下次检测','预应力T梁','桥面总宽(m).1']:
            if col in extracted_data.columns:
                print(doc_file)
                extracted_data = extracted_data.drop(columns = col)
            extracted_data.to_excel(output_file_path)
        
    except Exception as e:
        print(f"error occured at file {doc_file}\nError mesasage {e}")    
    
    

In [None]:
input_dir = "D:\\statics\\Reports\\2020\\20年S32"
output_dir = "D:\\statics\\statics\\S32\\20年S32"
doc_files = find_doc_files(input_dir)

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

for doc_file in doc_files:
    input_file = os.path.abspath(doc_file)
    relative_path = os.path.relpath(doc_file, start=input_dir)
    output_file_path = os.path.join(output_dir, relative_path[0:-4] + 'xlsx')
    document = Document(doc_file)
    extracted_data = extract_static_4(document=document)
    for col in ['定期检测','修复养护、预防养护','立面照','平面照','定期检查','1类','3类','','2类','/','2010','北京市海龙公路工程公司','']:
            if col in extracted_data.columns:
                print(doc_file)
                extracted_data = extracted_data.drop(columns = col)
            cleaned_data = extracted_data.apply(lambda x: x.map(clean_cell))
    
    cleaned_data.to_excel(output_file_path, index=False)
