# Extract electricity prices and volume from VENRON data set

# Introduction

In this notebook we use `Fonduer` to extract relations from the `VENRON` dataset.  
This code is a modified version of their original hardware [tutorial](https://github.com/HazyResearch/fonduer-tutorials/tree/master/hardware).  
The `Fonduer` pipeline (as outlined in the [paper](https://arxiv.org/abs/1703.05028)), and the iterative KBC process:

1. KBC Initialization
2. Candidate Generation and Multimodal Featurization
3. Probabilistic Relation Classification
4. Error Analysis and Iterative KBC


## Setup

First we import the relevant libraries and connect to the local database.  
Follow the README instructions to setup the connection to the postgres DB correctly.

If the database has existing candidates with generated features, the will not be overriden.  
To re-run the entire pipeline including initialization drop the database first.

In [None]:
! dropdb -h postgres -h postgres --if-exists elec_price_vol
! createdb -h postgres -h postgres elec_price_vol

In [None]:
# source .venv/bin/activate

In [None]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
import os
import sys
import logging


In [None]:
PARALLEL = 4  # assuming a quad-core machine
ATTRIBUTE = "elec_price_vol"
DB_USERNAME = 'user'
DB_PASSWORD = 'venron'
conn_string = f'postgresql://{DB_USERNAME}:{DB_PASSWORD}@postgres:5432/{ATTRIBUTE}'
    
docs_path = 'data/gold/html/'
pdf_path = 'data/gold/pdf/'
gold_file = 'data/electricity_gold.csv'
max_docs = 50 # 114


## 1.1 Parsing and Transforming the Input Documents into Unified Data Models

We first initialize a `Meta` object, which manages the connection to the database automatically, and enables us to save intermediate results.

In [None]:
from fonduer import Meta, init_logging

# Configure logging for Fonduer
init_logging(log_dir="logs")

session = Meta.init(conn_string).Session()

In [None]:
from fonduer.parser.preprocessors import HTMLDocPreprocessor
from fonduer.parser.models import Document, Sentence
from fonduer.parser import Parser

has_documents = session.query(Document).count() > 0

if (not has_documents): 
    doc_preprocessor = HTMLDocPreprocessor(docs_path, max_docs=max_docs)
    corpus_parser = Parser(session, structural=True, lingual=True, visual=True, pdf_path=pdf_path)
    %time corpus_parser.apply(doc_preprocessor, parallelism=PARALLEL)
    
print(f"Documents: {session.query(Document).count()}")
print(f"Sentences: {session.query(Sentence).count()}")

## 1.2 Dividing the Corpus into Test and Train

We'll split the documents 80/10/10 into train/dev/test splits. Note that here we do this in a non-random order to preserve the consistency and we reference the splits by 0/1/2 respectively.

In [None]:
docs = session.query(Document).order_by(Document.name).all()
ld   = len(docs)

train_docs = set()
dev_docs   = set()
test_docs  = set()
splits = (0.8, 0.9)
data = [(doc.name, doc) for doc in docs]
data.sort(key=lambda x: x[0])
for i, (doc_name, doc) in enumerate(data):
    if i < splits[0] * ld:
        train_docs.add(doc)
    elif i < splits[1] * ld:
        dev_docs.add(doc)
    else:
        test_docs.add(doc)
from pprint import pprint
pprint([x.name for x in train_docs][0:5])
print(f"Number of documents split: {len(docs)}")

# Phase 2: Mention Extraction, Candidate Extraction Multimodal Featurization

Given the unified data model from Phase 1, `Fonduer` extracts relation
candidates based on user-provided **matchers** and **throttlers**. Then,
`Fonduer` leverages the multimodality information captured in the unified data
model to provide multimodal features for each candidate.

## 2.1 Mention Extraction & Candidate Generation

1. Define mention classes
2. Use matcher functions to define the format of potential mentions
3. Define Mentionspaces (Ngrams)
4. Run Mention extraction (all possible ngrams in the document, API [ReadTheDocs](https://fonduer.readthedocs.io/en/stable/user/candidates.html#fonduer.candidates.MentionExtractor))

In [None]:
from fonduer.candidates.models import mention_subclass
from fonduer.candidates.matchers import RegexMatchSpan, DictionaryMatch, LambdaFunctionMatcher, Intersect, Union
from fonduer.candidates import MentionNgrams
from fonduer.candidates import MentionExtractor 
from fonduer.candidates.models import Mention

hasMentions = session.query(Mention).count() > 0

# 1.) Mention subclasses
Station = mention_subclass("Station")
Price = mention_subclass("Price")

### Dictionary of known stations ###
stations_list = [
    "COB",
    "MID COLUMBIA",
    "PALO VERDE",
    "MEAD",
    "NP-15",
    "SP-15",
    "PJM - Western HUB",

    "Cob",
    "Palo",
    "SP 15",
    "NP 15",
    "MidC",
    "MeadMktplace",
    "PJM",

    "Mead/Marketplace",
    "California-Oregon Border",
    "California Northern Zone",
    "California Southern Zone",

    "California-Oregon Border (COB)",
    "California Northern Zone (NP-15)",
    "California Southern Zone (SP-15)",
    "Palo Verde",
    "Mid-Columbia",
    "Mid Columbia",
]

if (not hasMentions):

    # 2.) Matcher functions
    station_matcher = DictionaryMatch(d=stations_list)
    price_matcher = RegexMatchSpan(rgx=r"\d{1,4}(\.\d{1,5})", longest_match_only=False)

    # 3.) Mention spaces (Ngrams)
    station_ngrams = MentionNgrams(n_max=4)
    price_ngrams = MentionNgrams(n_max=1)


    # 4.) Mention extraction
    mention_extractor = MentionExtractor(
        session, [Station, Price], [station_ngrams, price_ngrams], [station_matcher, price_matcher]
    )
    docs = session.query(Document).order_by(Document.name).all()
    mention_extractor.apply(docs, parallelism=PARALLEL)

    
print(f"Total Mentions: {session.query(Mention).count()}")

## 2.2 Candidate Extraction

1. Define Candidate Class
2. Define trottlers to reduce the number of possible candidates
3. Extract candidates (View the API for the CandidateExtractor on [ReadTheDocs](https://fonduer.readthedocs.io/en/stable/user/candidates.html#fonduer.candidates.MentionExtractor).)

In the last part we specified that these `Candidates` belong to the training set by specifying `split=0`; recall that we're referring to train/dev/test as splits 0/1/2.

In [None]:
from fonduer.utils.data_model_utils import *
import re
from fonduer.candidates import CandidateExtractor
from fonduer.candidates.models import candidate_subclass
from fonduer.utils.visualizer import Visualizer


# 1.) Define Candidate class
StationPrice = candidate_subclass("StationPrice", [Station, Price])

has_candidates = session.query(StationPrice).filter(StationPrice.split == 0).count() > 0

# 2.) DefineThrottlers
def any_filter(c):
    (station, price) = c
    if 'volume' in get_head_ngrams(price):
        return False
    if 'date' in get_head_ngrams(price):
        return False 
    if 'non firm' in get_head_ngrams(price):
        return False 
    return True

any_throttler = any_filter

# 3.) Candidate extraction
candidate_extractor = CandidateExtractor(session, [StationPrice], throttlers=[any_throttler])

for i, docs in enumerate([train_docs, dev_docs, test_docs]):
    if (not has_candidates):
        candidate_extractor.apply(docs, split=i, parallelism=PARALLEL)
    print(f"Number of Candidates in split={i}: {session.query(StationPrice).filter(StationPrice.split == i).count()}")

train_cands = candidate_extractor.get_candidates(split = 0)
dev_cands = candidate_extractor.get_candidates(split = 1)
test_cands = candidate_extractor.get_candidates(split = 2)


# 4.) Visualize some candidate for error analysis
pprint(train_cands[0][2003])
vis = Visualizer(pdf_path)

# Display a candidate
vis.display_candidates([train_cands[0][2003]])

## 2.2 Multimodal Featurization
Unlike dealing with plain unstructured text, `Fonduer` deals with richly formatted data, and consequently featurizes each candidate with a baseline library of multimodal features. 

### Featurize with `Fonduer`'s optimized Postgres Featurizer
We now annotate the candidates in our training, dev, and test sets with features. The `Featurizer` provided by `Fonduer` allows this to be done in parallel to improve performance.

View the API provided by the `Featurizer` on [ReadTheDocs](https://fonduer.readthedocs.io/en/stable/user/features.html#fonduer.features.Featurizer).

At the end of this phase, `Fonduer` has generated the set of candidates and the feature matrix. Note that Phase 1 and 2 are relatively static and typically are only executed once during the KBC process.

In [None]:
from fonduer.features import Featurizer

featurizer = Featurizer(session, [StationPrice])

# Training set
%time featurizer.apply(split=0, train=True, parallelism=PARALLEL)
%time F_train = featurizer.get_feature_matrices(train_cands)
print(F_train[0].shape)

# Dev set
%time featurizer.apply(split=1, parallelism=PARALLEL)
%time F_dev = featurizer.get_feature_matrices(dev_cands)
print(F_dev[0].shape)

# Test set
%time featurizer.apply(split=2, parallelism=PARALLEL)
%time F_test = featurizer.get_feature_matrices(test_cands)
print(F_test[0].shape)

# Phase 3: Probabilistic Relation Classification
In this phase, `Fonduer` applies user-defined **labeling functions**, which express various heuristics, patterns, and [weak supervision](http://hazyresearch.github.io/snorkel/blog/weak_supervision.html) strategies to label our data, to each of the candidates to create a label matrix that is used by our data programming engine.

1. Load Gold Data

--- 

Iterate the following steps

2. Create labeling functions
3. Apply labeling functions and measure accuracy of each LF (based on gold data).
4. Build a generative model by combining the labeling functions
5. Iterate on labeling function based on the models score

---

6. Finally build a descriminative model and test on the test set

### 3.1) Loading Gold LF

In [None]:
from fonduer.supervision.models import GoldLabel
from electricity_utils import gold
from fonduer.supervision import Labeler

# 1.) Load the gold data
docs = corpus_parser.get_documents()
labeler = Labeler(session, [StationPrice])
%time labeler.apply(docs=docs, lfs=[[gold]], table=GoldLabel, train=True, parallelism=PARALLEL)

### 3.2) Creating Labeling Functions

We have 3 states that we can return from a LF: `ABSTAIN`, `FALSE` or `TRUE`.

A library of data model utilities
which can be used to write labeling functions are outline in [Read the
Docs](http://fonduer.readthedocs.io/en/stable/user/data_model_utils.html). 

### 3.3) Applying the Labeling Functions

Next, we need to actually run the LFs over all of our training candidates, producing a set of `Labels` and `LabelKeys` (just the names of the LFs) in the database. Note that this will delete any existing `Labels` and `LabelKeys` for this candidate set.

View the API provided by the `Labeler` on [ReadTheDocs](https://fonduer.readthedocs.io/en/stable/user/supervision.html#fonduer.supervision.Labeler).

We can also view statistics about the resulting label matrix.
* **Coverage** is the fraction of candidates that the labeling function emits a non-zero label for.
* **Overlap** is the fraction candidates that the labeling function emits a non-zero label for and that another labeling function emits a non-zero label for.
* **Conflict** is the fraction candidates that the labeling function emits a non-zero label for and that another labeling function emits a conflicting non-zero label for.

In addition, because we have already loaded the gold labels, we can view the emperical accuracy of these labeling functions when compared to our gold labels using the `analysis` module of [Snorkel](https://github.com/snorkel-team/snorkel)

### 3.4) Build Generative Model

Now, we'll train a model of the LFs to estimate their accuracies. Once the model is trained, we can combine the outputs of the LFs into a single, noise-aware training label set for our extractor. Intuitively, we'll model the LFs by observing how they overlap and conflict with each other. To do so, we use [Snorkel](https://github.com/snorkel-team/snorkel)'s single-task label model.

We then print out the marginal probabilities for each training candidate.

In [None]:
from fonduer.utils.data_model_utils import *
from snorkel.labeling import labeling_function

from snorkel.labeling import LFAnalysis
from snorkel.labeling.model import LabelModel

import matplotlib.pyplot as plt
import re

ABSTAIN = -1
FALSE = 0
TRUE = 1

def dict_without_station(station):
    return [v for v in stations_list if not v == station]

# 2.) Create labeling functions 
@labeling_function()
def LF_price_head(c):
    return TRUE if 'price' in get_head_ngrams(c.price) else ABSTAIN

# @labeling_function()
# def LF_on_peak_head(c):
#     return TRUE if 'on peak' in get_head_ngrams(c.price) else ABSTAIN

@labeling_function()
def LF_peak_head(c):
    return TRUE if 'peak' in get_head_ngrams(c.price) else ABSTAIN

# @labeling_function()
# def LF_off_peak_head(c):
#     return FALSE if 'off peak' in get_head_ngrams(c.price) else ABSTAIN

# @labeling_function()
# def LF_firm_head(c):
#     return TRUE if 'firm' in get_head_ngrams(c.price) else ABSTAIN

@labeling_function()
def LF_dollar_to_left(c):
    return TRUE if '$' in get_left_ngrams(c.price, window=2) else ABSTAIN

# @labeling_function()
# def LF_same_table(c):
#     return TRUE if same_table(c) else ABSTAIN

# @labeling_function()
# def LF_other_station_table(c):
#     return FALSE if overlap(
#         dict_without_station(c.station), 
#         list(get_aligned_ngrams(c.price))
#     ) else ABSTAIN

station_price_lfs = [
     LF_price_head,
#     LF_on_peak_head,
     LF_peak_head,
#     LF_off_peak_head,
#     LF_firm_head,
     LF_dollar_to_left,  
#     LF_other_station_table,
]

# 3.) Apply the LFs on the training set
labeler = Labeler(session, [StationPrice])
%time labeler.apply(split=0, lfs=[station_price_lfs], train=True, clear=True, parallelism=PARALLEL)
%time L_train = labeler.get_label_matrices(train_cands)
print(f"Labeling functions on train_cands not ABSTAIN: {L_train[0].shape[1]}")

# 4.) Evaluate their accuracy
L_gold_train = labeler.get_gold_labels(train_cands, annotator='gold')
# Sort LFs for LFAnalysis because LFAnalysis does not sort LFs,
# while columns of L_train are sorted alphabetically already.
sorted_lfs = sorted(station_price_lfs, key=lambda lf: lf.name)
LFAnalysis(L=L_train[0], lfs=sorted_lfs).lf_summary(Y=L_gold_train[0].reshape(-1))

# 5.) Build generative model
gen_model = LabelModel(cardinality=2)
%time gen_model.fit(L_train[0], n_epochs=500, log_freq=100)

train_marginals = gen_model.predict_proba(L_train[0])
plt.hist(train_marginals[:, TRUE], bins=20)
plt.show()

# Apply on dev-set
labeler.apply(split=1, lfs=[station_price_lfs], clear=True, parallelism=PARALLEL)
%time L_dev = labeler.get_label_matrices(dev_cands)

L_gold_dev = labeler.get_gold_labels(dev_cands, annotator='gold')
LFAnalysis(L=L_dev[0], lfs=sorted_lfs).lf_summary(Y=L_gold_dev[0].reshape(-1))

In [None]:
LFAnalysis(L=L_dev[0], lfs=sorted_lfs).lf_summary(Y=L_gold_dev[0].reshape(-1))

## Training the Discriminative Model 

Fonduer uses the machine learning framework [Emmental](https://github.com/SenWu/emmental) to support all model training.

In [None]:
import emmental

# Setup training config
config = {
    "meta_config": {"verbose": True},
    "model_config": {"model_path": None, "device": 0, "dataparallel": False},
    "learner_config": {
        "n_epochs": 50,
        "optimizer_config": {"lr": 0.001, "l2": 0.0},
        "task_scheduler": "round_robin",
    },
    "logging_config": {
        "evaluation_freq": 1,
        "counter_unit": "epoch",
        "checkpointing": False,
        "checkpointer_config": {
            "checkpoint_metric": {f"{ATTRIBUTE}/{ATTRIBUTE}/train/loss": "min"},
            "checkpoint_freq": 1,
            "checkpoint_runway": 2,
            "clear_intermediate_checkpoints": True,
            "clear_all_checkpoints": True,
        },
    },
}

emmental.init(Meta.log_path)
emmental.Meta.update_config(config=config)

In [None]:
# Collect word counter from training data
from fonduer.learning.utils import collect_word_counter

word_counter = collect_word_counter(train_cands)

In [None]:
# Generate word embedding module for LSTM model
# (in Logistic Regression, we generate it since Fonduer dataset requires word2id dict)
from emmental.modules.embedding_module import EmbeddingModule

arity = 2

# Geneate special tokens
specials = []
for i in range(arity):
    specials += [f"~~[[{i}", f"{i}]]~~"]

emb_layer = EmbeddingModule(
    word_counter=word_counter, word_dim=300, specials=specials
)

In [None]:
# Generate dataloader for training set
from emmental.data import EmmentalDataLoader
from fonduer.learning.dataset import FonduerDataset
import numpy as np

# Filter out noise samples
diffs = train_marginals.max(axis=1) - train_marginals.min(axis=1)
train_idxs = np.where(diffs > 1e-6)[0]

train_dataloader = EmmentalDataLoader(
    task_to_label_dict={ATTRIBUTE: "labels"},
    dataset=FonduerDataset(
        ATTRIBUTE,
        train_cands[0],
        F_train[0],
        emb_layer.word2id,
        train_marginals,
        train_idxs,
    ),
    split="train",
    batch_size=100,
    shuffle=True,
)


In [None]:
from emmental.model import EmmentalModel
from fonduer.learning.task import create_task
from emmental.learner import EmmentalLearner

tasks = create_task(
    ATTRIBUTE, 2, F_train[0].shape[1], 2, emb_layer, model="LogisticRegression"
)

model = EmmentalModel(name=f"{ATTRIBUTE}_task")

for task in tasks:
    model.add_task(task)

emmental_learner = EmmentalLearner()
emmental_learner.learn(model, [train_dataloader])

## Evaluating on the Test Set 

In [None]:
# Generate dataloader for test data
test_dataloader = EmmentalDataLoader(
    task_to_label_dict={ATTRIBUTE: "labels"},
    dataset=FonduerDataset(
        ATTRIBUTE, test_cands[0], F_test[0], emb_layer.word2id, 2
    ),
    split="test",
    batch_size=100,
    shuffle=False,
)

In [None]:
from electricity_utils import entity_level_f1 


test_preds = model.predict(test_dataloader, return_preds=True)
positive = np.where(np.array(test_preds["probs"][ATTRIBUTE])[:, TRUE] > 0.6)
true_pred = [test_cands[0][_] for _ in positive[0]]
%time (TP, FP, FN) = entity_level_f1(true_pred, gold_file, ATTRIBUTE, test_docs)