<a href="https://colab.research.google.com/github/parus-cristatus/tolokapizza/blob/main/tolokaapi/project_transfer_between_accounts_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Refactored script to transfer indicated project with regular pools and trainings between production accounts**

1. The script doesn't transfer uploaded tasks or task suites.

2. Transfered skills and trainings mappings are saved to .csv files to persist the information and prevent duplicates. Skills are account-wide while trainings are project-wide.

3. Global skills are transfered with the same IDs.

4. Regular pool types (Exam, Rehab, Other and Training) can't be tansfered via API, thus must be rechecked manually.

**Important:**
- Trainings must have smart mixing configuration set on them. Otherwise the script will report everything is OK but they'll not be transfered. The situation can happen if you create a training in UI and don't upload any tasks.
- It's more reliable to transfer projects one by one, check mapping files and check results in UI.



In [None]:
import requests
import json
import csv
import os
from typing import Generator, Tuple, List, Set, Dict, Callable

In [None]:
PROD_ENDPOINT = 'https://toloka.dev/api/v1/'
TOKEN_FROM = '12345' # API token to interact with the account you need to transfer a project from
TOKEN_TO = '67890' # API token to interact with the account you need to transfer a project to
TOKEN_PREFIX = 'ApiKey'

In [None]:
# Current pool json body keys that contain skills for recursive walk
POOL_FIELDS_WITH_SKILLS = ['key', 'answer_weight_skill_id', 'skill_id']

In [None]:
def get_call(endpoint: str, call: str, token: str, params: dict=None) -> dict:
    """Returns json data from get request.

    Arguments:
        call: parameter
        token: toloka token (sandbox or prod)
        endpoint: endpoint (sandbox or prod)
        params: get call params

    Examples:
        >>> endpoint_from = 'https://toloka.dev/api/v1/'
        >>> token_from = 'KpC_UWW4h17E...'
        >>> get_call('projects', token_from, endpoint_from, {'limit': 100})
    """
    headers = {
        "Authorization": TOKEN_PREFIX + ' ' + token
    }
    response = requests.get(url=f"{endpoint}{call}", headers=headers, params=params)
    return response.json()

In [None]:
def post_call(endpoint: str, call: str, token: str, data: str) -> dict:
    """Returns json data from post request.

    Arguments:
        call: parameter
        endpoint: endpoint (sandbox or prod)
        token: toloka token (sandbox or prod)
        data: data to send in the body of the request

    Examples:
        >>> endpoint_to = 'https://sandbox.toloka.dev/api/v1/'
        >>> token_to = 'HQGC_xPF6W...'
        >>> post_call('projects', endpoint_to, token_to, json.dumps(project_from))
    """
    headers={
        "Authorization": TOKEN_PREFIX + ' ' + token,
        "Content-Type": "application/JSON"
    }
    response = requests.post(f"{endpoint}{call}", data=data, headers=headers)
    return response.json()

In [None]:
def get_values_from_dict(keys: list, data: dict) -> Generator[List[str], None, None]:
    """Returns generator

    Arguments:
        keys: list keys by which looking for values
        data: dict from which get the values

    Examples:
        >>> get_values_from_dict(['key_id','skill_id'], data)
    """
    if isinstance(data, dict):
        for k, v in data.items():
            if k in keys and v.isdigit():
                yield v
            if isinstance(v, (dict, list)):
                yield from get_values_from_dict(keys, v)
    elif isinstance(data, list):
        for i in data:
            if i.get('key') in keys:
                yield i.get('value')
            yield from get_values_from_dict(keys, i)

In [None]:
def deep_upd_dict(data: dict, values: dict) -> dict:
    """Returns dict with updated values.

    Arguments:
        data: dict in which to update the values
        values: dict with new values

    Examples:
        >>> data = {"filter": {'or': [{'category': 'skill', 'key': '29716', 'operator': 'NE'}]}}
        >>> values = {'29716': '11659'}
        >>> deep_upd_dict(data, values)
    """
    if isinstance(data, dict):
        return {k: deep_upd_dict(v, values) for k, v in data.items()}
    elif isinstance(data, list):
        return [deep_upd_dict(x, values) for x in data]
    else:
        return values.get(data, data)

In [None]:
def get_pools(prj_id: str, status: str, endpoint: str, token: str) -> list:
    """Returns list of pools

    Arguments:
        prj_id: project id
        status: pool status (OPEN, CLOSED, ARCHIVED)
        endpoint: endpoint (sandbox or prod)
        token: toloka token (sandbox or prod)
    """
    params = {
            'project_id': prj_id,
            'status': status.upper()
        }
    return get_call(endpoint, 'pools', token, params)['items']

In [None]:
def get_skill(endpoint: str, skill_id: str, token: str) -> dict:
    """Returns dict with data the requested skill

    Arguments:
        endpoint: endpoint (sandbox or prod)
        skill_id: skill id
        token: toloka token (sandbox or prod)
    """
    return get_call(endpoint, f'skills/{skill_id}', token)

In [None]:
def create_skill(data: str, endpoint: str, token: str) -> dict:
    return post_call(endpoint, 'skills', token, data=json.dumps(data))

