In [None]:
from __future__ import print_function
from shutil import copyfile
import os, subprocess, pipes, re, json, glob
from collections import Counter
from tqdm import tqdm
from fireworks.fw_config import SORT_FWS
from fireworks.core.launchpad import LaunchPad
from fireworks.core.fworker import FWorker
from boltons.tbutils import ParsedException
REMOTE_GARDEN = '/global/projecta/projectdirs/matgen/garden/'
LOCAL_GARDEN = '/Users/patrick/Downloads/mp_prod_garden/' # ADJUST ME
LAUNCHDIRS = os.path.join(LOCAL_GARDEN, 'launchdirs.txt')
# don't forget to set up ssh tunnel

In [None]:
lpad = LaunchPad.from_file('my_launchpad.yaml')
fworker = FWorker.from_file('my_fworker.yaml')

In [None]:
def print_categories(cats):
    c = Counter(dict((k, len(v)) for k, v in cats.items()))
    top10 = c.most_common(10)
    total = 0
    for k,v in top10:
        print(v, '\t', cats[k], '\t', k)
        total += v
    print(total)

In [None]:
states = ["RUNNING", "WAITING", "FIZZLED", "READY", "COMPLETED", "RESERVED", "ARCHIVED", "DEFUSED", "PAUSED"]

### fizzled workflows and according list of fireworks

#### fw_ids for user-submitted workflows

In [None]:
user_query = {"spec.task_type": "Add to SNL database", "spec.snl.about.remarks": "MP user submission"}
# user_query = {"spec.task_type": "Add to SNL database", "spec.snl.about.remarks": "new ICSD batch"}
# user_query = {"spec.task_type": "Add to SNL database", "spec.snl.about.remarks": "Pauling file"}
fw_ids_user = lpad.fireworks.find(user_query, {'fw_id': 1, '_id': 0}).distinct('fw_id')
print(len(fw_ids_user), 'user-submitted workflows')

#### prioritized user-submitted "Add to SNL" tasks to get duplicate checking done

In [None]:
priority_user_query = {
    "spec.task_type": "Add to SNL database", "spec.snl.about.remarks": {
        "$in": ["MP user submission"],
        "$nin": ["new ICSD batch", "Pauling file", "Heusler ABC2 phases"]
    }
}
priority_user_fws = {}
for state in states:
    if state == 'COMPLETED' or state == 'ARCHIVED':
        continue
    state_query = {'state': state}
    state_query.update(priority_user_query)
    priority_user_fws[state] = lpad.fireworks.find(state_query)
    nr_fws = priority_user_fws[state].count()
    if nr_fws > 0:
        #for d in priority_user_fws[state]:
        #    print(d['fw_id'])
        print('{} {} user-submitted Add-to-SNL tasks'.format(nr_fws, state))
print('DONE')

#### percentage of workflows in each state

In [None]:
# 118151 = {Ti,Zr,Hf}-Zn-N piezoelectricity study -> ALL COMPLETED 2017-01-24
# 114781 = Kitchaev Workflows
# 115780 = Heusler ABC2 phases
submission_group_id = 114781
query = {'nodes': {'$in': fw_ids_user}}
if user_query["spec.snl.about.remarks"] == "MP user submission":
    print('FYI: only looking at workflows with submission_group_id', submission_group_id)
    query.update({'metadata.submission_group_id': submission_group_id})
wflows = {}
total_wflows = float(lpad.workflows.find(query).count())
wflows_projection = {'fw_states': 1, 'parent_links': 1, 'links': 1, 'nodes': 1, '_id': 0, 'state': 1}
for state in states:
    state_query = {'state': state}
    state_query.update(query)
    wflows[state] = list(lpad.workflows.find(state_query, wflows_projection))
    nr_wflows = len(wflows[state])
    if nr_wflows > 0:
        wflows_fraction =  nr_wflows / total_wflows
        print('{} {} workflows ({:.1f}%)'.format(nr_wflows, state, wflows_fraction*100.))
print(int(total_wflows), 'workflows in total')

#### list of first fizzled fw_id in each workflow

