In [26]:
import os
import json
import datetime
from dotenv import load_dotenv
from time import sleep

load_dotenv()  # take environment variables from .env.

True

In [33]:
# establish a connection to the PostgreSQL database
import psycopg2 as pg

conn = pg.connect(
    dbname=os.environ["POSTGRES_DB"],
    user=os.environ["POSTGRES_USER"],
    password=os.environ["POSTGRES_PASSWORD"],
    host=os.environ["POSTGRES_HOST"]
)

In [4]:
"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Callable


project_id = "vu-game-ontology"
topic_id = "extract-entity"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)


def get_callback(
    publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
    def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing {data} timed out.")

    return callback





In [48]:

try:
    cursor = conn.cursor()
    cursor.execute("SELECT object_id FROM fandom_pages WHERE id < 2000 and object_id not in (SELECT object_id FROM fandom_entities)")
    object_ids = cursor.fetchall()
except Exception as e:
    conn.rollback()
    print(e)
finally:
    cursor.close()



In [49]:
len(object_ids)

1000

In [None]:
publish_futures = []

for i in object_ids[:10]:
    data = i[0]
    # When you publish a message, the client returns a future.
    publish_future = publisher.publish(topic_path, data.encode("utf-8"))
    # Non-blocking. Publish failures are handled in the callback function.
    publish_future.add_done_callback(get_callback(publish_future, data))
    publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")

In [50]:
for i, obj in enumerate(object_ids):
    if i % 100 == 0:
        print(i)
        sleep(20)

    data = obj[0]
    # When you publish a message, the client returns a future.
    publish_future = publisher.publish(topic_path, data.encode("utf-8"))
    


0
100
200
300
400
500
600
700
800
900


In [55]:
for n in range(1,14):
    
    try:
        cursor = conn.cursor()
        cursor.execute(f"SELECT object_id FROM fandom_pages WHERE id < {n}000 and object_id not in (SELECT object_id FROM fandom_entities)")
        object_ids = cursor.fetchall()
    except Exception as e:
        conn.rollback()
        print(e)
    finally:
        cursor.close()


    print(f"Processing next {len(object_ids)} objects from {n-1}000 to {n}000")
    
    for i, obj in enumerate(object_ids):
        if i % 50 == 0:
            print(i)
            sleep(15)

        data = obj[0]
        # When you publish a message, the client returns a future.
        publish_future = publisher.publish(topic_path, data.encode("utf-8"))
    if len(object_ids) > 100:
        sleep(20)



Processing next 0 objects from 0000 to 1000
Processing next 0 objects from 1000 to 2000
Processing next 0 objects from 2000 to 3000
Processing next 0 objects from 3000 to 4000
Processing next 0 objects from 4000 to 5000
Processing next 0 objects from 5000 to 6000
Processing next 0 objects from 6000 to 7000
Processing next 670 objects from 7000 to 8000
0
75
150


KeyboardInterrupt: 