# 检查可用内核

In [None]:
from jupyter_client.kernelspec import KernelSpecManager

ksm = KernelSpecManager()
print("可用内核:", ksm.get_all_specs().keys())
 #获取默认内核
default_kernel = ksm.get_kernel_spec('python3')
print(f"默认Python内核路径: {default_kernel.resource_dir}")


# 同步方式

In [None]:
from jupyter_client.manager import KernelManager
import time

# 创建内核管理器
km = KernelManager(kernel_name='python3')

# 启动内核
km.start_kernel()
print("内核已启动")

# 获取客户端
kc = km.client()


In [None]:

try:
    # 执行简单代码
    msg_id = kc.execute('print(Hello jupyter_client!")')
    print(f"消息ID: {msg_id}")
    
    # 获取输出
    while True:
        try:
            msg = kc.get_iopub_msg(timeout=5)
            if msg['parent_header'].get('msg_id') == msg_id:
                print(f"消息类型: {msg['msg_type']}")
                if msg['msg_type'] == 'stream':
                    print(f"输出: {msg['content']['text']}")
                elif msg['msg_type'] == 'execute_reply':
                    print("执行完成")
                    break
        except Exception as e:
            print(f"获取消息超时: {e}")
            break

finally:
    # 清理资源
    km.shutdown_kernel()
    print("内核已关闭")

# bash

In [None]:
from jupyter_client.manager import KernelManager

class IPythonExecutor:
    def __init__(self):
        self.km = KernelManager(kernel_name='python3')
        self.km.start_kernel()
        self.kc = self.km.client()
    
    def execute_bash(self, command: str) -> dict:
        """执行bash命令"""
        # 使用!魔法命令
        code = f"!{command}"
        return self._execute_code(code)
    
    def _execute_code(self, code: str) -> dict:
        """通用代码执行方法"""
        msg_id = self.kc.execute(code)
        
        outputs = []
        errors = []
        
        while True:
            try:
                msg = self.kc.get_iopub_msg(timeout=30)
                if msg['parent_header'].get('msg_id') == msg_id:
                    msg_type = msg['msg_type']
                    content = msg['content']
                    
                    if msg_type == 'stream':
                        outputs.append(content['text'])
                    elif msg_type == 'error':
                        errors.append({
                            'name': content['ename'],
                            'value': content['evalue'],
                            'traceback': '\n'.join(content['traceback'])
                        })
                    elif msg_type == 'execute_reply':
                        status = content['status']
                        break
                        
            except Exception as e:
                errors.append({'name': 'TimeoutError', 'value': str(e), 'traceback': ''})
                break
        
        return {
            'success': len(errors) == 0,
            'output': ''.join(outputs).strip(),
            'errors': errors,
            'code': code
        }

# 使用示例
executor = IPythonExecutor()

# 执行各种bash命令
commands = [
    "ls -la",
    "pwd", 
    "echo 'Hello from bash'",
    "python --version",
    "which python"
]

for cmd in commands:
    result = executor.execute_bash(cmd)
    print(f"命令: {cmd}")
    print(f"成功: {result['success']}")
    print(f"输出: {result['output']}")
    if result['errors']:
        print(f"错误: {result['errors']}")
    print("-" * 50)

# 异步方式

In [None]:
import asyncio
from jupyter_client.manager import AsyncKernelManager

async def async_execute_example():
    # 创建异步内核管理器
    km = AsyncKernelManager(kernel_name='python3')
    
    # 启动内核
    await km.start_kernel()
    print("异步内核已启动")
    
    # 获取客户端
    kc = km.client()
    
    try:
        # 执行代码
        msg_id = kc.execute('import time\nfor i in range(3):\n    print(f"计数: {i}")\n    time.sleep(1)')
        
        # 异步获取输出
        while True:
            try:
                msg = await kc.get_iopub_msg(timeout=10)
                if msg['parent_header'].get('msg_id') == msg_id:
                    if msg['msg_type'] == 'stream':
                        print(f"异步输出: {msg['content']['text'].strip()}")
                    elif msg['msg_type'] == 'execute_reply':
                        print("异步执行完成")
                        break
            except Exception as e:
                print(f"异步获取消息异常: {e}")
                break
    
    finally:
        await km.shutdown_kernel()
        print("异步内核已关闭")

# 运行异步示例
# asyncio.run(async_execute_example())


## 并发执行多个任务

In [None]:
async def execute_concurrent():
    """并发执行多个代码片段"""
    km = AsyncKernelManager(kernel_name='python3')
    await km.start_kernel()
    kc = km.client()
    
    # 定义多个任务
    tasks = [
        'import math\nprint(f"Pi = {math.pi}")',
        'import random\nprint(f"随机数: {random.randint(1, 100)}")',
        'print("当前时间:", __import__("datetime").datetime.now())',
    ]
    
    async def execute_task(code, task_id):
        """执行单个任务"""
        print(f"任务 {task_id} 开始执行")
        msg_id = kc.execute(code)
        
        while True:
            try:
                msg = await kc.get_iopub_msg(timeout=5)
                if msg['parent_header'].get('msg_id') == msg_id:
                    if msg['msg_type'] == 'stream':
                        print(f"任务 {task_id} 输出: {msg['content']['text'].strip()}")
                    elif msg['msg_type'] == 'execute_reply':
                        print(f"任务 {task_id} 完成")
                        break
            except:
                break
    
    try:
        # 并发执行所有任务
        await asyncio.gather(*[
            execute_task(code, i+1) 
            for i, code in enumerate(tasks)
        ])
    finally:
        await km.shutdown_kernel()

# asyncio.run(execute_concurrent())