In [24]:
import os
import sys
import annoy
import pickle
import tempfile
import re
import numpy as np
import tensorflow as tf
import apache_beam as beam
import tensorflow_hub as hub

from datetime import datetime
from collections import namedtuple
from apache_beam.transforms import util
from sklearn.random_projection import gaussian_random_matrix
import csv

In [25]:
print('TF version: {}'.format(tf.__version__))
print('TF-Hub version: {}'.format(hub.__version__))
print('Apache Beam version: {}'.format(beam.__version__))

TF version: 2.1.0
TF-Hub version: 0.8.0
Apache Beam version: 2.22.0


In [26]:
from tensorflow.keras.models import load_model 
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession

config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config = config)



A Million News Headlines dataset contains news headlines published over a period of 15 years sourced from the reputable Australian Broadcasting Corp. (ABC). This news dataset has a summarised historical record of noteworthy events in the globe from early-2003 to end-2017 with a more granular focus on Australia.

Format: Tab-separated two-column data: 1) publication date and 2) headline text. We are only interested in the headline text.

In [27]:
!head raw.tsv

'head' is not recognized as an internal or external command,
operable program or batch file.


In [31]:
# We only keep the headline

with open('corpus/text.txt', 'w') as out_file:
    csvreader =  csv.reader(open('abcnews-date-text.csv', 'r'))
    fields = next(csvreader)
    print(fields)
    for row in csvreader:
        #print(row)
        out_file.write(row[1]+"\n")
    api_lines = open('news_headings.txt', 'r', encoding="utf8").readlines()
    for row in api_lines:
        clean_row = re.sub(r"[^a-zA-Z0-9]+", ' ', row)
        out_file.write(clean_row.lower()+"\n")

['publish_date', 'headline_text']


In [14]:
!tail corpus/text.txt

'tail' is not recognized as an internal or external command,
operable program or batch file.


In [15]:
# Embedding extraction method

embed_fn = None

def generate_embeddings(text, module_url, random_projection_matrix = None):
  # Beam will run this function in different processes that need to
  # import hub and load embed_fn (if not previously loaded)
  global embed_fn
  if embed_fn is None:
    embed_fn = hub.load(module_url)
  embedding = embed_fn(text).numpy()
  if random_projection_matrix is not None:
    embedding = embedding.dot(random_projection_matrix)
  return text, embedding

In [16]:
def to_tf_example(entries):
  examples = []

  text_list, embedding_list = entries
  for i in range(len(text_list)):
    text = text_list[i]
    embedding = embedding_list[i]

    features = {
        'text': tf.train.Feature(
            bytes_list = tf.train.BytesList(value = [text.encode('utf-8')])),
        'embedding': tf.train.Feature(
            float_list = tf.train.FloatList(value = embedding.tolist()))
    }
  
    example = tf.train.Example(
        features = tf.train.Features(
            feature = features)).SerializeToString(deterministic = True)
  
    examples.append(example)
  
  return examples

In [17]:
# Beam Pipeline

def run_hub2emb(args):
  '''Runs the embedding generation pipeline'''

  options = beam.options.pipeline_options.PipelineOptions(**args)
  args = namedtuple("options", args.keys())(*args.values())

  with beam.Pipeline(args.runner, options = options) as pipeline:
    (
        pipeline
        | 'Read sentences from files' >> beam.io.ReadFromText(
            file_pattern = args.data_dir)
        | 'Batch elements' >> util.BatchElements(
            min_batch_size = args.batch_size, max_batch_size = args.batch_size)
        | 'Generate embeddings' >> beam.Map(
            generate_embeddings, args.module_url, args.random_projection_matrix)
        | 'Encode to tf example' >> beam.FlatMap(to_tf_example)
        | 'Write to TFRecords files' >> beam.io.WriteToTFRecord(
            file_path_prefix = '{}/emb'.format(args.output_dir),
            file_name_suffix = '.tfrecords')
    )

Random projection is a simple, yet powerful technique used to reduce the dimensionality of a set of points which lie in Euclidean space. For a theoretical background, see the Johnson-Lindenstrauss lemma.

