In [1]:
import random
random.seed(42)

In [2]:
import asyncio
import re
import csv
import pyppeteer as ptr
import pandas as pd
import nltk
from typing import Optional, TypeVar
from typing import Callable
from glob import glob
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer

In [3]:
import warnings
warnings.filterwarnings('ignore')

In [4]:
import ssl

try:
    _create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
    pass
else:
    ssl._create_default_https_context = _create_unverified_https_context

In [5]:
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/benjaminmacdonald/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /Users/benjaminmacdonald/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/benjaminmacdonald/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [6]:
PosTag = tuple[str, str]
PosTagList = list[PosTag]
StemWord = str
StemWordList = list[StemWord]

In [7]:
def get_name_and_extension(file_path: str) -> tuple[str, str]:
    regex = re.compile(r"(.*)/(.*)\.(.*)")
    return regex.match(file_path).group(2,3)

In [8]:
## Scraper.
async def download_html(browser: ptr.browser.Browser, page: ptr.page.Page, url: str, selector: str) -> Optional[str]:
    await page.goto(url, waitUntil="load", timeout=0)
    content = await page.querySelector(selector)

    html = ''
    if content:
        html = await page.evaluate('(element) => element.textContent', content)
        
    return html

def write_to_resource_target(target_path: str, file_name: str, content: StemWordList, extension="txt") -> None:
    with open(f"{target_path}/{file_name}.{extension}", "w") as file:
        file.write("\n".join(content))


async def get_training_data_from_folder(source_path: str, target_path: str, force=False) -> None:
    browser = await ptr.launch(headless=True)
    page = await browser.newPage()
    
    glob_pattern = "/**/*"
    source_files = glob(source_path + glob_pattern, recursive=True)
    target_files = glob(target_path + glob_pattern, recursive=True)
    target_file_names = [get_name_and_extension(file_path)[0] for file_path in target_files]
    
    for file_path in source_files:
        [file_name, file_extension] = get_name_and_extension(file_path)
        
        if not file_name in target_file_names or force:
            result = await get_training_data(browser, page, file_path)
            write_to_resource_target(target_path, file_name, result)        
                
    await browser.close()

async def get_training_data(browser: ptr.browser.Browser, page: ptr.page.Page, file_path: str) -> StemWordList:
    df = pd.read_csv(file_path)
    df.rename(columns=lambda x: x.strip(), inplace=True)

    words = []
    if isinstance(df, pd.DataFrame):
        for _, row in df.iterrows():
            print(row)
            result = await download_html(browser, page, row["link"], row["selector"])
            words.extend(clean_up_words(tokenize(result)))
    
    return words

In [9]:
## Cleaners.
T = TypeVar("T")
R = TypeVar("R")

TestValueStrCallable = Callable[[T], str]
TestValueBoolCallable = Callable[[T], bool]
ValueTestFnCallable = Callable[T, TestValueStrCallable]
FilterCallable = Callable[ValueTestFnCallable, bool]
MapCallable = Callable[ValueTestFnCallable, str]

def tokenize(data: str) -> PosTagList:
    tokenized_words = nltk.word_tokenize(data)
    mutated_words = nltk.pos_tag(tokenized_words)
    return mutated_words

def filter_words(x: T, test_value: TestValueStrCallable, *fns: FilterCallable) -> bool:
    if fns:
        if fns[0](x, test_value):
            return filter_words(x, test_value, *fns[1:])
        else:
            return False
    
    return True
    
def filter_by_punctuation(x: T, test_value: TestValueStrCallable = lambda t: t[0]) -> bool:
    return x[0] != x[1]

def filter_by_stop_word(x: T, test_value: TestValueStrCallable = lambda t: t[0]) -> bool:
    return test_value(x) not in stopwords.words("english")

def regex_filter(regex: str):
    def filter_by_regex(x: T, test_value: TestValueStrCallable = lambda t: t[0]):
        rgx = re.compile(regex)
        return rgx.match(test_value(x))
    return filter_by_regex

filter_by_alphabet = regex_filter(r"^([a-zA-Z]|')+$")
filter_by_apostrophe = regex_filter(r"^[^']*$")

def map_by_stem_words(x: PosTag, test_value: TestValueStrCallable = lambda t: t[0], ps=PorterStemmer()) -> StemWord:
    return ps.stem(test_value(x), True)

def map_words(x: T, test_value: TestValueStrCallable, *fns: MapCallable) -> StemWord:
    if fns:
        return map_words(fns[0](test_value(x)), test_value, *fns[1:])

    return x
            
def clean_up_words(words: PosTagList) -> StemWordList:
    filtered_words = list(
        filter(
        lambda x: filter_words(x, lambda x: x[0], filter_by_punctuation, filter_by_stop_word, filter_by_alphabet),
        words
        )
    )

    stem_words = list(map(lambda x: map_words(x, lambda x: x, map_by_stem_words), filtered_words))
    
    return list(
        filter(
            lambda x: filter_words(x, lambda x: x, filter_by_apostrophe), 
            stem_words)
    )

In [10]:
# type: ignore
await (get_training_data_from_folder("../resources/sources", "../resources/targets"))

In [11]:
# Parsers
def get_cleaned_data_from_file(input_file_path: str) -> Optional[list[str]]:
    with open(input_file_path, "r") as file:
        return [word.strip("\n") for word in list(file.readlines())]

