In [2]:
import pandas as pd
import numpy as np
from datetime import datetime
import re

def calculate_course_conflicts():
    # 1. 读取数据
    enrollment_data = pd.read_excel('math_student_enrollment.xlsx')
    course_schedule = pd.read_excel('df_final_cleaned_1.xlsx')
    
    # 2. 数据预处理
    print(f"学生选课数据: {enrollment_data.shape[0]} 行, {enrollment_data.shape[1]} 列")
    print(f"课程时间表数据: {course_schedule.shape[0]} 行, {course_schedule.shape[1]} 列")
    
    # 查看有多少学生和课程
    unique_students = enrollment_data['student_id'].nunique()
    unique_courses = enrollment_data['course_id'].nunique()
    print(f"总学生数: {unique_students}")
    print(f"总课程数: {unique_courses}")
    
    # 3. 将两个数据集合并，只保留共有的课程
    common_courses = set(enrollment_data['course_id']).intersection(set(course_schedule['Course Code']))
    print(f"两个数据集共有课程数: {len(common_courses)}")
    
    enrollment_filtered = enrollment_data[enrollment_data['course_id'].isin(common_courses)]
    print(f"过滤后的选课记录数: {enrollment_filtered.shape[0]}")
    
    # 4. 解析课程时间，创建时间区间
    # 处理上课周数
    def parse_week_pattern(pattern):
        weeks = []
        if isinstance(pattern, str):
            # 处理多个周范围，比如 "1-5, 26-30"
            if ',' in pattern:
                parts = [p.strip() for p in pattern.split(',')]
                for part in parts:
                    if '-' in part:
                        try:
                            start, end = map(int, part.split('-'))
                            weeks.extend(list(range(start, end + 1)))
                        except ValueError:
                            # 无法解析的范围，跳过
                            continue
                    else:
                        try:
                            weeks.append(int(part))
                        except ValueError:
                            # 无法解析的数字，跳过
                            continue
            # 处理单个周范围，比如 "9-19" 
            elif '-' in pattern:
                try:
                    start, end = map(int, pattern.split('-'))
                    weeks = list(range(start, end + 1))
                except ValueError:
                    # 无法解析的范围，返回空列表
                    weeks = []
            # 处理单一周数
            else:
                try:
                    weeks = [int(pattern.strip())]
                except ValueError:
                    weeks = []
        return weeks
    
    # 解析时间
    def parse_time(time_str):
        if isinstance(time_str, str) and ':' in time_str:
            hour, minute = map(int, time_str.split(':'))
            return hour * 60 + minute  # 转换为分钟表示
        return None
    
    # 添加解析后的时间数据
    course_schedule['start_minutes'] = course_schedule['Scheduled Start Time'].apply(parse_time)
    course_schedule['end_minutes'] = course_schedule['Scheduled End Time'].apply(parse_time)
    course_schedule['weeks'] = course_schedule['Teaching Week Pattern'].apply(parse_week_pattern)
    
    # 5. 创建每个学生的课程表
    # 构建字典: {学生ID: {(星期, 周数): [(课程ID, 开始时间, 结束时间)]}}
    student_schedules = {}
    
    # 获取每门课的所有时间槽
    course_time_slots = {}  # {课程ID: [(星期, 周数, 开始时间, 结束时间)]}
    
    for _, course in course_schedule.iterrows():
        course_id = course['Course Code']
        day = course['Scheduled Days']
        weeks = course['weeks']
        start_time = course['start_minutes']
        end_time = course['end_minutes']
        
        # 跳过缺少时间信息的课程
        if not isinstance(day, str) or not weeks or start_time is None or end_time is None:
            continue
        
        # mapping
        day_map = {'Monday': 0, 'Tuesday': 1, 'Wednesday': 2, 'Thursday': 3, 'Friday': 4, 'Saturday': 5, 'Sunday': 6}
        day_num = day_map.get(day)
        
        if day_num is not None:
            # 为每周创建时间槽
            if course_id not in course_time_slots:
                course_time_slots[course_id] = []
                
            for week in weeks:
                course_time_slots[course_id].append((day_num, week, start_time, end_time))
    
   
    for _, enrollment in enrollment_filtered.iterrows():
        student_id = enrollment['student_id']
        course_id = enrollment['course_id']
        
        
        if course_id not in course_time_slots:
            continue
            
        if student_id not in student_schedules:
            student_schedules[student_id] = {}
            
        # 添加该学生所有的课程时间
        for day_num, week, start_time, end_time in course_time_slots[course_id]:
            key = (day_num, week)
            if key not in student_schedules[student_id]:
                student_schedules[student_id][key] = []
            student_schedules[student_id][key].append((course_id, start_time, end_time))
    
    # 6. 计算冲突
    total_conflicts = 0
    total_course_entries = 0
    students_with_conflicts = set()
    
    for student_id, schedule in student_schedules.items():
        for day_week, courses in schedule.items():
            if len(courses) > 1:  # 同一天同一周有多门课
                # 对当天的课程按开始时间排序
                sorted_courses = sorted(courses, key=lambda x: x[1])
                
                # 检查是否有时间冲突
                for i in range(len(sorted_courses) - 1):
                    current_course, current_start, current_end = sorted_courses[i]
                    next_course, next_start, next_end = sorted_courses[i + 1]
                    
                    # 如果当前课程的结束时间晚于下一节课的开始时间，则有冲突
                    if current_end > next_start:
                        total_conflicts += 1
                        students_with_conflicts.add(student_id)
                        break  # 一个时间段内只计算一次冲突
            
            total_course_entries += len(courses)
    
    # 7. 计算冲突率
    conflict_rate = total_conflicts / total_course_entries if total_course_entries > 0 else 0
    
    print(f"\n结果:")
    print(f"发生冲突的课程数: {total_conflicts}")
    print(f"总课程记录数: {total_course_entries}")
    print(f"课程冲突率: {conflict_rate:.4f} ({conflict_rate*100:.2f}%)")
    print(f"有冲突的学生数: {len(students_with_conflicts)}")
    print(f"有冲突的学生占比: {len(students_with_conflicts)/len(student_schedules):.4f} ({len(students_with_conflicts)/len(student_schedules)*100:.2f}%)")
    
    return conflict_rate

