In [1]:
import os
import sys
import argparse

def validate_directory(path):
    """深度验证目录有效性"""
    if not os.path.exists(path):
        raise NotADirectoryError(f"路径不存在: {path}")
    if not os.path.isdir(path):
        raise NotADirectoryError(f"不是有效目录: {path}")
    if not os.access(path, os.R_OK):
        raise PermissionError(f"无读取权限: {path}")
    return os.path.abspath(path)

def get_target_dir():
    """多途径获取目标目录"""
    # # 方式1：命令行参数
    # parser = argparse.ArgumentParser(description='合并指定目录的代码文件')
    # parser.add_argument('-d', '--directory', type=str, help='目标目录路径')
    # args = parser.parse_args()
    # 
    # if args.directory:
    #     return validate_directory(args.directory)
    # 
    # # 方式2：拖放目录到脚本图标
    # if len(sys.argv) > 1:
    #     drag_path = sys.argv[1]
    #     try:
    #         return validate_directory(drag_path)
    #     except Exception as e:
    #         print(f"拖放路径无效: {e}")
    
    # 方式3：交互式输入
    while True:
        input_path = input("请输入目标目录的完整路径（或拖放目录到这里）: ").strip('"')
        try:
            return validate_directory(input_path)
        except Exception as e:
            print(f"无效路径: {e}\n请重新输入")

def merge_files(target_dir):
    output_path = os.path.join(target_dir, "merged_files.txt")
    
    # 收集目标文件（保持大小写敏感）
    files = []
    for entry in os.scandir(target_dir):
        if entry.is_file() and entry.name.lower().endswith(('.py', '.json')):
            files.append(entry.path)
    
    # 智能排序：先按扩展名分类，再按文件名排序
    files.sort(key=lambda x: (os.path.splitext(x)[1].lower(), os.path.basename(x).lower()))
    
    with open(output_path, 'w', encoding='utf-8') as output:
        for file_path in files:
            # 写入带样式的文件头
            output.write(f"\n{'#' * 80}\n### 文件路径: {file_path}\n{'#' * 80}\n\n")
            
            try:
                # 自动检测编码
                with open(file_path, 'rb') as f:
                    raw_data = f.read()
                    try:
                        content = raw_data.decode('utf-8')
                    except UnicodeDecodeError:
                        content = raw_data.decode('gbk', errors='replace')
                
                # 统一换行符并移除末尾多余空行
                content = content.replace('\r\n', '\n').rstrip()
                output.write(f"{content}\n")
                
            except Exception as e:
                error_msg = f"【读取失败】{type(e).__name__}: {str(e)}"
                output.write(f"{error_msg}\n")
    
    print(f"合并完成！输出文件路径: {os.path.abspath(output_path)}")

if __name__ == '__main__':
    try:
        target_dir = get_target_dir()
        print(f"正在处理目录: {target_dir}")
        merge_files(target_dir)
    except KeyboardInterrupt:
        print("\n操作已取消")
    except Exception as e:
        print(f"严重错误: {str(e)}", file=sys.stderr)
        sys.exit(1)

正在处理目录: D:\竞赛\数据挖掘竞赛\其他参赛者的答案\baseline解答\DeepSeaGLM-main\DeepSeaGLM-main\baseline\mountain_baseline
合并完成！输出文件路径: D:\竞赛\数据挖掘竞赛\其他参赛者的答案\baseline解答\DeepSeaGLM-main\DeepSeaGLM-main\baseline\mountain_baseline\merged_files.txt


In [4]:
import pandas as pd
import os

def extract_status_values(directory):
    for filename in os.listdir(directory):
        filepath = os.path.join(directory, filename)
        # 过滤CSV和Excel文件
        if filename.lower().endswith(('.csv', '.xlsx')):
            print(f"\n文件名：{filename}")
            
            try:
                # 读取文件
                if filename.lower().endswith('.csv'):
                    df = pd.read_csv(filepath)
                else:
                    df = pd.read_excel(filepath, engine='openpyxl')
                
                # 检查status列是否存在
                if 'status' not in df.columns:
                    print("该文件没有status列")
                    continue
                
                # 提取唯一状态值并排除空值
                unique_status = df['status'].dropna().unique()
                
                # 转换并格式化输出
                if len(unique_status) > 0:
                    formatted_status = ' '.join(map(str, unique_status))
                    print(f"状态值：{formatted_status}")
                else:
                    print("status列无有效值")
                    
            except Exception as e:
                print(f"处理文件时发生错误：{str(e)}")

