In [10]:
!pip install requests tqdm pandas scapy matplotlib urllib3




In [None]:

"""
mawi_daily_pcap_stats.py (Jupyter Notebook / tshark Fast Version)
────────────────────────────────────────────────────────
【高速版】tshark を利用してMAWI DITLのpcapファイルを高速に処理するスクリプト。
15分単位のpcap.gzをダウンロードし、1時間ごとにプロトコル別、
ユニークIP/ポート数の統計情報を集計し、CSVに保存後、可視化します。

【主なフロー】
1) 指定日付範囲をループ。
2) 1日あたり96個の15分pcapをダウンロード・展開。
3) ★★★ tshark を使ってpcapから必要な情報を高速に抽出 ★★★
4) 処理後、pcapファイルとgzファイルはすぐに削除。
5) 1日分のデータをpandasで高速に集計し、<YYYY-MM-DD>.csvに追記。
6) 全CSVを連結して、4段構成のグラフを描画。

【重要】
このスクリプトを実行するには、tshark (Wireshark) が
お使いの環境にインストールされている必要があります。
"""

from __future__ import annotations
import gzip
import shutil
import socket
import subprocess
import io
from collections import defaultdict
from datetime import datetime, date, time, timedelta, timezone
from pathlib import Path
from typing import Dict, Set

import pandas as pd
import requests
from matplotlib import dates as mdates
import matplotlib.pyplot as plt
from requests.adapters import HTTPAdapter
from tqdm import tqdm
from urllib3.util.retry import Retry


JST = timezone(timedelta(hours=9), 'JST')
UTC = timezone.utc


