In [None]:
from src.utils import *
from src.dbmongo import DbMongo, get_db
from collections import defaultdict, Counter
from loguru import logger
import csv
# from tqdm.notebook import tqdm
from tqdm.auto import tqdm

In [None]:
config = parse_config("config.json")
db = get_db(config)

In [None]:
# update entitie's len
db.update_entity_len()

In [None]:
# manual fixes
db.update_entities_to(config["actually_people"], "PER")
db.update_entities_to(config["actually_orgs"], "ORG")
db.update_entities_to(config["actually_locs"], "LOC")
db.update_entities_to(config["actually_misc"], "MISC")

# Process data and generate 

In [None]:
# 1 delete everything from the "final" collections
db["final_entities"].drop()
db["final_news"].drop()

In [None]:
# 2 find duplicates and update
def update_dups():
    # delete all "duplicate_of" before running again
    db["entities"].update({}, {"$unset": {"duplicate_of": True}} , multi=True)
    db.update_duplicate_entities(label="PER")
    db.update_duplicate_entities(label="ORG")
    db.update_duplicate_entities(label="LOC")
    db.update_duplicate_entities(label="MISC")
#update_dups()

In [None]:
ignore_entities = list(set(config["ignore_people"]))

In [None]:
def valid_entity(e):
    if len(e["_id"]) <= 1: return False
    if e["_id"][0:2] in {"a-", "o-", "f-"}: return False
    if e["_id"][0:3] in {"dr-", "ao-"}: return False
    if e["_id"][0:5] in {"foto-"}: return False
    if e["_id"][-4:] in {"-dos", "-lhe", "-lho", "-lha"}: return False
    if e["_id"][-3:] in {"-de", "-da", "-do", "-dn", "-cm", "-jn", "-in"}: return False
    if any([forbidden in e["_id"] for forbidden in ["soundcloud", "itunes", "android", "http", "www", "youtube", "instagram"]]): return False
    if all([len(part)==1 for part in e["_id"].split("-")]): return False # prevenir a-b-c
    return True

In [None]:
def valid_entity_size_per_label(e):
    label = e["label"]
    if label == "PER":  return "-" in e["_id"] and e["len"]>=10
    if label == "ORG":  return e["len"] >= max(50, 110 - 30 * len(e["_id"].split("-")))
    if label == "LOC":  return e["len"] >= 50
    if label == "MISC": return e["len"] >= 100

In [None]:
def must_include(e):
    return e["_id"] in config["must_include"]

In [None]:
# 3 iterate non-duplicate entities with a minmimum len according to label, previously: "_id": {"$regex": ".+-.+"}
for e in db["entities"].find({"duplicate_of": {"$exists": False}, "len": {"$gte": 10}, "_id": {"$not": {"$in": ignore_entities}}}, no_cursor_timeout=True):
    if not must_include(e) and (not valid_entity(e) or not valid_entity_size_per_label(e)): continue
    e["search_text"] = e["text"]
    db._upsert_one("final_entities", e)

In [None]:
# find all that are root of duplicate_of but are single worded like "zapatero"
# if these are insignificant -> ignore
for dup in db["entities"].find({"duplicate_of": {"$exists": True, "$regex": "^[^-]+$", "$not": {"$in": ignore_entities}}}, {"duplicate_of": True}, no_cursor_timeout=True).limit(10):
    print(dup["duplicate_of"])

In [None]:
# iterate all duplicates of those entities and update their search terms
for dup in db["entities"].find({"duplicate_of": {"$exists": True}}, {"duplicate_of": True, "news": True, "text": True}, no_cursor_timeout=True):
    root = db["final_entities"].find_one({"_id": dup["duplicate_of"]})
    if not root: continue
    # manually because mongo did not allow one-op-update
    root["search_text"]+="\n%s"%dup["text"]
    root["news"] = list(set(root["news"] + dup["news"]))
    root["len"] = len(root["news"])
    db._upsert_one("final_entities", root, upsert=False)

In [None]:
# iterate all valid and processed news -> insert into final_news without entities
# SHOULD RUN rnd/fix_news_texts beforehand
for n in db["news"].find({"processed": True, "processed_entities": True, "valid": {"$exists": False}}, {"title":True,"original":True,"text":True,"timestamp":True,"url":True,"image":True, "website": True}, no_cursor_timeout=True):
    n["entities"] = {"PER": [], "ORG": [], "LOC": [], "MISC": []}
    db._upsert_one("final_news", n)

