In [None]:
import requests
from bs4 import BeautifulSoup

# URL for good first listicle
url = 'https://www.themuse.com/advice/interview-questions-and-answers'

r = requests.get(url)

if r.ok:  # Request returned with status 200 (OK) (i.e. no errors or redirects)
  soup = BeautifulSoup(r.text, 'html.parser')  # parse the response body
  # Extract the interview questsions based on the markdown of the page
  list_items = soup.find('div', 'article-content').find('ul').find_all('li')
  questions = [li.text for li in list_items]
  print(*questions[:3], sep='\n')
  print('...')
else:
  # We shouldn't try to process the web page if the response code isn't 200 (OK)
  raise Exception(f'Request returned status {r.status_code}: {r.reason}')

In [None]:
%%bash
if [ ! -d "./interview-bot-source/" ]
then
  # The easiest way to get the data is just to clone the repository,
  # even though we won't actually be using git for anything
  git clone -q --branch "part-one" "https://github.com/maxTarlov/interview-bot-source.git"
  echo "Cloned 'interview-bot-source' @part-one"
else  # If the repository has already been cloned, update it and check out the correct branch/tag
  cd interview-bot-source/
  git pull "https://github.com/maxTarlov/interview-bot-source.git"
  git checkout "part-one"
  echo "Updated 'interview-bot-source'"
fi

In [None]:
import numpy as np
import pandas as pd

df = pd.read_csv('./interview-bot-source/data/questions.tsv', sep='\t')
df.head()

In [None]:
import string
# Create a translation table which maps punctuation to an empty tring
translation_table = str.maketrans('', '', string.punctuation)

# Create a function which removes punctuation and converts to lower case
normalize = lambda x: x.translate(translation_table).lower()

df['normalized_question'] = df['Question'].apply(normalize)

# 10 most frequent interview questions:
df.groupby('normalized_question', as_index=False).size().nlargest(10, 'size')

In [None]:
# Install the SentenceTransformers package
!pip install -q sentence-transformers

# Download the MiniLM model checkpoint and cache it for later
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder='models')

In [None]:
# Get embedding for each question in dataset
question_embeddings = model.encode(df['Question'])
question_embeddings  # a 1305x384 matrix (number of questions by number of dimentions)

In [None]:
from sklearn.cluster import KMeans

num_classes = 128

# Instantiate k-means with a set random state (more on that below)
km = KMeans(num_classes, random_state=42)
km.fit(question_embeddings)  # Create clusters
df['km_class'] = km.predict(question_embeddings)  # Store clusters in df

# How many questions are in each cluster?
class_size = df.groupby('km_class', as_index=False).size()
class_size

In [None]:
# Instantiate a model without a fixed random state for comparison
secondary_km = KMeans(num_classes, random_state=None)
secondary_km.fit(question_embeddings)

secondary_classes = pd.Series(secondary_km.predict(question_embeddings))
secondary_class_size = secondary_classes.groupby(secondary_classes).size()

In [None]:
import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(2, constrained_layout=True)
plt.rcParams['axes.titlesize'] = 16

max_class_size = max(class_size['size'].max(), secondary_class_size.max())

xaxis_label = 'no. questions per class'
yaxis_label = 'no. classes'

# Histogram 1
ax1.set_title('Class size distribution w/ random_state=42')
ax1.hist(class_size['size'], bins=8, range=(0, max_class_size))
ax1.set_xlabel(xaxis_label)
ax1.set_ylabel(yaxis_label)

# Histogram 2
ax2.set_title('Class size distribution w/ random_state=None')
ax2.hist(secondary_class_size, bins=8, range=(0, max_class_size))
ax2.set_xlabel(xaxis_label)
ax2.set_ylabel(yaxis_label)

plt.show()

In [None]:
from typing import Optional, Iterator
from textwrap import fill  # this function makes long strings more readable