if __name__ == "__main__":
    calculate_course_conflicts()

学生选课数据: 9163 行, 5 列
课程时间表数据: 557 行, 21 列
总学生数: 1317
总课程数: 129
两个数据集共有课程数: 70
过滤后的选课记录数: 4991

结果:
发生冲突的课程数: 47464
总课程记录数: 612662
课程冲突率: 0.0775 (7.75%)
有冲突的学生数: 1011
有冲突的学生占比: 0.9233 (92.33%)


In [None]:
import pandas as pd
import numpy as np
import re

def parse_week_pattern(pattern):
    """解析教学周次"""
    if not isinstance(pattern, str):
        return []
    
    weeks = []
    for part in re.findall(r'\d+-\d+|\d+', pattern):
        if '-' in part:
            start, end = map(int, part.split('-'))
            weeks.extend(range(start, end + 1))
        else:
            weeks.append(int(part))
    return weeks

def parse_time(time_str):
    """解析时间字符串"""
    try:
        hour, minute = map(int, time_str.split(':'))
        return hour * 60 + minute  # 统一转换成分钟
    except (ValueError, AttributeError):
        return -1  # 返回 -1 作为无效值

def calculate_course_conflicts():
    # 1. 读取数据
    dtype_enrollment = {'student_id': str, 'course_id': str}
    dtype_schedule = {'Course Code': str, 'Scheduled Days': str, 'Scheduled Start Time': str, 
                      'Scheduled End Time': str, 'Teaching Week Pattern': str}

    enrollment_data = pd.read_excel('math_student_enrollment.xlsx', dtype=dtype_enrollment)
    course_schedule = pd.read_excel('df_final_cleaned_1.xlsx', dtype=dtype_schedule)

    # 2. 预处理数据
    enrollment_data.dropna(subset=['student_id', 'course_id'], inplace=True)
    course_schedule.dropna(subset=['Course Code', 'Scheduled Days', 'Scheduled Start Time', 'Scheduled End Time'], inplace=True)

    # 3. 过滤共有课程
    common_courses = set(enrollment_data['course_id']).intersection(set(course_schedule['Course Code']))
    enrollment_filtered = enrollment_data[enrollment_data['course_id'].isin(common_courses)]

    # 4. 解析时间
    course_schedule['start_minutes'] = course_schedule['Scheduled Start Time'].apply(parse_time)
    course_schedule['end_minutes'] = course_schedule['Scheduled End Time'].apply(parse_time)
    course_schedule['weeks'] = course_schedule['Teaching Week Pattern'].apply(parse_week_pattern)

    # 5. 创建时间表
    student_schedules = {}
    course_time_slots = {}

    day_map = {'Monday': 0, 'Tuesday': 1, 'Wednesday': 2, 'Thursday': 3, 'Friday': 4, 'Saturday': 5, 'Sunday': 6}

    for _, course in course_schedule.iterrows():
        if course['start_minutes'] < 0 or course['end_minutes'] < 0:
            continue
        day_num = day_map.get(course['Scheduled Days'])
        if day_num is None:
            continue
        
        for week in course['weeks']:
            course_time_slots.setdefault(course['Course Code'], []).append((day_num, week, course['start_minutes'], course['end_minutes']))

    for _, enrollment in enrollment_filtered.iterrows():
        student_schedules.setdefault(enrollment['student_id'], {})
        for day_num, week, start_time, end_time in course_time_slots.get(enrollment['course_id'], []):
            student_schedules[enrollment['student_id']].setdefault((day_num, week), []).append((start_time, end_time))

    # 6. 计算冲突
    total_conflicts = 0
    students_with_conflicts = set()

    for student_id, schedule in student_schedules.items():
        for courses in schedule.values():
            courses.sort()
            for i in range(len(courses) - 1):
                if courses[i][1] > courses[i + 1][0]:  # 当前课程结束时间 > 下一门课开始时间
                    total_conflicts += 1
                    students_with_conflicts.add(student_id)
                    break

    print(f"课程冲突数: {total_conflicts}, 有冲突学生数: {len(students_with_conflicts)}")
    return total_conflicts

