In [1]:
# https://stackoverflow.com/questions/34621093/persist-elastic-search-data-in-docker-container
# TODOs: New KB upload with smaller text window
# TODOs: Common Crawl index upload
# TODOs: Good Logging
# TODOs: Better Keyphrase Extraction - Fast KeyBert

In [2]:
### NLTK IMPORT, OVERRIDING SSL CERTIFICATES ###
# import nltk
# import ssl
#
# try:
#     _create_unverified_https_context = ssl._create_unverified_context
# except AttributeError:
#     pass
# else:
#     ssl._create_default_https_context = _create_unverified_https_context
#
# nltk.download()

In [3]:
### CRAWL LOCAL FILE-SYSTEM ###
import os
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("KNOWLEDGE_BASE")

knowledge_path = "/Users/joshua.sheppard/wiki_extract_II"

def iter_filesys(path):
    if os.path.isfile(path):
        yield path

    elif os.path.isdir(path):
        for dir, _, filenames in os.walk(path):
                for f in filenames:
                    if not f.endswith('.DS_Store'):
                        yield os.path.join(dir, f)

    else:
        raise RuntimeError("Invalid path %s" % path)

kw_files = iter_filesys(knowledge_path)

kw_sample = []
for i in kw_files:
    kw_sample.append(i)

# print(kw_sample[:-3])
print(len(kw_sample))

17039


In [4]:
from multiprocessing.pool import ThreadPool as Pool
from tqdm.notebook import tqdm

import json
import more_itertools
import re
import spacy

nlp = spacy.load("en_core_web_sm")
files = [f for f in kw_sample]

def get_contents_(filename):
    """Parse the contents of a file. Each line is a JSON encoded document."""
    documents = []

    with open(filename) as f:
        for line in f:
            doc = json.loads(line)

            if doc["text"] == "": continue
            if not doc: continue

            documents.append((doc['id'], doc["title"], doc["text"]))

    return documents

def generate_data():
        for file in files:
                #yield get_contents_2(file)
                docs = get_contents_(file)
                for doc in docs:
                    yield(doc)

# Custom Regex Sentenzier (Faster Compute)
def sentensizer(doc):
    sents = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', doc)
    return sents

def normalise_text(passage):
    passage = str(passage)
    re.sub(r"(@\[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)|^rt|http.+?", "", passage)
    passage.encode("unicode_escape")
    passage.replace('"', '"')
    passage = re.sub("\n", "", passage)
    passage = re.sub('"', "'", passage)
    passage.strip()

    return passage

from nltk.tokenize import sent_tokenize
def sentence_window(article, window=3, step=2): 
    """ Generates a list of sentences of sliding size = window """

    # sents = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', article)
    # Segment Text to Sentences
    sents = sent_tokenize(article)

    if len(sents) == window:
        yield str(sents)

    for window in more_itertools.windowed(sents, n=window, step=2):
        yield window

### KEYWORD EXTRACTION ###
from yake import KeywordExtractor
from summa import keywords
from rake_nltk import Rake

keywords = Rake()
#kw_extractor = KeywordExtractor(lan="en", n=4, top=5)

pool = Pool(8)
def passages(idx, source, len_):
    count = 0
    with tqdm(total=(len_)) as pbar:
        for i in pool.apply(generate_data):
            count += 1
            id, title, article = i

            for window in sentence_window(article):
                passage = " ".join(normalise_text(passage) for passage in window)

                if len(passage) < 50: continue
                else:
                    keywords.extract_keywords_from_text(passage)

                    yield {
                        "_index": idx,
                        "document": {
                            "id": id,
                            "source": source,
                            "title": title,
                            "text": passage,
                            "keyphrase": keywords.get_ranked_phrases()[0:5]
                            #"keyphrase": [i for i in keywords.keywords(passage).split("\n")]
                            }
                        }

                pbar.update()

INFO:summa.preprocessing.cleaner:'pattern' package not found; tag filters are not available for English


In [5]:
### TEST KNOWLEDGE LOADER ###
test = []
trial = 1000
count = 0

# with tqdm(total=trial) as pbar:
for i in passages(idx="testing", source="wikipedia", len_=trial):
    count += 1
    if count > trial:
        break

    test.append(i)

  0%|          | 0/1000 [00:00<?, ?it/s]

