#### 系统设计
##### 功能：用户管理，角色管理，权限管理，访问控制
1. 用户管理：包括用户信息的增删改查，分配角色，分配权限等功能。
2. 角色管理：包括角色信息的增删改查，分配权限等功能。
3. 权限管理：包括权限信息的增删改查，分配角色等功能。
4. 访问控制：包括登录验证，权限管理等功能。
##### 系统实现
1. 登录界面：输入用户名和密码，点击登录按钮，判断用户名和密码是否正确，正确则进入主界面，错误则提示错误信息，注册新用户。
2. 主界面：显示当前登录用户的权限，包括增删改查和搜索功能。
3. 增删改查：分别对应增、删、改、查功能，可以对用户信息进行增删改查。功能的对象就暂时设为用户信息表。
4. 表设计：
- 功能表：包括功能ID、功能名称等信息。
- 角色表：包括角色ID、角色名称等信息。
- 权限表：包括权限ID、权限名称等信息。
- 用户表：包括用户ID、用户名、密码、权限等信息。
- 用户角色表：包括用户ID和角色ID等信息。
5. 登录验证：登录时，首先验证用户名和密码是否正确，正确则进入主界面，错误则提示错误信息。
6. 权限管理：管理员可以对用户的权限进行管理，包括分配角色和分配权限。


In [None]:
import os
import pymysql

class Fruit:
    def __init__(self, name, price, quantity):
        self.name = name
        self.price = price
        self.quantity = quantity


