# Training Pipeline Notebook
Defines and deploys a training pipeline for transferring Open Addresses .csv files into CONLL .txt files.

In [1]:
import re
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners import DataflowRunner
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
from datetime import timedelta

import random
import google.auth
import pyarrow
import os

In [2]:
#import sys
#!{sys.executable} -m pip install pyspark
#nltk.download('averaged_perceptron_tagger')

### User-controlled variables
Change these variables for the data to be processed in your GCP setup

Input data should always be within a single subfolder under the input location, with country folders within that.  Beyond that, files may be under state or province subfolders.  Each file should represent a single municipality, if the file is for a province or a country wide area, it should be named 'countrywide.csv', 'provincewide.csv', or 'statewide.csv'.

.INPUT_LOCATION  
&nbsp; &nbsp; +--subfolder  
&nbsp; &nbsp; &nbsp; &nbsp; +--country1  
&nbsp; &nbsp; &nbsp; &nbsp; | &nbsp; +--state1  
&nbsp; &nbsp; &nbsp; &nbsp; | &nbsp; | &nbsp; +--city1.csv  
&nbsp; &nbsp; &nbsp; &nbsp; | &nbsp; | &nbsp; +--city2.csv  
&nbsp; &nbsp; &nbsp; &nbsp; | &nbsp; +--state2  
&nbsp; &nbsp; &nbsp; &nbsp; | &nbsp; | &nbsp; +--file1.csv  
&nbsp; &nbsp; &nbsp; &nbsp; +-country2  
&nbsp; &nbsp; &nbsp; &nbsp; | &nbsp; +--province1  
&nbsp; &nbsp; &nbsp; &nbsp; | &nbsp; +--countrywide.csv  
&nbsp; &nbsp; &nbsp; &nbsp; +-country3  
&nbsp; &nbsp; &nbsp; &nbsp; | &nbsp; +--countrywide.csv  
&nbsp; &nbsp; &nbsp; &nbsp; |-...


In [3]:
#Bucket ID
BUCKET = 'postal-parser-28'
#If using Maplequad, should be europe-west1 or europe-west2
REGION = 'europe-west1'
INPUT_LOCATION = "gs://%s/unprocessed-data/**" % BUCKET
OUTPUT_LOCATION = "gs://%s/parquet-sample" % BUCKET

#Data randomization probabilities. 
PROBABILITY_SHUFFLE=0.2
PROBABILITY_DELETE=0.2
PROBABILITY_DUPLICATE=0.2

# Labels as they appear in the files being loaded (should 1:1 map to HEADER)
FILE_HEADER=['LON','LAT','NUMBER','STREET','UNIT','CITY','DISTRICT','REGION','POSTCODE','ID','HASH']
# Labels you wish to be applied in the training data
HEADER=['lon','lat','house_number','road','unit','city','state_district','state','postcode','id','hash']
PIPELINE_OPTION_FLAGS = [
    "--requirements_file=requirements.txt"
]

### GCP Settings
Sets up GCP settings, best to change settings in the above cell instead.

In [4]:
output_training_location = "%s/training/part" % OUTPUT_LOCATION
output_testing_location = "%s/testing/part" % OUTPUT_LOCATION
dataflow_gcs_location = 'gs://%s/dataflow' % BUCKET
options = pipeline_options.PipelineOptions(PIPELINE_OPTION_FLAGS)

ib.options.capture_duration = timedelta(seconds=60)
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
options.view_as(GoogleCloudOptions).region = REGION
options.view_as(pipeline_options.SetupOptions).sdk_location = (
    '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' % 
    beam.version.__version__)
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

### Address Class
Represents a single address and facilitates the transition from structured address to free-form string.  Also assigns proper NER tags to each entity.

