In [None]:
%load_ext autoreload
%autoreload 2

In [2]:
from src.data.auto_mturk import get_drive_service, download_forms_urls, create_mturk_client
import pickle
import os.path
from google_auth_oauthlib.flow import InstalledAppFlow
from src.constants import TOKEN_PATH,CREDS_PATH, FORMS_URLS_PATH, AWS_KEYS_PATH
from src.utils import read_access_keys
import io
import shutil
import pandas as pd
import xmltodict
from datetime import datetime
from googleapiclient.http import MediaIoBaseDownload
from datetime import datetime
import time
import pytz

utc=pytz.UTC


## Urls info retrieval

In [3]:
file_id = "1iOe--jfnoSjLkZ506XPqyEqO64krXByn_qLuPF3BaQA"
# (1). Retrieve the urls from the app script

# retrieve gdrive service
service = get_drive_service()


# download the most recent forms_urls
download_forms_urls(FORMS_URLS_PATH,file_id,service)
forms_url = pd.read_csv(FORMS_URLS_PATH,sep=r"\s+",header=None,names=['url'],index_col=0)

Download 100%


In [4]:
forms_url = forms_url.url.to_dict()

## MTurk management

In [5]:
def monitor_worker_tags(qualification_type_id='3OR1BBO28PIVPWZMRDTWE8U6OZXNGN'):
    print("searching..")
    while True:
        time.sleep(1)
        # search for workers already tagged
        exworkers = set()
        qualifs = turk.client.list_workers_with_qualification_type(QualificationTypeId=qualification_type_id,)
        for qualif in qualifs['Qualifications']:
            if qualif['QualificationTypeId'] == qualification_type_id:
                exworkers.add(qualif['WorkerId'])

        # search for new workers
        worker_ids = set()
        for hit in turk.client.list_hits()['HITs']:
            hitid = hit['HITId']
            result = turk.client.list_assignments_for_hit(HITId=hitid,AssignmentStatuses=['Submitted','Approved','Rejected'])
            assignments = result['Assignments']

            for assignment in assignments:
                workerid = assignment['WorkerId']
                worker_ids.add(workerid)

        worker_ids = worker_ids - exworkers
        for workerid in worker_ids:
            print(f"Tagging worker {workerid}")
            turk.client.associate_qualification_with_worker(
            QualificationTypeId=qualification_type_id,
            WorkerId=workerid,
            IntegerValue=1,
            SendNotification=False
        )

In [6]:
# free own worker
def clean_own_worker():
    turk.client.disassociate_qualification_from_worker(
        WorkerId='A29C1XYH77RQYM',
        QualificationTypeId='3OR1BBO28PIVPWZMRDTWE8U6OZXNGN',
        Reason=''
    )

In [7]:
def generate_password(i):
    a = i * 324 + 932
    return str(a)[:3]

In [8]:
def display_answer(worker_results):
    """
    Args:
        answer(dict): the xmldoc
    """
    if worker_results['NumResults'] > 0:
        for assignment in worker_results['Assignments']:
            xml_doc = xmltodict.parse(assignment['Answer'])

            print("Worker's answer was:")
            if type(xml_doc['QuestionFormAnswers']['Answer']) is list:
                # Multiple fields in HIT layout
                for answer_field in xml_doc['QuestionFormAnswers']['Answer']:
                    print("For input field: " + answer_field['QuestionIdentifier'])
                    print("Submitted answer: " + answer_field['FreeText'])
            else:
                # One field found in HIT layout
                print("For input field: " + xml_doc['QuestionFormAnswers']['Answer']['QuestionIdentifier'])
                print("Submitted answer: " + xml_doc['QuestionFormAnswers']['Answer']['FreeText'])
    else:
        print("No results ready yet")