In [None]:
def read_existing_mappings(file_path: str):
    existing_mappings = {}
    if os.path.isfile(file_path):
        with open(file_path, 'r', newline='') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                existing_mappings[row['from_id']] = row['to_id']
    return existing_mappings

In [None]:
def write_mappings_to_csv(file_path: str, new_val: dict):
    fieldnames = ['from_id', 'to_id']
    file_exists = os.path.isfile(file_path)
    with open(file_path, 'a', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        if not file_exists:
            writer.writeheader()
        for from_id, to_id in new_val.items():
            writer.writerow({'from_id': from_id, 'to_id': to_id})

In [None]:
# Make sure training pool has mixer_config parameter set! The script will silently fail to transfer it if there's no such parameter.

def handle_trainings(pool_to_transfer: dict):
        trainings_file_path = f'trainings_map_project_{PROJECT_ID}.csv'
        training_map = {}
        if 'quality_control' in pool_to_transfer and pool_to_transfer['quality_control'].get('training_requirement'):
            training_from_id = pool_to_transfer['quality_control'].get('training_requirement')['training_pool_id']
            existing_training_mappings = read_existing_mappings(trainings_file_path)
            if training_from_id in existing_training_mappings:
                pool_to_transfer['quality_control']['training_requirement']['training_pool_id'] = existing_training_mappings[training_from_id]
            else:
                try:
                    training_from = get_call(PROD_ENDPOINT, f"trainings/{training_from_id}", TOKEN_FROM)
                except requests.exceptions.RequestException as e:
                    print(f"Failed to get training from {pool_to_transfer['id']} : {e}")
                training_from.pop('id')
                training_from['project_id'] = project_to['id']
                try:
                    training_to = post_call(PROD_ENDPOINT, 'trainings', TOKEN_TO, data=json.dumps(training_from))
                    print(f"Training {training_from_id} was successfully transfered")
                except requests.exceptions.RequestException as e:
                    print(f"Failed to create training from {pool_to_transfer['id']} : {e}")
                training_to_id = training_to.get('id')
                pool_to_transfer['quality_control']['training_requirement']['training_pool_id'] = training_to_id
                training_map[training_from_id] = training_to_id
                write_mappings_to_csv(trainings_file_path, training_map)

In [None]:
def fetch_new_skill_mappings(skill_ids: set, existing_skill_mappings: dict, create_skill: callable):
        new_val = {}
        for i in skill_ids:
            if i in existing_skill_mappings:
                new_val[i] = existing_skill_mappings[i]
            else:
                try:
                    skill = get_skill(PROD_ENDPOINT, i, TOKEN_FROM)
                except requests.exceptions.RequestException as e:
                    print(f"Failed to get skill {skill['id']}: {e}")
                if not skill["global"]:
                    try:
                        new_val[i] = create_skill(skill, PROD_ENDPOINT, TOKEN_TO)['id']
                        print(f"Skill {skill['id']} was successfully transfered")
                    except requests.exceptions.RequestException as e:
                        print(f"Failed to create skill for {skill['id']} : {e}")
        return new_val

In [None]:
def handle_skills(pool_to_transfer: dict):
    skill_ids = set(get_values_from_dict(POOL_FIELDS_WITH_SKILLS, pool_to_transfer))
    skills_file_path = f'skills_map.csv'
    existing_skill_mappings = read_existing_mappings(skills_file_path)
    new_val = fetch_new_skill_mappings(skill_ids, existing_skill_mappings, create_skill)
    write_mappings_to_csv(skills_file_path, new_val)
    return new_val

In [None]:
def prepare_pool_for_transfer(pool_id: str):
    try:
        pool_to_transfer = get_call(PROD_ENDPOINT, f"pools/{pool_id}", TOKEN_FROM)
    except requests.exceptions.RequestException as e:
        print(f"Failed to get pool {pool_id} : {e}")
    pool_to_transfer.pop('id')
    pool_to_transfer['project_id'] = project_to['id']

    handle_trainings(pool_to_transfer)

    upd_pool = deep_upd_dict(pool_to_transfer, handle_skills(pool_to_transfer))

    return upd_pool

In [None]:
# Indicate project ID and regular pools IDs to transfer

PROJECT_ID = '151522'
POOLS = ['42026971', '42026974', '42040746']

In [None]:
# Transfer project

project_from = get_call(PROD_ENDPOINT, f"projects/{PROJECT_ID}", TOKEN_FROM)
project_to = post_call(PROD_ENDPOINT, 'projects', TOKEN_TO, data=json.dumps(project_from))
print(f"Project {project_from['id']} was successfully transfered")

In [None]:
# Transfer pools

for pool in POOLS:
    try:
        post_call(PROD_ENDPOINT, 'pools', TOKEN_TO, data=json.dumps(prepare_pool_for_transfer(pool)))
        print(f"Pool {pool} was successfully transfered")
    except requests.exceptions.RequestException as e:
        print(f"Failed to create pool for {pool} : {e}")

Made with 😮‍💨 by Sigma