In [6]:
test

[{'_index': 'testing',
  'document': {'id': '61837831',
   'source': 'wikipedia',
   'title': 'Hedda Lundh',
   'text': 'Hedda Lundh (1921–2012) was a Danish journalist and schoolteacher who, under the German occupation of Denmark in World War II, was a Danish resistance fighter. Based at the time in Aarhus, she is remembered as a railway saboteur, explosives expert and courier in the resistance movement. Early life.',
   'keyphrase': ['world war ii',
    '1921 – 2012',
    'danish resistance fighter',
    'resistance movement',
    'danish journalist']}},
 {'_index': 'testing',
  'document': {'id': '61837831',
   'source': 'wikipedia',
   'title': 'Hedda Lundh',
   'text': "Early life. Born on 29 September 1921 in Korsør, Hedda Lundh was the daughter of the newspaper editor Theodor Lundh-Jensen (1884–1952) and Alpha Tusnelda Emilie Winckler (1887–1973). The youngest of three sisters, she was brought up in a middle-class home where her father called her his 'boy' as she climbed trees, 

In [7]:
#from elasticsearch import Elasticsearch

### DB CONFIG ###
# PORT = "http://localhost:9200"
# INDEX_WIKI = "wiki_knowledge"
# SOURCE = "wikipedia"

# errors_before_interrupt = 5
# refresh_index_after_insert = False
# max_insert_retries = 3
# yield_ok = False

In [8]:
### INIT DB OBJECT ###
# knowledge_base = Elasticsearch(
#     PORT,
#     retry_on_timeout=True
# )

# knowledge_base

In [9]:
### REFRESH DB ###
#knowledge_base.delete_by_query(index=INDEX_NAME, query={"match_all": {}})

In [10]:
from elasticsearch.helpers import streaming_bulk, parallel_bulk
# from tqdm.notebook import tqdm

### WRITE TO DB ###
# errors_count = 0
# chunk_size = 25000
# counta = len(files)/chunk_size
# successes = 0
#
# # with tqdm(total=(counta)) as pbar:
# for ok, result in parallel_bulk(knowledge_base, passages(idx=INDEX_WIKI, source=SOURCE), chunk_size=chunk_size, request_timeout=60*3):
#     if ok is not True:
#             logging.error('Failed to import data')
#             logging.error(str(result))
#             errors_count += 1
#
#             if errors_count == errors_before_interrupt:
#                 logging.fatal('Too many import errors, exiting with error code')
#                 exit(1)
#
#     successes += ok
#         # pbar.update()

In [65]:
### CONSTRUCT NEWS DATASET ###
from datasets import load_dataset
cc_news = load_dataset("cc_news")
cc_news = cc_news["train"]



  0%|          | 0/1 [00:00<?, ?it/s]

In [73]:
counta = 0
for i in cc_news:
    counta += 1
    print(i)

    if counta > 0:
        break

{'title': 'Daughter Duo is Dancing in The Same Company', 'text': 'There\'s a surprising twist to Regina Willoughby\'s last season with Columbia City Ballet: It\'s also her 18-year-old daughter Melina\'s first season with the company. Regina, 40, will retire from the stage in March, just as her daughter starts her own career as a trainee. But for this one season, they\'re sharing the stage together.\nPerforming Side-By-Side In The Nutcracker\nRegina and Melina are not only dancing in the same Nutcracker this month, they\'re onstage at the same time: Regina is doing Snow Queen, while Melina is in the snow corps, and they\'re both in the Arabian divertissement. "It\'s very surreal to be dancing it together," says Regina. "I don\'t know that I ever thought Melina would take ballet this far."\nLeft: Regina and Melina with another company member post-snow scene in 2003. Right: The pair post-snow scene in 2017 (in the same theater)\nKeep reading at dancemagazine.com.', 'domain': 'www.pointema

In [74]:
test = []
trial = 10000
count = 0

# TODOs: Make Generic Class
def generate_news_content(file=cc_news):
    for i in file:
        yield i

pool = Pool(5)
def news_passages(file, idx, source, len_):
    count = 0
    with tqdm(total=(len_)) as pbar:
        for i in pool.map(generate_news_content, file):
            count += 1
            title = i["title"]
            article = i["text"]
            description = i["description"]

            for window in sentence_window(article):
                passage = " ".join(normalise_text(passage) for passage in window)

                if len(passage) < 50: continue
                else:
                    keywords.extract_keywords_from_text(passage)

                    yield {
                        "_index": idx,
                        "document": {
                            "title": title,
                            "description": description,
                            "source": source,
                            "text": passage,
                            "keyphrase": keywords.get_ranked_phrases()[0:5]
                            }
                        }

                pbar.update()

for i in news_passages(file=cc_news, idx="testing", source="news", len_=trial):
    count += 1
    test.append(i)

    if count > trial:
        break

  0%|          | 0/10000 [00:00<?, ?it/s]

TypeError: 'generator' object is not subscriptable

In [None]:
test

In [19]:
### LOAD USING DB OBJECT ###
from src.utils_ import elastic_db

### DB CONFIG ###
INDEX_CC = "cc_news"
SOURCE_CC = "common_crawl"
PORT = "http://localhost:9200"

cc_knowledge_base = elastic_db(PORT, INDEX_CC)

print(cc_knowledge_base, "INDEX", INDEX_CC, "SOURCE", SOURCE_CC)

cc_news common_crawl


In [None]:
# cc_knowledge_base.bulk_add(
#     index_name=INDEX_CC,
#     source=SOURCE_CC,
#     iterator=passages_,
#     chunk_size=10000,
#     len_= len(cc_news["train"])
# )

  0%|          | 0/708241 [00:00<?, ?it/s]

INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.738s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.770s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.798s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.820s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.588s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.423s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.401s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.621s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.437s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.742s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_

In [21]:
### REFRESH DB ###
# knowledge_base.delete_by_query(index=INDEX_CC, query={"match_all": {}})

In [21]:
from elasticsearch.helpers import streaming_bulk, parallel_bulk
## WRITE TO DB ###
errors_count = 0
chunk_size = 25000
counta = len(cc_news["train"])
successes = 0
errors_before_interrupt = 5

cc_knowledge_base.bulk(index=INDEX_CC, operations=passages_(idx=INDEX_CC, source=SOURCE_CC, len_=counta))

# with tqdm(total=(counta)) as pbar:
#     for ok, result in parallel_bulk(cc_knowledge_base, passages_(idx=INDEX_CC, source=SOURCE_CC, len_=counta), chunk_size=chunk_size, request_timeout=60*3):
#         if ok is not True:
#                 logging.error('Failed to import data')
#                 logging.error(str(result))
#                 errors_count += 1
#
#                 if errors_count == errors_before_interrupt:
#                     logging.fatal('Too many import errors, exiting with error code')
#                     exit(1)
#
#         successes += ok
#         pbar.update()

  0%|          | 0/708241 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [17]:
### NEWSROOM ###

test = []
trial = 10000
count = 0

def get_newsroom_(filename):
    """Parse the contents of a file. Each line is a JSON encoded document."""
    documents = []

    with open(filename) as f:
        for line in f:
            doc = json.loads(line)

            if doc["text"] == "": continue
            if not doc: continue

            documents.append(doc["text"])

    return documents

# def generate_data():
#         for file in files:
#                 #yield get_contents_2(file)
#                 docs = get_contents_(file)
#                 for doc in docs:
#                     yield(doc)

filename = "/Users/joshua.sheppard/newsroom/train.jsonl"
# TODOs: Make Generic Class
def generate_newsroom_data():
    for i in get_newsroom_(filename):
        yield i

def newsroom_passages_(idx, source, len_):
    count = 0
    with tqdm(total=(len_)) as pbar:
        for i in pool.apply(generate_newsroom_data):
            count += 1
            #text = i["text"]
            article = i
            #description = i["description"]

            for window in sentence_window(article):
                passage = " ".join(normalise_text(passage) for passage in window)

                if len(passage) < 50: continue
                else:
                    keywords.extract_keywords_from_text(passage)

                    yield {
                        "_index": idx,
                        "document": {
                            # "title": title,
                            # "description": description,
                            "source": source,
                            "text": passage,
                            "keyphrase": keywords.get_ranked_phrases()[0:5]
                            #"keyphrase": [i for i in keywords.keywords(passage).split("\n")]
                            }
                        }

                pbar.update()

# news_files = [i for i in cc_news_data]
for i in newsroom_passages_(idx="testing", source="newsroom", len_=trial):
    count += 1
    test.append(i)

    if count > trial:
        break

  0%|          | 0/10000 [00:00<?, ?it/s]

In [18]:
newsroom_data = [i for i in generate_newsroom_data()]

In [63]:
len(newsroom_data)

995040

In [20]:
newsroom_data[2000]

'The global telecom industry is scrambling to compete in mobile messaging with the likes of Facebook Inc.-owned WhatsApp and Apple Inc.’s iMessage.\n\nSome mobile carriers are playing catch up, rolling out clones of the popular messaging apps—with mixed results.\n\nOthers aren’t trying or have given up, saying they can’t compete with Silicon Valley and are better off focusing on their core voice and data services. The stakes are...'

In [21]:
test

[{'_index': 'testing',
  'document': {'source': 'newsroom',
   'text': 'HAMBURG, Germany, June 3 \x97 As he left the soccer field after a club match in the eastern German city of Halle on March 25, the Nigerian forward Adebowale Ogungbure was spit upon, jeered with racial remarks and mocked with monkey noises. In rebuke, he placed two fingers under his nose to simulate a Hitler mustache and thrust his arm in a Nazi salute. Marc Zoro, right, an Ivory Coast native, was a target of racial slurs from the home fans in Messina, Italy.',
   'keyphrase': ['nigerian forward adebowale ogungbure',
    'placed two fingers',
    'june 3 \x97',
    'ivory coast native',
    'eastern german city']}},
 {'_index': 'testing',
  'document': {'source': 'newsroom',
   'text': "Marc Zoro, right, an Ivory Coast native, was a target of racial slurs from the home fans in Messina, Italy. Adriano, a star with Inter Milan, tried to persuade him to stay on the field. From now until its conclusion on July 9, Jeff Z

In [23]:
import elasticsearch as es_cli

### LOAD USING DB OBJECT ###
from src.utils_ import elastic_db

### DB CONFIG ###
INDEX_NEWS = "newsroom"
SOURCE_NEWS = "newsroom"
PORT = "http://localhost:9200"

#newsroom_knowledge_base = elastic_db.ElasticDB(elastic_port=PORT, elastic_index=INDEX_NEWS)

# cc_knowledge_base = es_client.indices.create(
#     index="laptops-demo",
#     settings=configurations["settings"],
#     mappings=configurations["mappings"],
# )

from elasticsearch import Elasticsearch
newsroom_knowledge_base = Elasticsearch(PORT)

# newsroom_knowledge_base.indices.create(
#     index=INDEX_NEWS
# )

print(INDEX_NEWS, SOURCE_NEWS)

INFO:elastic_transport.transport:PUT http://localhost:9200/newsroom [status:400 duration:0.015s]


BadRequestError: BadRequestError(400, 'resource_already_exists_exception', 'index [newsroom/je3jecqLR0OXyqYU4Ue2aQ] already exists')

In [None]:
# cc_knowledge_base.bulk_add(
#     index_name=INDEX_CC,
#     source=SOURCE_CC,
#     iterator=passages_,
#     chunk_size=10000,
#     len_= len(cc_news["train"])
# )

  0%|          | 0/708241 [00:00<?, ?it/s]

INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.738s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.770s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.798s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.820s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.588s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.423s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.401s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.621s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.437s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:400 duration:0.742s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_

In [21]:
### REFRESH DB ###
# knowledge_base.delete_by_query(index=INDEX_CC, query={"match_all": {}})

In [24]:
from elasticsearch.helpers import streaming_bulk, parallel_bulk
## WRITE TO DB ###
errors_count = 0
chunk_size = 25000
counta = len(newsroom_data)
successes = 0
errors_before_interrupt = 5

#newsroom_knowledge_base.bulk(index=INDEX_NEWS, operations=passages_(idx=INDEX_CC, source=SOURCE_CC, len_=counta))

with tqdm(total=(counta)) as pbar:
    for ok, result in parallel_bulk(newsroom_knowledge_base, newsroom_passages_(idx=INDEX_NEWS, source=SOURCE_NEWS, len_=counta), chunk_size=chunk_size, request_timeout=60*3):
        if ok is not True:
                logging.error('Failed to import data')
                logging.error(str(result))
                errors_count += 1

                if errors_count == errors_before_interrupt:
                    logging.fatal('Too many import errors, exiting with error code')
                    exit(1)

        successes += ok
        pbar.update()

  0%|          | 0/995040 [00:00<?, ?it/s]

  0%|          | 0/995040 [00:00<?, ?it/s]

INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:200 duration:8.502s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:200 duration:13.631s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:200 duration:16.362s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:200 duration:17.262s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:200 duration:8.411s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:200 duration:10.616s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:200 duration:7.486s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:200 duration:7.756s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:200 duration:24.418s]
INFO:elastic_transport.transport:PUT http://localhost:9200/_bulk [status:200 duration:7.063s]
INFO:elastic_transport.transport:PUT http://localhost:9

In [6]:
#knowledge_path = "/Users/joshua.sheppard/"
import os
news_cc_2 = "/Users/joshua.sheppard/PycharmProjects/cc_download_articles"
news_cc_1 = "/Users/joshua.sheppard/PycharmProjects/news-please/cc_download_articles"

def iter_filesys(path):
    if os.path.isfile(path):
        yield path

    elif os.path.isdir(path):
        for dir, _, filenames in os.walk(path):
                for f in filenames:
                    if not f.endswith('.DS_Store'):
                        yield os.path.join(dir, f)

    else:
        raise RuntimeError("Invalid path %s" % path)

In [7]:
cc_1 = [i for i in iter_filesys(news_cc_1)]
cc_2 = [i for i in iter_filesys(news_cc_2)]

In [8]:
cc_news = cc_1 + cc_2
len(cc_news)

103018

In [36]:
import json

import re
def clean(clean):
    clean = str(clean)
    clean = re.sub(r"\n", "", clean)
    clean = re.sub(r'(?<=[a-z])\'(?=[a-z])', '', clean)
    clean = re.sub('([^a-zA-Z\s.!?])', "", clean)
    clean = re.sub('\s+', ' ', clean)

    clean = re.sub(r"www\S+", "", clean)
    return clean.strip().lower()

def get_contents_(filename):
    """Parse the contents of a file. Each line is a JSON encoded document."""
    documents = []
    with open(filename) as f:
        doc = json.load(f)
        documents.append((clean(doc['description']), clean(doc["maintext"])))

    return documents

def generate_cc_data():
    for file in cc_news:
            docs = get_contents_(file)
            for doc in docs:
                yield(doc)

In [22]:
news_ = [i for i in generate_cc_data()]

In [23]:
news_[100]

('taking a look at how twitter reacted to the news of indianapolis colts qb philip rivers announcing his retirement.',
 'former indianapolis colts quarterback philip rivers called it a career on wednesday as he retired from the nfl after seasons.the former firstround pick from the nfl draft had an illustrious career with a hall of fame resume despite never winning a super bowl. while the colts will be searching for his replacement twitter reacted to the news of his retirement.ill never forget lining up for a play and phil pointing to one of our linebackers and telling him he was lined up wrong based off the blitz we were about to run and being correct about it haha. one of the smartest ive ever played against and a hell of a competitor. httpst.coazxyvdafu jj watt jjwatt january philip rivers career th most passing yards ever th most passing td ever x pro bowler perhaps most impressively straight starts. never missed a game. rivers played through a torn acl in the afc championship. fiel

In [50]:
from multiprocessing.pool import ThreadPool as Pool
from nltk.tokenize import sent_tokenize

def sentence_window(article, window=3, step=2):
    """ Generates a list of sentences of sliding size = window """
    sents = sent_tokenize(article)

    if len(sents) == window:
        yield str(sents)

    for window in more_itertools.windowed(sents, n=window, step=2):
        yield window

pool = Pool(8)
from uuid import uuid4

kw_extractor = KeywordExtractor(lan="en", n=3, top=5)
def passages(idx, source, files):
    count = 0
    with tqdm(total=(len(files))) as pbar:
        for i in pool.apply(generate_cc_data):
            count += 1
            title, article = i
            id = uuid4()

            for window in sentence_window(article):
                passage = " ".join(normalise_text(passage) for passage in window)

                if len(passage) < 50: continue
                else:

                    yield {
                        "_index": idx,
                        "document": {
                            "id": id,
                            "source": source,
                            "title": title,
                            "text": passage,
                            "keyphrase": [i for i in keywords.keywords(passage).split("\n")]
                            }
                        }

                pbar.update()

In [53]:
#cc_news_passages = [i for i in passages(idx=INDEX_CC, source=SOURCE_CC, files=cc_news)]
# sample = 1000
# cc_passages = []
#
# counta = 0
# for i in passages(idx=INDEX_CC, source=SOURCE_CC, files=cc_news):
#     counta += 1
#     cc_passages.append(i)
#
#     if counta > sample:
#         break

  1%|          | 1000/103018 [00:02<03:32, 479.70it/s]


In [42]:
# from comcrawl import IndexClient
#
# client = IndexClient(["2019-11", "2020-50"])
# site = "reddit.com/r/MachineLearning/*"
# client.search(site, threads=2)

#client.download()
# first_page_html = client.results[0]["html"]

In [None]:
### TEST SEARCH ###
test_query = "government emails privacy"
#
# def search(query_, db, index, k=5):
#     results = db.search(
#         index = db.elastic_index,
#         query = {
#             "size": k,
#             "query": {
#                 "match": {
#                     "document.text": query_,
#         }}})
#
#     hits = results["hits"]["hits"]
#     doc_ids = [row['_source']["document"]["id"] for row in hits]
#
#     print(results)
#     return (hits, doc_ids)
#
# test = search(test_query, knowledge_base, INDEX_CC, k=2)[0][0]["_source"]["document"]["text"]
# test

In [41]:
# from multiprocessing.pool import ThreadPool as Pool
# from tqdm import tqdm
# import more_itertools
# import re
#
# def clean(passage):
#     passage = str(passage)
#     passage.encode("unicode_escape")
#     passage.replace('"', '"')
#     passage.strip()
#     passage = re.sub("\n", "", passage)
#     passage = re.sub('"', "'", passage)
#
#     return passage
#
# def sentence_window(article, window=5, step=2):
#     """ Generates a list of sentences of sliding size = window """
#     sents = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', article)
#
#     if len(sents) == window:
#         yield str(sents)
#
#     for window in more_itertools.windowed(sents, n=window, step=2):
#         yield window
#
# pool = Pool(8)
# def passages(idx, source):
#     count = 0
#     # with tqdm(total=len(files)) as pbar:
#     for i in pool.apply(iter_data):
#         count += 1
#         article = i["text"]
#
#         for window in sentence_window(article):
#             passage = " ".join(clean(passage) for passage in window)
#
#             #if len(passage) < 50: continue
#
#             yield {
#                 "_index": idx,
#                 "document": {
#                     "source": source,
#                     "title": i["title"],
#                     "text": passage
#                     }
#                 }
#
#         # pbar.update()
#

In [None]:
# import re
# from elasticsearch import helpers
# from elasticsearch.helpers import streaming_bulk, parallel_bulk
# from tqdm.auto import tqdm
# import spacy

# es = wiki_ev
# errors_count = 0

# # TODOs: Tune chunk size
# chunk_size = 25000
# counta = len(files)//chunk_size
# successes = 0

# # with tqdm(total=counta) as pbar:
# for ok, result in parallel_bulk(es, passages(idx=INDEX_NAME, source=SOURCE), chunk_size=chunk_size, request_timeout=60*3):

# # for ok, result in tqdm(streaming_bulk(es, index=INDEX_NAME, actions=passages(idx=INDEX_NAME, source=SOURCE), 
# #                         chunk_size=chunk_size, request_timeout=60*3, max_retries=3)):
#     if ok is not True:
#             logging.error('Failed to import data')
#             logging.error(str(result))
#             errors_count += 1

#             if errors_count == errors_before_interrupt:
#                 logging.fatal('Too many import errors, exiting with error code')
#                 exit(1)
    
#     successes += ok

In [None]:
# import time
# from elasticsearch import helpers
# from elasticsearch.helpers import streaming_bulk, parallel_bulk
# from tqdm import tqdm

# "https://stackoverflow.com/questions/67522617/elasticsearch-bulk-insert-w-python-socket-timeout-error#:~:text=The%20connection%20to%20elasticsearch%20has,be%20handled%20as%20an%20error."

# "https://github.com/elastic/elasticsearch-py/issues/297"

# def load_data(docs, idx_):
#     for doc in docs:
#         idx, title, text = doc
#         doc_ = {"id": idx, "title": title, "text": text}

#         yield {
#             "_index": idx_,
#             "document": {
#                 "id": idx,
#                 "title": title,
#                 "text": text
#             }
#         }

# # TODOs: Utils, Duration Function as Decorator
# es = wiki_ev
# errors_count = 0

# # TODOs: Increase Chunk Size, with extended Timeout + handeling
# # TODOs: Experiment-Check with Yield OK
# for ok, result in parallel_bulk(es, load_data(wiki_data, "wiki_evidence"), chunk_size=500, request_timeout=60*3):
#     if ok is not True:
#             logging.error('Failed to import data')
#             logging.error(str(result))
#             errors_count += 1

#             if errors_count == errors_before_interrupt:
#                 logging.fatal('Too many import errors, exiting with error code')
#                 exit(1)

In [None]:

# import spacy

# nlp = spacy.load("en_core_web_sm")

# def sentence_window(article, window=3, step=2): 
#     """ Generates a list of sentences of sliding size = window """
    
#     sents = list(nlp(article).sents)
    
#     if len(sents) == window:
#         yield sents

#     for i in range(0, len(sents)):
#         yield(sents[i:i + window])

# def load_data(docs, idx_):
#     """ Generates an evidence document to be inserted into ES Index """
#     for doc in docs:
#         idx, title, text = doc

#         for paragraph in sentence_window(text):
#             yield {
#                 "_index": idx_,
#                 "document": {
#                     "id": idx,
#                     "title": title,
#                     "text": paragraph
#                 }
#             }


In [None]:
# import time
# from elasticsearch import helpers
# from elasticsearch.helpers import streaming_bulk
# from tqdm import tqdm
#
# "https://stackoverflow.com/questions/67522617/elasticsearch-bulk-insert-w-python-socket-timeout-error#:~:text=The%20connection%20to%20elasticsearch%20has,be%20handled%20as%20an%20error."
#
# def load_data(docs, idx_):
#     for doc in docs:
#         idx, title, text = doc
#         doc_ = {"id": idx, "title": title, "text": text}
#
#         yield {
#             "_index": idx_,
#             "document": doc
#         }
#
# # TODOs: Utils, Duration Function as Decorator
# es = wiki_ev
# #helpers.bulk(es, load_data(wiki_data, "wiki_evidence"), raise_on_error=False, chunk_size=500)
# errors_count = 0
# # TODOs: Increase Chunk Size, with extended Timeout + handelling
# for ok, result in streaming_bulk(es, load_data(wiki_data, "wiki_evidence"), chunk_size=500, request_timeout=60*3, yield_ok=yield_ok, refresh=refresh_index_after_insert):
#     if ok is not True:
#             logging.error('Failed to import data')
#             logging.error(str(result))
#             errors_count += 1
#
#             if errors_count == errors_before_interrupt:
#                 logging.fatal('Too many import errors, exiting with error code')
#                 exit(1)

In [None]:
# for article in wiki_data:
#     _id, title, text = article
#     doc = {"id": _id, "title": title, "text": text}
#
#     wiki_ev.add_doc(doc)

In [None]:
# https://github.com/elastic/elasticsearch-py/issues/297

In [None]:
### DOCUMENT IMPORT: FULL-TEXTS ###
# from multiprocessing import Pool
# # from utils import get_contents
# import utils
# from tqdm import tqdm
# import json

# # TODOs: USE A GENERATOR OBJECT
# import spacy
# nlp = spacy.load("en_core_web_sm")
#
# p = Pool(8)
# files = [f for f in kw_sample]
#
# count = 0
# test = []
# with tqdm(total=len(files)) as pbar:
#     for documents in p.map(utils.get_contents_2, files):
#             for doc in documents:
#                 _id, title, text = doc
#
#             # count += 1
#             # doc_ = {"id": _id, "title": title, "text": text}
#
#             #wiki_ev.add_doc(doc)
#             test.append(doc_)
#     pbar.update()

In [None]:
# from elasticsearch import Elasticsearch

# # INIT OBJECT

# # TODOs: Persist a Generator Object
# PORT = "http://localhost:9200"
# INDEX_NAME = "wiki_evidence"
# errors_before_interrupt = 5
# refresh_index_after_insert = False
# max_insert_retries = 3
# yield_ok = False

# wiki_ev = Elasticsearch(
#     PORT,
#     #http_auth=(es_api_user, es_api_password)
#     retry_on_timeout=True,  # should timeout trigger a retry on different node?
# )

# wiki_ev.elastic_index = INDEX_NAME

# wiki_ev

<Elasticsearch(['http://localhost:9200'])>

In [None]:
# ### DOCUMENT IMPORT: SEGMENTED-TEXTS ###

# from multiprocessing import Pool
# # from utils import get_contents
# from tqdm import tqdm

# # p = Pool(8)
# # files = [f for f in kw_sample]
# #
# # count = 0
# # test = []
# # with tqdm(total=len(files)) as pbar:
# #     for documents in p.map(get_contents, files):
# #         for doc in documents:
# #             _id, title, text = doc
# #
# #             count += 1
# #             doc_ = {"id": _id, "title": title, "text": text}
# #
# #             #wiki_ev.add_doc(doc)
# #             test.append(doc)
# #     pbar.update()

In [None]:
# ### QUERY DB ###
# import elastic_db
# # from elastic_db import ElasticDB
# #
# # # Params
# # PORT = "http://localhost:9200"
# # INDEX = "wiki_evidence"
# # DOC = "evidence"
# #
# # # Init Elasticsearch DB
# # wiki_ev_ = ElasticDB(elastic_port=PORT, elastic_index=INDEX, elastic_doc=DOC)
# #
# # results = wiki_ev_.search("exploitation a wider public debate indecency adult")
# # results

In [None]:
### SQLITE LOAD ###
# from multiprocessing import Pool
# import utils
# from tqdm import tqdm
# import sqlite3

# import spacy
# import uuid

# nlp = spacy.load("en_core_web_sm")

# def paragraphs(document):
#     start = 0
#     document = nlp(document)
#     passages = []
#     for token in document:
#         if token.is_space and token.text.count("\n") > 1:
#             yield document[start:token.i]
#             start = token.i
#     yield document[start:]


# def get_contents(filename):
#     """Parse the contents of a file. Each line is a JSON encoded document."""
#     documents = []

#     with open(filename) as f:
#         for line in f:
#             doc = json.loads(line)

#             if doc["text"] == "": continue
#             if not doc: continue

#             passages = [str(i) for i in paragraphs(doc["text"])][0].split("\n")

#             for passage in passages:
#                 if len(passage) < 50:
#                     continue

#                 documents.append((str(uuid.uuid4()).replace('-',''), doc['id'], doc["title"], passage))

#     return documents

# save_path = "../data/wiki_evidence.db"
#
# p = Pool(8)
# files = [f for f in kw_sample]
#
# conn = sqlite3.connect(save_path)
# c = conn.cursor()
#
# documents = "documents"
# c.execute(f"CREATE TABLE documents (id PRIMARY KEY, id_, title, text);")
#
# count = 0
# step = 100
# batches = [files[i:i + step] for i in range(0, len(files), step)]
#
# for i, batch in enumerate(batches):
#     logger.info(f"[.... Batch #{i} .....]")
#     with tqdm(total=len(batch)) as pbar:
#         for document in tqdm(p.imap_unordered(get_contents, files)):
#             count += 1
#             for content in document:
#                 # _id, title, passage = content
#                 c.executemany("INSERT INTO documents VALUES (?,?,?,?)", (content,))
#
#         pbar.update()
#         logger.info(f"[Uploaded {count} documents]")
#
# conn.commit()
# conn.close()

In [None]:
# seq = [0, 1, 2, 3, 4, 5]
# window_size = 3
# step = 2

# # steps = 0, 2, 4 

# for i in range(0, len(seq) - window_size + 1, step):
#     print(i)
#     # print(i + window_size)
#     if i + window_size > len(seq):
#         # print(window_size)
#         window_size = i + window_size - len(seq)
    
#     print(seq[i: i + window_size])