if __name__ == '__main__':
    import sys
    # 获取目录路径（通过命令行参数或手动输入）
    target_dir = input("请输入目录路径：")
    
    # 验证路径有效性
    if not os.path.isdir(target_dir):
        print("错误：提供的路径不是有效目录")
    else:
        extract_status_values(target_dir)



文件名：Ajia_plc_1.csv
状态值：False 关机 开机 A架摆出 缆绳挂妥 征服者出水 征服者起吊 征服者入水 缆绳解除 A架摆回 征服者落座 停电

文件名：device_13_11_meter_1311.csv
状态值：False 折臂吊车关机 折臂吊车开机 小艇检查完毕 小艇入水 小艇落座

文件名：device_13_14_meter_1314.csv
该文件没有status列

文件名：device_13_2_meter_1302.csv
该文件没有status列

文件名：device_13_3_meter_1303.csv
该文件没有status列

文件名：device_1_15_meter_115.csv
该文件没有status列

文件名：device_1_2_meter_102.csv
该文件没有status列

文件名：device_1_3_meter_103.csv
该文件没有status列

文件名：device_1_5_meter_105.csv
该文件没有status列

文件名：Jiaoche_plc_1.csv
该文件没有status列

文件名：Port1_ksbg_1.csv
该文件没有status列

文件名：Port1_ksbg_2.csv
该文件没有status列

文件名：Port1_ksbg_3.csv
该文件没有status列

文件名：Port1_ksbg_4.csv
该文件没有status列

文件名：Port1_ksbg_5.csv
该文件没有status列

文件名：Port2_ksbg_1.csv
该文件没有status列

文件名：Port2_ksbg_2.csv
该文件没有status列

文件名：Port2_ksbg_3.csv
该文件没有status列

文件名：Port2_ksbg_4.csv
该文件没有status列

文件名：Port3_ksbg_10.csv
该文件没有status列

文件名：Port3_ksbg_8.csv
该文件没有status列

文件名：Port3_ksbg_9.csv
状态值：False OFF_DP ON_DP

文件名：Port4_ksbg_7.csv
该文件没有status列

文件名：Port4_ksbg_8.csv
该文件没有statu

In [13]:
import pandas as pd
import os
from datetime import datetime

def parse_time_column(series):
    """增强型时间解析函数"""
    formats = [
        '%Y-%m-%d %H:%M:%S',    # 标准格式
        '%Y/%m/%d %H:%M:%S',    # 斜杠分隔
        '%Y%m%d%H%M%S',         # 紧凑格式
        '%d-%b-%y %H:%M',       # 日期-月份缩写-年份（01-OCT-23 12:30）
        '%m/%d/%Y %I:%M %p',    # 12小时制含AM/PM
        '%Y-%m-%dT%H:%M',       # ISO格式
        '%H:%M %Y-%m-%d',       # 时间在前
        '%Y%m%d_%H%M',          # 带下划线分隔
        '%Y-%m-%d',             # 仅日期
        '%H:%M:%S',             # 仅时间
        '%d.%m.%Y %H.%M'        # 点分隔格式
    ]
    
    # 多格式尝试解析
    for fmt in formats:
        try:
            return pd.to_datetime(series, format=fmt, errors='raise')
        except:
            continue
    
    # 最终尝试自动解析
    return pd.to_datetime(series, errors='coerce')

def find_hour_segments(hours):
    """生成连续小时时间段"""
    if not hours:
        return []
    
    # 转换numpy时间为pandas时间戳
    hours = [pd.Timestamp(h) for h in hours]
    
    segments = []
    start = current = hours[0]
    
    for h in hours[1:]:
        if h == current + pd.Timedelta(hours=1):
            current = h
        else:
            segments.append((start, current))
            start = current = h
    segments.append((start, current))
    return segments


