-
Notifications
You must be signed in to change notification settings - Fork 0
/
encoding.py
153 lines (123 loc) · 6.55 KB
/
encoding.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# coding: utf-8
import os
import sys
import subprocess
import numpy as np
from gensim.models import LdaMulticore, LsiModel, HdpModel, TfidfModel, word2vec
from gensim.matutils import corpus2dense
from gensim.models.doc2vec import Doc2Vec, TaggedDocument
from gensim.models.wrappers.dtmmodel import DtmModel
from utils import transcorp2matrix
# SET OF FUNCTIONS TO CREATE EMBEDDINGS (Step 1) #
# Create STM embeddings by calling R script.
def create_stm_encoding(vector_size, datafile, language):
subprocess.call("Rscript ./external/stm.R {} {} {}".format(datafile, vector_size, language), shell=True)
x = np.loadtxt('./external/raw_embeddings/tmp_{}_EMBEDDING_{}.csv'.format('STM', vector_size))
return x, None
# Create CTM embeddings by calling R script.
def create_ctm_encoding(vector_size, datafile, language):
subprocess.call("Rscript ./external/ctm.R {} {} {}".format(datafile, vector_size, language), shell=True)
x = np.loadtxt('./external/raw_embeddings/tmp_{}_EMBEDDING_{}.csv'.format('CTM', vector_size))
return x, None
# Loads PTM embeddings if provided by the user as specified.
def load_ptm_encoding(vector_size):
filename = './external/raw_embeddings/tmp_{}_EMBEDDING_{}.csv'.format('PTM', vector_size)
try:
x = np.loadtxt("../Data Russie/encodings/RADIO_PTM_ENCODING_1000.txt")
except OSError:
print('No such file: {}'.format(filename))
sys.exit(1)
return x, None
# Created Doc2Vec embedding using gensim.
def create_d2v_encoding(corpus, vector_size):
d2v_corpus = [TaggedDocument(doc, [i]) for i, doc in enumerate(corpus)]
mod = Doc2Vec(d2v_corpus, vector_size=vector_size, window=5, min_count=2, workers=3)
return np.array([mod.docvecs[i] for i in range(len(mod.docvecs))]), mod
# Create Pool embedding using word2vec.
def create_pool_encoding(corpus, vector_size):
mod = word2vec.Word2Vec(corpus, size=vector_size, window=5, min_count=1, workers=3, sg=0)
return np.array([mod.wv[c].mean(0) if len(c) > 0 else np.zeros(vector_size) for c in corpus]), mod
# Create BoREP embedding using random projection of words' embeddings.
def create_borep_encoding(corpus, vector_size, dim=200):
w2v = word2vec.Word2Vec(corpus, size=dim, window=5, min_count=1, workers=3, sg=0)
w = np.random.uniform(-1 / np.sqrt(dim), 1 / np.sqrt(dim), (vector_size, dim))
res = np.vstack([np.apply_along_axis(lambda x: w.dot(x), 1, w2v.wv[c]).mean(0) if len(c) > 0
else np.zeros(vector_size) for c in corpus])
return res, None
# Create LDA embedding using gensim's multicore implementation. Change 'workers' to suit your specs.
def create_lda_encoding(corpus, vector_size, dictionary):
bow_corpus = [dictionary.doc2bow(x) for x in corpus]
mod = LdaMulticore(bow_corpus, num_topics=vector_size, workers=3)
transcorp = mod[bow_corpus]
return transcorp2matrix(transcorp, bow_corpus, vector_size), mod
# Create LSA embedding using gensim.
def create_lsa_encoding(corpus, vector_size, dictionary):
bow_corpus = [dictionary.doc2bow(x) for x in corpus]
mod = LsiModel(bow_corpus, num_topics=vector_size)
transcorp = mod[bow_corpus]
return transcorp2matrix(transcorp, bow_corpus, vector_size), mod
# Create HDP embedding using gensim.
def create_hdp_encoding(corpus, vector_size, dictionary):
bow_corpus = [dictionary.doc2bow(x) for x in corpus]
mod = HdpModel(bow_corpus, id2word=dictionary)
vector_size = mod.get_topics().shape[0]
transcorp = mod[bow_corpus]
return transcorp2matrix(transcorp, bow_corpus, vector_size), mod
# Create Bag-of-Words with TF-IDF.
def create_bow_encoding(corpus, vector_size, dictionary):
dictionary.filter_extremes(keep_n=vector_size)
bow_corpus = [dictionary.doc2bow(x) for x in corpus]
mod = TfidfModel(bow_corpus, dictionary=dictionary)
corpus_tfidf = mod[bow_corpus]
return corpus2dense(corpus_tfidf, num_terms=vector_size).T, mod
# Create DTM embedding using Blei's original binary.
def create_dtm_encoding(corpus, vector_size, dictionary, slices):
path = './external/dtm_bin/'
link = 'https://github.com/magsilva/dtm/tree/master/bin'
content = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) and 'dtm' in f]
if len(content) != 1:
print("Please place the appropriate binary file (and only this one) from {} into '{}'.".format(path, link))
sys.exit(1)
mod_path = path + content[0]
dictionary.filter_extremes(keep_n=5000)
bow_corpus = [dictionary.doc2bow(x) for x in corpus]
mod = DtmModel(mod_path, corpus=bow_corpus, id2word=dictionary, time_slices=slices, num_topics=vector_size)
return mod.gamma_, mod
# Create Sentence Embeddings with BERT & XLNet with sentence_transformers package
def create_bert_encoding(corpus, vector_size):
from sentence_transformers import SentenceTransformer
mod = SentenceTransformer('bert-base-nli-mean-tokens')
sentences = [' '.join(c) for c in corpus]
sentence_embeddings = mod.encode(sentences)
return sentence_embeddings, mod
# Main function to centralize the call for embedding construction.
def construct_corpus(corpus, dictionary, method='BOW', vector_size=200, datafile=None, slices=None, language='english'):
if method == 'DOC2VEC':
x, mod = create_d2v_encoding(corpus, vector_size)
elif method == 'POOL':
x, mod = create_pool_encoding(corpus, vector_size)
elif method == 'BOREP':
x, mod = create_borep_encoding(corpus, vector_size, dim=200)
elif method == 'LSA':
x, mod = create_lsa_encoding(corpus, vector_size, dictionary)
elif method == 'LDA':
x, mod = create_lda_encoding(corpus, vector_size, dictionary)
elif method == 'HDP':
print("HDP is hierarchical hence parameter K is ignored.")
x, mod = create_hdp_encoding(corpus, vector_size, dictionary)
elif method == 'DTM':
x, mod = create_dtm_encoding(corpus, vector_size, dictionary, slices)
elif method == 'STM':
print("STM is going to run a R subprocess to construct embedding...")
x, mod = create_stm_encoding(vector_size, datafile, language)
elif method == 'CTM':
print("CTM is going to run a R subprocess to construct embedding...")
x, mod = create_ctm_encoding(vector_size, datafile, language)
elif method == 'PTM':
print("PTM loads pre-computed embeddings using https://github.com/qiang2100/STTM")
x, mod = load_ptm_encoding(vector_size)
elif method == 'BERT':
x, mod = create_bert_encoding(corpus, vector_size)
else:
x, mod = create_bow_encoding(corpus, vector_size, dictionary)
return x, mod