class Auth_Manager:
    def __init__(self):
        # 初始化，建立连接，需提前创建好数据库
        self.connection = pymysql.connect(
            host='localhost',
            user='root',      
            password='123456',
            database='my_db', 
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
        self.create_table()
    


    def add_fruit(self, fruit):
        with self.connection.cursor() as cursor:
            # 首先尝试插入新水果
            insert_sql = "INSERT INTO fruits (name, price, quantity) VALUES (%s, %s, %s)"
            try:
                cursor.execute(insert_sql, (fruit.name, fruit.price, fruit.quantity))
                self.connection.commit()
                return True
            except pymysql.err.IntegrityError:
                # 如果水果已存在，则更新数量
                update_sql = "UPDATE fruits SET quantity = quantity + %s WHERE name = %s"
                cursor.execute(update_sql, (fruit.quantity, fruit.name))
                self.connection.commit()
                print(f"水果 {fruit.name} 已存在，已增加 {fruit.quantity} 个库存")
                return True

    def remove_fruit(self, fruit_name):
        with self.connection.cursor() as cursor:
            sql = "DELETE FROM fruits WHERE name = %s"
            affected_rows = cursor.execute(sql, (fruit_name,))      #cursor.execute() 返回受影响的行数（即被删除的行数）
            self.connection.commit()
            return affected_rows > 0

    def update_fruit(self, fruit_name, new_price, new_quantity):
        with self.connection.cursor() as cursor:
            sql = "UPDATE fruits SET price = %s, quantity = %s WHERE name = %s"
            affected_rows = cursor.execute(sql, (new_price, new_quantity, fruit_name))
            self.connection.commit()
            return affected_rows > 0

    def get_all_fruits(self):
        with self.connection.cursor() as cursor:
            sql = "SELECT name, price, quantity FROM fruits"
            cursor.execute(sql)
            results = cursor.fetchall()     #fetchall()方法返回一个包含所有行的列表，每行是一个字典或元组
            return [Fruit(result['name'], result['price'], result['quantity']) for result in results]
    
    def get_fruit(self, fruit_name):
        with self.connection.cursor() as cursor:
            sql = "SELECT name, price, quantity FROM fruits WHERE name = %s"
            cursor.execute(sql, (fruit_name,))
            result = cursor.fetchone()
            if result:
                return Fruit(result['name'], result['price'], result['quantity'])
            return None

    def sort_by_price(self):
        # 排序现在由SQL完成
        with self.connection.cursor() as cursor:
            sql = "SELECT name, price, quantity FROM fruits ORDER BY price ASC"  # order by __ ACS升序排列，DESC降序排列
            cursor.execute(sql)
            results = cursor.fetchall()
            return [Fruit(result['name'], result['price'], result['quantity']) for result in results]

    def __del__(self):      # 析构器，当对象被垃圾回收时自动调用，以免数据库链接浪费
        self.connection.close()


=== 主菜单 ===
当前用户: 未登录
1. 登录
2. 退出登录
3. 创建用户
4. 删除用户
5. 查看权限
6. 退出系统


In [None]:
import pymysql
from pymysql.cursors import DictCursor
from hashlib import md5

class User:
    def __init__(self, id, username, password=None):
        self.id = id
        self.username = username
        self.password = password

class Role:
    def __init__(self, id, name):
        self.id = id
        self.name = name

class Permission:
    def __init__(self, id, name, code):
        self.id = id
        self.name = name
        self.code = code

class Feature:
    def __init__(self, id, name, code):
        self.id = id
        self.name = name
        self.code = code

class AuthManager:
    def __init__(self):
        self.connection = pymysql.connect(
            host='localhost',
            user='root',
            password='123456',
            database='my_proj_db',
            charset='utf8mb4',
            cursorclass=DictCursor
        )
        self.current_user = None
        self.create_tables()
    
    def create_tables(self):
        with self.connection.cursor() as cursor:
            # 检查表是否已存在（使用您提供的SQL结构）
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS users (
                    id INT NOT NULL,
                    username VARCHAR(255) DEFAULT NULL,
                    password VARCHAR(255) DEFAULT NULL,
                    PRIMARY KEY (id),
                    UNIQUE KEY username (username)
                )
            """)
            
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS roles (
                    id INT NOT NULL,
                    name VARCHAR(255) DEFAULT NULL,
                    PRIMARY KEY (id)
                )
            """)
            
            # 其他表的创建语句...
            self.connection.commit()

    def hash_password(self, password):
        return md5(password.encode()).hexdigest()

    def login(self, username, password):
        with self.connection.cursor() as cursor:
            sql = "SELECT * FROM users WHERE username = %s AND password = %s"
            cursor.execute(sql, (username, self.hash_password(password)))
            user = cursor.fetchone()
            
        if user:
            self.current_user = User(user['id'], user['username'])
            return True
        return False

    def logout(self):
        self.current_user = None
        return True

    def register(self, username, password):
        with self.connection.cursor() as cursor:
            try:
                sql = "INSERT INTO users (username, password) VALUES (%s, %s)"
                cursor.execute(sql, (username, self.hash_password(password)))
                self.connection.commit()
                return True
            except pymysql.IntegrityError:
                return False

    def revise_role(self, user_id, role_id):
        if not self.current_user or not self.check_permission('revise_roles'):
            return False
            
        with self.connection.cursor() as cursor:
            try:
                sql = "update user_roles set role_id = %s where user_id = %s"
                cursor.execute(sql, (role_id, user_id))
                self.connection.commit()
                return True
            except pymysql.IntegrityError:
                return False

    def check_permission(self, permission_code):
        if not self.current_user:
            return False
            
        with self.connection.cursor() as cursor:
            sql = """
            SELECT COUNT(*) AS has_permission
            FROM user_roles ur
            JOIN role_permissions rp ON ur.role_id = rp.role_id
            JOIN permissions p ON rp.permission_id = p.id
            WHERE ur.user_id = %s AND p.code = %s
            """
            cursor.execute(sql, (self.current_user.id, permission_code))
            result = cursor.fetchone()
            
        return result['has_permission'] > 0

    def get_all_users(self):
        with self.connection.cursor() as cursor:
            sql = "SELECT id, username FROM users"
            cursor.execute(sql)
            return [User(row['id'], row['username']) for row in cursor.fetchall()]

    def get_user_roles(self, user_id):
        with self.connection.cursor() as cursor:
            sql = """
            SELECT r.id, r.name 
            FROM roles r
            JOIN user_roles ur ON r.id = ur.role_id
            WHERE ur.user_id = %s
            """
            cursor.execute(sql, (user_id,))
            return [Role(row['id'], row['name']) for row in cursor.fetchall()]

    def __del__(self):
        self.connection.close()

