In [None]:
!pip install -q apache_beam
!pip install -q annoy

In [None]:
import os
import sys
import pickle
from collections import namedtuple
from datetime import datetime
import numpy as np
import apache_beam as beam
from apache_beam.transforms import util
import tensorflow as tf
import tensorflow_hub as hub
import annoy
from sklearn.random_projection import gaussian_random_matrix
import pandas as pd

In [None]:
!wget 'https://dataverse.harvard.edu/api/access/datafile/3450625?format=tab&gbrecs=true' -O raw.tsv
!wc -l raw.tsv

--2020-12-19 19:01:17--  https://dataverse.harvard.edu/api/access/datafile/3450625?format=tab&gbrecs=true
Resolving dataverse.harvard.edu (dataverse.harvard.edu)... 206.191.184.198
Connecting to dataverse.harvard.edu (dataverse.harvard.edu)|206.191.184.198|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 57600231 (55M) [text/tab-separated-values]
Saving to: ‘raw.tsv’


2020-12-19 19:01:20 (28.7 MB/s) - ‘raw.tsv’ saved [57600231/57600231]

1103664 raw.tsv


In [None]:
raw_df = pd.read_csv('raw.tsv', sep="\t")
raw_df.head()

Unnamed: 0,publish_date,headline_text
0,20030219,aba decides against community broadcasting lic...
1,20030219,act fire witnesses must be aware of defamation
2,20030219,a g calls for infrastructure protection summit
3,20030219,air nz staff in aust strike for pay rise
4,20030219,air nz strike to affect australian travellers


In [None]:
!rm -r corpus
!mkdir corpus

with open('corpus/text.txt', 'w') as out_file:
  with open('raw.tsv', 'r') as in_file:
    for line in in_file:
      headline = line.split('\t')[1].strip().strip('"')
      out_file.write(headline+"\n")

In [None]:
text_df = pd.read_csv('corpus/text.txt', sep="\t")
text_df.head()

Unnamed: 0,headline_text
0,aba decides against community broadcasting lic...
1,act fire witnesses must be aware of defamation
2,a g calls for infrastructure protection summit
3,air nz staff in aust strike for pay rise
4,air nz strike to affect australian travellers


## Generate Embeddings

##### 1) Embedding extraction method 

In [None]:
embed_fn = None

def generate_embeddings(text, module_url, random_projection_matrix=None):
  # Beam will run this function in different processes that need to
  # import hub and load embed_fn (if not previously loaded)
  global embed_fn
  if embed_fn is None:
    embed_fn = hub.load(module_url)
  embedding = embed_fn(text).numpy()
  if random_projection_matrix is not None:
    embedding = embedding.dot(random_projection_matrix)
  return text, embedding