def get_sample_questions(class_index: int, sample_size: int=8, 
                         df: pd.DataFrame=df) -> "pd.Series[str]":
  """
  Return a list of randomly sampled questions where
    df['km_class'] == class_index
  If number of matching records is less than sample size, return all matching 
  records.
  """
  samples: pd.Series
  if class_size['size'][class_index] <= sample_size:
    samples = df[df['km_class'] == class_index]['Question']
  else:
    samples = df[df['km_class'] == class_index]['Question'].sample(sample_size)
  return samples

def display_sample(class_index: int, sample_size: int=8, 
                   df: pd.DataFrame=df) -> None:
  """
  Print random sample of questions where df['km_class'] == class_index along 
  with other useful information.
  """
  print(f'k-means class: {class_index}')
  all_questions = df[df['km_class']==class_index]['Question']
  print(f'Number of questions: {all_questions.size}')
  print(f'Unique questions: {np.unique(all_questions).size}')
  print('-'*70)
  samples = [fill(s) for s in get_sample_questions(class_index, sample_size)]
  print(*samples, sep='\n\n')

def get_class_iterator(starting_class: Optional[int]=None, 
                       sort_by_size: bool=False, ascending: bool=False, 
                       df: pd.DataFrame=df) -> Iterator[int]:
  """
  Return an iterator object over the k-means class indexes. If starting_class 
  is not None, start from the specified class (useful when you know where you 
  left off last). If sort_by_size,sort class indexes by the number of 
  occurances (descending). If ascending, sort ascending.
  """
  class_by_size = df.groupby('km_class').size()
  if sort_by_size:
    class_by_size = class_by_size.sort_values(ascending=ascending)
  return iter(class_by_size.loc[starting_class:].index)

# Change starting_class to 7 to start from class 7, for example
# Change sort_by_size to True to page through classes from largest to smallest
iterator = get_class_iterator(starting_class=None, sort_by_size=False, 
                              ascending=False)

In [None]:
#@title Run this cell to see a sample of each class
#@markdown Every time you run this cell, it will print a sample of the questions
#@markdown from one of the classes. Each subsequent time you run this 
#@markdown cell, it will sample the next class. To start from a specific class, 
#@markdown edit the `starting_class` argument in the last line of the previous 
#@markdown cell.

try:
  class_index = next(iterator)
except StopIteration:
  print('Reached the end of the classes, starting again from 0...')
  print('-'*70)
  iterator = iter(range(num_classes))
  class_index = next(iterator)

display_sample(class_index)

In [None]:
import json
with open('./interview-bot-source/data/answers.json') as f:
  question_answer_mappings = json.load(f)  # a dict, {<question>: <answer>}

fallback_answer = question_answer_mappings.pop('[fallback]')

# pre-answered questions we will compare user submitted questions to
golden_questions = tuple(question_answer_mappings.keys())
# we will need the embeddings to compute similarity to user questions
golden_question_encodings = model.encode(golden_questions)
answers = tuple(question_answer_mappings.values())

In [None]:
from typing import Callable, Tuple
import sentence_transformers
from sentence_transformers.util import cos_sim
import logging

LOGGING_LEVEL = 'DEBUG'
logger = logging.getLogger(__name__)
logger.setLevel(LOGGING_LEVEL)

def score_similarities(question: str, encoder: Callable=model.encode, 
    golden_question_embeddings: 'Tensor'=golden_question_encodings) -> 'Tensor':
  """
  Return a vector of similarity scores between the embedding of question and 
  the embeddings for the golden questions using cosine similarity.
  """
  question_embedding = encoder(question)
  # If the encodings don't have the same dimentionality something has gone wrong
  assert question_embedding.shape[0] == golden_question_embeddings.shape[1]
  return cos_sim((question_embedding), golden_question_embeddings)[0]

