# 数据使用 postgres

# 1.0 envirmonment

In [2]:
# environment
import pymongo
import tqdm
import psycopg
import random
from dotenv import load_dotenv
import os

load_dotenv()

DATABASE = pymongo.MongoClient("192.168.1.222").temporary_token_similarity



collection_node = DATABASE.token_similarity_node_v20240815

collection_edge = DATABASE.token_similarity_edge_v20240815

pg_uri = os.environ.get('postgres_uri')

In [3]:
# 找到集合所有的边
def query_distance(tuple_set_1, tuple_set_2):
    pg_uri = os.environ.get('postgres_uri')
    ret = set()
    with psycopg.connect(pg_uri) as conn:
        with conn.cursor() as cursor:
            try:
                for x in tuple_set_1:
                    greater_than_or_equal_x = list(item for item in tuple_set_2 if item > x)
                    if greater_than_or_equal_x:
                        cursor.execute("SELECT source_id,target_id,weight FROM edges WHERE source_id = %s AND target_id = ANY(%s);", 
                                    (x, greater_than_or_equal_x,)
                                    ) 
                        for result in cursor:
                            ret.add(result)
                if tuple_set_1 != tuple_set_2:
                    for x in tuple_set_2:
                        greater_than_or_equal_x = list(item for item in tuple_set_1 if item > x)
                        if greater_than_or_equal_x:
                            cursor.execute("SELECT source_id,target_id,weight FROM edges WHERE source_id = %s AND target_id = ANY(%s);", 
                                        (x, greater_than_or_equal_x,)
                                        ) 
                            for result in cursor:
                                ret.add(result)
                else:
                    pass
                    # print("dup set")
            except TypeError as e:
                print(tuple_set_1)
                print(tuple_set_2)
                raise e
    return ret

# 1.1 节点子图 加入数据组

In [None]:
# 节点集合

collection_subject = DATABASE.token_similarity_v20240729
subject_id_set = set()
for doc in collection_subject.find():
    subject_id_set.add(doc['SID'])
    subject_id_set.add(doc['EID'])
print(len(subject_id_set))

In [None]:


edge_list = []
node_list = []
# id_list = list(subject_id_set)[:100]
for s,t,weight in query_distance(subject_id_set,subject_id_set):
    edge_list.append({
        '_id': f"{s}-{t}",
        "s": s,
        "t": t,
        "weight": weight
    })
    node_list.append({
        '_id': s
    })
    node_list.append({
        '_id': t
    })

try:
    collection_edge.insert_many(edge_list,ordered=False)
except pymongo.errors.BulkWriteError as bwe:
    print(bwe.details)
try:
    collection_node.insert_many(node_list,ordered=False)
except pymongo.errors.BulkWriteError as bwe:
    print(bwe.details)

# 1.1.1 1M edges

In [None]:
# 节点集合

collection_subject = DATABASE.token_similarity_v20240712
subject_id_set = set()

pg_uri = os.environ.get('postgres_uri')
    
with psycopg.connect(pg_uri) as conn:
    with conn.cursor() as cursor:
        
        for doc in tqdm.tqdm(collection_subject.find()):
            if doc['SID'] < doc['EID']:
                sid,eid = doc['SID'],doc['EID']
            else:
                sid,eid = doc['EID'],doc['SID']
                
            cursor.execute("SELECT source_id,target_id,weight FROM edges WHERE source_id = %s AND target_id = %s;", 
                        (sid,eid,)
                        ) 
            result = cursor.fetchone()
            if result:
                s,t,weight = result
        
                edge_list.append({
                    '_id': f"{s}-{t}",
                    "s": s,
                    "t": t,
                    "weight": weight
                })
                node_list.append({
                    '_id': s
                })
                node_list.append({
                    '_id': t
                })

try:
    collection_edge.insert_many(edge_list,ordered=False)
except pymongo.errors.BulkWriteError as bwe:
    print(bwe.details)
try:
    collection_node.insert_many(node_list,ordered=False)
except pymongo.errors.BulkWriteError as bwe:
    print(bwe.details)

# 1.2 均匀数据, 加入实验组

In [None]:

# 有索引的查询方法, 不适应没有索引的遍历
def query_gpc_limit_by_weight(weight_gt,weight_lte,fetch_size):
    with psycopg.connect(pg_uri) as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT source_id,target_id,weight FROM edges WHERE weight > %s AND weight <= %s LIMIT %s;", 
                        (weight_gt,weight_lte,fetch_size,)
                        ) 
            for result in cursor:
                yield result

mutiple=10000
delta = 1/mutiple
edge_list = []
node_list = []
for i in tqdm.tqdm(range(mutiple)):
    gt = i/10000
    
    for doc in  query_gpc_limit_by_weight(gt,gt+delta,100):

        edge_list.append({
            '_id':  f"{doc[0]}-{doc[1]}",
            "s": doc[0],
            "t": doc[1],
            "weight": doc[2]
        })
        node_list.append({
            '_id': doc[0]
        })
        node_list.append({
            '_id': doc[1]
        })
        
random.shuffle(edge_list)
try:
    collection_edge.insert_many(edge_list,ordered=False)
except pymongo.errors.BulkWriteError as bwe:
    print(bwe.details)
try:
    collection_node.insert_many(node_list,ordered=False)
except pymongo.errors.BulkWriteError as bwe:
    print(bwe.details)
    
print(len(edge_list))
print(len(node_list))



In [None]:
# 1000*1000

In [None]:

# 遍历搜索,适合没有索引的表
import collections