In [None]:
# iterate all final_entities and insert them into the news piece
from pymongo import UpdateOne
for fe in db["final_entities"].find({}, no_cursor_timeout=True):
    db["final_news"].bulk_write([UpdateOne({'_id': n}, {
        "$addToSet": {"entities.%s" % fe["label"]: {"$each": [{"_id": fe["_id"], "text": fe["text"]}]}}
    }, upsert=True) for n in fe["news"]], ordered=False)

In [None]:
removed_news = list()

In [None]:
# remove any news_piece without entities of type PER and ORG
filters = {
    "entities.PER": {"$size": 0}, 
    "entities.ORG": {"$size": 0}, 
    # "entities.LOC": {"$size": 0}, 
    # "entities.MISC": {"$size": 0}
}
removed_news += [n["_id"] for n in db['final_news'].find(filters)]
db['final_news'].remove(filters)

In [None]:
# remove any news_piece without "text", this happens when entities are added to a non-existing news piece
# probably from a bug in the collection process or due to having performed several operations on it
filters = {"text": {"$exists": False}}
removed_news += [n["_id"] for n in db['final_news'].find(filters)]
db['final_news'].remove(filters)

In [None]:
removed_news = set(removed_news)

In [None]:
# make sure all the news mentioned in the final_entites are in final_news otherwise remove those news_ids from their `news` attributes
final_entities = list(db["final_entities"].find({}, no_cursor_timeout=True))
pbar = tqdm(total=len(final_entities))
for fe in final_entities:
    original = fe["news"]
    actual = list(set(fe["news"]) - removed_news)
    if len(actual) < len(original): # if some of the news have been deleted above
        fe["news"] = actual
        db._upsert_one("final_entities", fe)
    pbar.update()

### Generate Neo4j code

In [None]:
# https://neo4j.com/blog/bulk-data-import-neo4j-3-0/
# https://neo4j.com/blog/cypher-write-fast-furious/
# https://medium.com/neo4j/5-tips-tricks-for-fast-batched-updates-of-graph-structures-with-neo4j-and-cypher-73c7f693c8cc
# https://neo4j.com/developer/guide-import-csv/

## Import command
```
CREATE INDEX ON :PER(_id);
CREATE INDEX ON :ORG(_id);
CREATE INDEX ON :LOC(_id);
CREATE INDEX ON :MISC(_id);
CREATE INDEX ON :NEWS(_id);
```
---


`MATCH (n) DETACH DELETE n`

`:schema`

In [None]:
# iterate every entitiy
def generate_csv(csv_name, label):
    with open("../neo4j/import/%s.csv" % csv_name, "w", encoding="utf-8", newline="") as out:
        # fields = ['_id', 'label', 'text', 'search_text', 'len', 'news']
        fields = ['_id', 'text', "news"]
        writer = csv.DictWriter(out, fieldnames=fields, delimiter=",")
        writer.writeheader()
        for fe in db["final_entities"].find({"label": label}, {k: True for k in fields}, no_cursor_timeout=True):
            fe["news"] = ",".join(fe["news"])
            writer.writerow(fe)
    # return with/without
    return """
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM 'file:///%s.csv' AS row
MERGE (e:%s {_id: row._id, text: row.text})
WITH row, e
UNWIND split(row.news, ',') AS news_piece
MERGE (n:NEWS {_id: news_piece})
MERGE (e)-[:liga]-(n);""" % (csv_name, label), """
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM 'file:///%s.csv' AS row
MERGE (e:%s {_id: row._id, text: row.text});""" % (csv_name, label) # without news

In [None]:
entities = [("people", "PER"), ("orgs", "ORG"), ("locations", "LOC"), ("misc", "MISC")]
with_news, without_news = "", ""
for filename, label in entities:
    w, wo = generate_csv(filename, label)
    with_news+=w
    without_news+=wo
print("WITHOUT NEWS:\n\n", without_news)
print("\n", "-"*50, "\n")
print("WITH NEWS:\n\n", with_news)

In [None]:
with open("../neo4j/import/news.csv", "w", encoding="utf-8", newline="") as out:
    fields = ["_id", "title"]
    writer = csv.DictWriter(out, fieldnames=fields, delimiter=",")
    writer.writeheader()
    for n in db["final_news"].find({}, {k: True for k in fields}, no_cursor_timeout=True):
        writer.writerow(n)

```
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM 'file:///news.csv' AS row
MERGE (n:NEWS {_id: row._id, title: row.title});
```

