In [59]:
import sys
import logging
from pathlib import Path
from contextlib import contextmanager

from collections import defaultdict, deque, namedtuple
# from functools import partial

import multiprocessing

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

import ipywidgets as widgets
from ipywidgets import Layout, Box
from IPython.display import display

import humanfriendly
import pandas as pd

from libratom.utils.pff import PffArchive

### Set up spacy

In [42]:
import spacy
nlp = spacy.load("en_core_web_sm")

### Log settings

In [43]:
# https://ipywidgets.readthedocs.io/en/stable/examples/Output%20Widget.html#Integrating-output-widgets-with-the-logging-module
class OutputWidgetHandler(logging.Handler):
    """ Custom logging handler sending logs to an output widget """

    def __init__(self, *args, **kwargs):
        super(OutputWidgetHandler, self).__init__(*args, **kwargs)
        layout = {
            'display': 'flex',
            'border': '1px solid lightgray',
        }
        self.out = widgets.Output(layout=layout)

    def emit(self, record):
        """ Overload of logging.Handler method """
        new_output = {
            'name': 'stdout',
            'output_type': 'stream',
            'text': f'{self.format(record)}\n'
        }
        self.out.outputs = (new_output, ) + self.out.outputs

    def show_logs(self):
        """ Show the logs """
        display(self.out)

    def clear_logs(self):
        """ Clear the current logs """
        self.out.clear_output()

logger = logging.getLogger(__name__)
handler = OutputWidgetHandler()
handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
logger.addHandler(handler)
logger.setLevel(logging.INFO)

### Location of PST files

In [44]:
# Edit as appropriate
# CACHED_ENRON_DATA_DIR = Path("/tmp/libratom/test_data/RevisedEDRMv1_Complete")
CACHED_ENRON_DATA_DIR = Path("/tmp/libratom/test_data/RevisedEDRMv1_Complete/andy_zipper")
# CACHED_ENRON_DATA_DIR = Path("/tmp/libratom/test_data/RevisedEDRMv1_Complete/jason_wolfe")

### Session variables

In [45]:
# Generate the list of files to know how many there are
files = list(CACHED_ENRON_DATA_DIR.glob('**/*.pst'))

# Overall report
report = defaultdict(int)

# List of entities to display 
ents_sample = deque(maxlen=10)

### Database setup

In [70]:
engine = create_engine('sqlite:///ner.db')
Session = sessionmaker(bind=engine)

# connection = engine.connect()


In [71]:
Base = declarative_base()

class Entity(Base):
    __tablename__ = 'entities'
    
    id = Column(Integer, primary_key=True)
    text = Column(String)
    label_ = Column(String)
    filename = Column(String)

Base.metadata.create_all(engine)

In [72]:
Entity.__table__

Table('entities', MetaData(bind=None), Column('id', Integer(), table=<entities>, primary_key=True, nullable=False), Column('text', String(), table=<entities>), Column('label_', String(), table=<entities>), Column('filename', String(), table=<entities>), schema=None)

### Rendering

In [73]:
# Layouts
report_box_layout = Layout(
    display='flex',
    width='50%',
    margin='0px 0px 4px 0px',
    flex_flow='column',
    border='1px solid lightblue',
    justify_content='center',
    align_items='center'
)

entities_box_layout = Layout(
    width='50%',
    margin='0px 0px 4px 0px',
    border='1px solid lightblue',
)

### Utility functions

In [74]:
def update_report(out, data):
    """Refreshes the report output widget
    """

    out.clear_output(wait=True)
    
    df_data = {key: [value] for key, value in data.items()}
    df_data['Size'] = [humanfriendly.format_size(data['Size'])]
    
    with out:
        display(pd.DataFrame(df_data, index=['Total']))


def update_entities(out, data):
    """Refreshes the entities output widget
    """

    out.clear_output(wait=True)
    
    with out:
        print('Sample of entities found')
        print('------------------------')

        for ent in data:
            print(f'{ent.text.strip()}: {ent.label_}')

In [75]:
def process_file(file):
    
    try:
        with PffArchive(file) as archive:
            # for message in archive.messages():

            return f'{multiprocessing.current_process()}: {file}, {len(list(archive.messages()))} messages'

    except Exception as exc:
        return f'{multiprocessing.current_process()}: {file}, Error: {exc}'
        
# def process_message(message):
#     try:
#         # Extract entities from the message
#         doc = nlp(archive.format_message(message))
#         entities = doc.ents
# #         report['Entities'] += len(entities)

#         # Show up to 10 entities
# #         ents_sample.extendleft(entities)


#         print(multiprocessing.current_process())


#         # Refresh report widget every 10 messages
# #         if not report['Messages'] % 10:
# #             update_report(report_out, report)
# #             update_entities(ents_out, ents_sample)

#     except Exception as exc:
#         # Log error and move on to the next message
#         report['Errors'] += 1
#         logger.exception(exc)

##### Message generator

In [76]:
def get_messages(files):
    # Iterate over files
    for pst_file in files:
        try:
            with PffArchive(pst_file) as archive:                
                # Iterate over messages
                for message in archive.messages():
                    try:
                        yield pst_file.name, archive.format_message(message)
                    except Exception as exc:
                        # Log and move on to the next message
                        logger.exception(exc)
        except Exception as exc:
            # Log and move on to the next file
            logger.exception(exc)