Reducing the dimensionality of the embeddings with random projection means less time needed to build and query the ANN index.

In this tutorial we use Gaussian Random Projection from the Scikit-learn library.

In [18]:
def generate_random_projection_weights(original_dim, projected_dim):
  random_projection_matrix = None
  random_projection_matrix = gaussian_random_matrix(
      n_components = projected_dim, n_features = original_dim).T
  print("A Gaussian random weight matrix was creates with shape of {}".format(random_projection_matrix.shape))
  print('Storing random projection matrix to disk...')
  with open('random_projection_matrix', 'wb') as handle:
    pickle.dump(random_projection_matrix, 
                handle, protocol = pickle.HIGHEST_PROTOCOL)
        
  return random_projection_matrix

In [19]:
module_url = 'https://tfhub.dev/google/tf2-preview/nnlm-en-dim128/1' 
projected_dim = 64

In [20]:
output_dir = tempfile.mkdtemp()
original_dim = hub.load(module_url)(['']).shape[1]
random_projection_matrix = None

if projected_dim:
  random_projection_matrix = generate_random_projection_weights(
      original_dim, projected_dim)

A Gaussian random weight matrix was creates with shape of (128, 64)
Storing random projection matrix to disk...




In [21]:
args = {
    'job_name': 'hub2emb-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S')),
    'runner': 'DirectRunner',
    'batch_size': 1024,
    'data_dir': 'corpus/text.txt',
    'output_dir': output_dir,
    'module_url': module_url,
    'random_projection_matrix': random_projection_matrix,
}

print("Pipeline args are set.")
args

Pipeline args are set.


