In [70]:
import requests
from bs4 import BeautifulSoup
import logging
import random
import multiprocessing
import math
import pandas as pd
import glob
import os
import sys
from time import perf_counter
from logging import info, error, warning, debug
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait
import time

logging.basicConfig(format='%(levelname)s 🕑%(asctime)s %(name)s %(message)s', 
                    datefmt='%m/%d/%Y %I:%M:%S %p',
                    stream=sys.stdout, 
                    level=logging.INFO)

logging.addLevelName(logging.DEBUG,   "🤓DEBUG  ")
logging.addLevelName(logging.INFO,    "🧠INFO   ")
logging.addLevelName(logging.WARNING, "🤒WARNING")
logging.addLevelName(logging.ERROR,   "💣ERROR  ")

info(f"Core count: {multiprocessing.cpu_count()}.")

🧠INFO    🕑01/08/2022 07:10:31 AM root Core count: 8.


In [71]:
base_url = "https://translate.google.com/m"

user_agents = [
    "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:95.0) Gecko/20100101 Firefox/95.0",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.55 Safari/537.36"
]


def get_random_useragent():
    """
    This function returns a random user agent from the user_agents list.
    @return: a user agent from the user_agents list.
    """
    index = random.randint(0, len(user_agents)-1)
    user_agent = user_agents[index]
    return user_agent


def gtranslate(text):
    """
    This function translates a text using the Google translate API.
    """
    user_agent = get_random_useragent()
    response = requests.get(base_url,
                            params={"q": text},
                            headers={"user-agent": user_agent})

    if response.status_code != 200:
        if response.status_code == 429:
            error(f"To many requests {response.status_code}.")
            info("Script encountered status 429: to requests. Waitin for 30 minutes to resume")
            time.sleep(1800)
            info("Script resumes after 30 minutes")
            return gtranslate(text)
        else:
            error(f"Received status: {response.status_code}.")
        return None

    soup = BeautifulSoup(response.text, "html.parser")

    element = soup.find("div", {"class": "t0"})

    if not element:
        element = soup.find("div", {"class": "result-container"})

    translated = None
    if element:
        translated = element.get_text()
    return translated


def batch_gtranslate(texts):
    """
    This function runs gtranslate in batch mode.

    @param: texts: list of texts that need to be translated
    @return: list of translated texts.
    """
    translated_texts = []
    start = perf_counter()
    for text in texts:
        translated_text = gtranslate(text)
        translated_texts.append(translated_text)

    elapsed_time = perf_counter()-start
    debug(f"batch_translate: text_count: {len(texts)}, took: {elapsed_time:.2f} seconds.")
    return translated_texts

def batch_gtranslate_df(df):
    return df["full_text"].apply(lambda txt: gtranslate(txt))


def para_gtranslate(texts, min_batch_size=6, batch_count=None):
    """
    This function batch translates texts in parallel.

    @param: texts: list of texts to translate
    @param: batch_size: batch size of texts processed per thread. Default is 6.
    """
    cpu_count = multiprocessing.cpu_count()
    if batch_count is None:
        thread_count = cpu_count*2
    else:
        thread_count = batch_count
    info(f"Running parallel gtranslate with cpu count: {cpu_count}, and threads: {thread_count} available.")
    batch_size = min_batch_size
    if len(texts)/thread_count <= min_batch_size:
        # Number of threads is greater then number of texts to translate
        # Example 32 threads and 14 texts, then three threads will spawned.
        # Two threads will processes 6 texts and 1 will processes 2 texts.
        batch_count = math.ceil(len(texts)/batch_size)

    else:
        # Texts is greater then number threads.
        # Example: 256 texts and 32 threads, each thread will processes 
        # 8 texts.
        batch_size = math.ceil(len(texts)/thread_count)
        batch_count = thread_count

    info(f"Start translation with batch_count: {batch_count}, batch_size: {batch_size}.")
    result = []
    futures = []
    with ThreadPoolExecutor(batch_count) as executor:
        for i in range(0, batch_count):
            start = i*batch_size
            end = start+batch_size
            if end > len(texts):
                end = len(texts)
            batch = texts[start:end:]
            future = executor.submit(batch_gtranslate, batch)
            futures.append(future)

    wait(futures)
    for future in futures:
        translated_texts = future.result()
        for text in translated_texts:
            result.append(text)

    return result


