## 0.导入库

In [1]:
import os
import csv
import re
import chardet
import pandas as pd
from tqdm import tqdm

# 原始数据目录
folder_path = './J'
# csv文件目录
csv_path = './csv'
#excel文件目录

## 1.一级目录转换

### Informations

In [2]:
# 初始化一个空的DataFrame来存储结果
results = pd.DataFrame(columns=[
    'id', 'Type', 'Dates', 'Observatory', 
    'Reference Frame', 'Centre of Frame', 'Epoch of Equinox', 
    'Time Scale', 'Reduction', 'Coordinates', 'Diffraction', 'Receptor', 
    'Telescope', 'Observers'
])

# 初始化字典来记录成功和失败的项
status = {
    'success': [],
    'failures': {}
}

# 遍历文件夹内的所有HTML文件
for filename in os.listdir(folder_path):
    if filename.endswith('.html'):
        file_path = os.path.join(folder_path, filename)
        file_id = os.path.splitext(filename)[0]  # 获取不包含后缀的文件名
        
        # 检测文件编码
        with open(file_path, 'rb') as file:
            raw_data = file.read()
            result = chardet.detect(raw_data)
            encoding = result['encoding']

        # 读取文件内容
        with open(file_path, 'r', encoding=encoding) as file:
            html_content = file.read()
        
        # 初始化一个字典来存储当前文件的结果
        current_file_results = {'id': file_id}
        
        # 使用正则表达式提取Contents.部分的type, dates, observatory
        contents_patterns = {
            'Type': r'\btype:\s*(.+)',
            'Dates': r'\bdates:\s*(.+)',
            'Observatory': r'\bobservatory:\s*(.+)'
        }
        
        # 提取Contents.部分的内容
        for key, pattern in contents_patterns.items():
            regex = re.compile(pattern)
            match = regex.search(html_content)
            if match:
                current_file_results[key] = match.group(1).strip()
                status['success'].append(filename)
            else:
                if filename not in status['failures']:
                    status['failures'][filename] = []
                status['failures'][filename].append(key)
        
        # 提取Informations.到Comments.或Format.之间的全部内容
        informations_pattern = r'Informations\..*?(?=\b(Comments|Format)\.)'
        informations_match = re.search(informations_pattern, html_content, re.DOTALL)
        if informations_match:
            informations_content = informations_match.group(0).strip()
            
            # 提取Informations.部分的各个字段
            fields_patterns = {
                'Reference Frame': r'\breference frame:\s*(.+)',
                'Centre of Frame': r'\bcentre of frame:\s*(.+)',
                'Epoch of Equinox': r'\bepoch of equinox:\s*(.+)',
                'Time Scale': r'\btime scale:\s*(.+)',
                'Reduction': r'\breduction:\s*(.+)',
                'Coordinates': r'\bcoordinates:\s*(.+)',
                'Diffraction': r'\bdiff. refraction:\s*(.+)',
                'Receptor': r'\breceptor:\s*(.+)',
                'Telescope': r'\btelescope:\s*(.+)',
                'Observers': r'\bobservers:\s*(.+)'
            }
            
            for field, pattern in fields_patterns.items():
                regex = re.compile(pattern)
                match = regex.search(informations_content)
                if match:
                    current_file_results[field] = match.group(1).strip()
                else:
                    current_file_results[field] = None  # 如果字段不存在，则设为None
            
            # 处理可能有多个relative to的情况
            relative_pattern = r'\brelative to:\s*(.+)'
            relative_matches = re.findall(relative_pattern, informations_content)
            if relative_matches:
                current_file_results['Relative To'] = '; '.join(relative_matches)
        
        else:
            if filename not in status['failures']:
                status['failures'][filename] = []
            status['failures'][filename].append('Informations')
        
        # 将提取的内容添加到DataFrame中
        results = results.append(current_file_results, ignore_index=True)

# 将结果保存到CSV文件
results.to_csv('Informations.csv', index=False)

# 打印成功和失败的项
print(f"成功提取 {len(status['success'])} 项")
if status['failures']:
    print("失败的项有：")
    for filename, missing_keys in status['failures'].items():
        print(f"文件 {filename} 缺失以下信息：{', '.join(missing_keys)}")

成功提取 687 项


### 坐标系统格式

In [None]:
# 读取CSV文件
df = pd.read_csv('Informations.csv')

# 检查'Epoch of Equinox'列是否存在
if 'Epoch of Equinox' in df.columns:
    # 获取'Epoch of Equinox'列的所有值
    time_scale_values = df['Epoch of Equinox'].unique()
    
    # 将所有值连接成一个字符串，每个值占一行
    values_as_string = '\n'.join(time_scale_values)
    
    print("'Epoch of Equinox'列的所有值：")
    print(values_as_string + '\n')