# 测试代码
if __name__ == '__main__':
    auth_manager = AuthManager()
    
    #初始化测试数据（仅第一次运行时需要）
    # with auth_manager.connection.cursor() as cursor:
    #     cursor.execute("INSERT INTO roles (id, name) VALUES (1001, '超级管理员')")
    #     cursor.execute("INSERT INTO permissions (id, name, code) VALUES (1001, '用户管理', 'user_management')")
    #     cursor.execute("INSERT INTO role_permissions (role_id, permission_id) VALUES (1001, 1001)")
    #     auth_manager.connection.commit()
    
    while True:
        print("\n=== 权限管理系统 ===")
        if auth_manager.current_user:
            print(f"当前用户: {auth_manager.current_user.username}")
            print("1. 查看所有用户")
            print("2. 分配角色")
            print("3. 查看我的权限")
            print("4. 退出登录")
        else:
            print("1. 登录")
            print("2. 注册")
        
        print("0. 退出系统")
        
        choice = input("请输入您的选择: ")
        
        if auth_manager.current_user:
            if choice == "1":
                print("\n=== 用户列表 ===")
                users = auth_manager.get_all_users()
                for user in users:
                    roles = auth_manager.get_user_roles(user.id)
                    role_names = ", ".join([role.name for role in roles])
                    print(f"ID: {user.id}, 用户名: {user.username}, 角色: {role_names}")
            
            elif choice == "2":
                if auth_manager.check_permission('revise_roles'):
                    print("\n=== 分配角色 ===")
                    user_id = input("输入用户ID: ")
                    role_id = input("输入角色ID: ")
                    if auth_manager.assign_role(user_id, role_id):
                        print("角色分配成功")
                    else:
                        print("角色分配失败")
                else:
                    print("权限不足")
            
            elif choice == "3":
                print("\n=== 我的权限 ===")
                roles = auth_manager.get_user_roles(auth_manager.current_user.id)
                print("我的角色:", ", ".join([role.name for role in roles]))
                
            elif choice == "4":
                auth_manager.logout()
                print("已退出登录")
            
            elif choice == "0":
                break
        
        else:
            if choice == "1":
                print("\n=== 登录 ===")
                username = input("用户名: ")
                password = input("密码: ")
                if auth_manager.login(username, password):
                    print("登录成功")
                else:
                    print("用户名或密码错误")
            
            elif choice == "2":
                print("\n=== 注册 ===")
                username = input("用户名: ")
                password = input("密码: ")
                if auth_manager.register(username, password):
                    print("注册成功")
                else:
                    print("用户名已存在")
            
            elif choice == "0":
                break


=== 权限管理系统 ===
1. 登录
2. 注册
0. 退出系统

=== 登录 ===
用户名或密码错误

=== 权限管理系统 ===
1. 登录
2. 注册
0. 退出系统


In [None]:
import pymysql
from pymysql.cursors import DictCursor
from hashlib import md5


class User:
    def __init__(self, id, username, password=None):
        self.id = id
        self.username = username
        self.password = password


class Role:
    def __init__(self, id, name):
        self.id = id
        self.name = name


class Permission:
    def __init__(self, id, name, code):
        self.id = id
        self.name = name
        self.code = code


class Feature:
    def __init__(self, id, name, code):
        self.id = id
        self.name = name
        self.code = code


