# 並行処理と並列処理の基本

このノートブックでは、並行処理と並列処理の基本的な操作とベストプラクティスを学びます。


In [None]:
# threadingでI/Oバウンドな処理を並行化する

import threading
import time
import requests
from concurrent.futures import ThreadPoolExecutor

# 悪い例（逐次処理）
def fetch_urls_sequential(urls):
    """URLを逐次取得"""
    results = []
    for url in urls:
        try:
            response = requests.get(url, timeout=5)
            results.append(f"{url}: {response.status_code}")
        except Exception as e:
            results.append(f"{url}: Error - {e}")
    return results

# 良い例（並行処理）
def fetch_single_url(url):
    """単一URLを取得"""
    try:
        response = requests.get(url, timeout=5)
        return f"{url}: {response.status_code}"
    except Exception as e:
        return f"{url}: Error - {e}"

def fetch_urls_parallel(urls):
    """URLを並行取得"""
    with ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(fetch_single_url, urls))
    return results

# 使用例
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/3"
]

print("=== URL取得の比較 ===")

# 逐次処理
start_time = time.time()
sequential_results = fetch_urls_sequential(urls)
sequential_time = time.time() - start_time
print(f"逐次処理時間: {sequential_time:.2f}秒")
print(f"結果: {sequential_results}")

# 並行処理
start_time = time.time()
parallel_results = fetch_urls_parallel(urls)
parallel_time = time.time() - start_time
print(f"\n並行処理時間: {parallel_time:.2f}秒")
print(f"結果: {parallel_results}")
print(f"時間短縮: {sequential_time - parallel_time:.2f}秒")

# ファイル処理の並行化
import os
from pathlib import Path

class FileProcessor:
    """ファイル処理クラス"""
    
    def __init__(self, max_workers=4):
        self.max_workers = max_workers
        self.results = []
        self.lock = threading.Lock()
    
    def process_file(self, file_path):
        """単一ファイルを処理"""
        try:
            # ファイル処理をシミュレート
            with open(file_path, 'r') as f:
                content = f.read()
            
            # 処理時間をシミュレート
            time.sleep(0.1)
            
            result = {
                'file': str(file_path),
                'size': len(content),
                'lines': len(content.split('\n')),
                'status': 'success'
            }
            
            with self.lock:
                self.results.append(result)
            
            return result
        except Exception as e:
            error_result = {
                'file': str(file_path),
                'error': str(e),
                'status': 'error'
            }
            with self.lock:
                self.results.append(error_result)
            return error_result
    
    def process_files_sequential(self, file_paths):
        """ファイルを逐次処理"""
        self.results = []
        for file_path in file_paths:
            self.process_file(file_path)
        return self.results
    
    def process_files_parallel(self, file_paths):
        """ファイルを並行処理"""
        self.results = []
        threads = []
        
        for file_path in file_paths:
            thread = threading.Thread(target=self.process_file, args=(file_path,))
            threads.append(thread)
            thread.start()
        
        for thread in threads:
            thread.join()
        
        return self.results

# 使用例
print("\n=== ファイル処理の並行化 ===")

# テスト用ファイルを作成
test_files = []
for i in range(5):
    file_path = f"test_file_{i}.txt"
    with open(file_path, 'w') as f:
        f.write(f"Test content for file {i}\n" * 10)
    test_files.append(file_path)

processor = FileProcessor()

# 逐次処理
start_time = time.time()
sequential_results = processor.process_files_sequential(test_files)
sequential_time = time.time() - start_time
print(f"逐次処理時間: {sequential_time:.2f}秒")
print(f"処理されたファイル数: {len(sequential_results)}")

# 並行処理
start_time = time.time()
parallel_results = processor.process_files_parallel(test_files)
parallel_time = time.time() - start_time
print(f"\n並行処理時間: {parallel_time:.2f}秒")
print(f"処理されたファイル数: {len(parallel_results)}")
print(f"時間短縮: {sequential_time - parallel_time:.2f}秒")

# クリーンアップ
for file_path in test_files:
    os.remove(file_path)


In [None]:
# multiprocessingでCPUバウンドな処理を並列化する

import multiprocessing
import time
import math
import random

# CPUバウンドな処理
def calculate_fibonacci(n):
    """フィボナッチ数を計算"""
    if n <= 1:
        return n
    return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)

def calculate_prime_numbers(max_num):
    """素数を計算"""
    primes = []
    for num in range(2, max_num):
        is_prime = True
        for i in range(2, int(math.sqrt(num)) + 1):
            if num % i == 0:
                is_prime = False
                break
        if is_prime:
            primes.append(num)
    return primes