In [None]:
def find_root_node(wflow):
    # wflow['nodes'][0] is not necessarily the root node!
    parent_links_keys = wflow['parent_links'].keys()
    for node in wflow['nodes']:
        if str(node) in parent_links_keys:
            continue
        return node

In [None]:
state = 'FIZZLED' # workflow state
rerun_fws = []
fw_ids_state = {}
for wflow in wflows[state]:
    root_fw_id = find_root_node(wflow)
    # decend links until fizzled firework found
    fw_id = root_fw_id
    check_states = [state] if state != 'RUNNING' else ['READY', 'RESERVED']
    while 1:
        current_state = wflow['fw_states'][str(fw_id)]
        if current_state == 'RUNNING':
            print(fw_id, 'is RUNNING -> probably need to do `lpad rerun_fws -i {}`'.format(fw_id))
            break
        if current_state in check_states:
            task_type = lpad.fireworks.find_one({'fw_id': fw_id}, {'spec.task_type': 1})['spec']['task_type']
            if task_type not in fw_ids_state:
                fw_ids_state[task_type] = [int(fw_id)]
            else:
                fw_ids_state[task_type].append(int(fw_id))
            alt_state = lpad.fireworks.find_one({'fw_id': fw_id}, {'state': 1, '_id': 0})['state']
            if alt_state == 'RESERVED':
                rerun_fws.append(str(fw_id))
            break
        # if multiple children use non-waiting fw
        children = wflow['links'][str(fw_id)]
        for child in children:
            if wflow['fw_states'][str(child)] != 'WAITING':
                fw_id = child
if rerun_fws:
    print('lpad rerun_fws -i', ' '.join(rerun_fws))
for k,v in fw_ids_state.items():
    #if 'GGA' not in k: continue
    print(k, v)
    for fw_id in v:
        launches = lpad.launches.find({'fw_id': fw_id}, {'launch_dir': 1})
        for launch in launches:
            if not 'oasis' in launch['launch_dir']:
                print ('\t', fw_id, launch['launch_dir'])


#### list of incomplete fireworks in RUNNING workflows for fworker query

In [None]:
fw_ids_incomplete = {}
for wflow in wflows['RUNNING']:
    for fw_id, fw_state in wflow['fw_states'].items():
        if fw_state != 'COMPLETED':
            if fw_state not in fw_ids_incomplete:
                fw_ids_incomplete[fw_state] = [int(fw_id)]
            else:
                fw_ids_incomplete[fw_state].append(int(fw_id))
print(fw_ids_incomplete)

In [None]:
nodes = []
for d in lpad.workflows.find({'nodes': {'$in':[1370872,1566138,1566120,1566104,1566099,1567504,1567491,1563287,1652717]}}, {'_id': 0, 'nodes': 1}):
    nodes += d['nodes']
print(nodes)

#### list of first fireworks for fizzled workflows

In [None]:
query = {'fw_id': {'$in': [fw_id for fw_id in fw_ids_state.values()]}} # FIXME
projection = {'fw_id': 1, 'launches': 1, '_id': 0}
fws = list(lpad.fireworks.find(query, projection))
assert(len(fws) == len(wflows[state]))

### launch directories

In [None]:
fws_info = {}
for fw in tqdm(fws):
    launch_id = fw['launches'][-1]
    launch = lpad.launches.find_one({'launch_id': launch_id}, {'launch_dir': 1, '_id': 0})
    launch_dir = launch['launch_dir']
    launch_dir_exists = False
    for fw_id, fw_info in fws_info.items():
        if launch_dir == fw_info['launch_dir']:
            launch_dir_exists = True
            break
    if launch_dir_exists:
        if 'duplicates' in fws_info[fw_id]:
            fws_info[fw_id]['duplicates'].append(fw['fw_id'])
        else:
            fws_info[fw_id]['duplicates'] = [fw['fw_id']]
        continue
    fws_info[fw['fw_id']] = {'launch_dir': launch_dir}

In [None]:
nr_duplicates = 0
for fw_id, fw_info in fws_info.iteritems():
    if 'duplicates' in fw_info:
        nr_duplicates += len(fw_info['duplicates'])
print(nr_duplicates, '/', len(fws), 'workflows have duplicate launch_dirs =>',
      len(fws)-nr_duplicates, 'unique launch_dirs')