##### 2) Function for converting to TF record
For big data sets, binary file storage format is advantageous as it reduces the size of the file. This could help improve the file load and import time as well as the overall performace of the model which uses it. Tensorflow provides a native way to write in this format (TFRecord). <br>
TFRecord is explained in detail in this blog post: [ Tensorflow Records? What they are and how to use them ](https://medium.com/mostly-ai/tensorflow-records-what-they-are-and-how-to-use-them-c46bc4bbb564)

In [None]:
def to_tf_example(entries):
  examples = []

  text_list, embedding_list = entries
  for i in range(len(text_list)):
    text = text_list[i]
    embedding = embedding_list[i]

    features = {
        'text': tf.train.Feature(
            bytes_list=tf.train.BytesList(value=[text.encode('utf-8')])),
        'embedding': tf.train.Feature(
            float_list=tf.train.FloatList(value=embedding.tolist()))
    }

    example = tf.train.Example(
        features=tf.train.Features(
            feature=features)).SerializeToString(deterministic=True)

    examples.append(example)

  return examples

##### 3) Pipeline

In [None]:
def run_hub2emb(args):
  '''Runs the embedding generation pipeline'''

  options = beam.options.pipeline_options.PipelineOptions(**args)
  args = namedtuple("options", args.keys())(*args.values())

  with beam.Pipeline(args.runner, options=options) as pipeline:
    (
        pipeline
        | 'Read sentences from files' >> beam.io.ReadFromText(file_pattern=args.data_dir)
        | 'Batch elements' >> util.BatchElements(min_batch_size=args.batch_size, max_batch_size=args.batch_size)
        | 'Generate embeddings' >> beam.Map(generate_embeddings, args.module_url, args.random_projection_matrix)
        | 'Encode to tf example' >> beam.FlatMap(to_tf_example)
        | 'Write to TFRecords files' >> beam.io.WriteToTFRecord(
            file_path_prefix='{}/emb'.format(args.output_dir),
            file_name_suffix='.tfrecords')
    )

##### 4) Generaring Random Projection Weight Matrix 
<p>Reducing the dimenstionality of the vector using random projections. This helps in reducing the time to build and query the Annoy index.</p>



In [None]:
def generate_random_projection_weights(original_dim, projected_dim):
  random_projection_matrix = None
  random_projection_matrix = gaussian_random_matrix(
      n_components=projected_dim, n_features=original_dim).T
  print("A Gaussian random weight matrix was creates with shape of {}".format(random_projection_matrix.shape))
  print('Storing random projection matrix to disk...')
  with open('random_projection_matrix', 'wb') as handle:
    pickle.dump(random_projection_matrix, 
                handle, protocol=pickle.HIGHEST_PROTOCOL)

  return random_projection_matrix

##### 5) Run Pipeline

In [None]:
import tempfile

module_url = "https://tfhub.dev/google/nnlm-en-dim128/2"
projected_dim = 64

output_dir = tempfile.mkdtemp()
original_dim = hub.load(module_url)(['']).shape[1]
random_projection_matrix = None

if projected_dim:
  random_projection_matrix = generate_random_projection_weights(
      original_dim, projected_dim)

args = {
    'job_name': 'hub2emb-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S')),
    'runner': 'DirectRunner',
    'batch_size': 1024,
    'data_dir': 'corpus/*.txt',
    'output_dir': output_dir,
    'module_url': module_url,
    'random_projection_matrix': random_projection_matrix,
}

print("Pipeline args are set.")
args

A Gaussian random weight matrix was creates with shape of (128, 64)
Storing random projection matrix to disk...
Pipeline args are set.




{'batch_size': 1024,
 'data_dir': 'corpus/*.txt',
 'job_name': 'hub2emb-201219-190642',
 'module_url': 'https://tfhub.dev/google/nnlm-en-dim128/2',
 'output_dir': '/tmp/tmpv84yhfyl',
 'random_projection_matrix': array([[ 0.12178921, -0.05828368,  0.1996912 , ..., -0.15360454,
         -0.0705459 , -0.10283104],
        [ 0.00821986, -0.08220833,  0.10074971, ...,  0.22865047,
         -0.06353593,  0.12686375],
        [-0.07050029,  0.3016821 ,  0.04599832, ...,  0.06371925,
         -0.20535336, -0.03088271],
        ...,
        [-0.21025577,  0.0665988 ,  0.04275995, ...,  0.06973909,
          0.06158255, -0.01393402],
        [-0.24225089,  0.11765618, -0.04644012, ...,  0.02327763,
          0.11400811, -0.02377637],
        [-0.11452499, -0.28527941, -0.0642996 , ...,  0.05676581,
          0.09466643,  0.03812898]]),
 'runner': 'DirectRunner'}

In [None]:
print("Running pipeline...")
%time run_hub2emb(args)
print("Pipeline is done.")

Running pipeline...
CPU times: user 2min 16s, sys: 1min 46s, total: 4min 3s
Wall time: 2min 4s
Pipeline is done.


In [None]:
ls {output_dir}

emb-00000-of-00001.tfrecords


Read a sample record

In [None]:
embed_file = os.path.join(output_dir, 'emb-00000-of-00001.tfrecords')
sample = 5

# Create a description of the features.
feature_description = {
    'text': tf.io.FixedLenFeature([], tf.string),
    'embedding': tf.io.FixedLenFeature([projected_dim], tf.float32)
}

def _parse_example(example):
  # Parse the input `tf.Example` proto using the dictionary above.
  return tf.io.parse_single_example(example, feature_description)

dataset = tf.data.TFRecordDataset(embed_file)
for record in dataset.take(sample).map(_parse_example):
  print("{}: {}".format(record['text'].numpy().decode('utf-8'), record['embedding'].numpy()[:10]))

