In [None]:
import re
import os
import json
import requests
from datetime import datetime
import warnings
from contextlib import closing
import traceback

import pandas as pd
import toloka.client as toloka
import psycopg2

#audio and video processing
from pydub import AudioSegment as am
import moviepy.editor as mp
import wave

warnings.simplefilter(action='ignore', category=FutureWarning)

#credentials
DB_data = '''
    host = YOUR_HOST
    port = YOUR_PORT
    sslmode=require
    dbname = YOUR_DBNAME
    port = YOUR_USER
    password = YOUR_PASSWORD
    target_session_attrs=read-write
'''

OAUTH_token = YOUR_TOKEN
HEADERS = {"Authorization": "OAuth %s" % OAUTH_TOKEN, "Content-Type": "application/JSON"}
toloka_client = toloka.TolokaClient(OAUTH_TOKEN, 'PRODUCTION')

e = datetime.now()
date = '%s.%s.%s' % (e.day, e.month, e.year)

account = 'trainingdata.pro'

#create dataframe with processing sets
working_excel = pd.read_excel('work1.xlsx', sheet_name='Sheet1')
reject_message_russian = working_excel.loc[0, 'reject_message_russian']
reject_message_russian_topic = working_excel.loc[1, 'reject_message_russian']

#dict to get toloka-output names from column names
audios_order_dict = {"audio_1-1": "requests_VA_child_1", "video_1-2": "video",
                     "audio_2-1": "record_marvin_2020_1",
                     "audio_2-2": "record_marvin_2020_2", "audio_3-1": "record_antimarvin_1",
                     "audio_3-2": "record_antimarvin_2", "audio_4-1": "record_weather_1",
                     "audio_4-2": "record_weather_2", "audio_5-1": "record_time_1",
                     "audio_5-2": "record_time_2", "audio_6-1": "record_manage_1", "audio_6-2": "record_manage_2",
                     "audio_impr_7-1": "record_improv_1_1", "audio_impr_7-2": "record_improv_1_2",
                     "audio_impr_8-1": "record_improv_2_1", "audio_impr_8-2": "record_improv_2_2",
                     "audio_impr_9-1": "record_improv_3_1", "audio_impr_9-2": "record_improv_3_2",
                     }

#SQL queries
download_db_columns = '(assignment_id, worker_id, project_id,' \
                      ' toloka_submit_date, download_date, gender,' \
                      ' age, status, assignment_link)'

send_db_columns = '(assignment_id, worker_id, project_id,' \
                  ' toloka_submit_date, download_date, gender,' \
                  ' age, status, send_date, assignment_link)'

query_download_insert = ''' INSERT INTO public.sets %s
                    VALUES ('%s','%s','%s','%s','%s','%s','%s','DOWNLOADED','%s') '''

query_download_update = ''' UPDATE public.sets SET worker_id='%s',
                                 project_id='%s', toloka_submit_date='%s',
                                  download_date='%s', gender='%s',
                                   age='%s', status='DOWNLOADED',
                                    assignment_link='%s' WHERE assignment_id ='%s' '''

query_send_insert = f''' INSERT INTO public.sets %s
                                VALUES ('%s','%s','%s','%s',
                                '%s','%s','%s','INWORK', '%s', '%s') '''

query_send_update = ''' UPDATE public.sets SET worker_id='%s',
                         project_id='%s', toloka_submit_date='%s',
                          gender='%s',age='%s', status = 'INWORK',
                           assignment_link='%s',
                            send_date = '%s' WHERE assignment_id ='%s' '''

#get sets from database to dataframe to check dublicats
with closing(psycopg2.connect(DB_data)) as conn:
    with closing(conn.cursor()) as cursor:
        cursor.execute('''SELECT assignment_id, worker_id, project_id, toloka_submit_date,
                                              download_date, gender, age,
                                              status, send_date, assignment_link FROM public.sets''')
        all_sets_in_db_df = pd.DataFrame(cursor.fetchall(),
                                         columns=['assignment_id', 'worker_id', 'project_id', 'toloka_submit_date',
                                                  'download_date', 'gender', 'age',
                                                  'status', 'send_date', 'assignment_link'])