def build_session(retries: int = 3, backoff: float = 1.0) -> requests.Session:
    retry_strategy = Retry(
        total=retries,
        backoff_factor=backoff,
        status_forcelist=(500, 502, 503, 504),
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    sess = requests.Session()
    sess.mount("https://", adapter)
    sess.mount("http://", adapter)
    return sess



def fetch_and_unzip_segment(target_dt_utc: datetime, dest_dir: Path) -> Path | None:
    year = target_dt_utc.year
    stamp = target_dt_utc.strftime("%Y%m%d%H%M")
    url = f"https://mawi.wide.ad.jp/mawi/ditl/ditl{year}/{stamp}.pcap.gz"
    
    day_dir = dest_dir / f"{target_dt_utc:%Y%m%d}"
    day_dir.mkdir(parents=True, exist_ok=True)
    
    gz_path = day_dir / f"{stamp}.pcap.gz"
    pcap_path = gz_path.with_suffix(".pcap")

    if pcap_path.exists():
        return pcap_path

    if gz_path.exists():
        tqdm.write(f"Unzipping existing {gz_path.name}...")
        try:
            with gzip.open(gz_path, "rb") as fin, open(pcap_path, "wb") as fout:
                shutil.copyfileobj(fin, fout)
            return pcap_path
        except Exception as e:
            tqdm.write(f"[ERROR] Gunzip failed for {gz_path.name}: {e}")
            return None

    sess = build_session()
    try:
        with sess.get(url, stream=True, timeout=300) as resp:
            if resp.status_code == 404:
                return None
            resp.raise_for_status()
            total = int(resp.headers.get("Content-Length", 0))
            with open(gz_path, "wb") as fh, tqdm(
                total=total, unit="B", unit_scale=True, desc=f"DL {gz_path.name}", leave=False
            ) as bar:
                for chunk in resp.iter_content(1 << 20):
                    fh.write(chunk)
                    bar.update(len(chunk))
    except requests.exceptions.RequestException as e:
        tqdm.write(f"[ERROR] Download failed for {url}: {e}")
        return None

    try:
        with gzip.open(gz_path, "rb") as fin, open(pcap_path, "wb") as fout:
            shutil.copyfileobj(fin, fout)
        return pcap_path
    except Exception as e:
        tqdm.write(f"[ERROR] Gunzip failed for {gz_path.name}: {e}")
        gz_path.unlink(missing_ok=True)
        return None


def get_stats_df_from_pcap_with_tshark(pcap_path: Path) -> pd.DataFrame | None:
    """tshark を使用してpcapファイルから統計情報を抽出し、pandas DataFrameとして返す"""
    tshark_fields = [
        "-e", "frame.time_epoch",      # タイムスタンプ (Epoch)
        "-e", "ip.src",                # 送信元IP
        "-e", "ip.dst",                # 宛先IP
        "-e", "_ws.col.Protocol",      # プロトコル名 (簡易)
        "-e", "ip.proto",              # プロトコル番号
        "-e", "tcp.srcport",           # TCP送信元ポート
        "-e", "tcp.dstport",           # TCP宛先ポート
        "-e", "udp.srcport",           # UDP送信元ポート
        "-e", "udp.dstport",           # UDP宛先ポート
        "-e", "frame.len",             # フレーム長
    ]
    
    command = [
        "tshark", "-r", str(pcap_path), "-T", "fields", "-E", "separator=,", "-E", "quote=d",
        *tshark_fields
    ]
    
    try:
        result = subprocess.run(command, capture_output=True, text=True, check=True)
    except FileNotFoundError:
        tqdm.write("[FATAL ERROR] `tshark` command not found. Please install Wireshark/tshark and ensure it's in your system's PATH.")
        return None
    except subprocess.CalledProcessError as e:
        tqdm.write(f"[ERROR] tshark failed to process {pcap_path.name}: {e.stderr}")
        return None

    col_names = ["time_epoch", "src_ip", "dst_ip", "protocol_name", "protocol_num",
                 "tcp_src_port", "tcp_dst_port", "udp_src_port", "udp_dst_port", "length"]
    
    df = pd.read_csv(io.StringIO(result.stdout), header=None, names=col_names, low_memory=False)
    return df


def process_date_range(start: date, end: date, csv_dir: Path, pcap_dir: Path, cleanup_pcap: bool):
    """指定日付範囲のデータを処理し、CSVに追記。pcapは逐次削除。"""
    csv_dir.mkdir(parents=True, exist_ok=True)
    now_utc = datetime.now(UTC)

    days = [start + timedelta(days=i) for i in range((end - start).days + 1)]
    for current_d in tqdm(days, desc="Daily processing"):
        day_prefix = f"[{current_d:%Y-%m-%d}]"
        
        daily_dfs = []

        total_segments = 24 * 4
        segment_iter = tqdm(range(total_segments), desc=f"Segments for {current_d:%Y-%m-%d}", leave=False)
        for i in segment_iter:
            h, m = divmod(i * 15, 60)
            target_dt_utc = datetime.combine(current_d, time(h, m), tzinfo=UTC)

            if target_dt_utc >= now_utc: continue

            pcap_path = fetch_and_unzip_segment(target_dt_utc, pcap_dir)
            if not pcap_path: continue
            
            segment_df = get_stats_df_from_pcap_with_tshark(pcap_path)
            if segment_df is not None and not segment_df.empty:
                daily_dfs.append(segment_df)

            if cleanup_pcap and pcap_path:
                gz_path = pcap_path.with_suffix(".pcap.gz")
                pcap_path.unlink(missing_ok=True)
                gz_path.unlink(missing_ok=True)
        
        if not daily_dfs:
            tqdm.write(f"{day_prefix} No data found for this day. Skipping CSV write.")
            continue
            
        full_day_df = pd.concat(daily_dfs, ignore_index=True)
        tqdm.write(f"{day_prefix} Aggregating {len(full_day_df)} total packets...")

        full_day_df['time_epoch'] = pd.to_numeric(full_day_df['time_epoch'], errors='coerce')
        full_day_df['length'] = pd.to_numeric(full_day_df['length'], errors='coerce')
        full_day_df.dropna(subset=['time_epoch', 'length'], inplace=True)
        
        full_day_df['timestamp_utc'] = pd.to_datetime(full_day_df['time_epoch'], unit='s', utc=True)
        full_day_df['hour_utc'] = full_day_df['timestamp_utc'].dt.floor('H')

        def classify_protocol(row):
            if row['protocol_num'] == 1: return 'icmp'
            if row['protocol_num'] == 6: return 'tcp'
            if row['protocol_num'] == 17: return 'udp'
            return 'other'
        full_day_df['protocol'] = full_day_df.apply(classify_protocol, axis=1)

        agg_funcs = {
            'length': 'sum',
            'time_epoch': 'count',
            'src_ip': pd.Series.nunique,
            'dst_ip': pd.Series.nunique,
        }
        
        hourly_agg = full_day_df.groupby('hour_utc').agg(agg_funcs)
        hourly_agg.rename(columns={'length': 'total_bytes', 'time_epoch': 'total_packets',
                                   'src_ip': 'unique_src_ips', 'dst_ip': 'unique_dst_ips'}, inplace=True)
        
        proto_agg = full_day_df.groupby(['hour_utc', 'protocol']).agg({'length': 'sum', 'time_epoch': 'count'}).unstack(fill_value=0)
        proto_agg.columns = [f'{p}_{m}' for m, p in proto_agg.columns]
        
        src_ports = full_day_df.groupby('hour_utc')['tcp_src_port'].nunique() + full_day_df.groupby('hour_utc')['udp_src_port'].nunique()
        dst_ports = full_day_df.groupby('hour_utc')['tcp_dst_port'].nunique() + full_day_df.groupby('hour_utc')['udp_dst_port'].nunique()
        port_agg = pd.DataFrame({'unique_src_ports': src_ports, 'unique_dst_ports': dst_ports})

        final_df = hourly_agg.join(proto_agg).join(port_agg).reset_index()
        final_df['timestamp_jst'] = final_df['hour_utc'].dt.tz_convert(JST).dt.strftime('%Y-%m-%d %H:%M:%S')

        csv_path = csv_dir / f"{current_d:%Y-%m-%d}.csv"
        final_cols = [
            "timestamp_jst", "total_bytes", "total_packets",
            "tcp_bytes", "tcp_packets", "udp_bytes", "udp_packets",
            "icmp_bytes", "icmp_packets", "other_bytes", "other_packets",
            "unique_src_ips", "unique_dst_ips", "unique_src_ports", "unique_dst_ports"
        ]
        final_df.rename(columns={'length_tcp': 'tcp_bytes', 'time_epoch_tcp': 'tcp_packets', 
                                 'length_udp': 'udp_bytes', 'time_epoch_udp': 'udp_packets',
                                 'length_icmp': 'icmp_bytes', 'time_epoch_icmp': 'icmp_packets',
                                 'length_other': 'other_bytes', 'time_epoch_other': 'other_packets'}, inplace=True)
                                 
        for col in final_cols:
            if col not in final_df.columns:
                final_df[col] = 0
                
        final_df[final_cols].to_csv(csv_path, index=False)
        tqdm.write(f"{day_prefix} Write complete.")


def plot_stats(csv_dir: Path, start_date: date, end_date: date):
    """CSVを読み込み、4段構成のグラフを描画・保存する"""
    all_files = sorted(csv_dir.glob("*.csv"))
    if not all_files:
        print("[INFO] No CSV files found; nothing to plot.")
        return

    try:
        df = pd.concat((pd.read_csv(f, parse_dates=["timestamp_jst"]) for f in all_files), ignore_index=True)
        df["timestamp"] = df["timestamp_jst"].dt.tz_localize(JST)
        df = df.dropna(subset=["timestamp"]).drop_duplicates("timestamp", keep="last").set_index("timestamp").sort_index()
    except Exception as e:
        print(f"[ERROR] Failed to load or process CSVs: {e}")
        return

    if df.empty:
        print("[INFO] No data available to plot.")
        return

    fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(16, 24), sharex=True)
    plt.style.use('seaborn-v0_8-whitegrid')
    protocols = ["tcp", "udp", "icmp", "other"]
    colors = {"tcp": "#1f77b4", "udp": "#2ca02c", "icmp": "#d62728", "other": "#9467bd"}
    
    for col_family in ["bytes", "packets"]:
        for p in protocols:
            col = f"{p}_{col_family}"
            if col not in df.columns:
                df[col] = 0

    byte_cols = [f"{p}_bytes" for p in protocols]
    df_bytes_gb = df[byte_cols].div(1e9)
    ax1.stackplot(df.index, df_bytes_gb.T, labels=[p.upper() for p in protocols], colors=[colors[p] for p in protocols])
    ax1.set_ylabel("Traffic Volume (GB)")
    ax1.legend(loc="upper left")
    ax1.set_title("Hourly Traffic Volume by Protocol")
    ax1.grid(True)

    packet_cols = [f"{p}_packets" for p in protocols]
    df_packets_mil = df[packet_cols].div(1e6)
    ax2.stackplot(df.index, df_packets_mil.T, labels=[p.upper() for p in protocols], colors=[colors[p] for p in protocols])
    ax2.set_ylabel("Packets (Millions)")
    ax2.legend(loc="upper left")
    ax2.set_title("Hourly Packet Count by Protocol")
    ax2.grid(True)

    ax3.plot(df.index, df["unique_src_ips"], label="Unique Source IPs", color="cyan")
    ax3.plot(df.index, df["unique_dst_ips"], label="Unique Destination IPs", color="magenta", linestyle='--')
    ax3.set_ylabel("Unique IP Count")
    ax3.legend(loc="upper left")
    ax3.set_title("Hourly Unique IP Address Count")
    ax3.grid(True)
    
    ax4.plot(df.index, df["unique_src_ports"], label="Unique Source Ports", color="orange")
    ax4.plot(df.index, df["unique_dst_ports"], label="Unique Destination Ports", color="purple", linestyle='--')
    ax4.set_ylabel("Unique Port Count")
    ax4.legend(loc="upper left")
    ax4.set_title("Hourly Unique Port Number Count")
    ax4.grid(True)

    ax4.set_xlabel("Timestamp (JST)")
    ax4.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d\n%H:%M", tz=JST))
    fig.autofmt_xdate(rotation=0, ha="center")

    title_start = df.index.min().date()
    title_end = df.index.max().date()
    fig.suptitle(f"MAWI Hourly Traffic Analysis ({title_start:%Y-%m-%d} to {title_end:%Y-%m-%d})", fontsize=16)
    fig.tight_layout(rect=[0, 0, 1, 0.96])
    
    plot_filename = csv_dir.parent / f"mawi_stats_tshark_{start_date:%Y%m%d}-{end_date:%Y%m%d}.png"
    plt.savefig(plot_filename)
    print(f"\nPlot saved to {plot_filename.resolve()}")
    plt.show()



