多数のDaskワーカとコネクションを開く必要があるため，ファイルディスクリプタ数の上限を上げておく．

In [None]:
!ulimit -n 4096

Daskクラスタを起動する．設定の詳しい内容については https://keichi.dev/post/naist-cluster-dask/ を参照のこと．
新しいクラスタでは，設定を適宜修正する必要があると思われる．

In [None]:
from dask.distributed import Client, progress
from dask_jobqueue import SGECluster

cluster = SGECluster(cores=24,
                     processes=24,
                     memory="250GB",
                     queue="grid_short.q",
                     interface="ib0",
                     scheduler_options={"interface": "bond1"},
                     local_directory="/var/tmp",
                     job_extra=["-pe smp 24"],
                     walltime="04:00:00")

cluster.scale(jobs=20)

client = Client(cluster)

2011年から2016年までのGH Archiveのイベントデータを読み込み，パースした後，Dask Bagとして保持する．
パースできないJSONがごく一部混じっているたため，パースが失敗した際はスキップする．

In [None]:
import dask.bag as db
import json

lines = db.concat([db.read_text(f"/work/ichikawa/gharchive/{year}/*.json.gz") for year in range(2011, 2016)])

def parse_json(s):
    try:
        return json.loads(s)
    except:
        return {"type": "ParseError"}

events = lines.map(parse_json)

1. SEART (https://seart-ghs.si.usi.ch/) からダンプしたPythonリポジトリの一覧をDataFrameとして開き，各種条件によってリポジトリを絞り込む．
2. 先に読み込んだGH Archiveのイベントの中から対象リポジトリに関するものだけを抽出する．
3. 抽出したイベントを再度JSON形式にシリアライズし，ファイルとして保存する．

In [None]:
import dask.dataframe as dd

repos = dd.read_csv("python_repos.csv", assume_missing=True)
query = "not isFork and commits >= 100 and contributors >= 2 and releases >= 1 and lastCommit >= '2021-05-01'"
target_repos = repos.query(query).sort_values(by=["stargazers"], ascending=False).compute()

def is_target_repo(event):
    if "repo" not in event:
        return False
    if "name" not in event["repo"]:
        return False
    
    return event["repo"]["name"] in target_repos

filtered_events = events.filter(is_target_repo)
filtered_events.map(json.dumps).to_textfiles("events.*.json.gz")

In [None]:
%time events.map(lambda e: e["type"]).frequencies(sort=True).compute()

issueに関するイベントとpull requestに関するイベントを抽出する．

In [None]:
issues_events = events.filter(lambda r: r["type"] == "IssuesEvent")
pull_request_events = events.filter(lambda r: r["type"] == "PullRequestEvent")

各イベントから重要な属性を取り出し，DataFrameに変換した後，Apache Parquet形式で保存する．

In [None]:
issues_events.map(lambda e: {
    "action": e["payload"]["action"],
    "actor": e["actor"]["login"],
    "repo": e["repo"]["name"],
    "number": e["payload"]["issue"]["number"],
    "title": e["payload"]["issue"]["title"],
    "body": e["payload"]["issue"]["body"],
    "created_at": e["payload"]["issue"]["created_at"],
    "updated_at": e["payload"]["issue"]["updated_at"],
    "closed_at": e["payload"]["issue"]["closed_at"],
}).to_dataframe().to_parquet("./issues", overwrite=True)

In [None]:
pull_request_events.map(lambda e: {
    "action": e["payload"]["action"],
    "actor": e["actor"]["login"],
    "repo": e["repo"]["name"],
    "number": e["payload"]["pull_request"]["number"],
    "title": e["payload"]["pull_request"]["title"],
    "body": e["payload"]["pull_request"]["body"],
    "created_at": e["payload"]["pull_request"]["created_at"],
    "updated_at": e["payload"]["pull_request"]["updated_at"],
    "closed_at": e["payload"]["pull_request"]["closed_at"],
    "merged_at": e["payload"]["pull_request"]["merged_at"],
}).to_dataframe().to_parquet("./pull_requests", overwrite=True)

Parquetファイルを読み込んでみる．

In [None]:
import dask.dataframe as dd

df = dd.read_parquet("pull_requests").persist()

df.head(10)

In [None]:
df["repo"].value_counts().head(20)