### write text file with list of remote files for rsync

#### function to check existence of a list of files/directories; also generate list of existing output files in existing launch dirs

In [None]:
def launchdirs_exist(hostname):
    # hostname from ~/.ssh/config
    subprocess.call(['scp', LAUNCHDIRS, hostname+':~/'])
    for f in glob.glob(os.path.join(LOCAL_GARDEN, '*.txt')):
        os.remove(f)
    subprocess.call([
        'ssh', '-q', hostname,
        'for i in `cat ~/launchdirs.txt`; do '
        'if [ -d "$i" ]; then '
        'echo $i >> ~/launchdirs_exist.txt; '
        'compgen -G "$i/*.out" >> ~/launchdirs_exist_outfiles.txt; '
        'compgen -G "$i/*.error" >> ~/launchdirs_exist_outfiles.txt; '
        'else echo $i >> ~/launchdirs_not_exist.txt; fi; '
        'done'
    ])
    subprocess.call(['scp', hostname+':~/launchdirs_*.txt', LOCAL_GARDEN])
    subprocess.call(['ssh', '-q', hostname, 'rm ~/launchdirs*.txt'])
    total_counts = 0
    for fstr in glob.glob(os.path.join(LOCAL_GARDEN, 'launchdirs_*exist.txt')):
        with open(fstr, 'r') as f:
            counts = Counter(l.strip() for l in f)
            total_counts += len(counts)
            print(len(counts), '\t', os.path.basename(fstr))
    print('=', total_counts)

#### start by using launchdirs on NERSC

In [None]:
with open(LAUNCHDIRS, 'w') as f:
    for fw_id, fw_info in fws_info.iteritems():
        print(fw_info['launch_dir'].strip(), file=f)
launchdirs_exist('mendel-matcomp')

#### try NERSC & XSEDE gardens as alternative path for non-existing launchdirs

for XSEDE: rsync to Mendel from

- /oasis/projects/nsf/csd436/phuck/garden
- /oasis/scratch/comet/phuck/temp_project

`rsync -avz block_* mendel:/global/projecta/projectdirs/matgen/garden/`  
[could also do `try_garden` if direct ssh access to comet is enabled]

In [None]:
def get_dest_blocks(s):
    a = s.strip().split('/block_')
    if len(a) == 2:
        return [a[0], 'block_'+a[1]]
    a = s.strip().split('/launcher_')
    return [a[0], 'launcher_'+a[1]]

In [None]:
def try_garden(garden, hostname):
    # hostname from ~/.ssh/config
    launchdirs_exist_file = os.path.join(LOCAL_GARDEN, 'launchdirs_exist.txt')
    if os.path.exists(launchdirs_exist_file):
        copyfile(launchdirs_exist_file, LAUNCHDIRS)
    with open(LAUNCHDIRS, 'a') as f1:
        with open(os.path.join(LOCAL_GARDEN, 'launchdirs_not_exist.txt'), 'r') as f2:
            for line in f2.readlines():
                dest, block = get_dest_blocks(line)
                remote_dir = os.path.join(garden, block)
                print(remote_dir, file=f1)
    launchdirs_exist(hostname)
    #!head -1 {LOCAL_GARDEN}launchdirs_not_exist.txt

In [None]:
try_garden(REMOTE_GARDEN, 'mendel-matcomp')

#### rsync log output to local garden

In [None]:
dest_blocks = {}
with open(os.path.join(LOCAL_GARDEN, 'launchdirs_exist_outfiles.txt'), 'r') as f:
    for line in f.readlines():
        dest, block_file = get_dest_blocks(line)
        if dest in dest_blocks:
            dest_blocks[dest].append(block_file)
        else:
            dest_blocks[dest] = [block_file]
for k, v in dest_blocks.iteritems():
    print('\t', len(v), '\t', k)

In [None]:
for dest, block_files in dest_blocks.items():
    tmpfile = os.path.join(LOCAL_GARDEN, 'tmp.txt')
    with open(tmpfile, 'w') as f:
        for block_file in block_files:
            print(block_file, file=f)
    subprocess.call([
        'rsync', '-av', '--files-from='+LOCAL_GARDEN+'/tmp.txt', 'mendel-matcomp:'+dest+'/', LOCAL_GARDEN
    ])
    os.remove(tmpfile)
    print('done syncing', dest)