In [5]:
class AddressFunc(beam.DoFn):
    def init(self):
        probability_shuffle=0.2 
        probability_delete=0.2 
        probability_duplicate=0.2
        import random 
        
        #import nltk
        #nltk.download('averaged_perceptron_tagger')
        
        self.MAX_NUM_TAGS = 20
        self._set_order()
        if random.random() < probability_shuffle:
            self._randomize_order()
        self._duplicate_tags(probability_duplicate)
        self._delete_tags(probability_delete)
        self.ordered = False
        
    def _set_order(self):
        import random
        r = random.randint(0,2)
        if r == 0:
                new_order = ['house_number', 'road', 'city', 'city_district',
                         'state_district', 'state', 'postcode', 'house', 'level', 'unit', 'po_box',
                         'country']
        elif r == 1:
            new_order = ['house', 'house_number', 'po_box', 'road', 'city',
                         'city_district', 'state_district', 'state', 'postcode', 'level', 'unit',
                         'country']
        else:
            new_order = ['house', 'level', 'unit', 'po_box', 'house_number',
                         'road', 'city', 'city_district',
                         'state_district', 'state', 'postcode', 'country']
        self.ordered = False
        self.order = new_order

    def _randomize_order(self):
        import random
        random.shuffle(self.order)
        self.ordered = False

    def _delete_tags(self, _delete_probability):
        import random
        while random.random() < _delete_probability and len(self.order) > 1:
            del(self.order[random.randint(0, len(self.order)-1)])
        self.ordered = False

    def _duplicate_tags(self, _duplicate_probability):
        import random
        while random.random() < _duplicate_probability and len(self.order) < self.MAX_NUM_TAGS:
            item_to_be_duplicated = self.order[random.randint(0, len(self.order)-1)]
            self.order.insert(random.randint(0, len(self.order)), item_to_be_duplicated)
        self.ordered = False
    
    def _remove_extra_labels(self):
        # Description: Sorts csv_dict to create a list of dictionaries
        #   such that they are in the same order they would be in an
        #   address string written by a human.  Uses the order stored in class
        i = 0
        while i < len(self.order):
            if not self.order[i] in self.address_dict:
                del(self.order[i])
            else:
                i += 1
        self.ordered = True

    def _ner_tags(self):
        tags = []
        for header in self.order:
            value = re.split('[ _]',self.address_dict[header])
            tokens = []
            tokens = tokens + [word for word in value if word]
            for i in range(len(tokens)):
                if i == 0:
                    tags.append('B-' + header)
                else:
                    tags.append('I-' + header)
        return tags

    def _tokenize(self):
        tokens = []
        for header in self.order:
            value = re.split('[ _]', self.address_dict[header])
            tokens = tokens + [word for word in value if word]
        return tokens
    
    def _to_string(self):
        string_representation = ''
        for k in self.order:
            string_representation += self.address_dict[k] + ' '
        string_representation = string_representation.strip()
        return string_representation
    
    def _label(self):
        if not self.ordered:
            self._remove_extra_labels()
        return {'text': self._to_string(), 'labels': self._ner_tags() ,'tokens': self._tokenize()}
    
    def process(self, element):
        self.address_dict = element
        self.init()
        return [self._label()]

### Apache Beam PTransform and DoFn classes
Called throughout the pipeline execution

In [6]:
class FileReader(beam.PTransform):
    # Traverses GCP Storage and reads in any objects with prefix FILE_LOCATION and suffix '.csv'
    # Flattens all file reads into a single PCollection
    def __init__(self, FILE_LOCATION, pipeline_options):
        self._FILE_LOCATION = FILE_LOCATION
        self._options = pipeline_options
    
    def expand(self, pcoll):
        from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
        from apache_beam.io.filesystem import FileSystem
        from apache_beam.io.filesystem import FileMetadata
        from operator import add
        from functools import reduce
        
        gcs = GCSFileSystem(self._options)
        result = [m.metadata_list for m in gcs.match([self._FILE_LOCATION])]
        result = reduce(add, result)
        variables = ['p{}'.format(i) for i in range(len(result))]
        read_labels = ['Read file {}'.format(i) for i in range(len(result))]
        add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]
        
        return (
                [pcoll.pipeline 
                    | read_labels[i] >> beam.io.ReadFromText(result[i].path, skip_header_lines=1) 
                    | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path, self._FILE_LOCATION) 
                    for i in range(len(result)) if result[i].path.endswith('.csv')] 
                | 'Flatten PCollections' >> beam.Flatten() 
            )

class AddFilenamesFn(beam.DoFn):
    # ParDo to output a dict with filename and row
    def process(self, element, file_path, base_path):
        file_path_list = file_path.replace('.csv','').replace('_',' ').split("/")[len(base_path.split('/'))-2:]
        yield {'filename':file_path_list, 'row':element}

In [7]:
class InputFormatting(beam.PTransform):
    # Basic file formatting to extract data from files
    def __init__(self, header):
        self._header = header
    def expand(self, pcoll):
        return (
            pcoll
            | 'Parse CSV' >> beam.ParDo(ParseCSV())
            | 'Build Dictionary' >> beam.ParDo(ToDict(), self._header)
        )
        
class ParseCSV(beam.DoFn):    
    # Parses out a line of text as a csv line
    def process(self, element):
        import csv
        for line in csv.reader([element['row']], quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL, skipinitialspace=True):
            return [{'filename':element['filename'], 'row':line}]
        
