In [None]:
import re
import os
import json
import requests
from datetime import datetime
import warnings
from contextlib import closing
import traceback
import hashlib

import pandas as pd
import toloka.client as toloka
import psycopg2

#audio and video processing
from pydub import AudioSegment as am
import moviepy.editor as mp
import wave

warnings.simplefilter(action='ignore', category=FutureWarning)

validator_name = 'NAME'

#credentials
DB_data = '''
    host=host
    port=port
    sslmode=require
    dbname=dbname
    user=user
    password=password
    target_session_attrs=read-write
'''

OAUTH_TOKEN = 'OAUTH_TOKEN1'
HEADERS = {"Authorization": "OAuth %s" % OAUTH_TOKEN, "Content-Type": "application/JSON"}
toloka_client = toloka.TolokaClient(OAUTH_TOKEN, 'PRODUCTION')

e = datetime.now()
date = '%s.%s.%s' % (e.day, e.month, e.year)

#create dataframe with processing sets
working_excel = pd.read_excel('working_excel.xlsx', sheet_name='Sheet1')
reject_message_russian = working_excel.loc[0, 'reject_message_russian']
reject_message_russian_topic = working_excel.loc[1, 'reject_message_russian']
audio_and_reject_reason_dict_for_db = {}

#dict to get toloka-output names from column names
audios_order_dict = {"audio_1-1": "record_name_1", "video_1-2": "record_name_2",
                     "audio_2-1": "record_name_3",
                     "audio_2-2": "record_name_4", "audio_3-1": "record_name_5",
                     "audio_3-2": "record_name_6", "audio_4-1": "record_name_7",
                     "audio_4-2": "record_name_8", "audio_5-1": "record_name_9",
                     "audio_5-2": "record_name_10", "audio_6-1": "record_name_11", "audio_6-2": "record_name_12",
                     "audio_impr_7-1": "record_name_13", "audio_impr_7-2": "record_name_14",
                     "audio_impr_8-1": "record_name_15", "audio_impr_8-2": "record_name_16",
                     "audio_impr_9-1": "record_name_17", "audio_impr_9-2": "record_name_18",
                     }

#SQL queries
download_db_columns = '(assignment_id, worker_id, project_id,' \
                      ' toloka_submit_date, download_date, gender,' \
                      ' age, status, assignment_link, pool_id, validator_name, task_duration, category, worker_device, worker_country, worker_language, hashes, toloka_status)'

send_db_columns = '(assignment_id, worker_id, project_id,' \
                  ' toloka_submit_date, download_date, gender,' \
                  ' age, status, send_date, assignment_link, pool_id, validator_name, task_duration, category, worker_device, worker_country, worker_language, hashes,toloka_status)'

reject_db_columns = '(assignment_id, worker_id, project_id,' \
                  ' toloka_submit_date, gender,' \
                  ' age, status, assignment_link, pool_id, validator_name, task_duration, category, worker_device, worker_country, worker_language, reject_date, reject_reasons, toloka_status)'

query_reject_insert = ''' INSERT INTO public.sets %s
                    VALUES ('%s','%s','%s','%s','%s','%s','REJECTED_BY_VALIDATOR','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','REJECTED') '''

query_reject_update = ''' UPDATE public.sets SET worker_id='%s',
                                 project_id='%s', toloka_submit_date='%s', gender='%s',
                                   age='%s', status='REJECTED_BY_VALIDATOR',
                                    assignment_link='%s', pool_id='%s', validator_name='%s', task_duration='%s', category='%s', worker_device='%s', worker_country='%s', worker_language='%s', reject_date='%s', reject_reasons='%s', toloka_status='REJECTED' WHERE assignment_id ='%s' '''

query_download_insert = ''' INSERT INTO public.sets %s
                    VALUES ('%s','%s','%s','%s','%s','%s','%s','DOWNLOADED','%s','%s','%s','%s','%s','%s','%s','%s','%s','SUBMITTED') '''