#create errors file
open('errors.tsv', 'w', encoding='utf-8').close()

#write to file functions
def error_writer(request: str) -> None:
    with open('errors.tsv', 'a', encoding='utf-8') as file:
        file.write(request)


def need_manual_writer(request: str) -> None:
    with open('need_manual.tsv', 'a', encoding='utf-8') as file:
        file.write(request)

#save set-data to database
def db_update(assignment_id: str,
              worker_id: str,
              project_id: str,
              assignment_data: toloka.Assignment,
              worker_data: json,
              assignment_link: str,
              check_working_df: pd.DataFrame) -> None:
    toloka_date = assignment_data.created
    toloka_submit_date = '%s.%s.%s' % (toloka_date.day, toloka_date.month, toloka_date.year)

    dublicate_set = all_sets_in_db_df.loc[all_sets_in_db_df['assignment_id'] == assignment_id]

    solution = assignment_data.solutions[0]

    gender = solution.output_values['gender']
    age = solution.output_values['age']

    query = ''

    scenario_date = date
    #save new set
    if '+' in check_working_df.values and not 'send' in check_working_df.values:
        print('Обновляем для сета статус на "скачан"')
        if dublicate_set.empty:
            query = query_download_insert % (download_db_columns, assignment_id,
                                             worker_id, project_id, toloka_submit_date,
                                             scenario_date, gender, age, assignment_link)

        else:
            query = query_download_update % (worker_id, project_id,
                                             toloka_submit_date, scenario_date,
                                             gender, age, assignment_link, assignment_id)

            print('В базе уже есть такой сет, обновляем его данные')

    #update existing set to 'SENDED' status
    elif 'send' in check_working_df.values:
        print('Обновляем для сета статус на "отправлен заказчику"')
        if dublicate_set.empty:
            query = query_send_insert % (send_db_columns, assignment_id,
                                         worker_id, project_id, toloka_submit_date,
                                         scenario_date, gender, age, scenario_date, assignment_link)

            print('В базе такого сета нет - внесли его с нуля')

        else:
            query = query_send_update % (worker_id, project_id,
                                         toloka_submit_date, gender, age,
                                         assignment_link, scenario_date, assignment_id)
            print('В базе уже есть такой сет, обновляем его данные')

    try:
        with closing(psycopg2.connect(DB_data)) as conn:
            with closing(conn.cursor()) as cursor:
                cursor.execute(query)
                conn.commit()
                print('Обновили данные в базе')
        # print('Типо обновили в базе')
    except Exception as e:
        print('Не удалось сохранить в базу данных')
        error_writer(f'{assignment_id}\t{e}')

#get all Assignment data
def get_assignment_data(assignment_id: str) -> toloka.Assignment:
    assignment_data = toloka_client.get_assignment(assignment_id=assignment_id)
    return assignment_data

#get all Pool data
def get_pool_data(pool_id: str) -> toloka.Pool:
    pool_data = toloka_client.get_pool(pool_id=pool_id)
    return pool_data

#get all worker data
def get_worker_data(worker_id: str) -> json:
    worker_data = requests.get(url='https://toloka.dev/api/new/requester/workers/' + worker_id,
                               headers=HEADERS).json()
    return worker_data

#select worker language
def language_select() -> [str, str, str]:
    worker_language = 'RU'
    reject_message = reject_message_russian
    reject_topic = reject_message_russian_topic
    refusal_reassons_column = 'refusal_reasons_text_russian'
    return worker_language, reject_message, reject_topic, refusal_reassons_column