In [22]:
class Turker():
    def __init__(self,hitlayout,lifetimeinsec,groundtruth,production=False):
        """
        Args:
            hittypeid (str): hittypeid of the template to use
            hitlayout (str): hitlayout of the template to use
            lifetimeinsec (int): lifetime in seconds
        """
        self.production = production
        # retrieval of the access keys
        aws_access_key_id,aws_secret_access_key = read_access_keys(AWS_KEYS_PATH)
        # creation of an self.client client
        self.client = create_mturk_client(aws_access_key_id,aws_secret_access_key,production)
        self.hitlayout = hitlayout
        self.lifetimeinsec = lifetimeinsec
        self.url = "https://workersandbox.mturk.com/mturk/preview?groupId="# if production else "https://worker.mturk.com/mturk/preview?groupId="
        self.groundtruth = groundtruth
        
        
    def get_url(self,hit_id):
        return self.url + hit_id

    def list_hits(self):
        hits = self.client.list_hits()['HITs']
        if len(hits) == 0: 
            print("No Hits available")
        else:
            expiration = hits[0]['Expiration'].replace(tzinfo=utc)
            if expiration < datetime.now().replace(tzinfo=utc):
                print("EXPIRED")
            else:
                print(f"Expiration:{expiration}")
        
            for i,hit in enumerate(hits):
                hitid = hit['HITId']
                print(f"({i+1}): Hit:{hitid} Status: {hit['HITStatus']}")
                comp = turk.client.list_assignments_for_hit(HITId=hitid, AssignmentStatuses=['Submitted','Approved','Rejected'])['NumResults']

                maxo = hit["MaxAssignments"]
                print(f'Completed tasks: {comp}/{maxo}')
                print(f"URL: {self.get_url(hit['HITGroupId'])}")

    def list_results(self,hit_id):
        worker_results = turk.client.list_assignments_for_hit(HITId=hit_id, AssignmentStatuses=['Submitted','Approved','Rejected'])
        if worker_results['NumResults'] > 0:
            for assignment in worker_results['Assignments']:
                answer = self.__get_answer(assignment['Answer'])
                print(f"WorkerId: {assignment['WorkerId']} Answer:{answer} Status: {assignment['AssignmentStatus']}")
        else:
            print("No results ready yet")

    def list_results_df(self):
        df = []
        hits = self.client.list_hits()['HITs']
        for i,hit in enumerate(hits):
            hit_id = hit['HITId']
            worker_results = turk.client.list_assignments_for_hit(HITId=hit_id, AssignmentStatuses=['Submitted','Approved','Rejected'])
            for assignment in worker_results['Assignments']:
                answer = self.__get_answer(assignment['Answer'])
                password = generate_password(self.form_hit[hit_id])
                df.append({'WorkerId':assignment['WorkerId'],
                           'HITId':hit_id,
                           'Answer':answer,
                           'Code':password,
                           'Status':assignment['AssignmentStatus']
                          })
        return pd.DataFrame(df)

    def __get_answer(self,answer):
        xml_doc = xmltodict.parse(answer)
        return xml_doc['QuestionFormAnswers']['Answer']['FreeText']
        

    def __approve_all_assignments(self,hit_id,correct_hits_exclusively=False,force=False):
        
        """
        
        Args:
            correct_hits (Bool): whether to correct correct hits exclusively
        """
        if not force:
            form_idx = self.form_hit[hit_id]
            password = generate_password(form_idx)
        assignments = self.client.list_assignments_for_hit(HITId=hit_id,AssignmentStatuses=['Submitted'])
        assignments = assignments['Assignments']
        for assignment in assignments:
            ass_id = assignment['AssignmentId']
            if correct_hits_exclusively:
                answer = self.__get_answer(assignment['Answer'])
                if answer == password:
                    print(f"Approving assignment {ass_id}")
                    self.client.approve_assignment(AssignmentId=ass_id)
                else:
                    print(f"Rejecting assignment {ass_id}: given answer: {answer} correct password: {password}")
                    self.client.reject_assignment(AssignmentId=ass_id,
                                                  RequesterFeedback='Invalid confirmation key')
            else:
                print(f"Approving assignment {ass_id}")
                self.client.approve_assignment(AssignmentId=ass_id)

    def approve_all_hits(self):
        hits = self.client.list_reviewable_hits()['HITs']
        for hit in hits:
            self.__approve_all_assignments(hit['HITId'],force=True)        
    
    def approve_correct_hits(self):
        hits = self.client.list_reviewable_hits()['HITs']
        for hit in hits:
            self.__approve_all_assignments(hit['HITId'],correct_hits_exclusively=True)

    def delete_all_hits(self):
        hits = self.client.list_hits()['HITs']
        for hit in hits:
            self.delete_hit(hit['HITId'])

    def delete_hit(self,hit_id):
        try:
            self.client.delete_hit(HITId=hit_id)
            print(f"Deleting hit {hit_id}")
        except:
            print(f"Hit {hit_id} in Unassignable mode")

    def stop_all_hits(self):
        hits = self.client.list_hits()['HITs']
        for hit in hits:
            self.stop_hit(hit['HITId'])

    def stop_hit(self,hit_id):
        status= self.client.get_hit(HITId=hit_id)['HIT']['HITStatus']
        # If HIT is active then set it to expire immediately
        if status=='Assignable' or status=='Unassignable':
            response = self.client.update_expiration_for_hit(
                HITId=hit_id,
                ExpireAt=datetime(2015, 1, 1)
            )
            print(f"Stop hit {hit_id}")

    def approve_delete_all_hits(self):
        self.approve_all_hits()
        self.delete_all_hits()

    def create_forms_hits(self,forms_url,hittypeid=None,hitlayout=None):
        """
        Args:
            forms_url(dict): mapping between forms index and their respective url        
        """
        hitlayout = self.hitlayout if hitlayout is None else hitlayout
        form_hit = {}
        for idx,url in forms_url.items():
            print(f"Creating hit for form {idx}")
            
            myhit = self.client.create_hit(
                        HITLayoutId=hitlayout,
                        MaxAssignments=1,

                        HITLayoutParameters = [{'Name':'url',
                                   'Value':url}],
                        LifetimeInSeconds = self.lifetimeinsec,
                        AutoApprovalDelayInSeconds=600,
                        AssignmentDurationInSeconds=600,
                        Reward='0.01',
                        Title=f'Emojis Descriptions n {idx}',
                        Keywords='emojis, description, sentiment, emotions',
                        Description='Describe emojis by a single accurate word',
                        QualificationRequirements=[
                            {
                                'QualificationTypeId': '3OR1BBO28PIVPWZMRDTWE8U6OZXNGN',
                                'Comparator': 'DoesNotExist',
                                'ActionsGuarded': 'DiscoverPreviewAndAccept'
                            }
                        # TODO: add location and hit percentage
                        ]
            )
            form_hit[myhit['HIT']['HITId']] = idx
        self.form_hit = form_hit

