<a href="https://colab.research.google.com/github/nirmalhq/avro-test/blob/master/kwp_colab_main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install python-dotenv

from dotenv import load_dotenv

load_dotenv()



'engineering-infrastructure@seerinteractive.com'

In [4]:
destination_table = input("Enter the destination table name: ")

while True:
    if len(destination_table) <= 255 and destination_table.isidentifier():
        break

    destination_table = input("The table name you entered is not valid. Please enter a valid name: ")
    

Enter the destination table name: nirmal_temp_table


In [5]:
import csv

errors = []

with open("keywords.csv", encoding="utf-8") as file:
    reader = csv.reader(file, delimiter=",")

    for index, row in enumerate(reader):
        if not (row and row[0] and len(row[0]) < 80 and len(row[0].split()) < 10):
            errors.append(index)

if errors:
    print("The following rows in file have failed to pass the validation. Please correct them and upload again...")
    print(", ".join(map(str, errors)))

    raise SystemExit("Stop right there!")

if index >= 750000:
    print("The file contains more than 750K keywords...")

    raise SystemExit("Stop right there!")


In [6]:
locations = []
with open("locations.csv") as file:
    reader = csv.reader(file, delimiter=",")

    for index, row in enumerate(reader):
        # location validation logic should go here
        locations.append(row[0])
  
if index >= 50:
    print("The location file contains more than 50 locations..")

    raise SystemExit("Stop right there!")

In [7]:
!pip install emoji

import emoji
import re

def clean_search_term(txt):
    cleaned_txt = re.sub(r"\s+", " ", txt.strip())
    cleaned_txt = emoji.replace_emoji(txt, replace="")
    cleaned_txt = re.sub("[^A-Za-z0-9]+", "", txt)
    
    return cleaned_txt



In [8]:
def fetch_keywords_as_batches_of_100000():
    file = open("keywords.csv", encoding="utf-8")
    reader = csv.reader(file, delimiter=",")

    batch_of_rows = []
    for row in reader:
        batch_of_rows.append(clean_search_term(row[0]))
        
        if len(batch_of_rows) == 100000:
            yield batch_of_rows
            batch_of_rows = []
    
    if batch_of_rows:
        yield batch_of_rows

    file.close()

In [10]:
!pip install aiohttp

import os
import aiohttp
import asyncio
import base64

base64_bytes = base64.b64encode(
    ("%s:%s" % (os.environ["USER_NAME"], os.environ["PASSWORD"])).encode("ascii")
).decode("ascii")
headers = {
    "Authorization": "Basic %s" % base64_bytes,
    "Content-Encoding": "gzip",
}
api_endpoint_url = "https://api.dataforseo.com/v3/keywords_data/google_ads/search_volume/task_post"
pingback_url = "https://us-central1-supernova-app-qa.cloudfunctions.net/msv_postback?id=$id&tag=$tag"

async def invoke_api(session, url, payload):
    async with session.post(url, data=payload, headers=headers, timeout=10) as resp:
        response = resp.json()
        if response["status_code"] == 20000:
            return 0
        else:
            return 100000


async def api_caller(tasks):
    async with aiohttp.ClientSession() as session:
        post_tasks = []
        for loc in locations:
            post_data = dict()
            for index in range(0, len(tasks), 1000):
                  post_data[len(post_data)] = {
                      "location_code": loc, 
                      "keywords": tasks[index:index + 1000],
                      "tag": destination_table,
                      "pingback_url": pingback_url
                  }
            post_tasks.append(asyncio.ensure_future(invoke_api(session, api_endpoint_url, post_data)))

        responses = await asyncio.gather(*post_tasks)
        
    return sum(responses)




In [11]:
import concurrent.futures
import queue

def send_tasks_to_dataforseo(tasks, mp_queue, index):
    loop = asyncio.get_running_loop()
    
    tsk = loop.create_task(api_caller(tasks))
    tsk.add_done_callback(lambda t: print(f"Failed task's count={tsk.result()}"))

    mp_queue.put(index)

In [12]:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=50)
active_futures = {}
mp_queue = queue.Queue()

fetch_keywords_as_batches_of_100000_func_generator = fetch_keywords_as_batches_of_100000()
for index, batch_of_rows in enumerate(fetch_keywords_as_batches_of_100000_func_generator):
    future = executor.submit(send_tasks_to_dataforseo, batch_of_rows, mp_queue, index)
    active_futures[index] = future

    if index >= 49:
      break

while True:
    if not active_futures:
        break
    
    future_index = mp_queue.get(timeout=5)
    del active_futures[future_index]
    try:
        batch_of_rows = next(fetch_keywords_as_batches_of_100000_func_generator)
    except StopIteration:
        pass
    else:
        future = executor.submit(send_tasks_to_dataforseo, batch_of_rows, mp_queue, future_index)
        active_futures[future_index] = future
    finally:
        mp_queue.task_done()

executor.shutdown(wait=False)

Empty: ignored