def translate_column(df, column_name):
    """
    This function translates a hydrated data-set.

    @param: df: this is a pandas dataframe.
    @param: column_name: name of the column inside the dataframe that needs to be translated. 
    @return: dataframe containing a new column called 'processed_text'
    """
    texts = df[column_name].tolist()

    translated_texts = para_gtranslate(texts)

    se = pd.Series(translated_texts)

    df["processed_texts"] = se.values
    return df


def translate_dataset(dirpath, overwrite_cache=True):
    """
    This function translates a dataset of hydrated tweets.
    A dataset is considered a directory containing .csv files.
    Each files is read into a pandas dataframe and then written to 
    directory called 'processed'. The processed directory is a sibling 
    of the hydrated directory.

    This function caches results so if file hydrated/A.csv and processed/A.csv
    exists then A.csv is skipped. Moreover if a the file 'B.csv' exists in the 
    hydrated directory but not in the processed directory the file is translated 
    and written to the processed directory.

    @param: dirpath: The path to the dataset. The dataset must be a directory.
    @param: overwrite_cache: Whether caching is enabled. The default value is True
    """
    if not os.path.exists(dirpath) and not os.path.isdir(dirpath):
        raise FileNotFoundError("File 'dirpath' does not exist or is not a directory.")

    parentdir = os.path.dirname(dirpath)
    # create a path to directory that is a sibling of 'dirpath' variable.
    processeddir = os.path.join(parentdir, "processed")
    # create a directory called 'processed' for the processed dataset if it does not yet exist.
    if not os.path.exists(processeddir):
        os.mkdir(processeddir)
        info(f"Created directory: {processeddir}.")

    filenames = [file for file in os.listdir(dirpath) if file.endswith(".csv")]
    for file in filenames:
        filepath = os.path.join(dirpath, file)
        processed_filepath = os.path.join(processeddir, file)
        if not overwrite_cache and os.path.exists(processed_filepath):
            info(f"File already exists in: {processed_filepath}; skipping.")
            continue

        df = pd.read_csv(filepath,
                         index_col="id",
                         usecols=["id", "full_text", "created_at"],
                         dtype={"id": "int64"},
                         parse_dates=["created_at"])
        df = ratelimit_gtranslate(df,
                                  rate_limit=5000,
                                  lowerbound_wait=1800,
                                  upperbound_wait=2700)
        df.to_csv(processed_filepath)
        info(f"Created new processed file in: {processed_filepath}.")

    return True


def read_hydrated_csv(filename):
    df = pd.read_csv(filename,
                     index_col="id",
                     usecols=["id", "full_text", "created_at"],
                     dtype={"id": "int64"},
                     parse_dates=["created_at"])
    return df


def ratelimit_gtranslate(df,
                         rate_limit=20000,
                         lowerbound_wait=900,
                         upperbound_wait=2700):
    """
    Rate limited translate function. Translates a hydrated Dataframe with a
    certain rate limit in mind. Once the limit is reached the script
    waits for a certain amount of time before continueing; between
    lowerbound_wait and upperbound_wait.\n\n
    @param: df: Dataframe of a hydrated dataset; it is assumed it contains
                column: full_text\n.
    @param: rate_limit: the amount of rows that are processed before the wait
                        time kicks-in.
    @param: lowerbound_wait: minimum wait time in seconds;
                             default lowerbound_wait=900.
    @param: upperbound_wait: maximum wait time in seconds;
                             default lowerbound_wait=2700.
    @return: Dataframe containing a new column named 'processed_text'.
    """
    batch_count = math.ceil(len(df)/rate_limit)

    result_df = None

    request_count = 0

    for i in range(0, batch_count):
        start = i*rate_limit
        end = start+rate_limit
        if end > len(df):
            end = len(df)

        batch = df.iloc[start:end]
        texts = batch.full_text.values

        if result_df is None:
            translated_texts = para_gtranslate(texts)
            batch.loc[:, ["processed_text"]] = translated_texts
            result_df = batch
        else:
            translated_texts = para_gtranslate(texts)
            batch.loc[:, ["processed_text"]] = translated_texts
            result_df = result_df.append(batch)

        request_count = request_count+len(batch)

        if request_count >= rate_limit:
            wait_time = random.randint(lowerbound_wait, upperbound_wait)
            info(f"""Rate limit exceeded, current request count is
            {request_count}; Sleep for {wait_time} seconds.""")
            time.sleep(wait_time)
            info(f"Script resumes after {wait_time} seconds of sleep.")
            request_count = 0

    return result_df

