<a href="https://colab.research.google.com/github/marco-luzzara/boardgame-complexity-predictor/blob/master/src/extract_features.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from IPython.display import clear_output
import os
WORKING_LOCALLY = bool(os.getenv('WORKING_LOCALLY'))

if WORKING_LOCALLY:
    DATASET_FILE_PATH = 'data/dataset.csv'
    CLEANED_DATASET_FILE_PATH = 'data/cleaned_dataset.csv'
    PROCESSED_DATASET_FILE_PATH = 'data/processed_dataset.csv'
else:
    from google.colab import drive
    drive.mount('/content/drive')
    DATASET_FILE_PATH = '/content/drive/My Drive/Projects/IRBoardGameComplexity/dataset.csv'
    CLEANED_DATASET_FILE_PATH = '/content/drive/My Drive/Projects/IRBoardGameComplexity/cleaned_dataset.csv'
    PROCESSED_DATASET_FILE_PATH = '/content/drive/My Drive/Projects/IRBoardGameComplexity/processed_dataset.csv'
    clear_output(wait=False)

In [2]:
import spacy
from spacy import displacy



In [3]:
import logging

logger = logging.getLogger('bgg_predict')
logger.handlers.clear()
handler = logging.StreamHandler()
formatter = logging.Formatter(
        '%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

logger.debug('test')

In [4]:
import pandas as pd
import ast

def get_df_with_docs(file_path: str, nrows=None, skiprows=1) -> pd.DataFrame:
    ''' get a dataframe containing nrows and skipping the first `skiprows` (including the header)'''
    df_dataset = pd.read_csv(file_path, converters={ 'family': ast.literal_eval }, 
                             nrows=nrows, skiprows=range(1, skiprows))
    return df_dataset

def get_document_by_line(file_path: str, line: int) -> str:
    ''' the line includes the header too '''
    # range from 1 is used to keep the first row https://stackoverflow.com/a/27325729/5587393
    df = get_df_with_docs(file_path, 1, line - 1)
    return df['rulebook'].iloc[0]

def get_document_by_id(file_path: str, id: int) -> str:
     with pd.read_csv(file_path, chunksize=1, converters={ 'family': ast.literal_eval }) as reader:
        while True:
            df = next(reader)
            bg_id = df['id'].iloc[0]
            if bg_id == id:
                return df['rulebook'].iloc[0]

assert get_document_by_id(CLEANED_DATASET_FILE_PATH, 2310) == get_document_by_line(CLEANED_DATASET_FILE_PATH, 40)

## Cleaning and Preprocessing

In this part, data are cleaned and processed using coreference resolution. This means that all the pronouns and references to other objects in the sentence are resolved. The next 2 cells should be run only when you want to preprocess data, which takes a lot of time.

In [None]:
if not WORKING_LOCALLY:
    !pip install spacy-transformers
    !python3 -m pip install coreferee==1.3.*
    !python3 -m coreferee install en
    !python -m spacy download en_core_web_lg
    !python -m spacy download en_core_web_trf
    clear_output(wait=False)

In [None]:
import re

regex_mail = re.compile(r'\w+(?:\.\w+)*?@\w+(?:\.\w+)+')
# modified from https://stackoverflow.com/a/163684/5587393
regex_link = re.compile(r'(?:\b(?:(?:https?|ftp|file)://|www))[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#%=~_|]')
# in a sentence there must be at least 4 words of length 2 each
regex_at_least_4_words_in_sentence = re.compile(r"^(?=.*?(?:[,:;()'\"]?[a-zA-Z']{2,}[,:;()'\"]?(?: |-|$)(?:[^a-zA-Z]*?|[a-zA-Z]? ?)){4,})")         
# a string like "first.Second" could be misinterpreted by the tokenizer as a single token
# with the regex it becomes "first. Second"
regex_distance_between_period_and_following_word = re.compile(r'\.(?!\s|$)')
# compress consecutive whitespaces
regex_multiple_spaces = re.compile(r'\s{2,}')
# interrupted words usually have a "- " at the end before the new line, 'inter- rupted' -> 'interrupted'
# NOTE: must be after whitespace compression
regex_interrupted_word = re.compile(r'([a-zA-Z])- ')
# remove page numbers, that are usually enclosed in characters like = or -, for example "-12-"
regex_consecutive_meaningless_chars = re.compile(r'[^\.a-zA-Z0-9\s()]{2,} *(?:\d+)?|(?P<prepage>[^a-zA-Z\s\d\.])\d+(?P=prepage)')
# remove paragraphs id, '1.2.3' -> ''
regex_dot_separated_digits = re.compile(r'(?:\d+\.)+\d+')
# remove meaningless chars after sentence start, '. (- start' -> '. start'
regex_clean_start = re.compile(r'\.(\s?)[^a-zA-Z\s]+')
# recover missing apices
regex_missing_apices = re.compile(r"\b([a-zA-Z]+) (t|s)\b")

def clean_from_short_sentences(text: str) -> str:
    return '.'.join(sentence for sentence in text.split('.') if regex_at_least_4_words_in_sentence.match(sentence) is not None)

def clean_text(text: str) -> str:
    for clean_function in [lambda x: regex_mail.sub('', x),
                           lambda x: regex_link.sub('', x),
                           lambda x: regex_dot_separated_digits.sub('', x),
                           lambda x: regex_consecutive_meaningless_chars.sub('', x),
                           lambda x: regex_clean_start.sub(r'.\1', x),
                           # everything that is remove should be placed before this line so that 
                           # eventual spaces are compressed with regex_multiple_space
                           lambda x: regex_multiple_spaces.sub(' ', x),
                           lambda x: regex_interrupted_word.sub(r'\1', x),
                           lambda x: regex_missing_apices.sub(r"\1'\2", x),
                           lambda x: clean_from_short_sentences(x),
                           lambda x: regex_distance_between_period_and_following_word.sub('. ', x)]:
        text = clean_function(text)
    return text

test_text = 'this is a test (me@gmail.it) -12- that wi-  ll be   cleaned. with 2 5 6 not valid. two sentences can t be good http://or.not.'
cleaned_text = clean_text(test_text)
print(cleaned_text)
assert cleaned_text == 'this is a test () that will be cleaned. two sentences can\'t be good '

this is a test () that will be cleaned. two sentences can't be good 


In [None]:
from typing import List
import pandas as pd
import coreferee

def remove_columns_prefix(df: pd.DataFrame) -> None:
    '''remove prefix 'info.' from the columns of df'''
    df.rename(columns=lambda c: c.rsplit('.', 1)[-1], inplace=True)

def _get_new_token_from_resolve(token: spacy.tokens.Token, 
                                chains: coreferee.data_model.ChainHolder) -> spacy.tokens.Token:
    resolved_token = chains.resolve(token)
    return token.text_with_ws if resolved_token is None \
                              else 'and '.join([res_token.text_with_ws + ' ' for res_token in resolved_token])   

def _process_doc_for_coref(doc: spacy.tokens.Doc) -> str:
    replacement_tokens = []
    chains = doc._.coref_chains
    new_doc_tokens_text = [_get_new_token_from_resolve(token, chains) for token in doc]

    return ''.join(new_doc_tokens_text)

def preprocess_texts(texts: List[str]) -> List[str]:
    nlp = spacy.load('en_core_web_trf')
    nlp.add_pipe("coreferee")

    texts = [clean_text(text) for text in texts]
    docs = nlp.pipe(texts)

    return [regex_multiple_spaces.sub(' ', _process_doc_for_coref(doc)) for doc in docs]
        
text = get_document_by_line(DATASET_FILE_PATH, 103)
# text = '''Although he was very busy with his work, the magical Peter had had enough of it. 
#     He and his wife decided they needed a holiday. 
#     this couple travelled to Spain because it loves the country very much.'''
preprocess_texts([text])

In [None]:
import pandas as pd
from typing import Generator, Tuple
import ast

START_CLEANING = False
CHUNK_SIZE = 20
DATASET_ROWS = sum(1 for line in open(DATASET_FILE_PATH))

assert START_CLEANING == True # make sure you do not start preprocessing again

def clean_data_row(row, docs_dict: Generator[Tuple[int, str], None, None]):
    id_rulebook = next(docs_dict)
    assert id_rulebook[0] == row['id']
    row['rulebook'] = id_rulebook[1]
    return row

drive.mount('/content/drive')
for skip_rows in range(1, DATASET_ROWS, CHUNK_SIZE):
    column_names = ['rulebook', 'info.id', 'info.name', 'info.averageweight', 'info.playingtime', 'info.family']
    # ast.literal_eval converts the family column string into a python array
    df_dataset = pd.read_csv(DATASET_FILE_PATH, converters={ 'info.family': ast.literal_eval },
                            names=column_names, header=None,
                            nrows=CHUNK_SIZE, skiprows=skip_rows)
    remove_columns_prefix(df_dataset)
    logger.info(f"processing boardgames from {df_dataset.loc[0, 'id']} to {df_dataset.loc[df_dataset.index[-1], 'id']}")
    docs_dict = zip(df_dataset['id'].values, preprocess_texts(df_dataset['rulebook'].values))

    df_cleaned_dataset = df_dataset.apply(lambda x: clean_data_row(x, docs_dict),
                                        axis='columns')

    df_cleaned_dataset.to_csv(CLEANED_DATASET_FILE_PATH, 
                            header=True if skip_rows == 1 else False, index=False, 
                            mode='w' if skip_rows == 1 else 'a')

if not WORKING_LOCALLY:
    drive.flush_and_unmount()
drive.mount('/content/drive')
df_cleaned_dataset.head()

## Luck metrics
these metrics are retrieved using rule-based matching and dependency matching. Luck is one of the criteria that determine the bg weight. In this case, the sources of luck considered are:

- Dice rolling
- Drawing
- Shuffling
- Words like *random* or *randomly*

In [5]:
from spacy import displacy
from collections import namedtuple
from spacy.matcher import Matcher, DependencyMatcher

LuckMetrics = namedtuple('LuckMetrics', ['dice_based', 'drawing_based', 'shuffling_based', 'random_based'])

def get_luck_metrics(doc: spacy.tokens.Doc) -> LuckMetrics:
    # ---------- random ----------
    random_matcher = Matcher(doc.vocab)
    random_patterns_match = [
        [{"LEMMA": { "IN": ["random", "randomly"]}}]
    ]
    random_matcher.add("random", random_patterns_match)

    # ---------- shuffle ----------
    shuffle_matcher = Matcher(doc.vocab)
    shuffle_patterns_match = [
        [{"LEMMA": "shuffle"}]
    ]
    shuffle_matcher.add("shuffle", shuffle_patterns_match)

    # ---------- card drawing ----------
    drawing_matcher = DependencyMatcher(doc.vocab)    
    drawing_patterns = [
        [
            {
                "RIGHT_ID": "drawing",
                "RIGHT_ATTRS": {"LEMMA": "draw", "POS": "VERB"}
            },
            {
                "LEFT_ID": "drawing",
                "REL_OP": ">",
                "RIGHT_ID": "card",
                "RIGHT_ATTRS": {
                    "LEMMA": "card",
                    "POS": "NOUN", 
                    "DEP": { "IN": ['dobj', 'nsubjpass', 'compound'] }
                }
            }
        ]
    ]
    drawing_matcher.add("drawing", drawing_patterns)
    # ---------- dice rolling ----------
    dice_matcher = DependencyMatcher(doc.vocab)    
    dice_patterns = [
        [
            {
                "RIGHT_ID": "rolling",
                "RIGHT_ATTRS": {"LEMMA": { "IN": ["use", "throw", "roll"]}, "POS": "VERB"}
            },
            {
                "LEFT_ID": "rolling",
                "REL_OP": ">",
                "RIGHT_ID": "dice_or_die",
                "RIGHT_ATTRS": {
                    "LEMMA": { "IN": ["die", "dice"]},
                    "POS": "NOUN", 
                    "DEP": { "IN": ['nsubj', 'dobj', 'nsubjpass', 'compound'] }
                }
            }
        ],
        [
            {
                "RIGHT_ID": "rolling",
                "RIGHT_ATTRS": {"LEMMA": { "IN": ["use", "throw", "roll"]}, "POS": "VERB"}
            },
            {
                "LEFT_ID": "rolling",
                "REL_OP": ">",
                "RIGHT_ID": "number",
                "RIGHT_ATTRS": {
                    "IS_DIGIT": True, 
                    "DEP": { "IN": ['dobj'] }
                }
            }
        ]
    ]
    dice_matcher.add("diceroll", dice_patterns)

    dice_matches = dice_matcher(doc) 
    draw_matches = drawing_matcher(doc)
    shuffle_matches = shuffle_matcher(doc)
    random_matches = random_matcher(doc)

    return LuckMetrics(len(dice_matches), len(draw_matches), len(shuffle_matches), len(random_matches))

text = get_document_by_line(CLEANED_DATASET_FILE_PATH, 130)

nlp = spacy.load('en_core_web_sm')
doc = nlp(text)
print(len(doc), len(doc.text))
print(get_luck_metrics(doc))

2607 12624
LuckMetrics(dice_based=12, drawing_based=4, shuffling_based=3, random_based=0)


## Amount of choices metrics
these metrics are retrieved using rule-based matching and dependency matching. The amount of choices every player has is one of the criteria that determine the bg weight. In this case, I am considering:

- *can/could/may/decide/...*, with some exceptions:
    - negatives are not considered choices, like *cannot draw* or *don't choose*
    - *can* + *choose* and similar ones increase the *amount of choices* metrics by 1
- *choice/option*, except when there is a leading *no*.

In [6]:
from spacy import displacy
from collections import namedtuple
from spacy.matcher import Matcher, DependencyMatcher

ChoiceMetrics = namedtuple('ChoiceMetrics', ['can_based', 'choice_based'])

def get_choices_amount_metrics(doc: spacy.tokens.Doc) -> ChoiceMetrics:
    # --------------  can/could/may -------------- 
    # all can/could/may
    can_could_may_matcher = Matcher(doc.vocab)
    can_could_may_patterns = [
        [{
            "LEMMA": { "IN": ["can", "could", "may"]}, 
            "POS": "AUX"
        }]
    ]
    can_could_may_matcher.add('can_could_may', can_could_may_patterns)
    can_could_may_matches = { match[1] for match in can_could_may_matcher(doc) }

    # can/could/may with only or neg
    can_could_may_exceptions_matcher = DependencyMatcher(doc.vocab)
    can_could_may_exceptions_patterns = [
        [
            # ❌ can not/only/never verb 
            {
                "RIGHT_ID": "can_could_may",
                "RIGHT_ATTRS": {
                    "LEMMA": { "IN": ["can", "could", "may"]}, 
                    "POS": "AUX"
                }
            },
            {
                "LEFT_ID": "can_could_may",
                "REL_OP": "<",
                "RIGHT_ID": "generic_verb",
                "RIGHT_ATTRS": {
                    "POS": { "IN": ["AUX", "VERB"] }
                }
            },
            {
                "LEFT_ID": "generic_verb",
                "REL_OP": ">",
                "RIGHT_ID": "neg_or_only",
                "RIGHT_ATTRS": {
                    "LEMMA": { "IN": ["not", "only", "never"]}, 
                    "DEP": { "IN": ["advmod", "neg"] }
                }
            }
        ],
        [
            # ❌ can + choose are counted as 1
            {
                "RIGHT_ID": "can_could_may",
                "RIGHT_ATTRS": {
                    "LEMMA": { "IN": ["can", "could", "may"]}, 
                    "POS": "AUX"
                }
            },
            {
                "LEFT_ID": "can_could_may",
                "REL_OP": "<",
                "RIGHT_ID": "decision_verb",
                "RIGHT_ATTRS": {
                    "LEMMA": { "IN": ["decide", "select", "choose", "opt"]},
                    "POS": "VERB"
                }
            }
        ]
    ]

    can_could_may_exceptions_matcher.add('can_could_may_exceptions', can_could_may_exceptions_patterns)
    can_could_may_exceptions_matches = { match[1][0] for match in can_could_may_exceptions_matcher(doc) }

    # --------------  choose/select/... -------------- 
    # all choose/select/...
    choose_matcher = Matcher(doc.vocab)
    choose_patterns = [
        [{
            "LEMMA": { "IN": ["decide", "select", "choose", "opt"]}, 
            "POS": "VERB"
        }]
    ]
    choose_matcher.add('choose_select', choose_patterns)
    choose_matches = { match[1] for match in choose_matcher(doc) }

    # choose/select/... with only or neg
    choose_exceptions_matcher = DependencyMatcher(doc.vocab)
    choose_exceptions_patterns = [
        [
            # ❌ not/only/never choose
            {
                "RIGHT_ID": "decision_verb",
                "RIGHT_ATTRS": {
                    "LEMMA": { "IN": ["decide", "select", "choose", "opt"]}, 
                    "POS": "VERB"
                }
            },
            {
                "LEFT_ID": "decision_verb",
                "REL_OP": ">",
                "RIGHT_ID": "negation",
                "RIGHT_ATTRS": {
                    "LEMMA": { "IN": ["not", "only", "never"]}, 
                    "DEP": { "IN": ["advmod", "neg"] }
                }
            }
        ]
    ]

    choose_exceptions_matcher.add('choose_exceptions', choose_exceptions_patterns)
    choose_exceptions_matches = { match[1][0] for match in choose_exceptions_matcher(doc) }

    # -------------- choice and option -------------- 
    choice_option_matcher = Matcher(doc.vocab)
    choice_option_patterns = [
        [{
            "LEMMA": { "IN": ["choice", "option"]}, 
            "POS": "NOUN"
        }]
    ]
    choice_option_matcher.add('choice_option', choice_option_patterns)
    choice_option_matches = { match[1] for match in choice_option_matcher(doc) }

    choice_option_exceptions_matcher = DependencyMatcher(doc.vocab)
    choice_option_exceptions_patterns = [
        [
            {
                "RIGHT_ID": "choice",
                "RIGHT_ATTRS": {
                    "LEMMA": { "IN": ["choice", "option"]}, 
                    "POS": "NOUN"
                }
            },
            {
                "LEFT_ID": "choice",
                "REL_OP": ">",
                "RIGHT_ID": "prefix_no",
                "RIGHT_ATTRS": {
                    "LEMMA": "no",
                    "POS": "DET",
                    "DEP": "det"
                }
            }
        ]
    ]
    choice_option_exceptions_matcher.add('choice_option_exceptions', choice_option_exceptions_patterns)
    choice_option_exceptions_matches = { match[1][0] for match in choice_option_exceptions_matcher(doc) }

    # ---------------- results -----------------
    can_based_len = len(can_could_may_matches.difference(can_could_may_exceptions_matches))
    choice_based_len = len(choose_matches.difference(choose_exceptions_matches)) + \
                       len(choice_option_matches.difference(choice_option_exceptions_matches))

    return ChoiceMetrics(can_based_len, choice_based_len)

text = get_document_by_line(CLEANED_DATASET_FILE_PATH, 120)
# text = '''you can only take this because it can be outrageous. 
#     you can't take it. you can not also choose. you can never be sure of the result. 
#     you can decide the next thing, or you choose the target. another choice is to win. 
#     but there is no right option.'''

nlp = spacy.load('en_core_web_sm')
doc = nlp(text)
print(len(doc), len(doc.text))
print(get_choices_amount_metrics(doc))

# displacy.render(doc, style='dep', jupyter=True)

4276 19478
ChoiceMetrics(can_based=22, choice_based=8)


## Utils
Some useful methods, like:
- `find_n_most_common_nouns()`: returns the most common tokens in the dataset
- `displacy.render()`: shows the token dependencies

In [None]:
text = '''In a round of play, each player gets one turn.'''

nlp = spacy.load('en_core_web_sm')
doc = nlp(text.lower())
print([(token.lemma_, token.head, token.dep_, token.pos_) for token in doc])
print(len(doc), len(doc.text))

displacy.render(doc, style='dep', jupyter=True)

[('in', gets, 'prep', 'ADP'), ('a', round, 'det', 'DET'), ('round', in, 'pobj', 'NOUN'), ('of', round, 'prep', 'ADP'), ('play', of, 'pobj', 'NOUN'), (',', gets, 'punct', 'PUNCT'), ('each', player, 'det', 'DET'), ('player', gets, 'nsubj', 'NOUN'), ('get', gets, 'ROOT', 'VERB'), ('one', turn, 'nummod', 'NUM'), ('turn', gets, 'dobj', 'NOUN'), ('.', gets, 'punct', 'PUNCT')]
12 46


In [None]:
import itertools
from collections import Counter
from typing import List, Tuple, Iterator

def find_n_most_common_nouns(n, docs: Iterator[spacy.tokens.Doc]) -> List[Tuple[str, int]]:
    docs_sets = [set(find_most_common_nouns(doc).keys())
                 for doc in docs]
    all_tokens_from_docs = itertools.chain(*docs_sets)
    tokens_counter = Counter(all_tokens_from_docs)
    return tokens_counter.most_common(n)
    

nlp = spacy.load('en_core_web_sm')
df_dataset = get_df_with_docs(CLEANED_DATASET_FILE_PATH, 100, 200)
docs = nlp.pipe(df_dataset['rulebook'].values)

find_n_most_common_nouns(80, docs)

## Spacy Extension
This extension makes the sentence number retrieval much faster. With this extension, after the parsing step of the pipeline, each token receives a `sentence_id` extension attribute. With `token.sent`, the sentences of the doc are visited every time until its position is within the target sentence boundaries.

In [7]:
from spacy.language import Language

def _component_assign_sentence_id(doc: spacy.tokens.Doc) -> spacy.tokens.Doc:
    spacy.tokens.Token.set_extension('sentence_id', default=None, force=True)
    cur_sentence_id = -1
    for token in doc:
        if token.is_sent_start:
            cur_sentence_id += 1
        token._.sentence_id = cur_sentence_id
        
    return doc

# this extension exists because accessing the `sent` attribute everytime means
# loop through the doc.sents until the token is found
factory_id = 'assign_sentence_id'
if not Language.has_factory(factory_id):
    @Language.component(factory_id)
    def assign_sentence_id(doc: spacy.tokens.Doc) -> spacy.tokens.Doc:
        return _component_assign_sentence_id(doc)

text = """this is the first sentence. this is the second one. the third one is here."""

nlp = spacy.load('en_core_web_sm')
nlp.add_pipe('assign_sentence_id', after='parser')
doc = nlp(text)

for token in doc:
    print(token, token._.sentence_id)

this 0
is 0
the 0
first 0
sentence 0
. 0
this 1
is 1
the 1
second 1
one 1
. 1
the 2
third 2
one 2
is 2
here 2
. 2


## Rulebook features
I search entities in the text using rules like the part of speech and frequency, instead of keyword extraction algorithm like `YAKE` or `TextRank`.

In [13]:
from collections import Counter, defaultdict
from typing import List, Set, Dict, Tuple
from string import punctuation

MAX_DISTANCE_TO_BE_CONSIDERED_UNIGRAM = 4
MIN_TOKEN_TO_BE_CONSIDERED_UNIGRAM = 4
MIN_TOKEN_TO_BE_CONSIDERED_BIGRAM = 3
IGNORED_WORDS = {
        'amount', 'beginning', 'board', 'book', 'bottom', 'case', 'choice', 
        'clarification', 'clockwise', 'condition', 'cost', 'design', 'difference', 'effect', 
        'end', 'example', 'face', 'front', 'game', 'left', 'middle', 'note', 'number', 
        'opponent', 'option', 'order', 'overview', 'page', 'play',
        'purpose', 'reference', 'result', 'right', 'rule', 'rulebook', 
        'section', 'set', 'setup', 'side', 'summary', 'start', 'step', 'table', 'thing',
        'type', 'tie', 'time', 'top', 'total', 'use', 'value', 'version', 'way'
        }.union(spacy.load('en_core_web_sm').Defaults.stop_words)

def find_most_common_nouns(doc: spacy.tokens.Doc) -> Dict[str, List[spacy.tokens.Token]]:
    tokens_dict = defaultdict(list)

    for token in doc:
        if len(token) >= 3 and \
            token.pos_ in {'NOUN', 'PROPN'} and \
            token.dep_ in {'nsubj', 'dobj', 'nsubjpass', 'pobj'}:
            tokens_dict[token.lemma_.lower()].append(token)
           
    return tokens_dict

def _is_token_part_of_bigram(token: spacy.tokens.Token, 
                             unigram_token: spacy.tokens.Token) -> bool:
    return token.dep_ == 'compound' and \
        token.pos_ in {'NOUN', 'PROPN'} and \
        not token.text.endswith(tuple(punctuation)) and \
        not token.text.startswith(tuple(punctuation)) and \
        token.head.i == unigram_token.i

def find_most_relevant_ngram(doc: spacy.tokens.Doc,
                             unigrams: Dict[str, List[spacy.tokens.Token]]) \
                             -> Dict[str, Set[str]]:
    excluded_bigrams = IGNORED_WORDS.union(set(unigrams.keys()))
    bigram_associated_dict = defaultdict(Counter)
    for name, tokens in unigrams.items():
        for token in tokens:
            possible_bigram = doc[token.i - 1]
            if token.i > 0 and _is_token_part_of_bigram(possible_bigram, token) and \
                possible_bigram.lemma_ not in excluded_bigrams:
                bigram_associated_dict[name][possible_bigram.lemma_.lower()] += 1

    return defaultdict(set, { 
        unigram: set(bigram for bigram, counter in bigrams.items() 
                     if counter >= MIN_TOKEN_TO_BE_CONSIDERED_BIGRAM)
        for unigram, bigrams in bigram_associated_dict.items() 
    })

def _is_token_an_unigram(token_info: Tuple[str, spacy.tokens.Token]) -> bool:
    token = token_info[0]
    occurrences = token_info[1]
    sentence_ids = sorted([occ._.sentence_id for occ in occurrences])
    return token not in IGNORED_WORDS and \
            len(occurrences) >= MIN_TOKEN_TO_BE_CONSIDERED_UNIGRAM and \
            any(token_occurrence.dep_ in {'nsubj', 'nsubjpass', 'dobj'} \
                for token_occurrence in occurrences) and \
            min( # get the minimum distance between sentence ids. A token must not be completely sparse 
                map(lambda x: x[1] - x[0], zip(sentence_ids[:-1], sentence_ids[1:]))
            ) <= MAX_DISTANCE_TO_BE_CONSIDERED_UNIGRAM

def find_most_relevant_unigrams(doc: spacy.tokens.Doc) \
                                -> Dict[str, List[spacy.tokens.Token]]:
    possible_components_info = dict(
        filter(lambda token_info: _is_token_an_unigram(token_info), 
               find_most_common_nouns(doc).items()))

    return possible_components_info

nlp = spacy.load('en_core_web_sm')
nlp.add_pipe('assign_sentence_id', after='parser')
text = get_document_by_line(CLEANED_DATASET_FILE_PATH, 190)
doc = nlp(text.lower())
unigrams = find_most_relevant_unigrams(doc)
display(unigrams.keys())
display(len(unigrams))
ngrams = find_most_relevant_ngram(doc, unigrams)
display(ngrams)

dict_keys(['rounder', 'mountain', 'hill', 'trial', 'sprinter', 'rider', 'energy', 'peloton', 'movement', 'space', 'puncture', 'slipstream', 'team', 'sprint', 'cobblestone', 'race', 'ability', 'tpp', 'card'])

19

defaultdict(set,
            {'energy': {'turn'},
             'team': {'racing'},
             'race': set(),
             'ability': set(),
             'card': set()})

Find the verbs associated to each entity. If an entity has many verbs "associated", it means the user can choose many ways to interact with it. 

In [16]:
from spacy.matcher import DependencyMatcher
from typing import Dict

def find_actions_count_for_unigrams(doc: spacy.tokens.Doc, 
                                    unigrams: Dict[str, List[spacy.tokens.Token]]) -> Dict[str, int]:
    return { unigram: len(set(token.head.lemma_ for token in unigrams[unigram] if token.head.pos_ == 'VERB'))
        for unigram in unigrams }

def get_actions_score(doc: spacy.tokens.Doc, 
                      unigrams: Dict[str, List[spacy.tokens.Token]]) -> float:
    actions_counts = find_actions_count_for_unigrams(doc, unigrams)
    return sum(unigram_action_count[1] for unigram_action_count in actions_counts.items()) / len(unigrams)

nlp = spacy.load('en_core_web_sm')
nlp.add_pipe('assign_sentence_id', after='parser')
text = get_document_by_line(CLEANED_DATASET_FILE_PATH, 191)
doc = nlp(text.lower())
unigrams = find_most_relevant_unigrams(doc)
actions_counts = find_actions_count_for_unigrams(doc, unigrams)
print(actions_counts)
get_actions_score(doc, unigrams)

{'scenario': 4, 'mansion': 4, 'card': 35, 'change': 4, 'action': 17, 'keeper': 36, 'threat': 8, 'player': 14, 'deck': 1, 'event': 3, 'investigator': 45, 'objective': 3, 'point': 7, 'item': 4, 'death': 2, 'monster': 12, 'room': 15, 'fire': 6, 'darkness': 7, 'zombie': 9, 'damage': 6, 'token': 6, 'space': 8, 'figure': 8, 'turn': 7, 'stun': 2, 'attribute': 2, 'boy': 17, 'marker': 3, 'spell': 2, 'requirement': 2, 'horror': 9, 'place': 5, 'sample': 4, 'strike': 1, 'lynch': 5, 'man': 6, 'door': 5, 'weapon': 10, 'test': 5, 'puzzle': 3, 'monastery': 1, 'voice': 7, 'head': 1, 'cultist': 8, 'leader': 2, 'abomination': 9, 'relative': 3, 'altar': 1, 'ground': 2, 'tile': 3, 'classroom': 1, 'light': 4, 'witch': 6, 'health': 3, 'creature': 2, 'hound': 4, 'jacket': 3, 'helena': 2, 'skull': 4, 'police': 3, 'world': 2, 'sanctum': 3, 'sanity': 3, 'line': 1, 'girl': 3, 'injury': 3}


6.507462686567164

In [17]:
from collections import namedtuple
from typing import List, Set, Dict
import numpy as np
from itertools import product

RulebookMetrics = namedtuple("RulebookMetrics", ['entities_count', 
                                                 'interaction_score', 
                                                 'entities_variance',
                                                 'actions_score'])

def get_interaction_count(unigrams: Dict[str, List[spacy.tokens.Token]]) -> float:
    matrix_len = len(unigrams)
    matrix = [[0 for c in range(matrix_len)] for r in range(matrix_len)]
    unigrams_product = product(enumerate(unigrams.items()), enumerate(unigrams.items()))
    for (ir, (ug1, tokens1)), (ic, (ug2, tokens2)) in unigrams_product:
        # I only fill half of the matrix, the other half is symmetrical to the first
        # one. The main diagonal is useless because there is no interaction between
        # a component and itself, by definition
        if ir < ic:
            matrix[ir][ic] = len(set(token._.sentence_id for token in tokens1) \
                .union(set(token._.sentence_id for token in tokens2)))
        
    logger.debug(matrix)

    # the graph density of an undirected graph consider an edge as 2
    return 2 * sum(sum(_ for _ in row) for row in matrix)

# TODO: is it useful?
# def get_relevant_sentences_count(unigrams: Dict[str, List[spacy.tokens.Token]]) -> int:
#     return len(set().union(
#         *[set(token._.sentence_id for token in tokens) 
#           for tokens in unigrams.values()]))

def get_entities_variance(doc: spacy.tokens.Doc, 
                          unigrams: Dict[str, List[spacy.tokens.Token]]) -> float:
    '''variance measures how components interleave in the text. This could mean that rules involve
    many components and are therefore more complex. variancy is computed using `np.var` on each
    component list. the results are normalized by multiplicating for the frequency of the component.
    eventually the partial variances are summed together and the result normalized with the 
    total numbers of sentences.'''
    tokens_count = sum(len(token_list) for token_list in unigrams.values())
    return sum((len(tokens) / tokens_count) * np.var([token._.sentence_id for token in tokens])
        for tokens in unigrams.values()) / (doc[-1]._.sentence_id + 1)

def get_rulebook_metrics(doc: spacy.tokens.Doc) -> RulebookMetrics:
    most_relevant_unigrams = find_most_relevant_unigrams(doc)
    logger.debug(most_relevant_unigrams)
    most_relevant_ngrams = find_most_relevant_ngram(doc, most_relevant_unigrams)
    interactions_count = get_interaction_count(most_relevant_unigrams)
    # entities includes unigrams + their ngrams. `max(0, len(ngrams) - 1)` because
    # if I have 4 types of bigrams, then I need to add 3, given that 1 is included
    # in the unigrams
    most_relevant_entities_count = len(most_relevant_unigrams) + \
        sum(max(0, len(ngrams) - 1) for ngrams in most_relevant_ngrams.values())
    entities_variance = get_entities_variance(doc, most_relevant_unigrams)
    # density of a network
    interactions_score = interactions_count / (most_relevant_entities_count * (most_relevant_entities_count - 1))
    actions_score = get_actions_score(doc, most_relevant_unigrams)

    return RulebookMetrics(most_relevant_entities_count, 
                           interactions_score,
                           entities_variance,
                           actions_score)

text = get_document_by_line(CLEANED_DATASET_FILE_PATH, 139)
nlp = spacy.load('en_core_web_sm')
nlp.add_pipe('assign_sentence_id', after='parser')
doc = nlp(text.lower())
components = get_rulebook_metrics(doc)
components

RulebookMetrics(entities_count=21, interaction_score=28.36190476190476, entities_variance=10.731811818779741, actions_score=6.55)

## Processing step

In [18]:
from typing import Tuple
import pandas as pd
import ast
import os
from IPython.display import clear_output
from collections import namedtuple

drive.mount('/content/drive')
CHUNK_SIZE = 20
DATASET_ROWS = sum(1 for line in open(CLEANED_DATASET_FILE_PATH))
RulebookFeatures = namedtuple('RulebookFeatures', ['rulebook_len',
                                                   'dice_luck_metric', 
                                                   'drawing_luck_metric', 
                                                   'shuffling_luck_metric', 
                                                   'random_luck_metric', 
                                                   'can_metric',
                                                   'choices_metric',
                                                   'entities_count',
                                                   'interaction_score',
                                                   'entities_variance',
                                                   'actions_score'])

def get_rules_features(doc: spacy.tokens.Doc) -> RulebookFeatures:
    normalization_factor = len(doc) / 100
    rulebook_len = len(doc.text)
    luck_metrics = get_luck_metrics(doc)
    choices_amount_metric = get_choices_amount_metrics(doc)
    rulebook_metrics = get_rulebook_metrics(doc)

    return RulebookFeatures(rulebook_len=rulebook_len,
                            dice_luck_metric=luck_metrics.dice_based / normalization_factor,
                            drawing_luck_metric=luck_metrics.drawing_based / normalization_factor,
                            shuffling_luck_metric=luck_metrics.shuffling_based / normalization_factor,
                            random_luck_metric=luck_metrics.random_based / normalization_factor,
                            can_metric=choices_amount_metric.can_based / normalization_factor,
                            choices_metric=choices_amount_metric.choice_based / normalization_factor,
                            entities_count=rulebook_metrics.entities_count,
                            interaction_score=rulebook_metrics.interaction_score,
                            entities_variance=rulebook_metrics.entities_variance,
                            actions_score=rulebook_metrics.actions_score)

def process_texts(texts: List[str]) -> List[RulebookFeatures]:
    nlp = spacy.load('en_core_web_sm')
    nlp.add_pipe('assign_sentence_id', after='parser')
    texts = [text.lower() for text in texts]
    docs = nlp.pipe(texts)

    return docs

def create_feature_series(row, docs_dict):
    next_doc_info = next(docs_dict)
    assert next_doc_info[0] == row.id
    return pd.Series(get_rules_features(next_doc_info[1]), 
                     index=RulebookFeatures._fields)
    
def family_field_eval(family: str):
    result = ast.literal_eval(family)
    return result if len(result) > 0 else ['unspecified']

df_features = pd.DataFrame()
for skip_rows in range(1, DATASET_ROWS, CHUNK_SIZE):
    column_names = ['rulebook', 'id', 'name', 'averageweight', 'playingtime', 'family']
    # ast.literal_eval converts the family column string into a python array
    df_dataset = pd.read_csv(CLEANED_DATASET_FILE_PATH, converters={ 'family': family_field_eval },
                            names=column_names, header=None,
                            nrows=CHUNK_SIZE, skiprows=skip_rows)
    logger.info(f"processing boardgames from {df_dataset.loc[0, 'id']} to {df_dataset.loc[df_dataset.index[-1], 'id']}")
    docs_dict = zip(df_dataset['id'].values, process_texts(df_dataset['rulebook'].values))
    df_rulebook_features = df_dataset.apply(lambda x: create_feature_series(x, docs_dict),
                                            axis='columns')

    df_features = pd.concat([df_features, df_dataset[['averageweight', 'playingtime', 'family']] \
                    .join(df_rulebook_features)])
    
display(len(df_features))

# one-hot encoding "family" field 
# from https://stackoverflow.com/questions/71401193/one-hot-encoding-in-python-for-array-values-in-a-dataframe
df_features = pd.concat([df_features, 
                         df_features.pop('family')
                                    .apply('|'.join)
                                    .str.get_dummies()], axis='columns')

df_features.to_csv(PROCESSED_DATASET_FILE_PATH, header=True, index=False, mode='w')    
if not WORKING_LOCALLY:
    drive.flush_and_unmount()

df_features.head()

2023-01-04 11:12:09,775 bgg_predict  INFO     processing boardgames from 10 to 590
INFO:bgg_predict:processing boardgames from 10 to 590


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


2023-01-04 11:12:20,290 bgg_predict  INFO     processing boardgames from 690 to 2350
INFO:bgg_predict:processing boardgames from 690 to 2350
2023-01-04 11:12:51,592 bgg_predict  INFO     processing boardgames from 2780 to 6830
INFO:bgg_predict:processing boardgames from 2780 to 6830
2023-01-04 11:13:17,276 bgg_predict  INFO     processing boardgames from 6860 to 9870
INFO:bgg_predict:processing boardgames from 6860 to 9870
2023-01-04 11:13:37,239 bgg_predict  INFO     processing boardgames from 10140 to 17240
INFO:bgg_predict:processing boardgames from 10140 to 17240
2023-01-04 11:13:59,815 bgg_predict  INFO     processing boardgames from 17970 to 24310
INFO:bgg_predict:processing boardgames from 17970 to 24310
2023-01-04 11:14:16,081 bgg_predict  INFO     processing boardgames from 24480 to 33950
INFO:bgg_predict:processing boardgames from 24480 to 33950
2023-01-04 11:14:50,456 bgg_predict  INFO     processing boardgames from 34010 to 43530
INFO:bgg_predict:processing boardgames from 

380

Unnamed: 0,averageweight,playingtime,rulebook_len,dice_luck_metric,drawing_luck_metric,shuffling_luck_metric,random_luck_metric,can_metric,choices_metric,entities_count,...,actions_score,abstracts,cgs,childrensgames,familygames,partygames,strategygames,thematic,unspecified,wargames
0,2.1579,60,3155.0,0.0,0.0,0.14771,0.0,0.590842,0.295421,5.0,...,8.0,0,0,0,1,0,0,0,0,0
1,3.1452,90,73093.0,0.0,0.013557,0.027113,0.027113,1.3421,0.135566,62.0,...,6.15,0,0,0,0,0,1,0,0,0
2,1.81,90,20155.0,0.0,0.0,0.0,0.0,0.911927,0.143988,39.0,...,5.769231,0,0,0,0,0,0,1,0,0
3,1.4858,30,13866.0,0.0,0.413936,0.137979,0.0,0.551914,0.275957,14.0,...,4.0,0,0,0,1,0,0,0,0,0
4,2.7813,120,3604.0,0.0,0.0,0.0,0.0,0.58309,1.020408,8.0,...,4.0,0,0,0,0,0,1,0,0,0
