# Amazon S3 Client design

In [164]:
from pandas import DataFrame
from csv import reader
from codecs import getreader
import boto3
import json


def aws_credentials_from_file(path, filename):
    """
    Get the AWS S3 Id and Secret credentials, from a json file in the format:
        {"AWS_ID": "AWS_SECRET"}
    """
    with open(f"{path}/{filename}", 'r') as f:
        key_dict = json.load(f)
    for key in key_dict:
        aws_id = key
        aws_secret = key_dict[key]
        return aws_id, aws_secret


def get_s3(aws_id, aws_secret, region='us-east-1'):
    """
    Create an S3 boto3 resource given AWS credentials and a region
    """
    return boto3.resource(
        service_name='s3',
        region_name=region,
        aws_access_key_id=aws_id,
        aws_secret_access_key=aws_secret
    )


def labeled_sentences_from_json(sents_json):
    """
    Get a map of labeled sentences in the format:
        {sentence_id: {"text": "sentence text", "labels": []}}
    From the json that included other information such as metadata
    """
    return {sent_id: sent_labels_map for sent_id, sent_labels_map in [*sents_json.values()][0]["sentences"].items()}


class S3Client:
    def __init__(self, creds_filepath, creds_filename, bucket_name, language=None):
        self.aws_id, self.aws_secret = aws_credentials_from_file(creds_filepath, creds_filename)
        self.s3 = get_s3(self.aws_id, self.aws_secret)
        self.bucket_name = bucket_name
        self.metadata_folder = f"metadata/"

        # Language dependent DB names
        self.language = None
        self.base_folder = None
        self.raw_files_folder = None
        self.new_text_files_folder = None
        self.processed_text_files_folder = None
        self.new_sentences_folder = None
        self.processed_sentences_folder = None
        self.assisted_labeling_folder = None

        # Extra file names
        self.abbrevs_file = None

        if language is not None:
            self._update_folder_names(language)

    def _update_folder_names(self, language):
        if self.language != language:
            # Folders
            self.base_folder = f"{language}_documents"
            self.raw_files_folder = f"{self.base_folder}/raw_pdf"
            self.new_text_files_folder = f"{self.base_folder}/text_files/new"
            self.processed_text_files_folder = f"{self.base_folder}/text_files/processed"
            self.new_sentences_folder = f"{self.base_folder}/sentences"
#             self.processed_sentences_folder = f"{self.base_folder}/sentences/processed"
            self.assisted_labeling_folder = f"{self.base_folder}/assisted_labeling"

            # Files
            self.abbrevs_file = f"abbreviations/{language}_abbreviations.txt"

    def move_object(self, obj_name, obj_old_folder, obj_new_folder):
        """
        Move an object from a given S3 folder to another by copying it to the new folder,
        then deleting it from the old one
        """
        try:
            self.s3.Object(self.bucket_name, f"{obj_old_folder}/{obj_name}") \
                .copy_from(CopySource=f"{self.bucket_name}/{obj_new_folder}/{obj_name}")
            _ = self.s3.Object(self.bucket_name, f"{obj_old_folder}/{obj_name}").delete()
        except Exception as e:
            print(f"Error while moving {obj_name} from {obj_old_folder} to {obj_new_folder}.")
            print(e)

    def load_text_files(self, language):
        """
        Yield a text file id, and the text content of the file itself from the new text files folder
        These should be used for sentence splitting, and then calling the store_sentences() method
        """
        self._update_folder_names(language)
        for obj in self.s3.Bucket(self.bucket_name).objects.all().filter(Prefix=self.new_text_files_folder):
            if not obj.key.endswith("/"):
                file_id = obj.key.replace(self.new_text_files_folder, "").replace(".txt", "")
                text = obj.get()['Body'].read().decode('utf-8')
                yield file_id, text

    def store_sentences(self, sents, file_name, file_uuid, language):
        """
        Store a JSON file containing the metadata and sentences for a given text file in the S3 bucket
        """
        self._update_folder_names(language)
        sents_json = {file_uuid: {"metadata":
                                      {"n_sentences": len(sents),
                                       "file_name": file_name,
                                       "language": language},
                                  "sentences": sents}}

        self.s3.Object(self.bucket_name, f"{self.new_sentences_folder}/{file_uuid}_sents.json") \
            .put(Body=(json.dumps(sents_json, indent=4)))

    def load_sentences(self, language, init_doc, end_doc):
        """
        TODO: Write docs
        """
        self._update_folder_names(language)
        for i, obj in enumerate(self.s3.Bucket(self.bucket_name).objects.all().filter(Prefix=self.new_sentences_folder)):
            if not obj.key.endswith("/") and init_doc <= i < end_doc:
                sents = labeled_sentences_from_json(json.loads(obj.get()['Body'].read()))
                for sent_id, sent_labels_map in sents.items():
                    yield sent_id, sent_labels_map

    def store_assisted_labeling_csv(self, results_dictionary, queries_dictionary, init_doc, results_limit):
        """
        TODO: Write docs
        """
        path = f"s3://{self.bucket_name}/{self.assisted_labeling_folder}"
        col_headers = ["sentence_id", "similarity_score", "text"]
        for i, query in enumerate(results_dictionary.keys()):
            filename = f"{path}/query_{queries_dictionary[query]}_{i}_results_{init_doc}.csv"
            DataFrame(results_dictionary[query], columns=col_headers) \
                .head(results_limit) \
                .to_csv(filename, storage_options={"key": self.aws_id, "secret": self.aws_secret})

    def doc_ids_per_country(self, country):
        """
        Get a list of text document file IDs for a given country from the CSV database in the S3 bucket.
        In the CSV, the file id is the file name without the file extension ("23sd45fg.txt" without the ".txt")
        """
        metadata_fname = f"{self.metadata_folder}/{country}_metadata.csv"
        obj = self.s3.Object(bucket_name=self.bucket_name, key=metadata_fname)

        doc_ids = []
        for row in reader(getreader("utf-8")(obj.get()['Body'])):
            # Add original file ID without the file format
            doc_ids.append(row[3][:-4])

        return doc_ids

    def get_abbreviations(self, language):
        """
        Gets the set of abbreviations for a given language, from the text file in the S3 bucket
        """
        self._update_folder_names(language)
        obj = self.s3.Object(bucket_name=self.bucket_name, key=self.abbrevs_file)
        abbreviations_str = obj.get()['Body'].read().decode('utf-8')
        return set(abbreviations_str.split("\n"))