In [23]:
turk = Turker(hitlayout="3QS25971A2UW6PKAU5WN9771C2JXNI",
              groundtruth="abc",
             lifetimeinsec=600)

In [25]:
turk.create_forms_hits(forms_url)

Creating hit for form 0
Creating hit for form 1
Creating hit for form 2


In [38]:
turk.list_hits()

Expiration:2020-11-21 22:30:34+00:00
(1): Hit:3F6045TU7D92ZWMCLLMZSGS1Q6W99O Status: Reviewable
Completed tasks: 1/1
URL: https://workersandbox.mturk.com/mturk/preview?groupId=31UQ70MW5SOH84612C36XG8P8DCNHH
(2): Hit:3FI30CQHVK4Y56ZBLYXJZCWPO5R6BK Status: Assignable
Completed tasks: 0/1
URL: https://workersandbox.mturk.com/mturk/preview?groupId=32SOQFN7SOQH7KTYO9YJ1HEZ85RPI7
(3): Hit:309D674SHZ6NLN8CVPAMLCOZKR2BCH Status: Reviewable
Completed tasks: 1/1
URL: https://workersandbox.mturk.com/mturk/preview?groupId=3BW6RX62GFQF0UCE0YFLZL7NB2LLEY


In [17]:
clean_own_worker()