def test():
    edge_count = 1000*(1000-5)
    edge_count_book = collections.defaultdict(int)
    edge_list = []
    node_list = []
    
    def save_data():
        random.shuffle(edge_list)
        try:
            collection_edge.insert_many(edge_list,ordered=False)
        except pymongo.errors.BulkWriteError as bwe:
            print(bwe.details)
        try:
            collection_node.insert_many(node_list,ordered=False)
        except pymongo.errors.BulkWriteError as bwe:
            print(bwe.details)
    
    with psycopg.connect(pg_uri) as conn:
        with conn.cursor() as cursor:
            
            gen = cursor.stream("SELECT source_id,target_id,weight FROM edges;") 

            for s,t,weight in  tqdm.tqdm(gen):
                index = int(weight*1000)
                if index >= 1000 or edge_count_book[index] > 1000:
                    continue

                edge_count_book[index] += 1
                edge_count -= 1
                
                edge_list.append({
                    '_id':  f"{s}-{t}",
                    "s": s,
                    "t": t,
                    "weight": weight
                })
                node_list.append({
                    '_id': s
                })
                node_list.append({
                    '_id': t
                })
    
                if edge_count <= 0:
                    print('over')
                    save_data()
                    
                    
                    return
            save_data()

test()


# 2. 补全 title , description

In [5]:
from elasticsearch import Elasticsearch

def get_plaintext(pageID):
    ES = Elasticsearch( "http://192.168.1.227:9200")
    response = ES.search(
        index="en_page",
        body={
            "_source": ["title", "id", "plaintext"],
            "query": {
                "match_phrase": {
                    "_id": pageID,
                },
            },
            "size": 1,
        },
    )
    if response["hits"]["hits"]:
        source = response["hits"]["hits"][0]['_source']
        text_split  = source.get("plaintext","").strip().split("\n")
        title = source.get("title","").strip()
        if text_split:
            plain_text = ""
            for text in text_split:
                plain_text += f"{text} " 
                if len(plain_text.strip().split(" ")) > 15:
                    return plain_text.strip(), title
            return plain_text,title
    return None,title

for doc in tqdm.tqdm(collection_node.find({'status': {'$ne':'miss'},'title':None})):
    plaintext, title = get_plaintext(doc['_id'])
    if not plaintext:
        print('plaintext miss')
        collection_node.update_one({'_id': doc['_id']}, {'$set': {'status': 'miss'}})
        continue
    collection_node.update_one({'_id': doc['_id']}, {'$set': {'title': title, 'plaintext': plaintext}})


109306it [10:49, 168.17it/s]


# 3. 分词,对 plaintext 提取分词

In [7]:
import sys

sys.path.append("../gpc_demo")
from utils import (
        get_token,
)

for doc in tqdm.tqdm(collection_node.find({'status': {'$ne':'miss'},'token':None})):
    token,_ = get_token(doc['plaintext'])
    token_list = [item[0] for item in token]
    if token_list:
        collection_node.update_one({'_id':doc['_id']},{'$set':{'token':token_list}})
    else:
        print(doc['title'])

34it [00:01, 23.64it/s]

2007 Vallelunga Superbike World Championship round
List of doping cases in sport (J)
1978–79 Aberdeen F.C. season
1977–78 Aberdeen F.C. season
1975–76 Aberdeen F.C. season
1972–73 Aberdeen F.C. season
1971–72 Aberdeen F.C. season
1973–74 Aberdeen F.C. season
Büyükdere Avenue
Boston Society of Film Critics Award for Best Cinematography
19th century in poetry
1892 in paleontology
Boston Society of Film Critics Award for Best Use of Music in a Film
Inhospitable
1889 in paleontology
1963 in spaceflight
HLA-Cw7
List of titled noble families in the Kingdom of Hungary
John S. Dugdale
Donald J. Stewart
Coeval
1965 in spaceflight
Secund
1879 in paleontology
Exoteric
Third degree (interrogation)
Ecclesiastical
National Register of Historic Places listings in South and Southwest Portland, Oregon
Webmaster
Rhine–Main–Danube Canal
Caput
List of Latin phrases (P)
Abaxial
Floristics





# 4. 计算 node 中的 token 的联通度

In [10]:
doc_list = [doc for doc in tqdm.tqdm(collection_node.find({'token': {'$ne':None},'token_graph':None}))]

for doc in tqdm.tqdm(doc_list):
    token = doc['token']
    edges = [(a,b,weight) for a,b,weight in query_distance(token,token)]
    collection_node.update_one({'_id': doc['_id']}, {'$set': {'token_graph': edges}})

    

416473it [00:06, 64498.43it/s]
100%|██████████| 416473/416473 [11:34:12<00:00, 10.00it/s]  


# 5. 计算 edge 两个 node 之间的 token_graph 之间的边

transfer to worker

In [None]:
# for doc in tqdm.tqdm(collection_edge.find({'graph_link': {'$exists': False}, })):
#     sID = doc['s']
#     tID = doc['t']
#     sdoc = collection_node.find_one({'_id': sID})
#     tdoc = collection_node.find_one({'_id': tID})
#     sToken = set(sdoc.get('token'))
#     tToken = set(tdoc.get('token'))
#     if not sToken or not tToken:
#         print("edge disable", doc)
#         continue
    
#     edges = [(a,b,weight) for a,b,weight in query_distance(sToken-tToken,tToken-sToken)]
#     # print(len(edges))
#     # print(len(sToken-tToken))
#     # print(len(tToken-sToken))
#     # print(edges)
    
    
#     # print(sToken-tToken)
#     # print(tToken-sToken)
#     collection_edge.update_one(
#         {'_id': doc['_id']},
#         {'$set': {'graph_link': edges}}
#     )