In [165]:
s3_client = S3Client(creds_filepath="/Users/dafirebanks/Documents", creds_filename="credentials.json", 
              bucket_name="wri-nlp-policy", language="english")

In [171]:
# Load text files
i = 0
for file_id, text in s3_client.load_text_files("english"):
    print("File_id:", file_id)
    print("Text:", text[:100])
    print("=======================================")
    i += 1
    if i == 2:
        break

File_id: /0087b716bb5c4b4d8f8496f106c195e6f027e88d
Text: THE WILD LIFE (PROTECTION) AMENDMENT ACT, 2006
(ACT No. 39 OF 2006)
AN
ACT
further to amend the Wild
File_id: /016420ae47f42d91eb7171bd5a62805abe6f9ff5
Text: 137 (72)
PTSTENT 2101-97, TORER 21, 2002
4 (TT)
4 (")
(2) The words and expressions used but not def


In [167]:
# Load sentence files
i = 0
init_at_doc = 14778
end_at_doc = 16420
for sent_id, sent_map in s3_client.load_sentences("english", init_at_doc, end_at_doc):
    print("Sent_id:", sent_id)
    print("Text:", sent_map['text'])
    print("=======================================")
    i += 1
    if i == 4:
        break

Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_0
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_1
Text: The purpose of these  notices is to give interested persons an opportunity to participate in  the rule making prior to the adoption of the final rules.
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_2
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_3
Text: ----------------------------------------------------------------------- SUMMARY: The United States Department of Agriculture (USDA) is announcing that it has withdrawn certain proposed rules that were either published in the Federal Register more than 3 years ago without subsequent action or determined to no longer be candidates for final action.


# Data loading experiments

Here, we will compare the performance of generator functions vs providing all the loaded data in a loop, for both text files and json sentences

In [99]:
############################################# 
# Methods for sentence splitting
#############################################
def load_text_files_for_language(language, s3):
    new_text_files_folder = f"{language}_documents/text_files/new"
    file_id_to_text = {}
    for obj in s3.Bucket(BUCKET_NAME).objects.all().filter(Prefix=new_text_files_folder):
        if not obj.key.endswith("/"):
            file_id = obj.key.replace(new_text_files_folder, "").replace(".txt", "")
            text = obj.get()['Body'].read().decode('utf-8')
            file_id_to_text[file_id] = text
    return file_id_to_text
   
def load_text_files_for_language1(language, s3):
    new_text_files_folder = f"{language}_documents/text_files/new"
    
    for obj in s3.Bucket(BUCKET_NAME).objects.all().filter(Prefix=new_text_files_folder):
        if not obj.key.endswith("/"):
            file_id = obj.key.replace(new_text_files_folder, "").replace(".txt", "")
            text = obj.get()['Body'].read().decode('utf-8')
            yield file_id, text
    
    
############################################# 
# Methods for assisted labeling
#############################################
def labeled_sentences_from_dataset(sents_json):
    return {sent_id:sent_map for sent_id, sent_map in [*sents_json.values()][0]["sentences"].items()}

def load_sentences_for_language(language, s3, init_doc, end_doc):
    policy_dict = {}
    sents_folder = f"{language}_documents/sentences"

    for i, obj in enumerate(s3.Bucket(BUCKET_NAME).objects.all().filter(Prefix=sents_folder)):

        if not obj.key.endswith("/") and init_doc <= i < end_doc:
            serializedObject = obj.get()['Body'].read()
            policy_dict = {**policy_dict, **json.loads(serializedObject)}

    return labeled_sentences_from_dataset(policy_dict)

def load_sentences_for_language1(language, s3, init_doc, end_doc):
    sents_folder = f"{language}_documents/sentences"
    sents = {}
    
    for i, obj in enumerate(s3.Bucket(BUCKET_NAME).objects.all().filter(Prefix=sents_folder)):
        if not obj.key.endswith("/") and init_doc <= i < end_doc:
            sents.update(labeled_sentences_from_dataset(json.loads(obj.get()['Body'].read())))
    
    for sent_id, sent_map in sents.items():
        yield sent_id, sent_map
        
def load_sentences_for_language2(language, s3, init_doc, end_doc):
    sents_folder = f"{language}_documents/sentences"
    
    for i, obj in enumerate(s3.Bucket(BUCKET_NAME).objects.all().filter(Prefix=sents_folder)):
        if not obj.key.endswith("/") and init_doc <= i < end_doc:
            sents = labeled_sentences_from_dataset(json.loads(obj.get()['Body'].read()))
            for sent_id, sent_map in sents.items():
                yield sent_id, sent_map


In [19]:
from time import perf_counter 

#### 1. For loading text files

In [59]:
start_t = perf_counter()
i = 0
for file_id, text in load_text_files_for_language("english", s3_client.s3).items():
    print("File_id:", file_id)
    print("Text:", text[:100])
    print("=======================================")
    i += 1
    if i == 10000:
        break
stop_t = perf_counter()
print("Time taken:", stop_t - start_t)

File_id: /0087b716bb5c4b4d8f8496f106c195e6f027e88d
Text: THE WILD LIFE (PROTECTION) AMENDMENT ACT, 2006
(ACT No. 39 OF 2006)
AN
ACT
further to amend the Wild
File_id: /016420ae47f42d91eb7171bd5a62805abe6f9ff5
Text: 137 (72)
PTSTENT 2101-97, TORER 21, 2002
4 (TT)
4 (")
(2) The words and expressions used but not def
File_id: /017dc3e6fb7b98d975918798afb86462ee89001f
Text: THE TELANGANA PROHIBITION OF COW SLAUGHTER AND
ANIMAL PRESERVATION ACT, 1977.
(ACT NO. 11 OF 1977)
A
File_id: /0180163dc32afed62e39d6c63d637222bc7bdd3f
Text: -1-
GOVERNMENT OF GOA
Department of Animal Husbandry
Directorate of Animal Husbandry & Veterinary Se
File_id: /01c9d2721eaa2fd73f926ae68e27441f355e6023
Text: t
198
THE ARUNACHAL PRADESH WATER
RESOURCES
REGULATORY AUTHORITY ACT, 2006
(ACT NO. 15 OF 2006)
(Rec
File_id: /01e8604f38982ab507d8d687bcfe96ef8e564bd1
Text: MA8BA
Registered No.
A
(uter
lo suld bevionall)
(MOITA
799
sill
und
of
The Assam Gazette
of
not
EXTR
File_id: /025b5dd84bdd834cc79f1db41af5b8a6c621fc4b
T

In [60]:
start_t1 = perf_counter()
i = 0
for file_id, text in load_text_files_for_language1("english", s3_client.s3):
    print("File_id:", file_id)
    print("Text:", text[:100])
    print("=======================================")
    i += 1
    if i == 10000:
        break
stop_t1 = perf_counter()
print("Time taken:", stop_t1 - start_t1)

File_id: /0087b716bb5c4b4d8f8496f106c195e6f027e88d
Text: THE WILD LIFE (PROTECTION) AMENDMENT ACT, 2006
(ACT No. 39 OF 2006)
AN
ACT
further to amend the Wild
File_id: /016420ae47f42d91eb7171bd5a62805abe6f9ff5
Text: 137 (72)
PTSTENT 2101-97, TORER 21, 2002
4 (TT)
4 (")
(2) The words and expressions used but not def
File_id: /017dc3e6fb7b98d975918798afb86462ee89001f
Text: THE TELANGANA PROHIBITION OF COW SLAUGHTER AND
ANIMAL PRESERVATION ACT, 1977.
(ACT NO. 11 OF 1977)
A
File_id: /0180163dc32afed62e39d6c63d637222bc7bdd3f
Text: -1-
GOVERNMENT OF GOA
Department of Animal Husbandry
Directorate of Animal Husbandry & Veterinary Se
File_id: /01c9d2721eaa2fd73f926ae68e27441f355e6023
Text: t
198
THE ARUNACHAL PRADESH WATER
RESOURCES
REGULATORY AUTHORITY ACT, 2006
(ACT NO. 15 OF 2006)
(Rec
File_id: /01e8604f38982ab507d8d687bcfe96ef8e564bd1
Text: MA8BA
Registered No.
A
(uter
lo suld bevionall)
(MOITA
799
sill
und
of
The Assam Gazette
of
not
EXTR
File_id: /025b5dd84bdd834cc79f1db41af5b8a6c621fc4b
T

#### 2. For loading sentences

In [97]:
start_t = perf_counter()
init_at_doc = 14778
end_at_doc = 16420
i = 0
for sent_id, sent_map in load_sentences_for_language("english", s3_client.s3, init_at_doc, end_at_doc).items():
    print("Sent_id:", sent_id)
    print("Text:", sent_map['text'])
    print("=======================================")
    i += 1
    if i == 1000:
        break
stop_t = perf_counter()
print("Time taken:", stop_t - start_t)

Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_0
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_1
Text: The purpose of these  notices is to give interested persons an opportunity to participate in  the rule making prior to the adoption of the final rules.
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_2
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_3
Text: ----------------------------------------------------------------------- SUMMARY: The United States Department of Agriculture (USDA) is announcing that it has withdrawn certain proposed rules that were either published in the Federal Register more than 3 years ago without subsequent action or determined to no longer be candidates for final action.
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_4
Text: USDA is taking this action to reduce its regulatory backlog and focus its resources on higher priority actions.
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_5
Text: The Department's acti

In [98]:
start_t1 = perf_counter()
init_at_doc = 14778
end_at_doc = 16420
i = 0
for sent_id, sent_map in load_sentences_for_language1("english", s3_client.s3, init_at_doc, end_at_doc):
    print("Sent_id:", sent_id)
    print("Text:", sent_map['text'])
    print("=======================================")
    i += 1
    if i == 1000:
        break
stop_t1 = perf_counter()
print("Time taken:", stop_t1 - start_t1)

Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_0
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_1
Text: The purpose of these  notices is to give interested persons an opportunity to participate in  the rule making prior to the adoption of the final rules.
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_2
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_3
Text: ----------------------------------------------------------------------- SUMMARY: The United States Department of Agriculture (USDA) is announcing that it has withdrawn certain proposed rules that were either published in the Federal Register more than 3 years ago without subsequent action or determined to no longer be candidates for final action.
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_4
Text: USDA is taking this action to reduce its regulatory backlog and focus its resources on higher priority actions.
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_5
Text: The Department's acti

In [100]:
start_t2 = perf_counter()
init_at_doc = 14778
end_at_doc = 16420
i = 0
for sent_id, sent_map in load_sentences_for_language2("english", s3_client.s3, init_at_doc, end_at_doc):
    print("Sent_id:", sent_id)
    print("Text:", sent_map['text'])
    print("=======================================")
    i += 1
    if i == 1000:
        break
stop_t2 = perf_counter()
print("Time taken:", stop_t2 - start_t2)

Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_0
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_1
Text: The purpose of these  notices is to give interested persons an opportunity to participate in  the rule making prior to the adoption of the final rules.
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_2
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_3
Text: ----------------------------------------------------------------------- SUMMARY: The United States Department of Agriculture (USDA) is announcing that it has withdrawn certain proposed rules that were either published in the Federal Register more than 3 years ago without subsequent action or determined to no longer be candidates for final action.
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_4
Text: USDA is taking this action to reduce its regulatory backlog and focus its resources on higher priority actions.
Sent_id: e616b24e900271b89cde0761f3e4ac92f809b4bc_sent_5
Text: The Department's acti

### Conclusions
- Generators work better for magnitudes of ~1000, and start getting almost equal performance at around loading ~10000 elements at once