In [None]:
import os
import pandas as pd
import numpy as np
import string
import re
import matplotlib.pyplot as plt
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet
from nltk.stem import PorterStemmer
from nltk.tokenize import word_tokenize

nltk.download('wordnet')
nltk.download('punkt_tab')
nltk.download('averaged_perceptron_tagger_eng')

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.
[nltk_data] Downloading package averaged_perceptron_tagger_eng to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger_eng.zip.


True

In [None]:
# Mount Google Drive (required every time)
from google.colab import drive
drive.mount('/content/drive')

ValueError: mount failed

In [None]:
# Define and check the paths
# PROJECT_ROOT assumes the shared Milestone II folder is in your root google drive
PROJECT_ROOT = '/content/drive/MyDrive/SIADS 692 Milestone II/Milestone II - NLP Cryptic Crossword Clues' # Nathan's Drive
DATA_DIR = f"{PROJECT_ROOT}/data"
NOTEBOOK_DIR = f"{PROJECT_ROOT}/notebooks"

if not os.path.exists(PROJECT_ROOT):
    PROJECT_ROOT = os.path.abspath("..")  # fallback for local runs

In [None]:
# Read each CSV file into a DataFrame
df_clues = pd.read_csv(f'{DATA_DIR}/clues_raw.csv')
df_indicators = pd.read_csv(f'{DATA_DIR}/indicators_raw.csv')
df_ind_by_clue = pd.read_csv(f'{DATA_DIR}/indicators_by_clue_raw.csv')
df_ind_consolidated = pd.read_csv(f'{DATA_DIR}/indicators_consolidated_raw.csv')
df_charades = pd.read_csv(f'{DATA_DIR}/charades_raw.csv')
df_charades_by_clue = pd.read_csv(f'{DATA_DIR}/charades_by_clue_raw.csv')

In [None]:
# Instead of a string with redundant indices, extract only the clue_ids in
# brackets to create a list of integers
df_indicators["clue_ids"] = (
    df_indicators["clue_ids"]
    .str.findall(r"\[(\d+)\]")
    .apply(lambda xs: [int(x) for x in xs])
)

# Include a new column to keep track of how many clues have this indicator
df_indicators["num_clues"] = df_indicators["clue_ids"].apply(len)

In [None]:
# Create a dictionary where the key is the wordplay type, and the value is
# the list of associated unique indicators.
ind_by_wordplay_dict = {}

for wordplay in df_ind_consolidated.columns:
  ind_by_wordplay_dict[wordplay] = df_ind_consolidated[wordplay].values[0].split('\n')

# See how many unique indicators there are for each type of wordplay
for wordplay in ind_by_wordplay_dict:
  print(f"{wordplay}: {len(ind_by_wordplay_dict[wordplay])}")

In [None]:
df_indicators.shape, df_indicators.columns

In [None]:
df_indicators.wordplay.value_counts().sort_values()

In [None]:
df_indicators.indicator.value_counts().sort_values()
# Array of indicators that appear more than once in df, same as more than one wordplay type?
counts = df_indicators.groupby('indicator')['indicator'].transform('count')
df_indicators[counts > 1].indicator.unique()

In [None]:
df_indicators[df_indicators.indicator=='abnormal']

In [None]:
df_indicators[df_indicators.indicator=='caught']

In [None]:
# 888037 clues in the ind_by_clue df.
# 5730 of these have more than one indicator associated with them.
# 'to squeeze' seems like an indicator that is >1 word, but could be cleaned and retained.
df_ind_by_clue['ind_count'] = 8-df_ind_by_clue.isna().sum(axis=1)
print(df_ind_by_clue[df_ind_by_clue.ind_count>1].shape)
df_ind_by_clue.sort_values('ind_count', ascending=False)

In [None]:
df_indicators.iloc[15732]

In [None]:
# The indicator to extract could be simply "reflected".
df_clues[df_clues.clue_id==50741]['clue'].values

In [None]:
# 9097 rows with indicators >1 word count.
df_indicators['ind_wc'] = df_indicators['indicator'].apply(lambda x: len(str(x).split()))
df_long_ind = df_indicators[df_indicators['ind_wc'] > 1]

