In [None]:
# Before starting, log in to SQL as the postgres user and run this in the sm database:
#  CREATE SCHEMA graphql;
#  CREATE EXTENSION "uuid-ossp";
#  ALTER SCHEMA graphql OWNER TO sm;

import json
import os.path
import pandas as pd
import numpy as np
from collections import namedtuple
from sm.engine.db import DB
from sm.engine.es_export import ESExporter, init_es_conn
from sm.engine.util import SMConfig
from sm.engine.dataset import Dataset
from elasticsearch.client import IndicesClient, IngestClient
SMConfig.set_path('../conf/config.json')
sm_config = SMConfig.get_conf()
db = DB(sm_config['db'])
# es = ESExporter(db)
# es_conn = init_es_conn(sm_config['elasticsearch'])
# es_index = sm_config['elasticsearch']['index']
# ingest = IngestClient(es_conn)

In [None]:
submitter_data_file = "r1_0_20180917_submitter_data.csv"
group_membership_file = "r1_0_20180917_group_membership.csv"

group_keys = ['institution', 'email', 'name', 'pi_email', 'pi_name']
dirty_group_keys = ['current_' + key for key in group_keys]
clean_group_keys = ['new_' + key for key in group_keys]

SubmitterInfo = namedtuple('SubmitterInfo', group_keys)

In [None]:
# Find all institutions, submitters and PIs (now referred to as "submitter data")
def get_src_datasets():
    def normalize_name(person):
        if person.get('Name'):
            return person['Name']
        return (person.get('First_Name', '').strip() + ' ' + person.get('Surname', '').strip()).strip()

    src_datasets = []
    for id, is_public, metadata in db.select("select id, is_public, metadata from dataset", None):
        submitted_by = metadata.get('Submitted_By')
        if submitted_by and submitted_by.get('Submitter'):
            submitter = submitted_by['Submitter']
            pi = submitted_by.get('Principal_Investigator', {})
            src_datasets.append({
                "id": id,
                "is_public": is_public,
                "institution": submitted_by.get('Institution', ''),
                "email": submitter.get('Email', '').lower(),
                "name": normalize_name(submitter),
                "pi_email": pi.get('Email', '').lower(),
                "pi_name": normalize_name(pi),
            })
    return src_datasets

src_datasets = get_src_datasets()

In [None]:
# Dump submitter data to file for manual cleaning
def dump_submitters_for_manual_cleaning(filename, src_datasets):
    groups = (pd.DataFrame(src_datasets)
        .sort_values(group_keys)
        .groupby(group_keys))

    rows = []
    for groupkey, group in groups:
        rows.append(dict(
            [('datasets', group.size)] +
            [('earliest', group['id'].min())] +
            [('latest', group['id'].max())] +
            list(zip(dirty_group_keys, groupkey)) +
            list(zip(clean_group_keys, groupkey))
        ))
        
    df = pd.DataFrame(rows)
    # Reorder columns & export
    df = [['datasets', 'earliest', 'latest', *dirty_group_keys, *clean_group_keys]].to_csv(filename, index=False)

if not os.path.isfile(submitter_data_file):
    dump_submitters_for_manual_cleaning(submitter_data_file, src_datasets)
# Now manually edit the CSV, leaving the "current_" columns as-is and cleaning the values in the "new_" columns

In [None]:
# Load the cleaned CSV, unpack it and validate that it is consistent
    
def read_cleaned_submitters(submitter_file):
    combined_cleaned_data = pd.read_csv(submitter_data_file).fillna('')
    dirty_data = combined_cleaned_data.loc[:, dirty_group_keys].rename(columns=dict(zip(dirty_group_keys, group_keys)))
    clean_data = combined_cleaned_data.loc[:, clean_group_keys].rename(columns=dict(zip(clean_group_keys, group_keys)))
    
    groups_to_add = set(institution for (institution,) in clean_data[['institution']].values if institution)
    submitters_to_add = set((email, name) for email, name in clean_data[['email','name']].values)
    pis_to_add = set((email, name) for email, name in clean_data[['pi_email','pi_name']].values if email and name)
    users_to_add = submitters_to_add.union(pis_to_add)
    
    dirty_to_clean = dict(zip(
        (SubmitterInfo(*args) for args in dirty_data.values), 
        (SubmitterInfo(*args) for args in clean_data.values)))
    
    return dirty_data, clean_data, dirty_to_clean, groups_to_add, users_to_add