# 逐次処理
def process_sequential(numbers):
    """逐次処理"""
    results = []
    for num in numbers:
        result = calculate_fibonacci(num)
        results.append(result)
    return results

# 並列処理
def process_parallel(numbers):
    """並列処理"""
    with multiprocessing.Pool() as pool:
        results = pool.map(calculate_fibonacci, numbers)
    return results

# 使用例
print("=== CPUバウンドな処理の並列化 ===")

# テストデータ
numbers = [30, 31, 32, 33, 34]

# 逐次処理
start_time = time.time()
sequential_results = process_sequential(numbers)
sequential_time = time.time() - start_time
print(f"逐次処理時間: {sequential_time:.2f}秒")
print(f"結果: {sequential_results}")

# 並列処理
start_time = time.time()
parallel_results = process_parallel(numbers)
parallel_time = time.time() - start_time
print(f"\n並列処理時間: {parallel_time:.2f}秒")
print(f"結果: {parallel_results}")
print(f"時間短縮: {sequential_time - parallel_time:.2f}秒")
print(f"スピードアップ: {sequential_time / parallel_time:.2f}x")

# 画像処理の並列化
class ImageProcessor:
    """画像処理クラス"""
    
    def __init__(self):
        self.processed_count = 0
        self.lock = multiprocessing.Lock()
    
    def process_single_image(self, image_data):
        """単一画像を処理"""
        image_id, width, height = image_data
        
        # 画像処理をシミュレート
        time.sleep(0.1)  # 処理時間をシミュレート
        
        # 簡単な画像処理をシミュレート
        processed_pixels = width * height
        brightness = random.uniform(0.5, 1.5)
        contrast = random.uniform(0.8, 1.2)
        
        result = {
            'image_id': image_id,
            'processed_pixels': processed_pixels,
            'brightness': brightness,
            'contrast': contrast,
            'status': 'completed'
        }
        
        return result
    
    def process_images_sequential(self, image_data_list):
        """画像を逐次処理"""
        results = []
        for image_data in image_data_list:
            result = self.process_single_image(image_data)
            results.append(result)
        return results
    
    def process_images_parallel(self, image_data_list):
        """画像を並列処理"""
        with multiprocessing.Pool() as pool:
            results = pool.map(self.process_single_image, image_data_list)
        return results

# 使用例
print("\n=== 画像処理の並列化 ===")

# テストデータ
image_data_list = [
    (i, 1920, 1080) for i in range(10)
]

processor = ImageProcessor()

# 逐次処理
start_time = time.time()
sequential_results = processor.process_images_sequential(image_data_list)
sequential_time = time.time() - start_time
print(f"逐次処理時間: {sequential_time:.2f}秒")
print(f"処理された画像数: {len(sequential_results)}")

# 並列処理
start_time = time.time()
parallel_results = processor.process_images_parallel(image_data_list)
parallel_time = time.time() - start_time
print(f"\n並列処理時間: {parallel_time:.2f}秒")
print(f"処理された画像数: {len(parallel_results)}")
print(f"時間短縮: {sequential_time - parallel_time:.2f}秒")
print(f"スピードアップ: {sequential_time / parallel_time:.2f}x")

# データ分析の並列化
import statistics

def analyze_data_chunk(data_chunk):
    """データチャンクを分析"""
    chunk_id, data = data_chunk
    
    # データ分析をシミュレート
    time.sleep(0.05)  # 処理時間をシミュレート
    
    # 統計計算
    mean_val = statistics.mean(data)
    median_val = statistics.median(data)
    std_val = statistics.stdev(data) if len(data) > 1 else 0
    min_val = min(data)
    max_val = max(data)
    
    result = {
        'chunk_id': chunk_id,
        'count': len(data),
        'mean': mean_val,
        'median': median_val,
        'std': std_val,
        'min': min_val,
        'max': max_val
    }
    
    return result

def analyze_data_sequential(data_chunks):
    """データを逐次分析"""
    results = []
    for chunk in data_chunks:
        result = analyze_data_chunk(chunk)
        results.append(result)
    return results

def analyze_data_parallel(data_chunks):
    """データを並列分析"""
    with multiprocessing.Pool() as pool:
        results = pool.map(analyze_data_chunk, data_chunks)
    return results

# 使用例
print("\n=== データ分析の並列化 ===")

# テストデータ
data_chunks = [
    (i, [random.uniform(0, 100) for _ in range(1000)]) 
    for i in range(20)
]