In [None]:
# # for each news piece connecting two entities -> create a relation between those entities 
# # or
# # for each entity, iterate all entites and intersect news, create new relationship with given Weight
# def output_connections_with_news():
#     with open("../neo4j/import/connections_with_news.csv", "w", encoding="utf-8", newline="") as out:
#         writer = csv.DictWriter(out, fieldnames=["_id1", "_id2", "weight", "news"], delimiter=",")
#         writer.writeheader()
#         seen = set()
#         pbar = tqdm(total= db["final_entities"].count_documents({}))
#         for e1 in db["final_entities"].find({}, {"news": True}, no_cursor_timeout=True):
#             seen.add(e1["_id"])
#             for e2 in db["final_entities"].find({}, {"news": True}, no_cursor_timeout=True):
#                 if e2["_id"] in seen: continue
#                 common_news = list(set(e1["news"]) & set(e2["news"]))
#                 if len(common_news):
#                     writer.writerow({"_id1": e1["_id"], "_id2": e2["_id"], "weight": len(common_news), "news": ",".join(common_news)})
#             pbar.update()

```
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM 'file:///connections_with_news.csv' AS row
MERGE (e1 {_id: row._id1})
MERGE (e2 {_id: row._id2})
WITH row, e1, e2
MERGE (e1)-[:rel{weight: toInteger(row.weight), news: split(row.news, ',')}]-(e2);
```

In [None]:
# for each news piece connecting two entities -> create a relation between those entities 
# or
# for each entity, iterate all entites and intersect news, create new relationship with given Weight
def output_connections_without_news(min_common=1):
    with open("../neo4j/import/connections_%d.csv" % min_common, "w", encoding="utf-8", newline="") as out:
        writer = csv.DictWriter(out, fieldnames=["_id1", "_id2", "weight"], delimiter=",")
        writer.writeheader()
        entities = list(db["final_entities"].find({}, {"news": True}, no_cursor_timeout=True)) # loading to RAM
        pbar = tqdm(total= len(entities))
        for i, e1 in enumerate(entities):
            for e2 in entities[i+1:]:
                common_news = list(set(e1["news"]) & set(e2["news"]))
                if len(common_news)>=min_common:
                    writer.writerow({"_id1": e1["_id"], "_id2": e2["_id"], "weight": len(common_news)})
            pbar.update()

```
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM 'file:///connections.csv' AS row
MERGE (e1 {_id: row._id1})
MERGE (e2 {_id: row._id2})
WITH row, e1, e2
MERGE (e1)-[:rel{weight: toInteger(row.weight)}]-(e2);
```

In [None]:
output_connections_without_news(1)

---

# Below is the import code for neo4j-admin import

### CSV entities
```
_id:ID,text,len:int,:LABEL

```

In [None]:
with open("../neo4j/import/i_entities.csv", "w", encoding="utf-8", newline="") as out:
    fields = ['_id', 'text', "len", "label"]
    writer = csv.DictWriter(out, fieldnames=fields, delimiter=",")
    out.write("_id:ID,text,len:int,:LABEL\n")
    for fe in db["final_entities"].find({}, {k: True for k in fields}, no_cursor_timeout=True):
        fe["text"] = fe["text"].replace("\n", " ").strip()
        writer.writerow({"_id": fe["_id"], "text": fe["text"], "len": fe["len"], "label": fe["label"]})

### CSV relationships
```
:START_ID,:END_ID,weight:int,:TYPE

```

In [None]:
with open("../neo4j/import/i_connections.csv", "w", encoding="utf-8", newline="") as out:
    writer = csv.DictWriter(out, fieldnames=["start_id", "end_id", "weight"], delimiter=",")
    out.write(":START_ID,:END_ID,weight:int\n")
    with open("../neo4j/import/connections_1.csv", "r", encoding="utf-8") as processed:
        reader = csv.reader(processed, delimiter=',')
        next(reader) # skip headers
        for row in reader:
            writer.writerow({"start_id": row[0], "end_id": row[1], "weight": row[2]})

```sql
FROM cwd = /

neo4j-admin import --id-type=STRING --nodes=import/i_entities.csv --relationships=rel=import/i_connections.csv

neo4j-admin import --id-type=STRING --nodes=i_entities.csv --relationships=rel=i_connections.csv
```

### Deploy neo4j
* create new local version from newly generated data
* copy neo4j/data/databases and neo4j/data/transactions into container volume
* restart docker