class AuthManager:
    def __init__(self):
        self.connection = pymysql.connect(
            host='localhost',
            user='root',
            password='123456',
            database='my_db',
            charset='utf8mb4',
            cursorclass=DictCursor
        )
        self.current_user = None
        self.create_tables()

    def create_tables(self):
        with self.connection.cursor() as cursor:
            # 检查表是否已存在（使用您提供的SQL结构）
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS users (
                    id INT NOT NULL,
                    username VARCHAR(255) DEFAULT NULL,
                    password VARCHAR(255) DEFAULT NULL,
                    PRIMARY KEY (id),
                    UNIQUE KEY username (username)
                )
            """)

            cursor.execute("""
                CREATE TABLE IF NOT EXISTS roles (
                    id INT NOT NULL,
                    name VARCHAR(255) DEFAULT NULL,
                    PRIMARY KEY (id)
                )
            """)

            # 其他表的创建语句...
            self.connection.commit()

    def hash_password(self, password):
        return md5(password.encode()).hexdigest()

    def login(self, username, password):
        with self.connection.cursor() as cursor:
            sql = "SELECT * FROM users WHERE username = %s AND password = %s"
            cursor.execute(sql, (username, self.hash_password(password)))
            user = cursor.fetchone()

        if user:
            self.current_user = User(user['id'], user['username'])
            return True
        return False

    def logout(self):
        self.current_user = None
        return True

    def register(self, username, password):
        with self.connection.cursor() as cursor:
            try:
                sql = "INSERT INTO users (username, password) VALUES (%s, %s)"
                cursor.execute(sql, (username, self.hash_password(password)))
                self.connection.commit()
                return True
            except pymysql.IntegrityError:
                return False

    def assign_role(self, user_id, role_id):
        if not self.current_user or not self.check_permission('assign_roles'):
            return False

        with self.connection.cursor() as cursor:
            try:
                sql = "INSERT INTO user_roles (user_id, role_id) VALUES (%s, %s)"
                cursor.execute(sql, (user_id, role_id))
                self.connection.commit()
                return True
            except pymysql.IntegrityError:
                return False

    def check_permission(self, permission_code):
        if not self.current_user:
            return False

        with self.connection.cursor() as cursor:
            sql = """
            SELECT COUNT(*) AS has_permission
            FROM user_roles ur
            JOIN role_permissions rp ON ur.role_id = rp.role_id
            JOIN permissions p ON rp.permission_id = p.id
            WHERE ur.user_id = %s AND p.code = %s
            """
            cursor.execute(sql, (self.current_user.id, permission_code))
            result = cursor.fetchone()

        return result['has_permission'] > 0

    def get_all_users(self):
        with self.connection.cursor() as cursor:
            sql = "SELECT id, username FROM users"
            cursor.execute(sql)
            return [User(row['id'], row['username']) for row in cursor.fetchall()]

    def get_user_roles(self, user_id):
        with self.connection.cursor() as cursor:
            sql = """
            SELECT r.id, r.name 
            FROM roles r
            JOIN user_roles ur ON r.id = ur.role_id
            WHERE ur.user_id = %s
            """
            cursor.execute(sql, (user_id,))
            return [Role(row['id'], row['name']) for row in cursor.fetchall()]

    def __del__(self):
        self.connection.close()


# 测试代码
if __name__ == '__main__':
    auth_manager = AuthManager()

    # 初始化测试数据（仅第一次运行时需要）
    # with auth_manager.connection.cursor() as cursor:
    #     cursor.execute("INSERT INTO roles (id, name) VALUES (1001, '超级管理员')")
    #     cursor.execute("INSERT INTO permissions (id, name, code) VALUES (1001, '用户管理', 'user_management')")
    #     cursor.execute("INSERT INTO role_permissions (role_id, permission_id) VALUES (1001, 1001)")
    #     auth_manager.connection.commit()

    while True:
        print("\n=== 权限管理系统 ===")
        if auth_manager.current_user:
            print(f"当前用户: {auth_manager.current_user.username}")
            print("1. 查看所有用户")
            print("2. 分配角色")
            print("3. 查看我的权限")
            print("4. 退出登录")
        else:
            print("1. 登录")
            print("2. 注册")

        print("0. 退出系统")

        choice = input("请输入您的选择: ")

        if auth_manager.current_user:
            if choice == "1":
                print("\n=== 用户列表 ===")
                users = auth_manager.get_all_users()
                for user in users:
                    roles = auth_manager.get_user_roles(user.id)
                    role_names = ", ".join([role.name for role in roles])
                    print(f"ID: {user.id}, 用户名: {user.username}, 角色: {role_names}")

            elif choice == "2":
                if auth_manager.check_permission('assign_roles'):
                    print("\n=== 分配角色 ===")
                    user_id = input("输入用户ID: ")
                    role_id = input("输入角色ID: ")
                    if auth_manager.assign_role(user_id, role_id):
                        print("角色分配成功")
                    else:
                        print("角色分配失败")
                else:
                    print("权限不足")

            elif choice == "3":
                print("\n=== 我的权限 ===")
                roles = auth_manager.get_user_roles(auth_manager.current_user.id)
                print("我的角色:", ", ".join([role.name for role in roles]))

            elif choice == "4":
                auth_manager.logout()
                print("已退出登录")

            elif choice == "0":
                break

        else:
            if choice == "1":
                print("\n=== 登录 ===")
                username = input("用户名: ")
                password = input("密码: ")
                if auth_manager.login(username, password):
                    print("登录成功")
                else:
                    print("用户名或密码错误")

            elif choice == "2":
                print("\n=== 注册 ===")
                username = input("用户名: ")
                password = input("密码: ")
                if auth_manager.register(username, password):
                    print("注册成功")
                else:
                    print("用户名已存在")

            elif choice == "0":
                break

In [None]:
# 某可运行版本
import pymysql
from pymysql.cursors import DictCursor
from hashlib import md5


class User:
    def __init__(self, id, username, password=None):
        self.id = id
        self.username = username
        self.password = password


class Role:
    def __init__(self, id, name):
        self.id = id
        self.name = name


class Permission:
    def __init__(self, id, name, code):
        self.id = id
        self.name = name
        self.code = code


class Feature:
    def __init__(self, id, name, code):
        self.id = id
        self.name = name
        self.code = code


class AuthManager:
    def __init__(self):
        self.connection = pymysql.connect(
            host='localhost',
            user='root',
            password='123456',
            database='my_db',
            charset='utf8mb4',
            cursorclass=DictCursor
        )
        self.current_user = None
        self.create_tables()

    def create_tables(self):
        with self.connection.cursor() as cursor:
            # 检查表是否已存在（使用您提供的SQL结构）
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS users (
                    id INT NOT NULL,
                    username VARCHAR(255) DEFAULT NULL,
                    password VARCHAR(255) DEFAULT NULL,
                    PRIMARY KEY (id),
                    UNIQUE KEY username (username)
                )
            """)

            cursor.execute("""
                CREATE TABLE IF NOT EXISTS roles (
                    id INT NOT NULL,
                    name VARCHAR(255) DEFAULT NULL,
                    PRIMARY KEY (id)
                )
            """)

            # 其他表的创建语句...
            self.connection.commit()

    def hash_password(self, password):
        return md5(password.encode()).hexdigest()

    def login(self, username, password):
        with self.connection.cursor() as cursor:
            sql = "SELECT * FROM users WHERE username = %s AND password = %s"
            cursor.execute(sql, (username, self.hash_password(password)))
            user = cursor.fetchone()

        if user:
            self.current_user = User(user['id'], user['username'])
            return True
        return False

    def logout(self):
        self.current_user = None
        return True

    def register(self, username, password):
        with self.connection.cursor() as cursor:
            try:
                sql = "INSERT INTO users (username, password) VALUES (%s, %s)"
                cursor.execute(sql, (username, self.hash_password(password)))
                self.connection.commit()
                return True
            except pymysql.IntegrityError:
                return False

    def revise_role(self, user_id, role_id):
        if not self.current_user or not self.check_permission('revise_roles'):
            return False
            
        with self.connection.cursor() as cursor:
            try:
                sql = "update user_roles set role_id = %s where user_id = %s"
                cursor.execute(sql, (role_id, user_id))
                self.connection.commit()
                return True
            except pymysql.IntegrityError:
                return False

    def check_permission(self, permission_code):
        if not self.current_user:
            return False

        with self.connection.cursor() as cursor:
            sql = """
            SELECT COUNT(*) AS has_permission
            FROM user_roles ur
            JOIN role_permissions rp ON ur.role_id = rp.role_id
            JOIN permissions p ON rp.permission_id = p.id
            WHERE ur.user_id = %s AND p.code = %s
            """
            cursor.execute(sql, (self.current_user.id, permission_code))
            result = cursor.fetchone()

        return result['has_permission'] > 0

    def get_all_users(self):
        with self.connection.cursor() as cursor:
            sql = "SELECT id, username FROM users"
            cursor.execute(sql)
            return [User(row['id'], row['username']) for row in cursor.fetchall()]

    def get_user_roles(self, user_id):
        with self.connection.cursor() as cursor:
            sql = """
            SELECT r.id, r.name 
            FROM roles r
            JOIN user_roles ur ON r.id = ur.role_id
            WHERE ur.user_id = %s
            """
            cursor.execute(sql, (user_id,))
            return [Role(row['id'], row['name']) for row in cursor.fetchall()]

    def __del__(self):
        self.connection.close()