def analyze_hour_ranges(directory):
    for filename in os.listdir(directory):
        filepath = os.path.join(directory, filename)
        if not filename.lower().endswith(('.csv', '.xlsx')):
            continue
            
        print(f"\n{'='*30}\n文件：{filename}")
        
        try:
            # 读取数据
            if filename.endswith('.csv'):
                df = pd.read_csv(filepath)
            else:
                df = pd.read_excel(filepath, engine='openpyxl')
            
            # 检查目标列
            if 'csvTime' not in df.columns:
                print("缺少 csvTime 列")
                continue
                
            # 转换时间列
            time_series = parse_time_column(df['csvTime'])
            valid_times = time_series.dropna()
            
            if valid_times.empty:
                print("没有有效时间数据")
                continue
                
            # 提取整点小时并去重排序
            valid_hours = valid_times.dt.floor('H')
            unique_hours = [pd.Timestamp(h) for h in valid_hours.sort_values().unique()]  # 关键修改
            
            # 生成时间段
            segments = find_hour_segments(unique_hours)
            
            # 格式化输出
            if not segments:
                print("未找到有效时间段")
                continue
                
            print(f"发现 {len(segments)} 个时间段：")
            for i, (start, end) in enumerate(segments, 1):
                start = pd.Timestamp(start)  # 类型保险
                end = pd.Timestamp(end)
                
                date_str = start.strftime("%Y-%m-%d")  # 新增日期
                start_time = start.strftime("%H:%M")
                end_time = end.strftime("%H:%M")
                
                if start == end:
                    print(f"[日期 {date_str}] 时段{i} {start_time}")
                else:
                    duration = (end - start).total_seconds() / 3600 + 1
                    print(f"[日期 {date_str}] 时段{i} {start_time} → {end_time} （持续{duration:.0f}小时）")
                    
        except Exception as e:
            print(f"处理异常：{str(e)}")


if __name__ == '__main__':
    import sys
    target_dir =  input("请输入目录路径：")
    
    if not os.path.exists(target_dir):
        print("路径不存在")
    else:
        analyze_hour_ranges(target_dir)



文件：Ajia_plc_1.csv
发现 93 个时间段：
[日期 2024-05-16] 时段1 16:00 → 20:00 （持续5小时）
[日期 2024-05-16] 时段2 22:00 → 00:00 （持续3小时）
[日期 2024-05-17] 时段3 02:00 → 08:00 （持续7小时）
[日期 2024-05-17] 时段4 10:00 → 14:00 （持续5小时）
[日期 2024-05-17] 时段5 17:00
[日期 2024-05-17] 时段6 19:00 → 20:00 （持续2小时）
[日期 2024-05-17] 时段7 22:00 → 02:00 （持续5小时）
[日期 2024-05-18] 时段8 04:00 → 06:00 （持续3小时）
[日期 2024-05-18] 时段9 08:00 → 09:00 （持续2小时）
[日期 2024-05-18] 时段10 11:00
[日期 2024-05-18] 时段11 14:00 → 19:00 （持续6小时）
[日期 2024-05-18] 时段12 21:00
[日期 2024-05-18] 时段13 23:00 → 00:00 （持续2小时）
[日期 2024-05-19] 时段14 02:00 → 09:00 （持续8小时）
[日期 2024-05-19] 时段15 11:00 → 19:00 （持续9小时）
[日期 2024-05-19] 时段16 21:00 → 05:00 （持续9小时）
[日期 2024-05-20] 时段17 07:00 → 10:00 （持续4小时）
[日期 2024-05-20] 时段18 12:00
[日期 2024-05-20] 时段19 17:00 → 20:00 （持续4小时）
[日期 2024-05-20] 时段20 22:00 → 02:00 （持续5小时）
[日期 2024-05-21] 时段21 04:00 → 05:00 （持续2小时）
[日期 2024-05-21] 时段22 09:00
[日期 2024-05-21] 时段23 11:00 → 13:00 （持续3小时）
[日期 2024-05-21] 时段24 15:00 → 16:00 （持续2小时）
[日期 2024-05-21] 时段25 18:00

