# INGEST INFOMEDIA DATA

The data to ingest must have been prepared beforehand with the notebook "01_Prepare_Infomedia_Data.ipnyb" of the series. Indeed, it needs its output file "INFOMEDIA DEDUPLICATED.csv" as a source.

This notebook assumes that you have an Elastic Search (ES) engine installed.

The data ingested in ES is intended to be used with the datascapes from this repository: https://github.com/jacomyma/add-datascapes

*Authors: Snorre Ralund, Mathieu Jacomy, Anders Munk.*

## How to use
1. Edit the settings below
2. Run all the cells from the menu: Cell > Run all. **It can take a long time.**
3. It's over when the last cell outputs "Done."

In [None]:
settings = {}

# SOURCE FILE

# It should already be named like this from "01_Prepare_Infomedia_Data.ipnyb"
settings['infomedia_deduplicated_source'] = 'INFOMEDIA DEDUPLICATED.csv'


# ELASTIC SEARCH (ES) CONFIG

# URL of the ES node where to ingest the data
settings['es_URL'] = 'http://elastic:username@password:port'

# ES user
settings['es_auth_user'] = 'username'

# ES password
settings['es_auth_password'] = 'password'

# Index name
settings['index_name'] = 'infomedia-test'


# OTHER SETTINGS

# Do you want to delete and respawn the index?
# If set to False, it just ingests more data.
settings['reset_all'] = True

# How many documents to ingest at once?
settings['batch_size'] = 1000

**The rest of the script does not require your intervention, except for checking that no errors occur.**

Although you are welcome to change it! That is why it is commented.


In [None]:
# Install necessary stuff
!conda install elasticsearch -y
#!conda install elasticsearch_dsl -y

In [None]:
# Index settings (mappings, essentially)
# You do not need to change that
es_index_settings = {
    "index": {"number_of_replicas": 2},
    "analysis": {
        "filter": {
            "ngram_filter": {
                "type": "edge_ngram",
                "min_gram": 2,
                "max_gram": 15,
            },
        },
        "analyzer": {
            "ngram_analyzer": {
                "type": "custom",
                "tokenizer": "standard",
                "filter": ["lowercase", "ngram_filter"],
            },
        },
    },
}
es_index_mappings = {
    "properties": {
        "duid": {"type": "keyword"},
        "publishdate": {"type": "date"},
        "sourcename": {"type": "keyword"},
        "year": {"type": "integer"},
        "heading": {
            "type": "text",
            "analyzer": "standard",
            "fields": {
                "keyword": {"type": "keyword"},
                "ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
            },
        },
        "full_text": {
            "type": "text",
            "analyzer": "standard",
            "fields": {
                "ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
            },
        }
    }
}

In [None]:
# Load Infomedia raw data
# You can check what the data look like below.
import pandas as pd
import numpy as np
df = pd.read_csv(settings['infomedia_deduplicated_source']).replace({np.nan: None})
df

In [None]:
from elasticsearch import Elasticsearch

es_client = Elasticsearch(
    settings['es_URL'],
    basic_auth=[settings['es_auth_user'], settings['es_auth_password']],
)

if settings['reset_all']:
    es_client.indices.delete(index=settings['index_name'], ignore_unavailable=True)
    es_client.indices.create(index=settings['index_name'], settings=es_index_settings, mappings=es_index_mappings)

In [None]:
# Retrieve documents already indexed

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search

es_client = Elasticsearch(
    "http://elastic:9pSJ5siACNtVAUwB2mj8@10.92.0.111:9200",
    basic_auth=["elastic", "9pSJ5siACNtVAUwB2mj8"],
)

s = Search(using=es_client, index=settings['index_name']) \
    .query("match_all") \
    .source(["duid"])

response = s.execute()

indexed_documents = set()
for hit in s.params(scroll="20m").scan():
    indexed_documents.add(hit['duid'])

print("{} documents already in the index.".format(len(indexed_documents)))

In [None]:
# Identify documents to index

to_index = []
for index, row in df.iterrows():
    if row['duid'] not in indexed_documents:
        to_index.append(row)
print("{} documents to index yet.".format(len(to_index)))

In [None]:
# You may want to monitor what are the remaining documents.
# If so, set the condition to True just below:
if False:
    for row in to_index:
        print(row['duid']+" "+row["heading"])

In [None]:
import json

while len(to_index)>0:

    # Build a batch
    batch = []
    while len(to_index)>0 and len(batch)<settings['batch_size']:
        batch.append(to_index.pop())
    
    # Inject the batch
    actions = []
    for row in batch:
        action = {"index": {"_index": settings['index_name'], "_id": row["duid"]}}
        doc = {
            "duid": row["duid"],
            "publishdate": row["publishdate"],
            "sourcename": row["sourcename"],
            "year": int(row["year"]),
            "heading": row["heading"],
            "text": row["clean_text"],
        }
        actions.append(json.dumps(action))
        actions.append(json.dumps(doc))
    result = es_client.bulk(body="\n".join(actions), request_timeout=100)
    if result['errors']:
        for r in result['items']:
            if 'index' in r and '_id' in r['index'] and 'error' in r['index']:
                if 'type' in r['index']['error'] and 'reason' in r['index']['error']:
                    print("Insertion error for item {}: {}. Reason: {}".format(r['index']['_id'], r['index']['error']['type'], r['index']['error']['reason']))
                else:
                    print("Insertion error for item {}: {}".format(r['index']['_id'], r['index']['error']))
    print("{} documents to index.".format(len(to_index)))
print("Done.")