In [3]:
def test_gtranslate():
    text = "Liever te dik in de kist dan een feestje gemist."
    gtranslate(text)


def test_batch_gtranslate():
    texts = [
        "Liever te dik in de kist dan een feestje gemist.",
        "Hallo welt",
        "Buenos dias"
    ]
    batch_gtranslate(texts)


def test_para_gtranslate():
    texts = [
        "Liever te dik in de kist dan een feestje gemist.",
        "Hallo welt",
        "Buenos dias"
    ]
    result = para_gtranslate(texts, min_batch_size=2)
    print(result)

In [82]:
def type_check(correct_type):
    def check(old_function):
        def new_function(arg):
            if (isinstance(arg, correct_type)):
                return old_function(arg)
            else:
                print("Bad Type")
        return new_function
    return check


        
@type_check(int)
def times2(num):
    return num*2

num = "2"

times2(num)


Bad Type


In [75]:
def translate_file(input_path, chunksize=1000, limit=-1, n=8):
    info(f"translate_file: params:[input_path={input_path}, chunksize={chunksize}, limit={limit}, n={n}].")
    
    hydrated_path = os.path.dirname(input_path)
    dataset_path = os.path.dirname(hydrated_path)
    processed_path = os.path.join(dataset_path, "processed")
    filename = input_path.split(os.path.sep)
    filename = filename[len(filename)-1]
    result_path = os.path.join(processed_path, filename)

    count=0
    batch_size=math.ceil(chunksize/n)
    thread_count=min(n,batch_size)

    df2 = None
    if os.path.exists(result_path):
        info(f"translate_file: found {result_path}!")
        df2 = pd.read_csv(result_path, 
              index_col="id",
              usecols=["id", "full_text", "created_at"])

    for df in pd.read_csv(input_path, 
                          chunksize=chunksize,
                          index_col="id",
                          usecols=["id", "full_text", "created_at"]):
        if df2 is not None:
            df = df[~df.index.isin(df2.index)]
            if len(df) == 0:
                info(f"translate_file: no diff in current chunk; files: [{input_path} {result_path}; skipping!]")
                continue
            else:
                info(f"translate_file: diff size is {len(df)} for current chunk; filenames: [{input_path} {result_path}!]")
        
        batches = [df[i:i+batch_size] for i in range(0, len(df),batch_size)]
        result = None
        futures = []
        with ThreadPoolExecutor(n) as executor:
            for batch in batches:
                future = executor.submit(batch_gtranslate_df, batch)
                futures.append(future)

        wait(futures)
        for future in futures:
            translated_batch = future.result()
            if result is None:
                result = translated_batch
            else:
                result = result.append(translated_batch)

        df["processed_text"] = result
        if not os.path.exists(result_path):
            df.to_csv(result_path)
            info(f"translate_file:📝created new file at {result_path}.")
        else:
            df.to_csv(result_path, mode="a", header=False)
            info(f"translate_file:📝appended {len(df)} rows to {result_path}.")
        count = count + len(df)
        if limit != -1 and count >= limit:
            info(f"translate_file: limit reached, limit={limit} and count={count};👋exiting!")
            break
    

def translate_dataset2(path):
    csv_paths = glob.glob(os.path.join(path, "*.csv"))
    info(f"translate_dataset2: translating files: {csv_paths}, from {path}.")
    for cvs_path in csv_paths:
        translate_file(cvs_path, limit=2000, n=4)

    return True