# 测试代码
if __name__ == '__main__':
    auth_manager = AuthManager()

    # 初始化测试数据（仅第一次运行时需要）
    # with auth_manager.connection.cursor() as cursor:
    #     cursor.execute("INSERT INTO roles (id, name) VALUES (1001, '超级管理员')")
    #     cursor.execute("INSERT INTO permissions (id, name, code) VALUES (1001, '用户管理', 'user_management')")
    #     cursor.execute("INSERT INTO role_permissions (role_id, permission_id) VALUES (1001, 1001)")
    #     auth_manager.connection.commit()

    while True:
        print("\n=== 权限管理系统 ===")
        if auth_manager.current_user:
            print(f"当前用户: {auth_manager.current_user.username}")
            print("1. 查看所有用户")
            print("2. 分配角色")
            print("3. 查看我的权限")
            print("4. 退出登录")
        else:
            print("1. 登录")
            print("2. 注册")

        print("0. 退出系统")

        choice = input("请输入您的选择: ")

        if auth_manager.current_user:
            if choice == "1":
                print("\n=== 用户列表 ===")
                users = auth_manager.get_all_users()
                for user in users:
                    roles = auth_manager.get_user_roles(user.id)
                    role_names = ", ".join([role.name for role in roles])
                    print(f"ID: {user.id}, 用户名: {user.username}, 角色: {role_names}")

            elif choice == "2":
                if auth_manager.check_permission('assign_roles'):
                    print("\n=== 分配角色 ===")
                    user_id = input("输入用户ID: ")
                    role_id = input("输入角色ID: ")
                    if auth_manager.assign_role(user_id, role_id):
                        print("角色分配成功")
                    else:
                        print("角色分配失败")
                else:
                    print("权限不足")

            elif choice == "3":
                print("\n=== 我的权限 ===")
                roles = auth_manager.get_user_roles(auth_manager.current_user.id)
                print("我的角色:", ", ".join([role.name for role in roles]))

            elif choice == "4":
                auth_manager.logout()
                print("已退出登录")

            elif choice == "0":
                break

        else:
            if choice == "1":
                print("\n=== 登录 ===")
                username = input("用户名: ")
                password = input("密码: ")
                if auth_manager.login(username, password):
                    print("登录成功")
                else:
                    print("用户名或密码错误")

            elif choice == "2":
                print("\n=== 注册 ===")
                username = input("用户名: ")
                password = input("密码: ")
                if auth_manager.register(username, password):
                    print("注册成功")
                else:
                    print("用户名已存在")

            elif choice == "0":
                break