class ToDict(beam.DoFn):
    # Breaks down a row to be a dictionary with header : entity
    def process(self, element, header):
        import re
        FILE_NAME_FIELDS=['country','state']
        line_dict = {}
        for i in range(min(len(element['filename'])-2, 2)):
            line_dict[FILE_NAME_FIELDS[i]] = element['filename'][i+1]
        file_name = element['filename'][-1]
        if not re.match('.*([0-9]|country|province|state|wide).*', file_name):
            line_dict['city'] = file_name
        
        for i in range(min(len(header),len(element['row']))):
            #TODO: Have entity headers be based on the header in the file rather than predefined
            val = element['row'][i]
            if val != '':
                line_dict[header[i]] = element['row'][i]
        return([line_dict])

In [8]:
class NERFormatting(beam.PTransform):
    #Formats the PCollection into a format to be taken in by the NER model
    def expand(self, pcoll):
        return (
            pcoll
            | 'To NER Inout' >> beam.ParDo(ToNER())
            | 'Partition' >> beam.Partition(self.by_random, 2)
        )
    def by_random(self, element, num_partitions):
        import random
        PERCENT_TRAINING_DATA = 0.8
        if random.random() < PERCENT_TRAINING_DATA:
            return 0
        return 1

class ToNER(beam.DoFn):
    #Changes the list of tokens and labels to the format needed for NER input for a given address
    #element: {'tokens': [5, Main, St., ...], 'labels': [B-house_number, B-road, I-road]}
    def process(self, element):
        tokens, labels = element['tokens'], element['labels']
        data=[]
        lastBegin = 0
        for i in range(len(tokens)):
            a = {
                'annotatorType' :  "named_entity",
                'begin' : lastBegin,
                'end' : lastBegin + len(tokens[i]) - 1,
                'result' : labels[i],
                'metadata' :  {'word': tokens[i]},
                'embeddings' : [0.00]
            }
            lastBegin += len(tokens[i])+1
            data.append(a)
        return [{'text':element['text'], 'label':data}]

### Pipeline Declaration
Defines the path through which the pipeline executes

In [9]:
p = beam.Pipeline(InteractiveRunner())

train, test = (
    p 
    | 'Read files' >> FileReader(INPUT_LOCATION, options)
    | 'Format Input' >> InputFormatting(HEADER)
    | 'Order Adress' >> beam.ParDo(AddressFunc())
    | 'To NER Input' >> NERFormatting()
    )

In [90]:
# Outputs training split of data to training parquet
_ = train | 'Write train parquet' >> beam.io.parquetio.WriteToParquet(
        file_path_prefix=output_training_location,
        schema=pyarrow.schema(
            [('text',pyarrow.string()),
             ('label',pyarrow.list_(
                 pyarrow.struct([
                    ('annotatorType', pyarrow.string()),
                    ('begin', pyarrow.int32()),
                    ('end', pyarrow.int32()),
                    ('result', pyarrow.string()),
                    ('metadata',pyarrow.struct([('word', pyarrow.string())])),
                    ('embeddings', pyarrow.list_(pyarrow.float64()))
                     ])))]
        ),
        file_name_suffix='.parquet')

#Outputs testing split of data to testing parquet
_ = test | 'Write test parquet' >> beam.io.parquetio.WriteToParquet(
        file_path_prefix=output_testing_location,
        schema=pyarrow.schema(
            [('text',pyarrow.string()),
             ('label',pyarrow.list_(
                 pyarrow.struct([
                    ('annotatorType', pyarrow.string()),
                    ('begin', pyarrow.int32()),
                    ('end', pyarrow.int32()),
                    ('result', pyarrow.string()),
                    ('metadata',pyarrow.struct([('word', pyarrow.string())])),
                    ('embeddings', pyarrow.list_(pyarrow.float64()))
                     ])))]
        ),
        file_name_suffix='.parquet')

### Run pipeline on Dataflow

In [113]:
ib.show_graph(p)

In [115]:
pipeline_result = DataflowRunner().run_pipeline(p, options=options)

In [18]:
from IPython.core.display import display, HTML
url = ('https://console.cloud.google.com/dataflow/jobs/%s/%s?project=%s' % 
      (pipeline_result._job.location, pipeline_result._job.id, pipeline_result._job.projectId))
display(HTML('Click <a href="%s" target="_new">here</a> for the details of your Dataflow job!' % url))

### Show PCollection for testing purposes
Runs locally, don't run this on large datasets

In [10]:
#ib.show(train)