<a href="https://colab.research.google.com/github/liweitj47/classic_match/blob/main/guwen_match.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install datasets transformers



In [2]:
from google.colab import drive
drive.mount('/content/drive/')
# 指定当前的工作文件夹
import os
# 此处为google drive中的文件路径,drive为之前指定的工作根目录，要加上
os.chdir("/content/drive/MyDrive/") 

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [3]:
book_sid_map = {}
target_text = []
target_name = []
target_sid = []
for name in ["./corpus/classics/short_data/周易.txt", "./corpus/classics/short_data/周礼.txt","./corpus/classics/short_data/论语.txt","./corpus/classics/short_data/尚书.txt","./corpus/classics/short_data/诗经.txt", "./corpus/classics/short_data/孟子.txt"]:
  if name not in book_sid_map:
    book_sid_map[name] = {}
  sid = 0
  for line in open(name):
    target_text.append(line.strip())
    target_name.append(name)
    target_sid.append(sid)
    book_sid_map[name][sid] = line.strip()
    sid += 1
assert len(target_text) == len(target_name)

In [4]:
eval_text = []
eval_name = []
eval_sid = []
for name in ["Hao.txt", "Yi.txt"]:
  if name not in book_sid_map:
    book_sid_map[name] = {}
  sid = 0
  for line in open(os.path.join("./corpus/classics/short_data", name)):
    book_sid_map[name][sid] = line.strip()
    eval_text.append(line.strip())
    eval_name.append(name)
    eval_sid.append(sid)
    sid += 1
assert len(eval_text) == len(eval_name)

In [5]:
from transformers import AutoTokenizer, AutoModel
tokenizer = AutoTokenizer.from_pretrained("ethanyt/guwenbert-base")
#tokenizer = AutoTokenizer.from_pretrained("/content/drive/MyDrive/guwenbert-base")
#model = AutoModel.from_pretrained("/content/drive/MyDrive/DigitalHumanities")
model = AutoModel.from_pretrained("ethanyt/guwenbert-base")

Downloading:   0%|          | 0.00/519 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/93.5k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/418M [00:00<?, ?B/s]

Some weights of the model checkpoint at ethanyt/guwenbert-base were not used when initializing RobertaModel: ['lm_head.layer_norm.weight', 'lm_head.decoder.bias', 'lm_head.dense.weight', 'lm_head.layer_norm.bias', 'lm_head.dense.bias', 'lm_head.bias', 'lm_head.decoder.weight']
- This IS expected if you are initializing RobertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [6]:
print(tokenizer.pad_token_id, tokenizer.unk_token_id, tokenizer.cls_token_id)

1 3 0


In [7]:
import torch

class Dataset(torch.utils.data.Dataset):
  def __init__(self, text, book_name, sid):
    super(Dataset, self).__init__()
    self.text = text      
    self.book_name = book_name
    self.sid = sid

  def __getitem__(self, idx):
    #return self.input_ids[idx], self.token_type_ids[idx], self.attention_mask[idx], self.labels[idx]
    return self.text[idx], self.book_name[idx], self.sid[idx]

  def __len__(self):
    return len(self.text)  

  

class MyCollator:
  def __init__(self, tokenizer):
    self.tokenizer = tokenizer

  def __call__(self, data):
    text, book_name, sid = [list(s) for s in zip(*data)]
    encodings = self.tokenizer(text, max_length=32, truncation=True, padding=True, return_tensors='pt')
    #print(encodings['input_ids'])
    encodings['book_name'] = book_name
    encodings['sid'] = sid
    return encodings  

In [8]:
from torch.utils.data import DataLoader

batch_size = 32
my_collator = MyCollator(tokenizer)
target_dataset = Dataset(target_text, target_name, target_sid)
target_loader = DataLoader(target_dataset, shuffle=False, batch_size=batch_size, collate_fn=my_collator)
eval_dataset = Dataset(eval_text, eval_name, eval_sid)
eval_loader = DataLoader(eval_dataset, batch_size=batch_size, collate_fn=my_collator)

In [9]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

model.eval()
target_rep = {}
for batch in target_loader:
  input_ids = batch['input_ids'].to(device)
  attention_mask = batch['attention_mask'].to(device)
  outputs = model(input_ids, attention_mask=attention_mask)
  #pooled_outputs = outputs.pooler_output.detach().numpy()
  pooled_outputs = outputs.last_hidden_state[:,0]
  norm = torch.sqrt((pooled_outputs*pooled_outputs).sum(1, keepdim=True))
  pooled_outputs = (pooled_outputs/norm).cpu().detach().numpy()
  for output, book_name, sid in zip(pooled_outputs, batch['book_name'], batch['sid']):
    if book_name not in target_rep:
      target_rep[book_name] = {}
    target_rep[book_name][sid] = output