query_download_update = ''' UPDATE public.sets SET worker_id='%s',
                                 project_id='%s', toloka_submit_date='%s',
                                  download_date='%s', gender='%s',
                                   age='%s', status='DOWNLOADED',
                                    assignment_link='%s', pool_id='%s', validator_name='%s', task_duration='%s', category='%s', worker_device='%s', worker_country='%s', worker_language='%s', hashes='%s', toloka_status='SUBMITTED' WHERE assignment_id ='%s' '''

query_send_insert = f''' INSERT INTO public.sets %s
                                VALUES ('%s','%s','%s','%s',
                                '%s','%s','%s','INWORK', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', 'SUBMITTED') '''

query_send_update = ''' UPDATE public.sets SET worker_id='%s',
                         project_id='%s', toloka_submit_date='%s',
                          gender='%s',age='%s', status = 'INWORK',
                           assignment_link='%s',
                            send_date = '%s', pool_id='%s', validator_name='%s', task_duration='%s', category='%s', worker_device='%s', worker_country='%s', worker_language='%s', hashes='%s', toloka_status='SUBMITTED' WHERE assignment_id ='%s' '''

#get sets from database to dataframe to check dublicats
with closing(psycopg2.connect(DB_data)) as conn:
    with closing(conn.cursor()) as cursor:
        cursor.execute('''SELECT assignment_id, worker_id, project_id, toloka_submit_date,
                                              download_date, gender, age,
                                              status, send_date, assignment_link, pool_id, validator_name, task_duration, category, worker_device, worker_country, worker_language, hashes, toloka_status FROM public.sets''')
        all_sets_in_db_df = pd.DataFrame(cursor.fetchall(),
                                         columns=['assignment_id', 'worker_id', 'project_id', 'toloka_submit_date',
                                                  'download_date', 'gender', 'age',
                                                  'status', 'send_date', 'assignment_link', 'pool_id', 'validator_name', 'task_duration', 'category', 'worker_device', 'worker_country', 'worker_language', 'hashes', 'toloka_status'])

#create errors file
open('errors.tsv', 'w', encoding='utf-8').close()

#write to file functions
def error_writer(request: str) -> None:
    with open('errors.tsv', 'a', encoding='utf-8') as file:
        file.write(request)


def need_manual_writer(request: str) -> None:
    with open('need_manual.tsv', 'a', encoding='utf-8') as file:
        file.write(request)