### analyze log output of fizzled workflows

#### save actual remote and local output directories

In [None]:
with open(os.path.join(LOCAL_GARDEN, 'launchdirs_exist.txt'), 'r') as f:
    for line in tqdm(f.readlines()):
        remote_dir = line.strip()
        dest, block = get_dest_blocks(line)
        block_found = False
        for fw_id, fw_info in fws_info.iteritems():
            if get_dest_blocks(fw_info['launch_dir'])[1] == block:
                block_found = True
                fw_info['remote_dir'] = remote_dir
                fw_info['local_dir'] = os.path.join(LOCAL_GARDEN, block)
                break
        if not block_found:    
            raise ValueError(block, 'not found')
                
# with open(os.path.join(LOCAL_GARDEN, 'launchdirs_not_exist.txt'), 'r') as f:
#     for line in tqdm(f.readlines()):
#         dest, block = get_dest_blocks(line)
#         for fw_id, fw_info in fws_info.iteritems():
#             if get_dest_blocks(fw_info['launch_dir'])[1] == block:
#                 fw_info['remote_dir'] = None
#                 break

In [None]:
launchdirs_not_exist_count = 0
for fw_id, fw_info in fws_info.items():
    launchdirs_not_exist_count += int('remote_dir' not in fw_info)
print('check:', launchdirs_not_exist_count, 'launch_dirs not found')

#### scan for error messages

In [None]:
def get_file_path(extension, dirlist):
    for fstr in dirlist:
        fn, ext = os.path.splitext(os.path.basename(fstr))
        if fn+ext == 'vasp.out':
            continue
        if ext == extension:
            return fstr
    return None

In [None]:
def scan_errors_warnings(f):
    for line in f.readlines():
        line_lower = line.strip().lower()
        if 'error:' in line_lower or 'warning:' in line_lower:
            return line.strip()

In [None]:
for fw_id, fw_info in tqdm(fws_info.items()):
    fw_info['errors'] = []
    
    if 'remote_dir' not in fw_info:
        fw_info['errors'].append('remote_dir not found')
        continue
    local_dir = fw_info['local_dir']
    if not os.path.exists(local_dir):
        fw_info['errors'].append('local_dir not found')
        continue
    ls = glob.glob(os.path.join(local_dir, '*'))
    if not ls:
        fw_info['errors'].append('no files found in local_dir')
        continue

    error_file = get_file_path('.error', ls)
    if error_file is not None:
        # look for a traceback in *.error
        with open(error_file, 'r') as f:
            fcontent = f.read()
            match = re.search('Traceback((.+\n)+)Traceback', fcontent)
            if not match:
                match = re.search('Traceback((.+\n)+)INFO', fcontent)
                if not match:
                    match = re.search('Traceback((.+\n)+)$', fcontent)
            if match:
                fw_info['errors'].append('Traceback'+match.group(1))
            else:
                scan = scan_errors_warnings(f)
                if scan:
                    fw_info['errors'].append(scan)

    # look into .out file
    out_file = get_file_path('.out', ls)
    with open(out_file, 'r') as f:
        scan = scan_errors_warnings(f)
        if scan:
            fw_info['errors'].append(scan)

    # look into vasp.out
    vasp_out = os.path.join(local_dir, 'vasp.out')
    if os.path.exists(vasp_out):
        with open(vasp_out, 'r') as f:
            vasp_out_tail = f.readlines()[-1].strip()
            fw_info['errors'].append(' -- '.join(['vasp.out', vasp_out_tail]))

# FIXME .out and .error for non-reservation mode on directory up

#### categorize errors

In [None]:
def add_fw_to_category(fw_id, key, cats):
    if key in cats:
        cats[key].append(fw_id)
    else:
        cats[key] = [fw_id]