def get_cleaned_data_from_folder(input_path: str) -> dict[str, PosTagList]:
    input_files = glob(input_path + "/**/*", recursive=True)
    
    data = dict()
    for file_path in input_files:
        [file_name, file_extension] = get_name_and_extension(file_path)
        classification_data = get_cleaned_data_from_file(file_path)
        data[file_name] = classification_data
        
    return data


def group_by_tags(pos_tag_list: PosTagList) -> dict[str, list[str]]:
    """
      @unused
    """
    groups = dict()
    
    for value,tag in pos_tag_list:
        if tag in groups:
            groups[tag]
            groups[tag].append(value)
        else:
            groups[tag] = [value]
            
    return groups
    



In [21]:
from nltk.probability import WittenBellProbDist, FreqDist,LaplaceProbDist
import numpy as np



def unpack_dict_list(dict_list: dict[str, StemWordList]):
    print(dict_list.keys())
    print(dict_list.values())
    return [v for k in result for v in result[k]]

def generate_sample(population: StemWordList, label: str, n_unique_words: int, chr_limit = 280, max_itr = 100, word_limit = 56):
    freq_dist = FreqDist(population)
    prob_dist = WittenBellProbDist(freq_dist, n_unique_words)
    
    samples = set()
    chr_count = 0
    
    for _ in range(max_itr):
        generated_v = prob_dist.generate()
        
        if len(generated_v) + chr_count < chr_limit:
            samples.add(generated_v)
            chr_count += 1
            
        if len(samples) >= word_limit:
            break;
            
    return np.array(list([*samples, label])).reshape(-1, 1)

def generate_samples(data: dict[str, StemWordList], n_samples = 100):
    #print(data)
    n_unique_words = len(set(unpack_dict_list(data)))
    
    return np.array([np.array([generate_sample(data[k], k, n_unique_words) for _ in range(n_samples)]) for k in data])

In [22]:
from sklearn.preprocessing import OneHotEncoder,LabelBinarizer
from typing import Union


def get_one_hot_encoder(population: dict[str, StemWordList]):
    population_array = np.array(list(set(unpack_dict_list(population)))).reshape(-1, 1)
    encoder = OneHotEncoder()
    encoder.fit(population_array)
    return encoder


def get_label_encoder(labels: np.ndarray):
    encoder = LabelBinarizer()
    encoder.fit(labels)
    return encoder

def transform_data_by_encoder(data: np.ndarray, encoder: Union[OneHotEncoder, LabelBinarizer]):
    encoded_data = encoder.transform(data)
    
    if not isinstance(encoded_data, np.ndarray):
        encoded_data = encoded_data.toarray()
        
    return encoded_data

def split_into_x_y(samples: np.ndarray):
    x = samples[:,:,:-1]
    y = samples[:,:,-1]
    y = y.reshape(y.shape[0]*y.shape[1], 1)
    return (x,y)
    
def transform_x_y(x: np.ndarray, y: np.ndarray, in_coder: OneHotEncoder, out_coder: LabelBinarizer):   
    encoded_x = np.stack([
        transform_data_by_encoder(x[lbl_idx, smpl_idx], in_coder)
        for lbl_idx in range(x.shape[0])
        for smpl_idx in range(samples.shape[1])
    ], axis=0)
    
#    print(np.where(encoded_x[0][0] == 1))
        
    encoded_y = transform_data_by_encoder(y, out_coder)
    
    return (encoded_x, encoded_y)

def get_x_y():
    result = get_cleaned_data_from_folder("../resources/targets")
    samples = generate_samples(result, n_samples=5)
    x,y = split_into_x_y(samples)
    input_encoder = get_one_hot_encoder(x)
    output_encoder = get_label_encoder(y)
    return transform_x_y(x, y, input_encoder, output_encoder)

get_x_y()
    

{'migraine': ['introduct', 'migrain', 'common', 'episod', 'disord', 'hallmark', 'disabl', 'headach', 'gener', 'associ', 'nausea', 'light', 'sound', 'sensit', 'the', 'acut', 'treatment', 'migrain', 'adult', 'review', 'prevent', 'treatment', 'migrain', 'adult', 'discuss', 'separ', 'see', 'prevent', 'treatment', 'episod', 'migrain', 'adult', 'the', 'pathophysiolog', 'clinic', 'manifest', 'diagnosi', 'migrain', 'also', 'discuss', 'separ', 'see', 'pathophysiolog', 'clinic', 'manifest', 'diagnosi', 'migrain', 'adult', 'approach', 'to', 'treatment', 'the', 'abort', 'symptomat', 'therapi', 'migrain', 'rang', 'use', 'simpl', 'analges', 'nonsteroid', 'drug', 'nsaid', 'acetaminophen', 'triptan', 'antiemet', 'calcitonin', 'peptid', 'cgrp', 'antagonist', 'lasmiditan', 'dihydroergotamin', 'noninvas', 'neuromodul', 'devic', 'typic', 'use', 'patient', 'respond', 'toler', 'drug', 'treatment', 'wish', 'avoid', 'treatment', 'usual', 'effect', 'given', 'earli', 'cours', 'headach', 'larg', 'singl', 'dose',

NameError: name 'result' is not defined

In [None]:
from sklearn.model_selection import train_test_split