#skill giving
def skill_give(assignment_id: str, worker_id: str, project_id: str) -> None:
    skill_id_reject = 61364

    if project_id == '133568':
        skill_value = 6
    elif project_id == '134002':
        skill_value = 9
    elif project_id == '134003':
        skill_value = 12
    else:
        return None

    skill_body = {
        "skill_id": skill_id_reject,
        "user_id": worker_id,
        "value": skill_value,
    }
    url = 'https://toloka.dev/api/v1/user-skills'
    # print('Типо выдали навык')
    add_skill = requests.put(url, headers=HEADERS, json=skill_body).json()
    if 'created' in add_skill:
        print('Навык выдан')
    else:
        print('Навык не выдан: ', add_skill)
        need_manual_writer(f"{assignment_id}\tвыдать навык\n")

#main function to send message
def message_send(assignment_data: toloka.Assignment,
                 reject_message: str,
                 reject_topic: str,
                 worker_id: str,
                 refusal_reassons_column: str,
                 check_working_df: pd.DataFrame) -> None:

    assignment_id = assignment_data.id
    audio_and_reject_reason_dict = {}
    # print(check_working_df)

    #create dict audio_name: reject_reason
    for audio_column_name in check_working_df.dropna(axis=1):
        # print(check_working_df[audio_column_name])
        audio_and_reject_reason_dict[audios_order_dict[audio_column_name]] = str(check_working_df.reset_index()[audio_column_name][0]).split(' ')
    print(audio_and_reject_reason_dict)

    #create html message
    reject_reasons_for_html = reject_reasons_for_html_maker(audio_and_reject_reason_dict, refusal_reassons_column, assignment_data)
    reject_message = reject_message.replace('{reject_reasons}', reject_reasons_for_html)
    message_body = {
        "topic": {
            "EN": reject_topic,
        },
        "text": {
            "EN": reject_message,
        },
        "recipients_select_type": "DIRECT",
        "recipients_ids": [worker_id],
        "answerable": True
    }
    # print('Типо отправили сообщение, записали в файл')
    # with open('message_test.html', 'w', encoding='utf-8') as file:
    #     file.write(reject_topic + '\n')
    #     file.write(reject_message)
    send_msg = requests.post('https://toloka.dev/api/v1/message-threads/compose', headers=HEADERS,
                             json=message_body).json()

    if 'created' in send_msg:
        print('Отправили сообщение')
    else:
        print('Сообщение не отправлено: ', send_msg)
        need_manual_writer(f"{assignment_id}\tотправить сообщение\n")

#create html message
def reject_reasons_for_html_maker(audio_and_reject_reason_dict: dict,
                                  refusal_reassons_column: str,
                                  assignment_data: toloka.Assignment) -> str:
    reject_reasons_for_html = ''
    for key, value in audio_and_reject_reason_dict.items():
        if 'record_improv' in key:
            if 'record_improv_1' in key:
                phrase = 'попросить включить любую сказку'
            elif 'record_improv_2' in key:
                phrase = 'попросить включить любой мультфильм'
            elif 'record_improv_3' in key:
                phrase = 'задать любой вопрос'
        else:
            if key == 'video' or key == 'requests_VA_child_1':
                toloka_input_name = 'requests_VA_child'
            else:
                toloka_input_name = '_'.join(key.split('_')[1:-1])
            phrase = assignment_data.tasks[0].input_values[toloka_input_name]
        reject_reasons = value
        for reject_reason in reject_reasons:
            print(str(float(reject_reason)))
            # print(working_excel['refusal_reasons_number'].apply(str))
            print(str(float(reject_reason)).replace('.0', ''))
            print(refusal_reassons_column)
            reject_reason_text = working_excel.loc[working_excel['refusal_reasons_number'].apply(str) == str(float(reject_reason)), refusal_reassons_column].values[0]
            # print(reject_reason_text)
            reject_reason_for_html = '''
            <li style="margin-top: 0cm; margin-right: 0cm; margin-bottom: 8pt; line-height: normal;
             font-size: 15px; font-family: Calibri, sans-serif; background: white;">
             <strong>
             <span style='font-size:16px;font-family:"Arial",sans-serif;color:#141824;'>''' \
                                     + reject_reason_text + f' (фраза: {phrase})' '''</span></strong></li>'''
            reject_reasons_for_html += reject_reason_for_html
    return reject_reasons_for_html