In [None]:
categories = {}
for fw_id, fw_info in fws_info.iteritems():
    if not fw_info['errors']:
        add_fw_to_category(fw_id, 'no errors parsed', categories)
        continue
    for error in fw_info['errors']:
        if 'remote_dir' in error or 'local_dir' in error:
            add_fw_to_category(fw_id, error, categories)
        elif error.startswith('Traceback'):       
            exc = ParsedException.from_string(error)
            msg = exc.exc_msg[:50]
            match = re.search('errors reached: (.*)', msg)
            if match:
                msg = match.group(1)
            key = ' -- '.join([exc.exc_type, msg])
            lineno = exc.frames[-1]['lineno']
            key = ' -- '.join([key, os.path.basename(exc.source_file) + '#' + lineno])
            add_fw_to_category(fw_id, key, categories)
        else:
            match = re.search('{(.*)}', error) # matches dictionary
            if match:
                dstr = '{' + match.group(1) + '}'
                dstr = dstr.replace("u'", '"').replace("'", '"')
                dstr = re.sub('{"handler": (.*), "errors"', '{"handler": "\g<1>", "errors"', dstr)
                try:
                    d = json.loads(dstr)
                except:
                    add_fw_to_category(fw_id, 'looks like dict but could not decode', categories)
                else:
                    if 'handler' in d and 'errors' in d:
                        if '<' in d['handler']:
                            match = re.search('custodian\.vasp\.handlers\.(.*) object', d['handler'])
                            if match:
                                d['handler'] = match.group(1)
                            else:
                                raise ValueError('custodian.vasp.handlers not matched')
                        add_fw_to_category(fw_id, d['handler'], categories)
                    elif 'action' in d:
                        add_fw_to_category(fw_id, 'action', categories)
                    else:
                        add_fw_to_category(fw_id, 'found dict but not handler or action error', categories)
            else:
                add_fw_to_category(fw_id, error, categories)
        break # only look at first error
print_categories(categories)

### debugging

In [None]:
fws_info[1564191]['remote_dir']

In [None]:
lpad.fireworks.find_one({'fw_id': 1564191}, {'spec._priority': 1, 'state': 1})

In [None]:
lpad.fireworks.find_one({'fw_id': 1285769}, {'spec._priority': 1, 'state': 1})

In [None]:
lpad.fireworks.find_one({'fw_id': 1399045}, {'spec._priority': 1, 'state': 1})

In [None]:
f = open('mpcomplete_kitchaev.json', 'r')

In [None]:
import json
d = json.load(f)

In [None]:
def find_last_node(wflow):
    for node in wflow['links'].keys():
        if not wflow['links'][node]:
            return node
    raise ValueError('last node not found!')

In [None]:
from pymongo import MongoClient
import yaml

In [None]:
materials_prod_config_path = os.path.join(os.environ['DB_LOC'], 'materials_db_prod.yaml')
materials_prod_config_file = open(materials_prod_config_path, 'r')
config = yaml.load(materials_prod_config_file)

In [None]:
conn = MongoClient(config['host'], config['port'], j=False)
db_jp = conn[config['db']]
db_jp.authenticate(config['username'], config['password'])
db_jp.materials.count()

In [None]:
for cif, info in d.items():
    submission_id = info['submission_id']
    wflow = lpad.workflows.find_one({'metadata.submission_id': submission_id}, wflows_projection)
    if wflow['state'] != 'COMPLETED':
        continue
    fw_id = find_root_node(wflow)
    task_ids = [None]
    while 1:
        launch_id = lpad.fireworks.find_one({'fw_id': fw_id}, {'launches': 1, '_id': 0})['launches'][-1]
        launch = lpad.launches.find_one(
            {'launch_id': launch_id, 'action.stored_data.task_id': {'$exists': 1}},
            {'action.stored_data.task_id': 1, '_id': 0}
        )
        if launch:
            task_ids.append(launch['action']['stored_data']['task_id'])
        children = wflow['links'][str(fw_id)]
        if not children:
            break
        fw_id = children[-1]
    mat = db_jp.materials.find_one({'task_ids': {'$in': task_ids}}, {'task_id': 1, 'task_ids': 1, '_id': 0})
    info['fw_id'] = fw_id
    info['mp_id'] = mat['task_id']
    print(d[cif])
    #break
print('DONE')

In [None]:
fout = open('mpcomplete_kitchaev_mpids.json', 'w')
json.dump(d, fout)