#save set-data to database
def db_update(assignment_id: str,
              worker_id: str,
              project_id: str,
              assignment_data: toloka.Assignment,
              worker_data: toloka.User,
              assignment_link: str,
              check_working_df: pd.DataFrame,
              hashes: dict,
              scenario: str) -> None:

    toloka_date = assignment_data.created
    toloka_submit_date = '%s.%s.%s' % (toloka_date.day, toloka_date.month, toloka_date.year)
    pool_id = assignment_data.pool_id

    dublicate_set = all_sets_in_db_df.loc[all_sets_in_db_df['assignment_id'] == assignment_id]

    task_duration = str((assignment_data.submitted - assignment_data.created).seconds)

    solution = assignment_data.solutions[0]

    gender = solution.output_values['gender']
    age = solution.output_values['age']

    int_age = int(age)
    category = ''
    if 4 <= int_age <= 6:
        category = '4-6'
    elif 7 <= int_age <= 9:
        category = '7-9'
    elif 10 <= int_age <= 12:
        category = '10-12'

    worker_device = worker_data.attributes.os_family
    worker_country = worker_data.attributes.country_by_phone
    worker_language = 'RU'

    reject_reasons = json.dumps(audio_and_reject_reason_dict_for_db, ensure_ascii=False)

    print('reject_reasons ', reject_reasons)

    hashes = json.dumps(hashes, ensure_ascii=False)

    query = ''

    scenario_date = date
    #save new set
    if scenario == 'download':
        print('Update sets status to "download"')
        if dublicate_set.empty:
            query = query_download_insert % (download_db_columns, assignment_id,
                                             worker_id, project_id, toloka_submit_date,
                                             scenario_date, gender, age, assignment_link, pool_id, validator_name, task_duration, category, worker_device, worker_country, worker_language, hashes)

        else:
            query = query_download_update % (worker_id, project_id,
                                             toloka_submit_date, scenario_date,
                                             gender, age, assignment_link, assignment_id, pool_id, validator_name, task_duration, category, worker_device, worker_country, worker_language, hashes)

            print('There is already set in DB, update its status')

    #update existing set to 'SENDED' status
    elif scenario == 'send':
        print('Update sets status to "send to requester"')
        if dublicate_set.empty:
            query = query_send_insert % (send_db_columns, assignment_id,
                                         worker_id, project_id, toloka_submit_date,
                                         scenario_date, gender, age, scenario_date, assignment_link, pool_id, validator_name, task_duration, category, worker_device, worker_country, worker_language, hashes)

            print('There is no such set, add new')

        else:
            query = query_send_update % (worker_id, project_id,
                                         toloka_submit_date, gender, age,
                                         assignment_link, scenario_date, assignment_id, pool_id, validator_name, task_duration, category, worker_device, worker_country, worker_language, hashes)
            print('There is already set in DB, update its status')

    elif scenario == 'reject':
        print('Update sets status to "rejected"')
        if dublicate_set.empty:

            query = query_reject_insert % (reject_db_columns, assignment_id,
                                         worker_id, project_id, toloka_submit_date,
                                         gender, age, assignment_link, pool_id, validator_name, task_duration, category, worker_device, worker_country, worker_language, scenario_date, reject_reasons)

            print('There is no such set, add new')

        else:
            query = query_reject_update % (worker_id, project_id,
                                         toloka_submit_date, gender, age,
                                         assignment_link, pool_id, validator_name, task_duration, category, worker_device, worker_country, worker_language, scenario_date, reject_reasons, assignment_id)
            print('There is already set in DB, update its status')

    try:
        with closing(psycopg2.connect(DB_data)) as conn:
            with closing(conn.cursor()) as cursor:
                cursor.execute(query)
                conn.commit()
                print('Update data')
        # print('Типо обновили в базе')
    except Exception as e:
        print('Some error')
        error_writer(f'{assignment_id}\t{e}')

#get all Assignment data
def get_assignment_data(assignment_id: str) -> toloka.Assignment:
    assignment_data = toloka_client.get_assignment(assignment_id=assignment_id)
    return assignment_data

#get all Pool data
def get_pool_data(pool_id: str) -> toloka.Pool:
    pool_data = toloka_client.get_pool(pool_id=pool_id)
    return pool_data

#get all worker data
def get_worker_data(worker_id: str) -> toloka.User:
    worker_data = toloka_client.get_user(worker_id)
    return worker_data

#select worker language
def language_select() -> [str, str, str]:
    worker_language = 'RU'
    reject_message = reject_message_russian
    reject_topic = reject_message_russian_topic
    refusal_reassons_column = 'refusal_reasons_text_russian'
    return worker_language, reject_message, reject_topic, refusal_reassons_column

#skill giving
def skill_give(assignment_id: str, worker_id: str, project_id: str) -> None:
    skill_id_reject = 1

    if project_id == 'project_id_1':
        skill_value = 6
    elif project_id == 'project_id_2':
        skill_value = 9
    elif project_id == 'project_id_3':
        skill_value = 12
    else:
        return None

    skill_body = {
        "skill_id": skill_id_reject,
        "user_id": worker_id,
        "value": skill_value,
    }
    url = 'https://toloka.dev/api/v1/user-skills'
    add_skill = requests.put(url, headers=HEADERS, json=skill_body).json()
    if 'created' in add_skill:
        print('Skill added')
    else:
        print('Skill not added: ', add_skill)
        need_manual_writer(f"{assignment_id}\tskill add\n")