if __name__ == "__main__":
    calculate_course_conflicts()


In [1]:
import pandas as pd
import re

def parse_week_pattern(pattern):
    """解析教学周次"""
    if not isinstance(pattern, str):
        return []
    
    weeks = []
    for part in re.findall(r'\d+-\d+|\d+', pattern):
        if '-' in part:
            start, end = map(int, part.split('-'))
            weeks.extend(range(start, end + 1))
        else:
            weeks.append(int(part))
    return weeks

def parse_time(time_str):
    """解析时间字符串"""
    try:
        hour, minute = map(int, time_str.split(':'))
        return hour * 60 + minute  # 统一转换成分钟
    except (ValueError, AttributeError):
        return -1  # 返回 -1 作为无效值

def calculate_course_conflicts():
    # 1. 读取数据
    dtype_enrollment = {'student_id': str, 'course_id': str}
    dtype_schedule = {'Course Code': str, 'Scheduled Days': str, 'Scheduled Start Time': str, 
                      'Scheduled End Time': str, 'Teaching Week Pattern': str}

    enrollment_data = pd.read_excel('math_student_enrollment.xlsx', dtype=dtype_enrollment)
    course_schedule = pd.read_excel('df_final_cleaned_1.xlsx', dtype=dtype_schedule)

    # 2. 预处理数据
    enrollment_data.dropna(subset=['student_id', 'course_id'], inplace=True)
    course_schedule.dropna(subset=['Course Code', 'Scheduled Days', 'Scheduled Start Time', 'Scheduled End Time'], inplace=True)

    # 3. 过滤共有课程
    common_courses = set(enrollment_data['course_id']).intersection(set(course_schedule['Course Code']))
    enrollment_filtered = enrollment_data[enrollment_data['course_id'].isin(common_courses)]

    # 4. 解析时间
    course_schedule['start_minutes'] = course_schedule['Scheduled Start Time'].apply(parse_time)
    course_schedule['end_minutes'] = course_schedule['Scheduled End Time'].apply(parse_time)
    course_schedule['weeks'] = course_schedule['Teaching Week Pattern'].apply(parse_week_pattern)

    # 5. 创建时间表
    student_schedules = {}
    course_time_slots = {}

    day_map = {'Monday': 0, 'Tuesday': 1, 'Wednesday': 2, 'Thursday': 3, 'Friday': 4, 'Saturday': 5, 'Sunday': 6}

    for _, course in course_schedule.iterrows():
        if course['start_minutes'] < 0 or course['end_minutes'] < 0:
            continue
        day_num = day_map.get(course['Scheduled Days'])
        if day_num is None:
            continue
        
        for week in course['weeks']:
            course_time_slots.setdefault(course['Course Code'], []).append((day_num, week, course['start_minutes'], course['end_minutes']))

    for _, enrollment in enrollment_filtered.iterrows():
        student_schedules.setdefault(enrollment['student_id'], {})
        for day_num, week, start_time, end_time in course_time_slots.get(enrollment['course_id'], []):
            student_schedules[enrollment['student_id']].setdefault((day_num, week), []).append((enrollment['course_id'], start_time, end_time))

    # 6. 计算冲突
    total_conflicts = 0
    total_courses = 0
    students_with_conflicts = set()

    for student_id, schedule in student_schedules.items():
        for courses in schedule.values():
            total_courses += len(courses)  # 计算总课程数
            courses.sort(key=lambda x: x[1])  # 按开始时间排序
            
            for i in range(len(courses) - 1):
                if courses[i][2] > courses[i + 1][1]:  # 结束时间 > 下一节课的开始时间
                    total_conflicts += 1
                    students_with_conflicts.add(student_id)
                    break  # 一旦发现冲突，跳出循环，减少计算量

    # 7. 计算统计数据
    conflict_rate = total_conflicts / total_courses if total_courses > 0 else 0
    total_students = len(student_schedules)
    student_conflict_rate = len(students_with_conflicts) / total_students if total_students > 0 else 0

    # 8. 输出结果
    print(f"发生冲突的课程数: {total_conflicts}")
    print(f"总课程记录数: {total_courses}")
    print(f"课程冲突率: {conflict_rate:.4f} ({conflict_rate * 100:.2f}%)")
    print(f"有冲突的学生数: {len(students_with_conflicts)}")
    print(f"有冲突的学生占比: {student_conflict_rate:.4f} ({student_conflict_rate * 100:.2f}%)")

    return {
        "total_conflicts": total_conflicts,
        "total_courses": total_courses,
        "conflict_rate": conflict_rate,
        "students_with_conflicts": len(students_with_conflicts),
        "student_conflict_rate": student_conflict_rate
    }

if __name__ == "__main__":
    calculate_course_conflicts()


发生冲突的课程数: 47464
总课程记录数: 612662
课程冲突率: 0.0775 (7.75%)
有冲突的学生数: 1011
有冲突的学生占比: 0.9233 (92.33%)