else:
    print("CSV文件中没有找到'Epoch of Equinox'列。")

### 时间系统格式

In [None]:
# 读取CSV文件
df = pd.read_csv('Informations.csv')

# 检查'Time Scale'列是否存在
if 'Time Scale' in df.columns:
    # 获取'Time Scale'列的所有值
    time_scale_values = df['Time Scale'].unique()
    
    # 将所有值连接成一个字符串，每个值占一行
    values_as_string = '\n'.join(time_scale_values)
    
    print("'Time Scale'列的所有值：")
    print(values_as_string + '\n')
else:
    print("CSV文件中没有找到'Time Scale'列。")

## 2.二级目录转换

### TXT观测数据转成成CSV

In [27]:
# 确保CSV目录存在
if not os.path.exists(csv_path):
    os.makedirs(csv_path)

def detect_encoding(file_path):
    with open(file_path, 'rb') as file:
        return chardet.detect(file.read())['encoding']

def detect_delimiter(file_content):
    # 尝试识别常见的分隔符
    patterns = {
        ',': re.compile(r'[^,]+'),
        '\t': re.compile(r'[^\t]+'),
        ' ': re.compile(r'[^\s]+')
    }
    max_match_count = 0
    detected_delimiter = None
    for delimiter, pattern in patterns.items():
        matches = len(pattern.findall(file_content))
        if matches > max_match_count:
            max_match_count = matches
            detected_delimiter = delimiter
    return detected_delimiter

# 记录失败的项目
failed_files = []

# 遍历文件夹中的所有TXT文件
txt_files = [f for f in os.listdir(folder_path) if f.endswith(".txt")]
for filename in tqdm(txt_files, desc='Processing TXT files'):
    # 构建完整的文件路径
    file_path = os.path.join(folder_path, filename)
    
    # 检测文件编码
    encoding = detect_encoding(file_path)
    
    # 读取TXT文件内容
    with open(file_path, 'r', encoding=encoding, errors='ignore') as file:
        file_content = file.read()
        
        # 检测分隔符
        delimiter = detect_delimiter(file_content)
        if delimiter is None:
            failed_files.append(filename)
            continue
        
        # 分割数据，忽略多余的空格
        lines = file_content.split('\n')
        csv_lines = []
        for i, line in enumerate(lines):
            if line.strip():  # 忽略空行
                if delimiter == '\t':  # 处理制表符分隔的数据
                    fields = line.split('\t')
                elif delimiter == ' ':  # 处理空格分隔的数据
                    fields = line.split()
                else:  # 处理逗号分隔的数据
                    fields = line.split(',')
                # 在每个字段后面加上]
                fields_with_brackets = [field + ']' for field in fields]
                csv_lines.append(fields_with_brackets)
        
        # 构建CSV文件的完整路径
        csv_filename = filename.replace('.txt', '.csv')
        csv_file_path = os.path.join(csv_path, csv_filename)
        
        # 将内容写入CSV文件
        try:
            with open(csv_file_path, 'w', encoding='utf-8', newline='') as csv_file:
                csv_writer = csv.writer(csv_file)
                
                # 写入表头
                headers = [f"C{i+1}" for i in range(len(csv_lines[0]))]
                csv_writer.writerow(headers)
                
                # 写入数据
                for row in csv_lines:
                    csv_writer.writerow(row)
        except Exception as e:
            failed_files.append(filename)
            print(f"Failed to write {filename}: {e}")

if failed_files:
    print("Failed to convert files:", failed_files)
else:
    print("All files were converted successfully.")


Processing TXT files: 100%|██████████| 229/229 [00:00<00:00, 276.98it/s]

All files were converted successfully.





### 提取各目录列

In [1]:
# 存储CSV文件名和列数的列表
csv_info = []

# 遍历文件夹中的所有CSV文件
csv_files = [f for f in os.listdir(csv_path) if f.endswith(".csv")]

for filename in csv_files:
    file_path = os.path.join(csv_path, filename)
    
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            csv_reader = csv.reader(file)
            # 尝试获取第一行数据来确定列数
            first_row = next(csv_reader)
            num_columns = len(first_row)
            csv_info.append([(filename.split("."))[0], num_columns])
    except Exception as e:
        print (f"Failed to read {filename}: {e}")

# 将结果写入到输出CSV文件
try:
    with open('column_info.csv', 'w', encoding='utf-8', newline='') as output_file:
        writer = csv.writer(output_file)
        writer.writerow(['Filename', 'Number of Columns'])  # 写入表头
        for info in csv_info:
            writer.writerow(info)
    print(f"Column information has been written to column_info.csv")
except Exception as e:
    print(f"Failed to write to column_info.csv: {e}")

NameError: name 'os' is not defined

## 3.三级数据整合