#main function to send message
def message_send(assignment_data: toloka.Assignment,
                 reject_message: str,
                 reject_topic: str,
                 worker_id: str,
                 refusal_reassons_column: str,
                 check_working_df: pd.DataFrame) -> None:

    assignment_id = assignment_data.id

    audio_and_reject_reason_dict = {}

    #create dict audio_name: reject_reason
    for audio_column_name in check_working_df.dropna(axis=1):
        audio_and_reject_reason_dict[audios_order_dict[audio_column_name]] = str(check_working_df.reset_index()[audio_column_name][0]).split(' ')



    print(audio_and_reject_reason_dict)

    #create html message
    reject_reasons_for_html = reject_reasons_for_html_maker(audio_and_reject_reason_dict, refusal_reassons_column, assignment_data)
    reject_message = reject_message.replace('{reject_reasons}', reject_reasons_for_html)
    message_body = {
        "topic": {
            "EN": reject_topic,
        },
        "text": {
            "EN": reject_message,
        },
        "recipients_select_type": "DIRECT",
        "recipients_ids": [worker_id],
        "answerable": True
    }
    send_msg = requests.post('https://toloka.dev/api/v1/message-threads/compose', headers=HEADERS,
                             json=message_body).json()

    if 'created' in send_msg:
        print('Message sended')
    else:
        print('Message not sended: ', send_msg)
        need_manual_writer(f"{assignment_id}\tmessage send\n")

#create html message
def reject_reasons_for_html_maker(audio_and_reject_reason_dict: dict,
                                  refusal_reassons_column: str,
                                  assignment_data: toloka.Assignment) -> str:
    global audio_and_reject_reason_dict_for_db
    audio_and_reject_reason_dict_for_db = {}
    reject_reasons_for_html = ''
    for key, value in audio_and_reject_reason_dict.items():
        reject_reason_for_db = []
        if 'record_improv' in key:
            if 'record_name_13' in key:
                phrase = 'попросить включить любую сказку'
            elif 'record_name_14' in key:
                phrase = 'попросить включить любой мультфильм'
            elif 'record_name_16' in key:
                phrase = 'задать любой вопрос'
        else:
            if key == 'video' or key == 'record_name_3':
                toloka_input_name = 'record_name_3'
            else:
                toloka_input_name = '_'.join(key.split('_')[1:-1])
            phrase = assignment_data.tasks[0].input_values[toloka_input_name]
        reject_reasons = value
        for reject_reason in reject_reasons:
            reject_reason_text = working_excel.loc[working_excel['refusal_reasons_number'].apply(float).apply(str) == str(float(reject_reason)), refusal_reassons_column].values[0]
            reject_reason_for_html = '''
            <li style="margin-top: 0cm; margin-right: 0cm; margin-bottom: 8pt; line-height: normal;
             font-size: 15px; font-family: Calibri, sans-serif; background: white;">
             <strong>
             <span style='font-size:16px;font-family:"Arial",sans-serif;color:#141824;'>''' \
                                     + reject_reason_text + f' (фраза: {phrase})' '''</span></strong></li>'''
            reject_reason_for_db.append(reject_reason_text)
            reject_reasons_for_html += reject_reason_for_html
        audio_and_reject_reason_dict_for_db[key] = reject_reason_for_db
    return reject_reasons_for_html

#reject_set
def reject_set(assignment_id: str) -> None:
    toloka_client.reject_assignment(assignment_id=assignment_id, public_comment='Есть ошибки')
    print('Reject set ', assignment_id)

#function to get audio file from video
def audio_from_video_getter(attachment_id: str, filepath: str) -> None:
    filepath_video = filepath.replace('wav', 'mp4')
    with open(filepath_video, 'wb') as out_f:
        toloka_client.download_attachment(attachment_id=attachment_id, out=out_f)
    clip = mp.VideoFileClip(filepath_video)
    clip.audio.write_audiofile(filepath)
    clip.close()