验证新增api选择逻辑是否有用

In [1]:
import os
import sys
import json
from datetime import datetime
import pandas as pd
from ai_brain import get_answer

class AnswerVerifier:
    def __init__(self):
        # 测试用例
        self.test_cases = [
            {
                "id": "7",
                "question": "2024/8/23 A架开机的时间点（请以XX:XX输出）？",
                "expected_format": r"^\d{2}:\d{2}$"  # 期望格式：XX:XX
            },
            {
                "id": "8", 
                "question": "2024/8/24 征服者入水时间点（请以XX:XX输出）？",
                "expected_format": r"^\d{2}:\d{2}$"
            },
            {
                "id": "9",
                "question": "2024/8/24 揽绳解除的时间点（请以XX:XX输出）？",
                "expected_format": r"^\d{2}:\d{2}$"
            },
            {
                "id": "10",
                "question": "2024/8/24 A架摆回的时间点（请以XX:XX输出）？",
                "expected_format": r"^\d{2}:\d{2}$"
            },
            {
                "id": "11",
                "question": "2024/8/24 A架摆出的时间点（请以XX:XX输出）？",
                "expected_format": r"^\d{2}:\d{2}$"
            },
            {
                "id": "12",
                "question": "2024/8/24 8:45 ~ 8:55之间，发生了哪些关键动作？",
                "expected_format": r".*【.*】.*"  # 期望包含【】格式的动作
            },
            {
                "id": "13",
                "question": "2024/8/24 16:00 ~ 16:30之间，发生了哪些关键动作？",
                "expected_format": r".*【.*】.*"
            },
            {
                "id": "15",
                "question": "以征服者落座为标志，2024/8/23 的深海作业A作业结束的时间（请以XX:XX输出）？",
                "expected_format": r"^\d{2}:\d{2}$"
            },
            {
                "id": "19",
                "question": "2024/8/24 下午，A架开机发生在折臂吊车开机之前，是否正确？",
                "expected_format": r".*"  # 任意回答格式
            },
            {
                "id": "99",
                "question": "在2024年8月24日，小艇最后一次落座是什么时候（请以XX:XX输出）？",
                "expected_format": r"^\d{2}:\d{2}$"
            }
        ]
        
    def verify_answer_format(self, answer, expected_format):
        """验证答案格式是否符合预期"""
        import re
        return bool(re.match(expected_format, answer.strip()))

    def run_tests(self):
        """运行所有测试用例并记录结果"""
        results = []
        
        for case in self.test_cases:
            print(f"\n测试问题 {case['id']}:")
            print(f"问题: {case['question']}")
            
            try:
                # 获取答案
                answer = get_answer(case['question'])
                print(f"模型回答: {answer}")
                
                # 验证格式
                format_correct = self.verify_answer_format(answer, case['expected_format'])
                
                result = {
                    "id": case['id'],
                    "question": case['question'],
                    "answer": answer,
                    "format_correct": format_correct,
                    "expected_format": case['expected_format']
                }
                
                results.append(result)
                
                # 打印验证结果
                status = "✓" if format_correct else "✗"
                print(f"{status} 格式验证: {'通过' if format_correct else '失败'}")
                
            except Exception as e:
                print(f"Error processing question {case['id']}: {str(e)}")
                results.append({
                    "id": case['id'],
                    "question": case['question'],
                    "error": str(e)
                })
        
        return results

    def generate_report(self, results):
        """生成测试报告"""
        total = len(results)
        format_passed = sum(1 for r in results if r.get('format_correct', False))
        
        report = {
            "总测试数": total,
            "格式正确数": format_passed,
            "通过率": f"{(format_passed/total*100):.2f}%",
            "详细结果": results
        }
        
        # 保存报告
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        report_file = f'test_report_{timestamp}.json'
        
        with open(report_file, 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
            
        return report

def main():
    print("开始验证答案改进效果...")
    verifier = AnswerVerifier()
    
    print("\n运行测试用例...")
    results = verifier.run_tests()
    
    print("\n生成测试报告...")
    report = verifier.generate_report(results)
    
    print("\n测试总结:")
    print(f"总测试数: {report['总测试数']}")
    print(f"格式正确数: {report['格式正确数']}")
    print(f"通过率: {report['通过率']}")

if __name__ == "__main__":
    main()

所有文件夹均已存在。不再重新预处理数据。
需要预处理数据，请删除文件夹后重新运行。
开始验证答案改进效果...

运行测试用例...

测试问题 7:
问题: 2024/8/23 A架开机的时间点（请以XX:XX输出）？
问题内容： 2024/8/23 A架开机的时间点（请以XX:XX输出）？
API列表： ['get_device_status_by_time_range']


KeyboardInterrupt: 

测试回答的不好的问题

In [5]:
import os
import sys
import json
from datetime import datetime
import pandas as pd
from ai_brain import get_answer

test_questions = [
    "2024/8/24 8:45 ~ 8:55之间，发生了哪些关键动作？",
    "2024/8/24 揽绳解除的时间点（请以XX:XX输出）？",
    "2024/8/24，小艇最后一次落座是什么时候（请以XX:XX输出）？"
]

for q in test_questions:
    result = get_answer(q)
    print(f"问题: {q}")
    print(f"答案: {result}\n")

问题: 2024/8/24 8:45 ~ 8:55之间，发生了哪些关键动作？
答案: 在2024年8月24日8:45到8:55之间，没有记录到任何关键动作。
问题: 2024/8/24 揽绳解除的时间点（请以XX:XX输出）？
答案: 根据提供的信息，2024年8月24日没有记录显示设备进行了“解除”动作。因此，无法提供具体的揽绳解除时间点。如果需要进一步的帮助，请提供更多信息或检查设备记录的准确性。
问题: 2024/8/24，小艇最后一次落座是什么时候（请以XX:XX输出）？
答案: 根据提供的信息，2024年8月24日小艇没有记录到落座的状态。因此，无法提供具体的落座时间。如果您需要进一步的帮助或有其他查询需求，请告知。


测试新的api

In [2]:
import os
import sys
import json
from datetime import datetime
import pandas as pd
from ai_brain import get_answer

def run_single_status_tests():
    """
    运行单一状态时间查询的测试用例
    """
    test_cases = [
        # 基础状态查询
        # {
        #     "id": "7",
        #     "question": "2024/8/23 A架开机的时间点（请以XX:XX输出）？",
        #     "expected_pattern": r'^\d{2}:\d{2}$'  # 期望格式：XX:XX
        # },
        # {
        #     "id": "8",
        #     "question": "2024/8/24 征服者入水时间点（请以XX:XX输出）？",
        #     "expected_pattern": r'^\d{2}:\d{2}$'
        # },
        # {
        #     "id": "9",
        #     "question": "2024/8/24 揽绳解除的时间点（请以XX:XX输出）？",
        #     "expected_pattern": r'^\d{2}:\d{2}$'
        # },
        # {
        #     "id": "10",
        #     "question": "2024/8/24 A架摆回的时间点（请以XX:XX输出）？",
        #     "expected_pattern": r'^\d{2}:\d{2}$'
        # },
        # {
        #     "id": "11",
        #     "question": "2024/8/24 A架摆出的时间点（请以XX:XX输出）？",
        #     "expected_pattern": r'^\d{2}:\d{2}$'
        # },
        # # 变体格式测试
        # {
        #     "id": "61",
        #     "question": "2024年5月20日征服者入水的时间是？（请以XX:XX输出）",
        #     "expected_pattern": r'^\d{2}:\d{2}$'
        # },
        # {
        #     "id": "80",
        #     "question": "8月15日早上，A架的开机时间是几点（请以XX:XX输出）？",
        #     "expected_pattern": r'^\d{2}:\d{2}$'
        # },
        # {
        #     "id": "82",
        #     "question": "在2024年8月28日，A架的关机时间是几点（请以XX:XX输出）",
        #     "expected_pattern": r'^\d{2}:\d{2}$'
        # },
        # # 特殊条件测试
        # {
        #     "id": "93",
        #     "question": "在2024年9月3日，折臂吊车最后一次开机是什么时候（请以XX:XX输出）？",
        #     "expected_pattern": r'^\d{2}:\d{2}$'
        # },
        # {
        #     "id": "95",
        #     "question": "在2024年8月24日，A架第二次开机是什么时候（请以XX:XX输出）？",
        #     "expected_pattern": r'^\d{2}:\d{2}$'
        # }
        
        
        # 新增测试
        {
    "id": "gysxdmx_00015",
    "question": "以征服者落座为标志，2024/8/23 的深海作业A作业结束的时间（请以XX:XX输出）？",
    "expected_pattern": r'^\d{2}:\d{2}$'  # 严格匹配时间格式（如 15:30）
        },  
        
               {
    "id": "gysxdmx_00019",
    "question": "2024/8/24 下午，A架开机发生在折臂吊车开机之前，是否正确？",
    "expected_pattern": r'^\d{2}:\d{2}$'  # 严格匹配时间格式（如 15:30）
        },  
        
        
        
                    {
    "id": "gysxdmx_00022",
    "question":  "2024/8/23/上午A架的运行时长和下午A架开机时长相比，哪个长，长多少（以整数分钟输出）？",
    "expected_pattern": r'^\d{2}:\d{2}$'  # 严格匹配时间格式（如 15:30）
        },  
    ]
    
    test_results = {
        "total": len(test_cases),
        "passed": 0,
        "failed": 0,
        "details": []
    }
    
    print("开始运行单一状态时间查询测试...\n")
    
    for case in test_cases:
        print(f"测试用例 {case['id']}:")
        print(f"问题: {case['question']}")
        
        # 获取答案
        result = get_answer(case['question'])
        print(f"答案: {result}")
        
        # 验证答案格式
        import re
        format_correct = bool(re.match(case['expected_pattern'], result.strip()))
        
        # 记录结果
        test_results['details'].append({
            "id": case['id'],
            "question": case['question'],
            "answer": result,
            "format_correct": format_correct
        })
        
        if format_correct:
            test_results['passed'] += 1
            print("✓ 格式验证通过")
        else:
            test_results['failed'] += 1
            print("✗ 格式验证失败")
        print()
    
    # 打印测试总结
    print("\n测试总结:")
    print(f"总用例数: {test_results['total']}")
    print(f"通过数量: {test_results['passed']}")
    print(f"失败数量: {test_results['failed']}")
    print(f"通过率: {(test_results['passed']/test_results['total']*100):.2f}%")
    
    return test_results

if __name__ == "__main__":
    test_results = run_single_status_tests()
    
    # 保存测试报告
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    report_file = f'single_status_test_report_{timestamp}.json'
    


开始运行单一状态时间查询测试...

测试用例 gysxdmx_00015:
问题: 以征服者落座为标志，2024/8/23 的深海作业A作业结束的时间（请以XX:XX输出）？
问题内容： 以征服者落座为标志，2024/8/23 的深海作业A作业结束的时间（请以XX:XX输出）？
API列表： ['get_operation_end_time']
答案: 2024年8月23日的深海作业A以征服者落座为标志的结束时间是19:05。
✗ 格式验证失败

测试用例 gysxdmx_00019:
问题: 2024/8/24 下午，A架开机发生在折臂吊车开机之前，是否正确？
问题内容： 2024/8/24 下午，A架开机发生在折臂吊车开机之前，是否正确？
设备开机顺序比较问题
API列表： ['compare_device_startup_order']
答案: 2024年8月24日下午，A架开机确实发生在折臂吊车开机之前。具体时间显示，A架开机时间为16:03:08，而折臂吊车的开机时间为16:04:28。因此，A架开机比折臂吊车开机早了1分20秒。
✗ 格式验证失败

测试用例 gysxdmx_00022:
问题: 2024/8/23/上午A架的运行时长和下午A架开机时长相比，哪个长，长多少（以整数分钟输出）？
问题内容： 2024/8/23/上午A架的运行时长和下午A架开机时长相比，哪个长，长多少（以整数分钟输出）？
API列表： ['compare_operation_durations']
-------compute_operational_duration执行-------
Error comparing durations: 'check_current_presence'
答案: 很抱歉，根据提供的信息，我无法确定2024年8月23日上午A架的运行时长和下午A架开机时长哪个更长，也无法计算两者之间的具体时间差。如果需要解决这个问题，请提供具体的时长数据或确保相关系统能够正常返回所需信息。如果有其他问题或需要帮助，请告诉我。
✗ 格式验证失败


测试总结:
总用例数: 3
通过数量: 0
失败数量: 3
通过率: 0.00%


输出特定日期的全部数据

In [2]:
import pandas as pd
from datetime import datetime

# 读取CSV文件（请将'data.csv'替换为实际文件名）
df = pd.read_csv(r'D:\竞赛\数据挖掘竞赛\初赛数据1231\初赛数据\Port3_ksbg_9_2.csv')

# 转换时间列为datetime类型
df['csvTime'] = pd.to_datetime(df['csvTime'], format='%Y/%m/%d %H:%M')

# 定义目标日期
target_date = datetime(2024, 8, 23).date()

# # 过滤数据
# filtered_data = df[(df['csvTime'].dt.date == target_date) & (df['status'] != 'False')]

# 过滤数据
filtered_data = df[(df['csvTime'].dt.date == target_date)]

# 输出结果
if not filtered_data.empty:
    print(f"找到{len(filtered_data)}条2024年8月15日的记录：")
    print(filtered_data.to_string(index=False))
else:
    print("未找到2024年8月15日的相关记录")

找到1440条2024年8月15日的记录：
 Unnamed: 0             csvTime  P3_33  P3_18  P3_17  P3_16
      10798 2024-08-23 00:00:06      0      0     -1      0
      10799 2024-08-23 00:01:06      0      0     -1      0
      10800 2024-08-23 00:02:06      0      0      0      0
      10801 2024-08-23 00:03:06      0      0      0      0
      10802 2024-08-23 00:04:06      0      0      0      0
      10803 2024-08-23 00:05:06      0      0      0      0
      10804 2024-08-23 00:06:06      0      0      0      0
      10805 2024-08-23 00:07:06      0      0      0      0
      10806 2024-08-23 00:08:06      0      0     -1      0
      10807 2024-08-23 00:09:06      0      0      0      0
      10808 2024-08-23 00:10:06      0      0      0      0
      10809 2024-08-23 00:11:06      0      0      0      0
      10810 2024-08-23 00:12:06      0      0     -1      0
      10811 2024-08-23 00:13:06      0      0      0      0
      10812 2024-08-23 00:14:06      0      0      0      0
      10813 2024-0

输出特定日期的全部数据(三个表，时间可以精确到分钟)

In [1]:
import pandas as pd
from datetime import datetime
import os

def query_equipment_data(base_path, tables, target_time, time_format='%Y-%m-%d %H:%M'):
    """
    多设备时间维度精确查询函数
    :param base_path: 数据库基础路径
    :param tables: 要查询的设备表列表
    :param target_time: 目标时间字符串（支持多种格式）
    :param time_format: 时间解析格式
    """
    # 时间解析与精度检测
    try:
        dt_obj = datetime.strptime(target_time, time_format)
        time_components = {
            'year': dt_obj.year,
            'month': dt_obj.month,
            'day': dt_obj.day,
            'hour': dt_obj.hour if '%H' in time_format else None,
            'minute': dt_obj.minute if '%M' in time_format else None
        }
    except ValueError as e:
        raise ValueError(f"时间格式错误: {str(e)}")

    results = {}
    
    for device in tables:
        file_path = os.path.join(base_path, f"{device}.csv")
        try:
            # 动态读取数据并优化内存
            df = pd.read_csv(file_path, parse_dates=['csvTime'])
            df['csvTime'] = pd.to_datetime(df['csvTime'], format='%Y/%m/%d %H:%M')

            # 构建动态过滤条件
            time_filter = (
                (df['csvTime'].dt.year == time_components['year']) &
                (df['csvTime'].dt.month == time_components['month']) &
                (df['csvTime'].dt.day == time_components['day'])
            )
            
            if time_components['hour'] is not None:
                time_filter &= (df['csvTime'].dt.hour == time_components['hour'])
                if time_components['minute'] is not None:
                    time_filter &= (df['csvTime'].dt.minute == time_components['minute'])

            filtered_df = df[time_filter & (df['status'] != 'False')]

            # 结果格式化
            if not filtered_df.empty:
                results[device] = {
                    'count': len(filtered_df),
                    'data': filtered_df.to_dict('records'),
                    'time_range': {
                        'start': filtered_df['csvTime'].min().strftime('%Y-%m-%d %H:%M:%S'),
                        'end': filtered_df['csvTime'].max().strftime('%Y-%m-%d %H:%M:%S')
                    }
                }
                
        except Exception as e:
            print(f"查询设备 {device} 时发生错误: {str(e)}")
            continue

    return results

# 使用示例
if __name__ == "__main__":
    # 配置参数
    database_path = r'D:\竞赛\数据挖掘竞赛\其他参赛者的答案\baseline解答\DeepSeaGLM-main\DeepSeaGLM-main\baseline\mountain_baseline\database_in_use'
    device_list = ['Ajia_plc_1', 'device_13_11_meter_1311', 'Port3_ksbg_9']
    
    # 时间查询示例（支持不同精度）
    # query_time = '2024-08-24 09:10'  # 完整时间
    query_time = '2024-08-23'       # 仅日期
    # query_time = '2024-08-23 08'    # 到小时

    # 执行查询
    query_results = query_equipment_data(
        base_path=database_path,
        tables=device_list,
        target_time=query_time,
        # time_format='%Y-%m-%d %H:%M'  # 根据输入调整格式
        time_format='%Y-%m-%d'
    )

    # 可视化输出
    for device, data in query_results.items():
        print(f"\n设备 {device} 查询结果 ({data['count']} 条记录)")
        print(f"时间范围: {data['time_range']['start']} 至 {data['time_range']['end']}")
        print(pd.DataFrame(data['data']).to_string(index=False))
        print("\n" + "="*60)


设备 Ajia_plc_1 查询结果 (12 条记录)
时间范围: 2024-08-23 08:03:08 至 2024-08-23 19:18:08
 Unnamed: 0 Ajia-0_v Ajia-2_v Ajia-1_v Ajia-4_v  Ajia-3_v  Ajia-5_v             csvTime status
      11094        0        0        0        0    0.0000    0.0000 2024-08-23 08:03:08     开机
      11221  37.2364  692.736  30.2588  692.909   93.9909   92.4600 2024-08-23 10:12:08  征服者起吊
      11225  37.2364  692.616  30.2588  692.813   94.5375   94.2634 2024-08-23 10:16:08  征服者入水
      11226  37.2364  693.799  30.2588  694.262   60.6053   56.3365 2024-08-23 10:17:08   缆绳解除
      11229  37.2364  692.219  30.2588  692.522  106.1710  104.5650 2024-08-23 10:20:08   A架摆回
      11239    error    error    error    error   -1.0000   -1.0000 2024-08-23 10:30:08     关机
      11680        0        0        0        0    0.0000    0.0000 2024-08-23 17:58:08     开机
      11697  37.2364  694.325  30.2588  694.363   87.6729   86.3214 2024-08-23 18:16:08   A架摆出
      11741  37.2364  694.251  30.2365   694.15   55.2905   58.0159 

In [34]:
import re
question = "2024/8/24 下午，A架开机发生在折臂吊车开机之前，是否正确？"
a = re.search(r"(\d{4}/\d{1,2}/\d{1,2})\s*(上午|下午)?.*开机发生在.*开机之(前|后)", question)

print(a or False)

<re.Match object; span=(0, 28), match='2024/8/24 下午，A架开机发生在折臂吊车开机之前'>
