In [1]:
import json
from pathlib import Path
import dateutil.parser
from collections import defaultdict
import pwd
# !PYTHONUSERBASE="$(pwd)/.ipython" pip install --user git+https://github.com/tqdm/tqdm.git@master#egg=tqdm
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor
import pandas as pd

def load_entry(path):
    user_id = Path(path).stat().st_uid
    try:
        raw = json.load(open(path))
    except:
        return None
    result = {}
    result["timestamp"] = dateutil.parser.parse(raw["timestamp"])
    result["environment"] = Path(raw["sys.executable"]).parent.parent.as_posix()
    result["hostname"] = raw["hostname"]
    result["modules"] = extract_modules(raw["modules"], raw.get("versions", {}))
    result["user_id"] = user_id
    return result

def extract_modules(module_paths, module_versions):
    seen_modules = set()
    
    for name, fs_path in module_paths.items():
        submodules = name.split('.')
        if name != submodules[0]:
            continue
        version = module_versions.get(name)
        key = (submodules[0], str(version))
        if key in seen_modules:
            continue
        if fs_path and "site-packages" not in fs_path:
            continue
        if name == "sitecustomize":
            continue
        seen_modules.add(key)
    return seen_modules

In [2]:
def timestamp_collision(ts, ts_list):
    return any(abs((ts - t0).total_seconds()) < 10.0 for t0 in ts_list)

def list_to_dataframe(logs_list):
    data = []
    user_times = defaultdict(list)
    for log in logs_list:
        if log:
            user_id, ts = log["user_id"], log["timestamp"]
            if timestamp_collision(ts, user_times[user_id]):
                continue
            user_times[user_id].append(ts)
            for (module, version) in log.pop("modules"):
                data.append({"module": module, "version": version, **log})
    return pd.DataFrame(data)

def crawl_logs(top_dir, *patterns):
    top_dir = Path(top_dir)
    files_list, results = [], []
    for pattern in patterns:
        files_list.extend(top_dir.glob(pattern))
    n_files = len(files_list)
    print(f"Processing {n_files} files...")
    with ProcessPoolExecutor(max_workers=8) as pool:
        for data in tqdm(pool.map(load_entry, files_list, chunksize=100)): 
            results.append(data)
    return list_to_dataframe(results)

In [3]:
top = '/lus/theta-fs0/logs/pythonlogging/module_usage'
df = crawl_logs(top, "2020/12/04/*")

Processing 17664 files...


17664it [00:02, 6023.57it/s]


In [4]:
df

Unnamed: 0,module,version,timestamp,environment,hostname,user_id
0,pvectorc,,2020-12-04 03:00:33.551531,/soft/datascience/conda/miniconda3/latest,nid02568,33720
1,mpl_toolkits,,2020-12-04 03:00:33.551531,/soft/datascience/conda/miniconda3/latest,nid02568,33720
2,google,,2020-12-04 03:00:33.551531,/soft/datascience/conda/miniconda3/latest,nid02568,33720
3,zipp,,2020-12-04 03:00:33.551531,/soft/datascience/conda/miniconda3/latest,nid02568,33720
4,pyrsistent,,2020-12-04 03:00:33.551531,/soft/datascience/conda/miniconda3/latest,nid02568,33720
...,...,...,...,...,...,...
6642,widget_tweaks,,2020-12-04 20:41:53.343524,/lus/theta-fs0/software/thetagpu/balsam/env2,thetagpusn1,33180
6643,django,,2020-12-04 20:41:53.343524,/lus/theta-fs0/software/thetagpu/balsam/env2,thetagpusn1,33180
6644,jinja2,,2020-12-04 20:41:53.343524,/lus/theta-fs0/software/thetagpu/balsam/env2,thetagpusn1,33180
6645,sqlparse,,2020-12-04 20:41:53.343524,/lus/theta-fs0/software/thetagpu/balsam/env2,thetagpusn1,33180