#download all audio files
def download_audios(assignment_data: toloka.Assignment,
                    check_working_df:pd.DataFrame) -> None:
    assignment_id = assignment_data.id
    cur_dir = os.path.join('MTS audio sets', date, assignment_id)
    if not os.path.exists(cur_dir):
        try:
            os.makedirs(cur_dir)
            solution = assignment_data.solutions[0]
            with open(os.path.join(cur_dir, f'{assignment_id}_{1}.mp4'), 'wb') as out_f:
                toloka_client.download_attachment(attachment_id=solution.output_values['video'], out=out_f)
            for column_audio_name in check_working_df:
                print(column_audio_name)
                if check_working_df.reset_index().loc[0, column_audio_name] == '+':
                    if not 'impr' in column_audio_name: filename = column_audio_name.split('_')[1].split('-')[0]
                    else: filename = column_audio_name.split('_')[2].split('-')[0]
                    print('filename: ', filename)
                    toloka_output_name = audios_order_dict[column_audio_name]
                    if toloka_output_name == 'video' or toloka_output_name == 'record_name_3':
                        toloka_input_name = 'record_name_3'
                    else:
                        toloka_input_name = '_'.join(toloka_output_name.split('_')[1:-1])
                    print('toloka_output_name: ', toloka_output_name)
                    filepath = os.path.join(cur_dir, f'{assignment_id}_{filename}.wav')
                    if toloka_output_name == 'video':
                        audio_from_video_getter(solution.output_values[toloka_output_name], filepath)
                    else:
                        with open(filepath, 'wb') as out_f:
                            toloka_client.download_attachment(attachment_id=solution.output_values[toloka_output_name], out=out_f)
                    # downsampling to 16 kHz audio
                    corrected = False
                    while not corrected:
                        with wave.open(filepath, "rb") as wave_file:
                            frame_rate = wave_file.getframerate()
                            mono = wave_file.getnchannels()

                            # print(frame_rate)
                            sound = am.from_file(filepath, format='wav')
                            if frame_rate == 16000 and mono == 1:
                                print(filepath, ' file is OK')
                                corrected = True
                            else:
                                print(filepath, 'not 16k')
                                sound = sound.set_frame_rate(16000)
                                sound = sound.set_channels(1)
                                sound.export(filepath, format='wav')
                                print(filepath, ' correct file')

                    if not 'impr' in column_audio_name:
                        with open(os.path.join(cur_dir, f'{assignment_id}_{filename}.txt'), 'w', encoding='utf-8') as out_txt_f:
                            out_txt_f.write(assignment_data.tasks[0].input_values[toloka_input_name])

            with open(os.path.join(cur_dir, f'{assignment_id}.csv'), 'w') as csv_file:
                csv_file.write("gender;age"+"\n")
                csv_file.write(f"{solution.output_values['gender']};{solution.output_values['age']}"+"\n")

        except Exception as e:
            print('Error during audio download')
            error_writer(f"{assignment_id}\t{e}\n")

    else:
        print('Such dir already exists')

#dublicat checking before downloading
def predownload_set_dublicat_checking_and_download(assignment_data: toloka.Assignment,
                                                   worker_id: str) -> bool:
    assignment_id = assignment_data.id
    if not assignment_id in all_sets_in_db_df['assignment_id'].unique() and not \
            worker_id in all_sets_in_db_df['worker_id'].unique():
        download_decision = True
    else:
        same_set_by_id = all_sets_in_db_df.loc[(all_sets_in_db_df['assignment_id'] == assignment_id)]
        same_set_by_worker = all_sets_in_db_df.loc[(all_sets_in_db_df['worker_id'] == worker_id)]
        if not same_set_by_id.empty:
            print('There is already set with this id: ')
            print(same_set_by_id.to_markdown())
        if not same_set_by_worker.empty:
            print('There is already set with this worker: ')
            print(same_set_by_worker.to_markdown())
        decision = input('Download? \n 1.Yes \n 2.No')
        if decision == '1':
            download_decision = True
        else:
            download_decision = False
    return download_decision