#reject_set
def reject_set(assignment_id: str) -> None:
    # print('Типо отклонили сет')
    toloka_client.reject_assignment(assignment_id=assignment_id, public_comment='Есть ошибки')
    print('Отклонили сет ', assignment_id)

#function to get audio file from video
def audio_from_video_getter(attachment_id: str, filepath: str) -> None:
    filepath_video = filepath.replace('wav', 'mp4')
    with open(filepath_video, 'wb') as out_f:
        toloka_client.download_attachment(attachment_id=attachment_id, out=out_f)
    clip = mp.VideoFileClip(filepath_video)
    clip.audio.write_audiofile(filepath)
    clip.close()

#download all audio files
def download_audios(assignment_data: toloka.Assignment,
                    check_working_df:pd.DataFrame) -> None:
    assignment_id = assignment_data.id
    # print(assignment_data)
    cur_dir = os.path.join('MTS audio sets', date, assignment_id)
    if not os.path.exists(cur_dir):
        try:
            os.makedirs(cur_dir)
            solution = assignment_data.solutions[0]
            with open(os.path.join(cur_dir, f'{assignment_id}_{1}.mp4'), 'wb') as out_f:
                toloka_client.download_attachment(attachment_id=solution.output_values['video'], out=out_f)
            for column_audio_name in check_working_df:
                print(column_audio_name)
                if check_working_df.reset_index().loc[0, column_audio_name] == '+':
                    if not 'impr' in column_audio_name: filename = column_audio_name.split('_')[1].split('-')[0]
                    else: filename = column_audio_name.split('_')[2].split('-')[0]
                    print('filename: ', filename)
                    toloka_output_name = audios_order_dict[column_audio_name]
                    if toloka_output_name == 'video' or toloka_output_name == 'requests_VA_child_1':
                        toloka_input_name = 'requests_VA_child'
                    else:
                        toloka_input_name = '_'.join(toloka_output_name.split('_')[1:-1])
                    print('toloka_output_name: ', toloka_output_name)
                    filepath = os.path.join(cur_dir, f'{assignment_id}_{filename}.wav')
                    if toloka_output_name == 'video':
                        audio_from_video_getter(solution.output_values[toloka_output_name], filepath)
                    else:
                        with open(filepath, 'wb') as out_f:
                            toloka_client.download_attachment(attachment_id=solution.output_values[toloka_output_name], out=out_f)
                    # downsampling to 16 kHz audio
                    corrected = False
                    while not corrected:
                        with wave.open(filepath, "rb") as wave_file:
                            frame_rate = wave_file.getframerate()
                            mono = wave_file.getnchannels()

                            # print(frame_rate)
                            sound = am.from_file(filepath, format='wav')
                            if frame_rate == 16000 and mono == 1:
                                print(filepath, ' file is OK')
                                corrected = True
                            else:
                                print(filepath, 'not 16k')
                                sound = sound.set_frame_rate(16000)
                                sound = sound.set_channels(1)
                                sound.export(filepath, format='wav')
                                print(filepath, ' correct file')

                    if not 'impr' in column_audio_name:
                        with open(os.path.join(cur_dir, f'{assignment_id}_{filename}.txt'), 'w', encoding='utf-8') as out_txt_f:
                            out_txt_f.write(assignment_data.tasks[0].input_values[toloka_input_name])

            with open(os.path.join(cur_dir, f'{assignment_id}.csv'), 'w') as csv_file:
                csv_file.write("gender;age"+"\n")
                csv_file.write(f"{solution.output_values['gender']};{solution.output_values['age']}"+"\n")

        except Exception as e:
            print('Не удалось скачать аудио')
            error_writer(f"{assignment_id}\t{e}\n")

    else:
        print('Папка с таким айди уже есть, второй раз не качаем')