{'job_name': 'hub2emb-200702-145021',
 'runner': 'DirectRunner',
 'batch_size': 1024,
 'data_dir': 'corpus/text.txt',
 'output_dir': 'C:\\Users\\Test\\AppData\\Local\\Temp\\tmpb8oqjr3v',
 'module_url': 'https://tfhub.dev/google/tf2-preview/nnlm-en-dim128/1',
 'random_projection_matrix': array([[-2.02602155e-01,  3.63717371e-02, -3.76618353e-01, ...,
          1.36456201e-01, -7.37147406e-02,  1.71278012e-01],
        [ 2.49992116e-01, -3.39484630e-02, -1.55456461e-01, ...,
          6.62979747e-02, -7.45054968e-02,  2.88302319e-01],
        [ 2.82934846e-01, -2.81104923e-01,  1.29482777e-01, ...,
          1.25713983e-01, -6.97562031e-02,  2.43263700e-01],
        ...,
        [ 9.09493914e-02, -2.51202988e-01,  2.43784095e-02, ...,
         -1.20167022e-01,  6.31295869e-02, -6.03459017e-03],
        [-1.29833866e-02, -2.49764118e-02,  5.60526119e-02, ...,
         -9.24629452e-04, -3.30043205e-02,  3.64626798e-03],
        [-6.07713266e-02, -1.66290260e-02, -2.47857951e-04, ...,
     

In [22]:
print("Running pipeline...")
%time run_hub2emb(args)
print("Pipeline is done.")



Running pipeline...
Pipeline is done.


UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe1 in position 49: invalid continuation byte

In [23]:
embed_file = os.path.join(output_dir, 'emb-00000-of-00001.tfrecords')
sample = 5

# Create a description of the features.
feature_description = {
    'text': tf.io.FixedLenFeature([], tf.string),
    'embedding': tf.io.FixedLenFeature([projected_dim], tf.float32)
}

def _parse_example(example):
  # Parse the input `tf.Example` proto using the dictionary above.
  return tf.io.parse_single_example(example, feature_description)

dataset = tf.data.TFRecordDataset(embed_file)
for record in dataset.take(sample).map(_parse_example):
  print("{}: {}".format(record['text'].numpy().decode('utf-8'), record['embedding'].numpy()[:10]))

NotFoundError: NewRandomAccessFile failed to Create/Open: C:\Users\Test\AppData\Local\Temp\tmpb8oqjr3v\emb-00000-of-00001.tfrecords : The system cannot find the file specified.
; No such file or directory

ANNOY (Approximate Nearest Neighbors Oh Yeah) is a C++ library with Python bindings to search for points in space that are close to a given query point. It also creates large read-only file-based data structures that are mapped into memory. It is built and used by Spotify for music recommendations.

In [None]:
def build_index(embedding_files_pattern, index_filename, vector_length, metric = 'angular', num_trees = 100):
  '''Builds an ANNOY index'''

  annoy_index = annoy.AnnoyIndex(vector_length, metric = metric)
  # Mapping between the item and its identifier in the index
  mapping = {}

  embed_files = tf.io.gfile.glob(embedding_files_pattern)
  num_files = len(embed_files)
  print('Found {} embedding file(s).'.format(num_files))

  item_counter = 0
  for i, embed_file in enumerate(embed_files):
    print('Loading embeddings in file {} of {}...'.format(i+1, num_files))
    dataset = tf.data.TFRecordDataset(embed_file)
    for record in dataset.map(_parse_example):
      text = record['text'].numpy().decode("utf-8")
      embedding = record['embedding'].numpy()
      mapping[item_counter] = text
      annoy_index.add_item(item_counter, embedding)
      item_counter += 1
      if item_counter % 100000 == 0:
        print('{} items loaded to the index'.format(item_counter))

  print('A total of {} items added to the index'.format(item_counter))

  print('Building the index with {} trees...'.format(num_trees))
  annoy_index.build(n_trees = num_trees)
  print('Index is successfully built.')
  
  print('Saving index to disk...')
  annoy_index.save(index_filename)
  print('Index is saved to disk.')
  print("Index file size: {} GB".format(
    round(os.path.getsize(index_filename) / float(1024 ** 3), 2)))
  annoy_index.unload()

  print('Saving mapping to disk...')
  with open(index_filename + '.mapping', 'wb') as handle:
    pickle.dump(mapping, handle, protocol = pickle.HIGHEST_PROTOCOL)
  print('Mapping is saved to disk.')
  print("Mapping file size: {} MB".format(
    round(os.path.getsize(index_filename + '.mapping') / float(1024 ** 2), 2)))

In [None]:
embedding_files = "{}/emb-*.tfrecords".format(output_dir)
embedding_dimension = projected_dim
index_filename = "index"

In [None]:
%time build_index(embedding_files, index_filename, embedding_dimension)

### Use the Index for Similarity Matching
Now we can use the ANN index to find news headlines that are semantically close to an input query.

In [None]:
index = annoy.AnnoyIndex(embedding_dimension)
index.load(index_filename, prefault = True)
print('Annoy index is loaded.')
with open(index_filename + '.mapping', 'rb') as handle:
  mapping = pickle.load(handle)
print('Mapping file is loaded.')

In [None]:
# Similarity matching method

def find_similar_items(embedding, num_matches = 5):
  '''Finds similar items to a given embedding in the ANN index'''
  ids = index.get_nns_by_vector(
  embedding, num_matches, search_k = -1, include_distances = False)
  items = [mapping[i] for i in ids]
  return items

In [None]:
# Extract embedding from a given query

print("Loading the TF-Hub module...")
%time embed_fn = hub.load(module_url)
print("TF-Hub module is loaded.")

random_projection_matrix = None
if os.path.exists('random_projection_matrix'):
  print("Loading random projection matrix...")
  with open('random_projection_matrix', 'rb') as handle:
    random_projection_matrix = pickle.load(handle)
  print('random projection matrix is loaded.')

def extract_embeddings(query):
  '''Generates the embedding for the query'''
  query_embedding =  embed_fn([query])[0].numpy()
  if random_projection_matrix is not None:
    query_embedding = query_embedding.dot(random_projection_matrix)
  return query_embedding

In [None]:
extract_embeddings("Hello Machine Learning!")[:10]

In [None]:
# Enter a query to find the most similar items

query = "terrorists in plane" 

print("Generating embedding for the query...")
%time query_embedding = extract_embeddings(query)

print("")
print("Finding relevant items in the index...")
%time items = find_similar_items(query_embedding, 10)

print("")
print("Results:")
print("=========")
for item in items:
    print(item)