START_DATE = date(2025, 4, 9)
END_DATE   = date(2025, 4, 9)

PCAP_DIR = Path("./data/MAWI_ditl/pcap")
CSV_DIR  = Path("./data/MAWI_ditl/csv")

CLEANUP_FILES = True


if START_DATE > END_DATE:
    print("[ERROR] Start date cannot be after end date.")
else:
    print("Configuration:")
    print(f"  - Date Range: {START_DATE} to {END_DATE}")
    print(f"  - PCAP Dir:   {PCAP_DIR.resolve()}")
    print(f"  - CSV Dir:    {CSV_DIR.resolve()}")
    print(f"  - Cleanup Files: {CLEANUP_FILES}\n")

    process_date_range(
        start=START_DATE, 
        end=END_DATE,
        csv_dir=CSV_DIR, 
        pcap_dir=PCAP_DIR,
        cleanup_pcap=CLEANUP_FILES
    )

    print("\nPlotting results...")
    plot_stats(CSV_DIR, START_DATE, END_DATE)



Configuration:
  - Date Range: 2025-04-09 to 2025-04-09
  - PCAP Dir:   /Users/saitosatoru/workspace/GDELT_EDA/data/MAWI_ditl/pcap
  - CSV Dir:    /Users/saitosatoru/workspace/GDELT_EDA/data/MAWI_ditl/csv
  - Cleanup Files: True



Daily processing:   0%|          | 0/1 [00:00<?, ?it/s]
Daily processing:   0%|          | 0/1 [00:00<?, ?it/s]        

Unzipping existing 202504090000.pcap.gz...


In [33]:
!pip install ipywidgets

Collecting ipywidgets
  Downloading ipywidgets-8.1.7-py3-none-any.whl.metadata (2.4 kB)
Collecting widgetsnbextension~=4.0.14 (from ipywidgets)
  Downloading widgetsnbextension-4.0.14-py3-none-any.whl.metadata (1.6 kB)
Collecting jupyterlab_widgets~=3.0.15 (from ipywidgets)
  Downloading jupyterlab_widgets-3.0.15-py3-none-any.whl.metadata (20 kB)
Downloading ipywidgets-8.1.7-py3-none-any.whl (139 kB)
Downloading jupyterlab_widgets-3.0.15-py3-none-any.whl (216 kB)
Downloading widgetsnbextension-4.0.14-py3-none-any.whl (2.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m30.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: widgetsnbextension, jupyterlab_widgets, ipywidgets
Successfully installed ipywidgets-8.1.7 jupyterlab_widgets-3.0.15 widgetsnbextension-4.0.14