def get_hashes(assignment_data: toloka.Assignment) -> {str: str}:
    assignment_id = assignment_data.id
    hashes = {}
    for root, _, files in os.walk(os.path.join('Audio sets', date, assignment_id)):
        for file in files:
            hash = hashlib.md5(open(os.path.join(root, file), 'rb').read()).hexdigest()
            hashes[file] = hash
    return hashes


#assignment_id from assignment_link maker
def get_assignment_id_from_link(assignment_link: str) -> str:
    assignment_id = assignment_link.split('assignments/')[1].split('?')[0]
    return assignment_id


def main():
    for assignment_id in working_excel['assignment_id'].dropna():
        assignment_cell_in_excel = assignment_id
        if 'http' in assignment_id:
            assignment_id = get_assignment_id_from_link(assignment_id)
        if not '#' in assignment_cell_in_excel:
            print('Обрабатываем сет: ', assignment_id)
            try:
                assignment_data = get_assignment_data(assignment_id)
                pool_data = get_pool_data(pool_id=assignment_data.pool_id)
                project_id = pool_data.project_id
                worker_id = assignment_data.user_id
                worker_data = get_worker_data(worker_id)

                worker_language, reject_message, reject_topic, refusal_reassons_column = language_select()

                check_working_df = working_excel.loc[working_excel['assignment_id'] == assignment_cell_in_excel].loc[:, 'audio_1-1':'audio_impr_9-2']

                assignment_link = f'https://platform.toloka.ai/requester/project/{project_id}/pool/{assignment_data.pool_id}/assignments/{assignment_id}?direction=ASC'
                print(assignment_link)

                if "+" in check_working_df.values or 'send' in check_working_df.values:
                    hashes = ''

                    if '+' in check_working_df.values:
                        print('Start sets downloading')
                        download_decision = predownload_set_dublicat_checking_and_download(assignment_data, worker_id)
                        if download_decision == True:
                            download_audios(assignment_data, check_working_df)
                            hashes = get_hashes(assignment_data)
                        else:
                            print('Download canceled')

                    if '+' in check_working_df.values and not 'send' in check_working_df.values:
                        scenario = 'download'
                    elif 'send' in check_working_df.values:
                        scenario = 'send'

                    db_update(assignment_id, worker_id, project_id, assignment_data, worker_data, assignment_link, check_working_df, hashes, scenario)

                else:
                    if str(assignment_data.status) == 'Status.SUBMITTED':
                        print('Start sets rejection')
                        reject_set(assignment_id)
                        if not 404 in check_working_df.values:
                            message_send(assignment_data, reject_message, reject_topic, worker_id, refusal_reassons_column, check_working_df)
                            skill_give(assignment_id, worker_id, str(project_id))
                            hashes = {}
                            scenario = 'reject'
                            db_update(assignment_id, worker_id, project_id, assignment_data, worker_data, assignment_link, check_working_df, hashes, scenario)

                        else:
                            print('No message')
                            print('No skill')

                    else:
                        print('Set has another status: ', assignment_data.status, ', skip')

                print('-' * 50)

            except toloka.exceptions.DoesNotExistApiError:
                print(assignment_id, ' - no such set, may be it is on another account')
            except Exception as e:
                error_message = traceback.format_exc()
                error_writer(f"{assignment_id}\t{error_message}\n")
                print(assignment_id, 'error - wtite to file')

        else:
            print('Commented set, skipped ', assignment_id)


if __name__ == '__main__':
    if not ',' in validator_name:
        main()
    else:
        print('Default validator name!')