def validate_cleaned_submitters(src_datasets, clean_data, dirty_to_clean, users_to_add):
    for email, df in pd.DataFrame(list(users_to_add), columns=["email","name"]).groupby(["email"], as_index=False):
        if len(df['name'].values) > 1:
            print(f'Submitter/PI email mapped to multiple names: {email} -> {", ".join(df["name"].values)}')

    for idx, series in clean_data.iterrows():
        inst, email, name = series.loc[['institution', 'pi_email', 'pi_name']]
        if not inst and not (email and name):
            print(f'Row {idx+1} has no institution or PI')

    for ds in src_datasets:
        key = SubmitterInfo(*(ds[key] for key in group_keys))
        if not dirty_to_clean.get(key):
            print(f'Missing cleaned data for {key}')
            
dirty_data, clean_data, dirty_to_clean, groups_to_add, users_to_add = read_cleaned_submitters(submitter_data_file)
validate_cleaned_submitters(src_datasets, clean_data, dirty_to_clean, users_to_add)
# Fix validation errors & rerun if necessary

In [None]:
# Insert new groups & users
for group in groups_to_add:
    if not db.select_one("SELECT 1 FROM graphql.group WHERE name = %s", [group]):
        db.alter("INSERT INTO graphql.group (name, short_name) VALUES (%s, %s)", [group, group])
    
for email, name in users_to_add:
    if not db.select_one("SELECT 1 FROM graphql.user WHERE LOWER(email) = LOWER(%s)", [email]):
        db.alter("""
            WITH cred AS (INSERT INTO graphql.credentials ("emailVerified") VALUES (FALSE) RETURNING id)
            INSERT INTO graphql.user (email, name, role, credentials_id)
            SELECT %s, %s, 'user', cred.id
            FROM cred;
        """, [email, name])
    else:
        db.alter('UPDATE graphql.user SET name = %s WHERE email = %s', [name, email])

new_group_name_to_id = dict(db.select("SELECT name, id FROM graphql.group"))
new_user_email_to_id = dict(db.select("SELECT email, id FROM graphql.user"))

In [None]:
# Update datasets
def update_dataset(ds, dirty_to_clean, new_group_name_to_id, new_user_email_to_id):
    ds_id = ds['id']
    cleaned = dirty_to_clean[SubmitterInfo(*(ds[key] for key in group_keys))]
    user_id = new_user_email_to_id[cleaned.email]
    group_id = cleaned.institution and new_group_name_to_id[cleaned.institution] or None
    metadata, = db.select_one('SELECT metadata FROM dataset WHERE id = %s', [ds_id])
    
    if not db.select_one('SELECT 1 FROM graphql.dataset WHERE id = %s', [ds_id]):
        db.alter("INSERT INTO graphql.dataset (id, user_id, group_id) VALUES (%s, %s, %s)", [ds_id, user_id, group_id])
    else:
        db.alter("UPDATE graphql.dataset SET user_id = %s, group_id = %s WHERE id = %s", [user_id, group_id, ds_id])
       
    metadata['Submitted_By'] = {
        # TODO: Remove Institution & Submitter once code is migrated
        'Institution': cleaned.institution,
        'Submitter': {"Email": cleaned.email, "Name": cleaned.name},
        'Principal_Investigator': {"Email": cleaned.pi_email, "Name": cleaned.pi_name} if cleaned.pi_email else None,
    }
    db.alter('UPDATE dataset SET metadata = %s WHERE id = %s', [json.dumps(metadata), ds_id])
    
    # TODO: merge PR #51 so that this works:
#     es.update_ds(ds_id, ['submitter_id', 'group_id', 'metadata'])
    
for ds in src_datasets:
    update_dataset(ds, dirty_to_clean, new_group_name_to_id, new_user_email_to_id)
    
# Update src_datasets with cleaned data, as it's used for statistics in the membership queries
src_datasets = get_src_datasets()