headline_text: [-0.13770862 -0.33973074 -0.00641254  0.06792866 -0.18257555  0.17030762
 -0.25820914 -0.07196762 -0.06983764 -0.05482756]
aba decides against community broadcasting licence: [ 0.11068262 -0.05284186  0.07776139 -0.03697378  0.09400154 -0.02768139
 -0.11715365  0.13583417  0.11048616  0.07729341]
act fire witnesses must be aware of defamation: [ 0.12514837 -0.05470643  0.11934085  0.02903574 -0.20105729  0.10341746
  0.06573067  0.03990437  0.19003376  0.22840866]
a g calls for infrastructure protection summit: [ 0.20514376 -0.08683673  0.07670747  0.1445789  -0.14576158 -0.04367733
 -0.05763638  0.2639629   0.09091558  0.00183326]
air nz staff in aust strike for pay rise: [-0.13528588  0.07863335 -0.13245809  0.0767909  -0.03352788 -0.13530774
 -0.14249204 -0.1614395   0.07354797  0.01717338]


##### 6) Build index

[ANNOY](https://github.com/spotify/annoy) is a C++ library with Python bindings to search for points in space that are close to a given query point. It also creates large read-only file-based data structures that are mmapped into memory. It is built and used by Spotify for music recommendations.

This is one way of indexing embeddings for nearest neighbours search. There are many other libraries availble, such as [FAISS](https://ai.facebook.com/tools/faiss/). Its explained in detail in this blog: [Faiss: A library for efficient similarity search](https://engineering.fb.com/2017/03/29/data-infrastructure/faiss-a-library-for-efficient-similarity-search/)

In [None]:
def build_index(embedding_files_pattern, index_filename, vector_length, 
    metric='angular', num_trees=100):
  '''Builds an ANNOY index'''

  annoy_index = annoy.AnnoyIndex(vector_length, metric=metric)
  # Mapping between the item and its identifier in the index
  mapping = {}

  embed_files = tf.io.gfile.glob(embedding_files_pattern)
  num_files = len(embed_files)
  print('Found {} embedding file(s).'.format(num_files))

  item_counter = 0
  for i, embed_file in enumerate(embed_files):
    print('Loading embeddings in file {} of {}...'.format(i+1, num_files))
    dataset = tf.data.TFRecordDataset(embed_file)
    for record in dataset.map(_parse_example):
      text = record['text'].numpy().decode("utf-8")
      embedding = record['embedding'].numpy()
      mapping[item_counter] = text
      annoy_index.add_item(item_counter, embedding)
      item_counter += 1
      if item_counter % 100000 == 0:
        print('{} items loaded to the index'.format(item_counter))

  print('A total of {} items added to the index'.format(item_counter))

  print('Building the index with {} trees...'.format(num_trees))
  annoy_index.build(n_trees=num_trees)
  print('Index is successfully built.')

  print('Saving index to disk...')
  annoy_index.save(index_filename)
  print('Index is saved to disk.')
  print("Index file size: {} GB".format(
    round(os.path.getsize(index_filename) / float(1024 ** 3), 2)))
  annoy_index.unload()

  print('Saving mapping to disk...')
  with open(index_filename + '.mapping', 'wb') as handle:
    pickle.dump(mapping, handle, protocol=pickle.HIGHEST_PROTOCOL)
  print('Mapping is saved to disk.')
  print("Mapping file size: {} MB".format(
    round(os.path.getsize(index_filename + '.mapping') / float(1024 ** 2), 2)))

In [None]:
embedding_files = "{}/emb-*.tfrecords".format(output_dir)
embedding_dimension = projected_dim
index_filename = "index"

!rm {index_filename}
!rm {index_filename}.mapping

%time build_index(embedding_files, index_filename, embedding_dimension)

rm: cannot remove 'index': No such file or directory
rm: cannot remove 'index.mapping': No such file or directory
Found 1 embedding file(s).
Loading embeddings in file 1 of 1...
100000 items loaded to the index
200000 items loaded to the index
300000 items loaded to the index
400000 items loaded to the index
500000 items loaded to the index
600000 items loaded to the index
700000 items loaded to the index
800000 items loaded to the index
900000 items loaded to the index
1000000 items loaded to the index
1100000 items loaded to the index
A total of 1103664 items added to the index
Building the index with 100 trees...
Index is successfully built.
Saving index to disk...
Index is saved to disk.
Index file size: 1.6 GB
Saving mapping to disk...
Mapping is saved to disk.
Mapping file size: 50.61 MB
CPU times: user 11min 18s, sys: 49.1 s, total: 12min 7s
Wall time: 8min 59s


##### 7) Similarity matching

In [None]:
index = annoy.AnnoyIndex(embedding_dimension)
index.load(index_filename, prefault=True)
print('Annoy index is loaded.')
with open(index_filename + '.mapping', 'rb') as handle:
  mapping = pickle.load(handle)
print('Mapping file is loaded.')

  """Entry point for launching an IPython kernel.


Annoy index is loaded.
Mapping file is loaded.


In [None]:
def find_similar_items(embedding, num_matches=5):
  '''Finds similar items to a given embedding in the ANN index'''
  ids = index.get_nns_by_vector(
  embedding, num_matches, search_k=-1, include_distances=False)
  items = [mapping[i] for i in ids]
  return items

In [None]:
# Load the TF-Hub module
print("Loading the TF-Hub module...")
%time embed_fn = hub.load(module_url)
print("TF-Hub module is loaded.")

random_projection_matrix = None
if os.path.exists('random_projection_matrix'):
  print("Loading random projection matrix...")
  with open('random_projection_matrix', 'rb') as handle:
    random_projection_matrix = pickle.load(handle)
  print('random projection matrix is loaded.')

def extract_embeddings(query):
  '''Generates the embedding for the query'''
  query_embedding =  embed_fn([query])[0].numpy()
  if random_projection_matrix is not None:
    query_embedding = query_embedding.dot(random_projection_matrix)
  return query_embedding

Loading the TF-Hub module...
CPU times: user 1.98 s, sys: 1.39 s, total: 3.36 s
Wall time: 6.3 s
TF-Hub module is loaded.
Loading random projection matrix...
random projection matrix is loaded.


In [None]:
extract_embeddings("Hello World!")

array([ 1.17241136e-01,  2.62062729e-02, -3.33819925e-02, -5.46151620e-02,
        1.36114855e-02, -1.41464184e-01, -3.60374490e-01,  1.43173049e-01,
       -1.09221070e-01, -2.36616821e-04,  1.51816261e-03, -5.28215226e-02,
       -2.58640195e-02, -1.53519420e-01, -8.97222777e-02,  2.00432325e-01,
        1.03531104e-01, -1.95038122e-01,  4.12229890e-02, -1.23485297e-01,
        5.43044080e-02, -7.06551569e-02, -1.45756273e-01,  9.81731250e-02,
       -2.50568607e-02, -3.80514622e-02,  1.16389707e-01, -5.83222522e-02,
        1.95557139e-02, -1.36746520e-01, -2.43734727e-02, -4.80600038e-02,
        3.82312758e-02, -1.04549729e-01, -1.84006667e-02, -1.94254185e-02,
       -1.25843178e-01, -1.41848847e-01, -1.51613722e-01,  1.26761116e-01,
        1.52873269e-02,  4.76524508e-02,  6.15889394e-02, -5.74508806e-02,
       -7.20210680e-02,  2.26732823e-01, -8.75514801e-02,  1.08207161e-01,
       -1.98334148e-01, -6.40587430e-02,  1.99382623e-01,  6.17804674e-02,
        1.99270363e-03,  

##### 8) Search Results

In [None]:
query = "problems in industry"

print("Generating embedding for the query...")
%time query_embedding = extract_embeddings(query)

print("")
print("Finding relevant items in the index...")
%time items = find_similar_items(query_embedding, 10)

print("")
print("Results:")
print("=========")
for item in items:
  print(item)

Generating embedding for the query...
CPU times: user 4.57 ms, sys: 0 ns, total: 4.57 ms
Wall time: 3.59 ms

Finding relevant items in the index...
CPU times: user 6.08 ms, sys: 0 ns, total: 6.08 ms
Wall time: 6.27 ms

Results:
hospitals in crisis
taxi industry in turmoil
troubled money markets threaten future of inpex
shipping industry in strife
mareeba council claims success in moving
floods crisis worsens in thailand
perth unit owners in for prolonged market pain amid slow market
areas of concern in greens labor agreement
crisis in ossetia worsens
progress in nursing dispute