def get_best_match(question: str, answers: Tuple=answers, 
    fallback_answer: str=fallback_answer, threshold=0.55, 
    encoder: Callable=model.encode, 
    golden_question_embeddings: 'Tensor'=golden_question_encodings) -> str:
  """
  Determine the best pre-written answer to the question.
  """
  similarity_scores = score_similarities(question, encoder, 
    golden_question_embeddings)
  best_match_index = int(similarity_scores.argmax())
  confidence = float(similarity_scores[best_match_index])
  assert confidence == max(similarity_scores)
  logger.info('Confidence: '+str(confidence))

  assert len(golden_questions) == len(answers)
  result: str

  if confidence > threshold:
    logger.info('Matched question: '+golden_questions[best_match_index])
    result = answers[best_match_index]
  else:
    logger.info('confidence < threshold, using fallback')
    logger.debug('Next best question: '+golden_questions[best_match_index])
    logger.debug('Next best answer: '+answers[best_match_index])
    result = fallback_answer
  return result

  result: str

  if confidence > threshold:
    logger.info('Matched question: '+golden_questions[best_match_index])
    result = answers[best_match_index]
  else:
    logger.info('confidence < threshold, using fallback')
    logger.debug('Next best question: '+golden_questions[best_match_index])
    logger.debug('Next best answer: '+answers[best_match_index])
    result = fallback_answer
  return result

In [None]:
#@title In-notebook chatbot UI

#@markdown Run this cell to get an interactive widgit you can use to ask the bot
#@markdown some questions.

import ipywidgets as widgets
from IPython.display import display, HTML

layout = widgets.Layout(
    border='1px solid grey',
    padding='6px 12px 6px 12px',
    margin='3px 0px 24px 2px',
    max_width='275px'
)

text_area = widgets.Text(
  placeholder='Try "what are your greatest strengths?"',
  disabled=False
)

submit_button = widgets.Button(
  description='Submit',
  disabled=False,
  button_style='info', # 'success', 'info', 'warning', 'danger' or ''
  tooltip='Click me'
)

output = widgets.Output()


def submit(_):
  user_query = text_area.value
  logger.info('User query: '+user_query)

  response = get_best_match(user_query)

  output.clear_output()
  output.layout = layout
  with output:
    display(HTML(f"<p>{response}</p>".format(response=response)))

display(text_area)
display(submit_button)
display(output)

text_area.on_submit(submit)
submit_button.on_click(submit)

In [None]:
%%bash
if [ ! -d interview-bot-source/cloud-function/data ]; then
  # make a directory for the cloud function data
  mkdir interview-bot-source/cloud-function/data
fi

# move cached encoder model to function directory
cp -r models/sentence-transformers_all-MiniLM-L6-v2 \
interview-bot-source/cloud-function/data/sentence-transformers_all-MiniLM-L6-v2

# move answers.json to function directory
cp interview-bot-source/data/answers.json \
interview-bot-source/cloud-function/data/answers.json

In [None]:
%%writefile interview-bot-source/cloud-function/data/config.json
{
    "model folder": "sentence-transformers_all-MiniLM-L6-v2"
}

In [None]:
# save embeddings to function data directory
import pickle
with open('interview-bot-source/cloud-function/data/golden_question_encodings.pickle', 'wb') as f:
  pickle.dump(golden_question_encodings, f)

In [None]:
!pip install -q functions-framework

In [None]:
from functions_framework import create_app
from urllib.parse import quote

main_path = "interview-bot-source/cloud-function/main.py"

with create_app("route_requests", main_path).test_client() as client:
  question = 'What are your greatest strengths?'
  response = client.get(f'/?q={quote(question)}')
  print("Status: ", response.status)
  print("Body: ", response.json)

In [None]:
import sys
if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()

!gcloud init

In [None]:
%%bash
gcloud functions deploy handle-question\
  --gen2\
  --runtime python310\
  --region=us-central1\
  --source=interview-bot-source/cloud-function\
  --entry-point route_requests\
  --trigger-http\
  --allow-unauthenticated\
  --memory=1024MB\
  --max-instances=3