In [None]:
# Remove fields from elasticsearch (NOTE: The mappings remain in Elasticsearch until the index is recreated, because ES no longer supports deleting mappings)
def remove_fields_from_elasticsearch():
    fields_to_remove = [
        'ds_meta.Submitted_By.Institution', 
        'ds_meta.Submitted_By.Submitter.Email',
        'ds_meta.Submitted_By.Submitter.First_Name',
        'ds_meta.Submitted_By.Submitter.Surname',
        'ds_meta.Submitted_By.Submitter',
        'ds_meta.Submitted_By.Principal_Investigator.First_Name',
        'ds_meta.Submitted_By.Principal_Investigator.Surname'
    ]
    pipeline_id = 'remove_old_fields'
    ingest.put_pipeline(
        id=pipeline_id,
        body={'processors': 
              [{'remove': {'field': field, 'ignore_failure': True}} for field in fields_to_remove]
             })

    es_conn.update_by_query(
        index=es_index,
        body={'query': {}},
        params={'pipeline': pipeline_id, 'wait_for_completion': True})

    ingest.delete_pipeline(id=pipeline_id)
    
remove_fields_from_elasticsearch()

In [None]:
# Dump calculated group memberships for manual checking
def dump_group_membership_for_manual_cleaning(filename, clean_data):
    src_datasets_df = pd.DataFrame(src_datasets)
    institutions_and_emails = np.concatenate([
        clean_data[['institution','email','name']].values,
        clean_data[['institution','pi_email','pi_name']].values])

    columns = ['group', 'email', 'name', 'datasets submitted', 'datasets as PI', 'datasets as PI to someone else', 'role']
    users_in_groups = []
    for inst, email, name in pd.unique([(inst, email, name) for inst, email, name in institutions_and_emails]):
        if inst and email:
            df_inst = src_datasets_df['institution'] == inst
            df_subm = src_datasets_df['email'] == email
            df_not_subm = src_datasets_df['email'] != email
            df_pi = src_datasets_df['pi_email'] == email
            datasets_as_submitter = src_datasets_df[df_inst & df_subm].shape[0]
            datasets_as_pi = src_datasets_df[df_inst & df_pi].shape[0]
            datasets_as_pi_to_someone_else = src_datasets_df[df_inst & df_pi & df_not_subm].shape[0]
            role = 'PRINCIPAL_INVESTIGATOR' if datasets_as_pi > 0 else 'MEMBER'
            users_in_groups.append([inst, email, name, datasets_as_submitter, datasets_as_pi, datasets_as_pi_to_someone_else, role])
        
    df = pd.DataFrame(users_in_groups, columns=columns).sort_values(columns)
    df.to_csv(filename, index=False)
    
    
if not os.path.isfile(group_membership_file):
    dump_group_membership_for_manual_cleaning(group_membership_file, clean_data)


In [None]:
# Read and validate group memberships
    
def read_cleaned_group_membership(filename):
    data = pd.read_csv(filename).fillna('')
    data = data[['group', 'email', 'role']]
    
    #validate
    valid_data = []
    for idx, (group, email, role) in enumerate(data.values):
        if not new_group_name_to_id.get(group):
            print(f"Unrecognized group {group} on line {idx}")
        elif not new_user_email_to_id.get(email):
            print(f"Unrecognized user {email} on line {idx}")
        elif not role in ['PRINCIPAL_INVESTIGATOR','MEMBER', '']:
            print(f"Unrecognized role {role} on line {idx}")
        elif role != '':
            valid_data.append((group, email, role))
            
    return valid_data
            
group_membership = read_cleaned_group_membership(group_membership_file)

# Fix data and rerun this cell if necessary

In [None]:
for group, email, role in group_membership:
    group_id = new_group_name_to_id[group]
    user_id = new_user_email_to_id[email]
    if not db.select_one('SELECT 1 FROM graphql.user_group WHERE group_id = %s AND user_id = %s', [group_id, user_id]):
        db.alter("INSERT INTO graphql.user_group (group_id, user_id, role) VALUES (%s, %s, %s)", [group_id, user_id, role])
    else:
        db.alter("UPDATE graphql.user_group SET role = %s WHERE group_id = %s AND user_id = %s", [role, group_id, user_id])