In [76]:
dataset_path = os.path.join("data-sets", "Lopez1")
hydrated_path = os.path.join(dataset_path, "hydrated")

translate_dataset2(hydrated_path)

🧠INFO    🕑01/08/2022 07:12:49 AM root translate_dataset2: translating files: ['data-sets/Lopez1/hydrated/output2020_05.csv', 'data-sets/Lopez1/hydrated/output2020_03.csv'], from data-sets/Lopez1/hydrated.
🧠INFO    🕑01/08/2022 07:12:49 AM root translate_file: params:[input_path=data-sets/Lopez1/hydrated/output2020_05.csv, chunksize=1000, limit=2000, n=4].
🧠INFO    🕑01/08/2022 07:12:49 AM root translate_file: found data-sets/Lopez1/processed/output2020_05.csv!
🧠INFO    🕑01/08/2022 07:12:49 AM root translate_file: no diff in current chunk; files: [data-sets/Lopez1/hydrated/output2020_05.csv data-sets/Lopez1/processed/output2020_05.csv; skipping!]
🧠INFO    🕑01/08/2022 07:12:49 AM root translate_file: no diff in current chunk; files: [data-sets/Lopez1/hydrated/output2020_05.csv data-sets/Lopez1/processed/output2020_05.csv; skipping!]
🧠INFO    🕑01/08/2022 07:12:49 AM root translate_file: no diff in current chunk; files: [data-sets/Lopez1/hydrated/output2020_05.csv data-sets/Lopez1/processed/

True

In [15]:
translate_file("output2020_05.csv", limit=5000)

🧠INFO    🕑01/07/2022 03:55:29 PM root translate_file: params:[filename=output2020_05.csv, chunksize=1000, limit=5000, n=8].
🧠INFO    🕑01/07/2022 03:55:29 PM root translate_file: found data-sets/Lopez1/processed/output2020_05.csv!
🧠INFO    🕑01/07/2022 03:55:29 PM root translate_file: no diff in current chunk for files: [data-sets/Lopez1/hydrated/output2020_05.csv data-sets/Lopez1/processed/output2020_05.csv; skipping!]
🧠INFO    🕑01/07/2022 03:55:29 PM root translate_file: no diff in current chunk for files: [data-sets/Lopez1/hydrated/output2020_05.csv data-sets/Lopez1/processed/output2020_05.csv; skipping!]
🧠INFO    🕑01/07/2022 03:55:29 PM root translate_file: no diff in current chunk for files: [data-sets/Lopez1/hydrated/output2020_05.csv data-sets/Lopez1/processed/output2020_05.csv; skipping!]
🧠INFO    🕑01/07/2022 03:55:29 PM root translate_file: no diff in current chunk for files: [data-sets/Lopez1/hydrated/output2020_05.csv data-sets/Lopez1/processed/output2020_05.csv; skipping!]
🧠I

In [14]:
len(pd.read_csv(processed_path + "/output2020_05.csv"))

12000

In [137]:
df = pd.read_csv(processed_path + "output2020_05.csv", 
                      nrows=400,
                      index_col="id",
                      usecols=["id", "full_text", "created_at"])
df2 = pd.read_csv(result_filename,  index_col="id",
                      usecols=["id", "full_text", "processed_text", "created_at"])

df3 = df[~df.index.isin(df2.index)]
df3

df2

Unnamed: 0_level_0,created_at,full_text,processed_text
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1234036905316159488,Sun Mar 01 08:45:00 +0000 2020,44% denkt dat de Corona-virus epidemie erger z...,44% think the Corona virus epidemic will get w...
1234383118334922752,Mon Mar 02 07:40:43 +0000 2020,Gondstofbelasting wordt de norm. Ook voor blon...,Gand dust load is becoming the norm. Also for ...
1234831817817063424,Tue Mar 03 13:23:42 +0000 2020,Aantal met coronavirus besmette personen in Ne...,Number of people infected with coronavirus in ...
1235077563334012928,Wed Mar 04 05:40:12 +0000 2020,Vietnam carrier Vietjet to halt flights to S. ...,Vietnam carrier Vietjet to halt flights to S. ...
1235492331970793472,Thu Mar 05 09:08:20 +0000 2020,RT @haP65: @NikaDragomira Fife's idiocy is now...,RT @haP65: @NikaDragomira Fife's idiocy is now...
...,...,...,...
1236676134680625152,Sun Mar 08 15:32:21 +0000 2020,seokjin paying for yoongis birthday dinner mak...,seokjin paying for yoongis birthday dinner mak...
1236735959175237632,Sun Mar 08 19:30:04 +0000 2020,En als alle IC bedden bezet zijn kan de rest v...,"And if all IC beds are occupied, the rest of t..."
1236759061544214528,Sun Mar 08 21:01:52 +0000 2020,@FD_Nieuws Hopelijk hebben ze alle panelen al ...,@FD_Nieuws Hopefully they have already purchas...
1236997826959347712,Mon Mar 09 12:50:38 +0000 2020,Leraar middelbare school besmet. Tien andere l...,High school teacher infected. Ten other teache...


In [60]:
df = pd.DataFrame(None, columns=["id", "full_text", "created_at", "processed_text"])
df = df.set_index("id")
df



Unnamed: 0_level_0,full_text,created_at,processed_text
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1


In [95]:
df2 = pd.read_csv(filename, 
                      nrows=20,
                      index_col="id",
                      usecols=["id", "full_text", "created_at"])


#pd.Series([df2["full_text"].apply(lambda txt: gtranslate(txt))], index=df2.index, name="processed_text")
se = df2["full_text"].apply(lambda txt: gtranslate(txt))

In [96]:
se

id
1234036905316159488    44% think the Corona virus epidemic will get w...
1234383118334922752    Gand dust load is becoming the norm. Also for ...
1234831817817063424    Number of people infected with coronavirus in ...
1235077563334012928    Vietnam carrier Vietjet to halt flights to S. ...
1235492331970793472    RT @haP65: @NikaDragomira Fife's idiocy is now...
1235597189759545344    So the inoculation tactics in the bible belt s...
1235690260316794880    And so the LIBERALS, who previously humiliated...
1235869971378581504    Corona seems to have two variants https://t.co...
1236096698570637312    Happy Netherlands is rising fast! 'NO PANIC' s...
1236615839551545344    @Paradisbeer @Rzuid70 Being able to spread a c...
1236916084554567680    Today I advised my students to wash hands afte...
1237107373245956096    I finally know what I'm suffering from....\n\n...
1237292284158390272    What is the advice with regard to the 'high-fi...
1237790238895988736                             

In [97]:
df2

Unnamed: 0_level_0,created_at,full_text
id,Unnamed: 1_level_1,Unnamed: 2_level_1
1234036905316159488,Sun Mar 01 08:45:00 +0000 2020,44% denkt dat de Corona-virus epidemie erger z...
1234383118334922752,Mon Mar 02 07:40:43 +0000 2020,Gondstofbelasting wordt de norm. Ook voor blon...
1234831817817063424,Tue Mar 03 13:23:42 +0000 2020,Aantal met coronavirus besmette personen in Ne...
1235077563334012928,Wed Mar 04 05:40:12 +0000 2020,Vietnam carrier Vietjet to halt flights to S. ...
1235492331970793472,Thu Mar 05 09:08:20 +0000 2020,RT @haP65: @NikaDragomira Fife's idiocy is now...
1235597189759545344,Thu Mar 05 16:05:00 +0000 2020,De inentings tactiek in de bijbelbelt werkt du...
1235690260316794880,Thu Mar 05 22:14:50 +0000 2020,"En zo krijgen de LIBERALEN, die voordien CD&am..."
1235869971378581504,Fri Mar 06 10:08:57 +0000 2020,Corona lijkt twee varianten te hebben https://...
1236096698570637312,Sat Mar 07 01:09:53 +0000 2020,Gelukkig Nederland stijgt met stip! ‘NO PANIC’...
1236615839551545344,Sun Mar 08 11:32:46 +0000 2020,@Paradijsbeer @Rzuid70 Het kunnen verspreiden ...
