In [30]:
import json
import os

import numpy as np
import psycopg
from dotenv import load_dotenv
from tqdm import tqdm

## Load ad prev post ids

In [31]:
ad_prev_posts = np.load(
    "/home/deniskirbaba/Documents/influai-data/embeddings/ad_prev_posts.npy", allow_pickle=True
)
ad_prev_posts[:3]

array([[4006259, 1000183937,
        array([4006269, 4006268, 4006267, 4006266, 4006265, 4006264, 4006263,
               4006262, 4006261, 4006260])                                   ],
       [8352825, 1000627591,
        array([8352835, 8352834, 8352833, 8352832, 8352831, 8352830, 8352829,
               8352828, 8352827, 8352826])                                   ],
       [7074797, 1000683515,
        array([7074807, 7074806, 7074805, 7074804, 7074803, 7074802, 7074801,
               7074800, 7074799, 7074798])                                   ]],
      dtype=object)

In [32]:
print(
    f"Number of ad posts: {len(ad_prev_posts)}\nTotal number of prev posts: {sum([len(i[2]) for i in ad_prev_posts])}"
)

Number of ad posts: 177373
Total number of prev posts: 1766026


## Set up DB conn and fetching function

In [33]:
load_dotenv()

True

In [34]:
try:
    conn = psycopg.connect(
        host=os.getenv("DB_IP"),
        dbname=os.getenv("DB_NAME"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        port=os.getenv("DB_PORT"),
    )
    print("Connection established.")
except Exception as e:
    print(f"Error connecting to the database: {e}")
    raise

Connection established.


In [35]:
SCHEME = "parse"
TABLE = "posts_content"
COLUMNS = ["raw_text"]
CHUNK_SIZE = 10_000
OUTPUT_DIR = "/home/deniskirbaba/Documents/influai-data/embeddings/ad_prev_texts"

In [36]:
def save(ad_prev_texts, ad_posts_processed):
    with open(f"{OUTPUT_DIR}/ad_prev_texts_{ad_posts_processed}.json", "w") as f:
        json.dump(ad_prev_texts, f, ensure_ascii=False)

In [37]:
def fetch_data(conn, ad_prev_posts):
    try:
        with conn.cursor(name="streaming_cursor") as cursor:
            ad_prev_texts = {}
            ad_posts_processed = 0

            for ad_post_id, _, ids in tqdm(ad_prev_posts):
                if len(ids) != 0:
                    id_list = ", ".join(map(str, ids))
                    query = f"SELECT {', '.join(COLUMNS)} FROM {SCHEME}.{TABLE} WHERE id IN ({id_list})"
                    cursor.execute(query)

                    rows = []
                    for row in cursor:
                        rows.append(row)
                    ad_prev_texts[ad_post_id] = [ids.tolist(), rows]
                else:
                    ad_prev_texts[ad_post_id] = []
                    
                ad_posts_processed += 1

                if ad_posts_processed % CHUNK_SIZE == 0:
                    save(ad_prev_texts, ad_posts_processed)
                    ad_prev_texts = {}

            if ad_prev_texts:
                save(ad_prev_texts, ad_posts_processed)

    except Exception as e:
        print(f"Error during data fetching: {e}")
        raise
    finally:
        conn.close()
        print("Connection closed.")

In [38]:
fetch_data(conn, ad_prev_posts)

 12%|█▏        | 20938/177373 [43:35<5:25:39,  8.01it/s] 


Connection closed.


KeyboardInterrupt: 