RequestError: An error occurred (RequestError) when calling the DisassociateQualificationFromWorker operation: This operation can be called with a status of: Granted (1605993562403 s)

In [108]:
monitor_worker_tags()

searching..
Tagging worker A29C1XYH77RQYM


KeyboardInterrupt: 

In [36]:
turk.list_results("309D674SHZ6NLN8CVPAMLCOZKR2BCH")

No results ready yet


In [43]:
turk.list_results_df()

In [40]:
turk.approve_correct_hits()

Approving assignment 3EFVCAY5L4UUDND5S7C05X792HG8J7
Rejecting assignment 34MAJL3QP58I98OEDJU08EAF4YY43K: given answer: cako correct password: 932


In [39]:
turk.delete_all_hits()

Hit 3P7RGTLO6EY0LBF8HOROEKH17IQAKY in Unassignable mode
Deleting hit 32FESTC2NHB2HMHIVGRHQ2GN9C9CUZ
Hit 3UYRNV2KITKNAT7WH3ZQALSABPV8NX in Unassignable mode


In [42]:
# reset the hits
turk.approve_all_hits()
turk.stop_all_hits()
turk.delete_all_hits()

Stop hit 3FI30CQHVK4Y56ZBLYXJZCWPO5R6BK
Deleting hit 3F6045TU7D92ZWMCLLMZSGS1Q6W99O
Deleting hit 3FI30CQHVK4Y56ZBLYXJZCWPO5R6BK
Deleting hit 309D674SHZ6NLN8CVPAMLCOZKR2BCH


## Qualifications

In [187]:
# Create the qualification we need to distinguish workers who already performed an emoji task
turk.client.create_qualification_type(
    Name='emojidone',
    Description="Qualification to recognize workers that already performed an emoji-related task",
    QualificationTypeStatus='Active',
    AutoGranted=True,
    AutoGrantedValue=1
)
# QualificationTypeId='3OR1BBO28PIVPWZMRDTWE8U6OZXNGN

RequestError: An error occurred (RequestError) when calling the CreateQualificationType operation: You have already created a QualificationType with this name. A QualificationType's name must be unique among all of the QualificationTypes created by the same user. (1605981881577 s)

## Retrieving Results

In [58]:
import boto3
import xmltodict

In [79]:
aws_access_key_id, aws_secret_access_key = read_access_keys("../creds/aws.txt")
mturk = boto3.client('mturk',
   aws_access_key_id = aws_access_key_id,
   aws_secret_access_key = aws_secret_access_key,
   region_name='us-east-1',
   endpoint_url = mturk_environment['endpoint']
)
# Use the hit_id previously created
hit_id = myhit['HIT']['HITId']
# We are only publishing this task to one Worker
# So we will get back an array with one item if it has been completed
worker_results = mturk.list_assignments_for_hit(HITId=hit_id, AssignmentStatuses=['Submitted'])

display_answer(worker_results)

In [85]:
def answers_2_dataframe(worker_results):
    df = [{'WorkerID':assignment['WorkerId'],
           'HITId':assignment['HITId'],
           'Answer':xmltodict.parse(assignment['Answer'])} 
          for assignment in worker_results['Assignments']
           ]
    for answer in df:
        xml_doc = answer['Answer']
        
        if type(xml_doc['QuestionFormAnswers']['Answer']) is list:
            # Multiple fields in HIT layout
            for answer_field in xml_doc['QuestionFormAnswers']['Answer']:
                key = answer_field['QuestionIdentifier']
                answer_field['FreeText']
                answer[key] = value
        else:
            # One field found in HIT layout
            key = xml_doc['QuestionFormAnswers']['Answer']['QuestionIdentifier']
            value = xml_doc['QuestionFormAnswers']['Answer']['FreeText']
            answer[key] = value
        del answer['Answer']
    return pd.DataFrame(df)

In [86]:
answers_2_dataframe(worker_results)

Unnamed: 0,WorkerID,HITId,surveycode
0,A29C1XYH77RQYM,3ZFRE2BDQ9Z76JMNSZMCL7GCCBMZXL,EMOJI378910