# Try removing stopwords, maybe better to retain all, allow membership with multiple clusters
stop = stopwords.words('english')
df_long_ind['indicator_wo_stop'] = df_long_ind['indicator'].apply(lambda x: ' '.join([word for word in x.split() if word not in (stop)]))

In [None]:
df_long_ind

In [None]:
df_clues[df_clues.clue_id==412172]['clue'].values

In [None]:
df_clues[df_clues.clue_id==412172]

In [None]:
ind_by_wordplay_dict.keys()

In [None]:
# Option 1: unique set of lemmatized indicator

def ind_list(in_dict, key):
  # split words if indicator wc>1
  ser = pd.Series(in_dict[key]).apply(lambda x: str(x).split())
  # keep only unique indicators of this wordplay type.
  unique_inds = list(set(ser.sum()))
  lemmatizer = WordNetLemmatizer()
  lem_unique_inds = list(set([lemmatizer.lemmatize(word) for word in unique_inds]))
  lem_unique_inds.sort()
  return lem_unique_inds

out = [ind_list(ind_by_wordplay_dict, key) for key in ind_by_wordplay_dict.keys()]
out = sum(out, [])
out = list(set(out))
out.sort()
print(len(out))
out

In [None]:
# Option 2: unique set of lemmatized indicator words,
# enhanced with POS tagging lemmatization.

# Define a function to map Penn Treebank tags to WordNet tags
def get_wordnet_pos(tag):
    if tag.startswith('J'):
        return wordnet.ADJ
    elif tag.startswith('V'):
        return wordnet.VERB
    elif tag.startswith('N'):
        return wordnet.NOUN
    elif tag.startswith('R'):
        return wordnet.ADV
    else:
        return wordnet.NOUN # Default to noun if no clear mapping


def ind_list(in_dict, key):
  # split words if indicator wc>1
  ser = pd.Series(in_dict[key]).apply(lambda x: str(x).split())
  # keep only unique indicators of this wordplay type.
  unique_inds = list(set(ser.sum()))
  # convert list of unique indicators to one string.
  unique_inds = ' '.join(unique_inds)

  # Tokenize the text
  tokens = nltk.word_tokenize(unique_inds)
  # Perform POS tagging (NLTK uses Penn Treebank tags)
  pos_tags = nltk.pos_tag(tokens)

  lemmatizer = WordNetLemmatizer()
  lem_unique_inds = [lemmatizer.lemmatize(word, get_wordnet_pos(tag)) for word, tag in pos_tags]

  lem_unique_inds = list(set(lem_unique_inds))
  lem_unique_inds.sort()

  return lem_unique_inds

out = [ind_list(ind_by_wordplay_dict, key) for key in ind_by_wordplay_dict.keys()]
out = sum(out, [])
out = list(set(out))
out.sort()
print(len(out))
out

In [None]:
# Option 3: unique set of stemmed indicator words

def ind_list(in_dict, key):

  # split words if indicator wc>1
  ser = pd.Series(in_dict[key]).apply(lambda x: str(x).split())
  # keep only unique indicators of this wordplay type.
  unique_inds = list(set(ser.sum()))
  ps = PorterStemmer()
  stem_unique_inds = list(set([ps.stem(word) for word in unique_inds]))
  stem_unique_inds.sort()
  return stem_unique_inds

out = [ind_list(ind_by_wordplay_dict, key) for key in ind_by_wordplay_dict.keys()]
out = sum(out, [])
out = list(set(out))
out.sort()
print(len(out))
out

In [None]:
# Find any punctuation in indicators.
pattern = f"[{re.escape(string.punctuation)}]+"
for key in ind_by_wordplay_dict.keys():
  words = ind_by_wordplay_dict[key]
  print(key, len(ind_by_wordplay_dict[key]), len([w for w in words if re.findall(pattern, w)]))

In [None]:
ind_by_wordplay_dict['anagram']

In [None]:
words = ind_by_wordplay_dict['anagram']
# Create a translation table that maps every punctuation character to None (removal)
translator = str.maketrans(string.punctuation, ' ' * len(string.punctuation))
# Apply the translation
new_words = [w.translate(translator) for w in words]

new_words

- Looked into the anagram indicators more... are these acting on the definition clue? Or is the anagram indicator containing the actual letters to use in the answer?
- Use a counter, giving more confidence in more frequent clue indicators?
- These '/' characters are strange, what is the actual text in the CCC that matches this indicator?