In [77]:
# Rename Entity once we have namespacing
# EntityResult = namedtuple('EntityResult',['text','label_','filename'])

def process_message(filename: str, message: str):
    # Return basic types to avoid serialization issues

    try:
        # Extract entities from the message
        doc = nlp(message)
#         entities = [EntityResult(ent.text, ent.label_, filename) for ent in doc.ents]
        
        entities = [{'text': ent.text, 'label_': ent.label_, 'filename': filename} for ent in doc.ents]

        return entities, None

    except Exception as exc:
        return None, str(exc)

In [78]:
@contextmanager
def open_db_session():

    session = Session()
    try:
        yield session
        session.commit()
    except:
        session.rollback()
        raise
    finally:
        session.close()

### Initialize progress and report widgets

In [86]:
handler.clear_logs()

In [87]:
# Progress bar for number of files processed
progress = widgets.IntProgress(
    value=0,
    min=0,
    max=len(files),
    step=1,
    description='Completed:',
    bar_style='',
    orientation='horizontal'
)

# Container for the report
report_out = widgets.Output()

# Container for the entities sample
ents_out = widgets.Output()

### Message based entities extraction

In [88]:
%%time

# Can't pickle lambdas
def job(args):
    return process_message(*args)

if __name__ == '__main__':
    with multiprocessing.Pool() as pool, open_db_session() as session:
        for entities, exc in pool.imap(job, get_messages(files), chunksize=100):
            if exc:
                logger.error(exc)
            
            for entity in entities:
                new_ent = Entity(**entity)
                session.add(new_ent)
                

# len(res)

CPU times: user 1min, sys: 1.81 s, total: 1min 2s
Wall time: 3min 25s


### File based entities extraction

In [None]:
# %%time

# if __name__ == '__main__':
#     with multiprocessing.Pool() as pool:
#         for res in pool.map(process_file, files):
#             print(res)


In [None]:
# %%time

# handler.clear_logs()

# # Start displaying results
# display(Box(children=[report_out, progress], layout=report_box_layout))
# display(Box(children=[ents_out], layout=entities_box_layout))

# # Iterate over files
# for pst_file in files:
#     # Update report
#     report['Files'] += 1    
#     report['Size'] += pst_file.stat().st_size
    
#     try:
#         # Iterate over messages
#         with PffArchive(pst_file) as archive:
#             for message in archive.messages():
#                 # Update report
#                 report['Messages'] += 1
                
#                 try:
#                     # Extract entities from the message
#                     doc = nlp(archive.format_message(message))
#                     entities = doc.ents
#                     report['Entities'] += len(entities)
                    
#                     # Show up to 10 entities
#                     ents_sample.extendleft(entities)

#                     # Refresh report widget every 10 messages
#                     if not report['Messages'] % 10:
#                         update_report(report_out, report)
#                         update_entities(ents_out, ents_sample)

#                 except Exception as exc:
#                     # Log error and move on to the next message
#                     report['Errors'] += 1
#                     logger.exception(exc)

#     except Exception as exc:
#         # Log error and move on to the next file
#         report['Errors'] += 1
#         logger.exception(exc)
    
#     # Update progress bar
#     progress.value += 1
    
#     # Refresh report widget
#     update_report(report_out, report)

In [4]:
# Print out errors, if any 
handler.show_logs()

Output(layout=Layout(border='1px solid lightgray', display='flex'))

In [82]:
session.query(Entity).count()

362475

In [85]:
for ent in session.query(Entity)[:100]:
    print(ent.text, ent.label_)

12 Jan 2001 DATE
05:18:00 TIME
12 Jan 2001 DATE
05:18:00 TIME
MIME-Version: 1.0
 LAW
Content-Type ORG
Carl Carter PERSON
Carl Carter PERSON
Andy Zipper PERSON
Andy Zipper PERSON
Louise PERSON
Louise Kitchen PERSON
Justin Rostant PERSON
Trade Counts ORG
Category GPE
January 11, 2001
 DATE
Body-Type ORG
EML ORG
PST ORG
NSF ORG
ZL Technologies, ORG
ZL Technologies, ORG
11 Jan 2001 DATE
05:11:00 TIME
11 Jan 2001 DATE
05:11:00 TIME
ESBKYGKRH02BR3TIKC211PTTGIYWCWVQB@zlsvr22 NORP
MIME-Version ORG
1.0 CARDINAL
Content-Type ORG
Justin Rostant PERSON
Andy Zipper PERSON
Andy Zipper PERSON
Louise PERSON
Louise Kitchen PERSON
Trade Counts ORG
Category GPE
January 10, 2001
 DATE
Body-Type ORG
EML ORG
PST ORG
NSF ORG
ZL Technologies, ORG
ZL Technologies, ORG
17 Jan 2001 DATE
05:02:00 TIME
17 Jan 2001 DATE
05:02:00 TIME
MIME-Version LAW
1.0 CARDINAL
Content-Type ORG
Justin Rostant PERSON
Andy Zipper PERSON
Andy Zipper PERSON
Louise PERSON
Louise Kitchen PERSON
Trade Counts ORG
Category GPE
January 16,