In [10]:
import numpy as np

class MemoryDataset(torch.utils.data.Dataset):
  def __init__(self, rep):
    super(MemoryDataset, self).__init__()
    self.data = []
    for book_name in rep:
      for sid in rep[book_name]:
        self.data.append({"vector": rep[book_name][sid], "book_name": book_name, "sid": sid})
  
  def __getitem__(self, idx):
    vector = self.data[idx]["vector"]
    book_name = self.data[idx]["book_name"]
    sid = self.data[idx]["sid"]
    return vector, book_name, sid

  def __len__(self):
    return len(self.data)

def my_collate_fn(data):
  vector, book_name, sid = [list(s) for s in zip(*data)]
  matrix = torch.from_numpy(np.array(vector))
  assert len(matrix) == len(book_name), (len(matrix), len(book_name), len(sid))
  return {"vector":matrix, "book_name":book_name, "sid":sid}

memory_dataset = MemoryDataset(target_rep)
memory_loader = DataLoader(memory_dataset, batch_size=batch_size, collate_fn=my_collate_fn)

In [11]:
class RetrieveSample:
    def __init__(self, book_name, sid, score):
        self.book_name = book_name
        self.sid = sid
        self.score = score
    
    def __lt__(self, other):
        return self.score > other.score

In [12]:
from collections import namedtuple
import heapq
eval_rep = {"Hao.txt":{}, "Yi.txt":{}}
for batch in eval_loader:
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    outputs = model(input_ids, attention_mask=attention_mask)
    #pooled_outputs = outputs.pooler_output
    pooled_outputs = outputs.last_hidden_state[:,0]
    norm = torch.sqrt((pooled_outputs*pooled_outputs).sum(1, keepdim=True))
    pooled_outputs = pooled_outputs/norm
    for memory_batch in memory_loader:
        target_vec = memory_batch["vector"].to(device)
        #scores = torch.sigmoid(torch.einsum("iv,jv->ij", pooled_outputs, target_vec)).detach().cpu().numpy()
        scores = torch.sigmoid(torch.matmul(pooled_outputs, target_vec.transpose(0, 1))).detach().cpu().numpy()
        for eval_book_name, eval_sid, mem_score in zip(batch['book_name'], batch['sid'], scores):
            for mem_book_name, mem_sid, score in zip(memory_batch['book_name'], memory_batch['sid'], mem_score):
                if eval_sid not in eval_rep[eval_book_name]:
                    eval_rep[eval_book_name][eval_sid] = []
                    #eval_rep[eval_book_name][eval_sid] = 0
                heapq.heappush(eval_rep[eval_book_name][eval_sid], RetrieveSample(mem_book_name, mem_sid, float(score)))
                eval_rep[eval_book_name][eval_sid] = eval_rep[eval_book_name][eval_sid][:5]
    del batch    

In [13]:
affiliation = {"Hao.txt":{}, "Yi.txt":{}}
best_match = {"Hao.txt":[], "Yi.txt":[]}
for book_name in eval_rep:
  for sid in eval_rep[book_name]:
    #eval_rep[book_name][sid].sort(key=lambda s: s.score, reverse=True)
    if eval_rep[book_name][sid][0].score < 0.5:
        continue
    other_book = eval_rep[book_name][sid][0].book_name
    if other_book not in affiliation[book_name]:
        affiliation[book_name][other_book] = 0
    affiliation[book_name][other_book] += 1
    best_match[book_name].append({"Source":book_sid_map[book_name][sid], "Target":book_sid_map[other_book][eval_rep[book_name][sid][0].sid]})

for book_name in affiliation:
    items = list(affiliation[book_name].items())
    items.sort(key=lambda k: k[1], reverse=True)
    print("the best matched book for %s is %s"%(book_name, items[0]))
import json
json.dump(best_match, open('best_match.json', 'w'), ensure_ascii=False, indent=4)

the best matched book for Hao.txt is ('./corpus/classics/short_data/孟子.txt', 1053)
the best matched book for Yi.txt is ('./corpus/classics/short_data/孟子.txt', 6771)


In [14]:
from google.colab import files
files.download('best_match.json')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>