# 逐次処理
start_time = time.time()
sequential_results = analyze_data_sequential(data_chunks)
sequential_time = time.time() - start_time
print(f"逐次処理時間: {sequential_time:.2f}秒")
print(f"分析されたチャンク数: {len(sequential_results)}")

# 並列処理
start_time = time.time()
parallel_results = analyze_data_parallel(data_chunks)
parallel_time = time.time() - start_time
print(f"\n並列処理時間: {parallel_time:.2f}秒")
print(f"分析されたチャンク数: {len(parallel_results)}")
print(f"時間短縮: {sequential_time - parallel_time:.2f}秒")
print(f"スピードアップ: {sequential_time / parallel_time:.2f}x")

# 結果の表示
print(f"\n最初のチャンクの結果:")
result = parallel_results[0]
print(f"  チャンクID: {result['chunk_id']}")
print(f"  データ数: {result['count']}")
print(f"  平均: {result['mean']:.2f}")
print(f"  中央値: {result['median']:.2f}")
print(f"  標準偏差: {result['std']:.2f}")


In [None]:
# asyncioで非同期処理を効率的に管理する

import asyncio
import aiohttp
import time

# 非同期関数の定義
async def fetch_single_url(session, url):
    """単一URLを非同期取得"""
    try:
        async with session.get(url, timeout=5) as response:
            return f"{url}: {response.status}"
    except Exception as e:
        return f"{url}: Error - {e}"

async def fetch_urls_async(urls):
    """URLを非同期取得"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_single_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

# 使用例
print("=== 非同期処理の例 ===")

urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/3"
]

# 非同期処理
start_time = time.time()
results = await fetch_urls_async(urls)
async_time = time.time() - start_time
print(f"非同期処理時間: {async_time:.2f}秒")
print(f"結果: {results}")

# ファイル処理の非同期化
import aiofiles

async def process_file_async(file_path):
    """単一ファイルを非同期処理"""
    try:
        async with aiofiles.open(file_path, 'r') as f:
            content = await f.read()
        
        # 処理時間をシミュレート
        await asyncio.sleep(0.1)
        
        result = {
            'file': str(file_path),
            'size': len(content),
            'lines': len(content.split('\n')),
            'status': 'success'
        }
        
        return result
    except Exception as e:
        return {
            'file': str(file_path),
            'error': str(e),
            'status': 'error'
        }

async def process_files_async(file_paths):
    """ファイルを非同期処理"""
    tasks = [process_file_async(file_path) for file_path in file_paths]
    results = await asyncio.gather(*tasks)
    return results

# 使用例
print("\n=== ファイル処理の非同期化 ===")

# テスト用ファイルを作成
test_files = []
for i in range(5):
    file_path = f"test_file_{i}.txt"
    with open(file_path, 'w') as f:
        f.write(f"Test content for file {i}\n" * 10)
    test_files.append(file_path)

# 非同期処理
start_time = time.time()
results = await process_files_async(test_files)
async_time = time.time() - start_time
print(f"非同期処理時間: {async_time:.2f}秒")
print(f"処理されたファイル数: {len(results)}")

# クリーンアップ
for file_path in test_files:
    os.remove(file_path)

# データベース操作の非同期化
import sqlite3
import aiosqlite

async def insert_user_async(db_path, name, email):
    """ユーザーを非同期挿入"""
    async with aiosqlite.connect(db_path) as db:
        await db.execute(
            "INSERT INTO users (name, email) VALUES (?, ?)",
            (name, email)
        )
        await db.commit()

async def insert_users_async(db_path, user_data):
    """ユーザーを非同期挿入"""
    tasks = [insert_user_async(db_path, name, email) for name, email in user_data]
    await asyncio.gather(*tasks)

# 使用例
print("\n=== データベース操作の非同期化 ===")

# データベースを初期化
async def init_database():
    async with aiosqlite.connect("test_async.db") as db:
        await db.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT,
                email TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        await db.commit()

# テストデータ
user_data = [
    (f"User{i}", f"user{i}@example.com") for i in range(20)
]

# データベースを初期化
await init_database()

# 非同期処理
start_time = time.time()
await insert_users_async("test_async.db", user_data)
async_time = time.time() - start_time
print(f"非同期処理時間: {async_time:.2f}秒")

# ユーザー数を確認
async with aiosqlite.connect("test_async.db") as db:
    async with db.execute("SELECT COUNT(*) FROM users") as cursor:
        count = await cursor.fetchone()
        print(f"ユーザー数: {count[0]}")

# クリーンアップ
os.remove("test_async.db")