#dublicat checking before downloading
def predownload_set_dublicat_checking_and_download(assignment_data: toloka.Assignment,
                                                   worker_id: str) -> bool:
    assignment_id = assignment_data.id
    if not assignment_id in all_sets_in_db_df['assignment_id'].unique() and not \
            worker_id in all_sets_in_db_df['worker_id'].unique():
        download_decision = True
    else:
        same_set_by_id = all_sets_in_db_df.loc[(all_sets_in_db_df['assignment_id'] == assignment_id)]
        same_set_by_worker = all_sets_in_db_df.loc[(all_sets_in_db_df['worker_id'] == worker_id)]
        if not same_set_by_id.empty:
            print('Сет с таким айди уже есть в базе: ')
            print(same_set_by_id.to_markdown())
        if not same_set_by_worker.empty:
            print('Сеты от того же воркера: ')
            print(same_set_by_worker.to_markdown())
        decision = input('Все равно скачать? \n 1.Да \n 2.Нет')
        if decision == '1':
            download_decision = True
        else:
            download_decision = False
    return download_decision

#assignment_id from assignment_link maker
def get_assignment_id_from_link(assignment_link: str) -> str:
    assignment_id = assignment_link.split('assignments/')[1].split('?')[0]
    return assignment_id


def main():
    for assignment_id in working_excel['assignment_id'].dropna():
        assignment_cell_in_excel = assignment_id
        if 'http' in assignment_id:
            assignment_id = get_assignment_id_from_link(assignment_id)
        print('Обрабатываем сет: ', assignment_id)
        try:
            assignment_data = get_assignment_data(assignment_id)
            pool_data = get_pool_data(pool_id=assignment_data.pool_id)
            project_id = pool_data.project_id
            worker_id = assignment_data.user_id
            worker_data = get_worker_data(worker_id)

            worker_language, reject_message, reject_topic, refusal_reassons_column = language_select()

            check_working_df = working_excel.loc[working_excel['assignment_id'] == assignment_cell_in_excel].loc[:, 'audio_1-1':'audio_impr_9-2']

            if "+" in check_working_df.values or 'send' in check_working_df.values:
                assignment_link = f'https://platform.toloka.ai/requester/project/{project_id}/pool/{assignment_data.pool_id}/assignments/{assignment_id}?direction=ASC'
                print(assignment_link)
                if '+' in check_working_df.values:
                    print('Начинаем скачивать сет')
                    download_decision = predownload_set_dublicat_checking_and_download(assignment_data, worker_id)
                    if download_decision == True:
                        download_audios(assignment_data, check_working_df)
                    else:
                        print('Скачивание отменено')
                db_update(assignment_id, worker_id, project_id, assignment_data, worker_data, assignment_link, check_working_df)

            else:
                if str(assignment_data.status) == 'Status.SUBMITTED':
                    print('Начинаем отклонение сета')
                    reject_set(assignment_id)
                    if not 404 in check_working_df.values:
                        message_send(assignment_data, reject_message, reject_topic, worker_id, refusal_reassons_column, check_working_df)
                        skill_give(assignment_id, worker_id, str(project_id))
                    else:
                        print('Сообщение не отправляем')
                        print('Навык не выдаем')

                else:
                    print('У сета уже другой статус: ', assignment_data.status, ', пропускаем')

            print('-' * 50)

        except toloka.exceptions.DoesNotExistApiError:
            print(assignment_id, ' - такого сета не найдено')
        except Exception as e:
            error_message = traceback.format_exc()
            error_writer(f"{assignment_id}\t{error_message}\n")
            print(assignment_id, 'ошибка - прописана в файле')


if __name__ == '__main__':
    main()