# Parallel parsing of xml-files with Dask and ElementTree

We have two xml-files (but it might as well be a million), and we want to parse them in parallel with Dask. We use ElementTree to parse the xml.

1. Dask Bags: https://dask.readthedocs.io/en/latest/bag.html
2. ElementTree: https://docs.python.org/3/library/xml.etree.elementtree.html

This borrows heavily from the dask example at https://examples.dask.org/bag.html

In [86]:
#import
import xml.etree.ElementTree as ET
from dask.distributed import Client, progress
import dask.bag as db
import glob
import dask.dataframe as dd
import shutil
import zipfile
import csv

Setting up the Dask thread pool:

In [87]:
client = Client(n_workers=4, threads_per_worker=1)
client

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


0,1
Client  Scheduler: tcp://127.0.0.1:34041  Dashboard: http://127.0.0.1:45419/status,Cluster  Workers: 4  Cores: 4  Memory: 2.10 GB


We need to treat each file as an object and process it separately, which makes it hard to use the `db.from_text` method because it treats each line as an object. Instead we define a function to parse and return the root element of a file, and call this on a list of xml-files.

In [88]:
def parse_xmlfile(fname):
    x = ET.parse(fname).getroot()
    x.attrib['file'] = fname
    for e in x.findall('country'):
        e.attrib['file'] = fname
    return(x)

Finding each country in the elements and checking how many elements we have. Should be equal to the number of files (same as if we ran this on `b`), but each element should contain multiple entries.

Create a function to extract the info we want from each country-element, and return an array of json-elements (dicts). Since each element in `elm` is a list of elements, this gets a little nested as shown below.

In [89]:
def make_rows(cntrs):
    return([
        {'name': x.get('name'), 
         'rank': x.find('rank').text,
        'file': x.get('file')} for x in cntrs])

Call the `make_rows` function on the elements, flatten it to melt the two arrays (that stem from the two files) into one, and actually compute the result (Dask is lazy).

## Loop

In [90]:
zips = glob.iglob('folder_*/*.zip')

for file in zips:
    import zipfile
    path_to_zip_file = file
    directory_to_extract_to = './tmp'
    zip_ref = zipfile.ZipFile(path_to_zip_file, 'r')
    zip_ref.extractall(directory_to_extract_to)
    zip_ref.close()
    
    files = glob.iglob(directory_to_extract_to + '/**/*.xml')
    b = db.from_sequence(files).map(parse_xmlfile)
    elm = db.map(lambda x: x.findall('country'), b)
    ddf = db.map(make_rows, elm).flatten().to_dataframe()
    
    ddf.to_csv('extracts/' + "_".join(file.split('.')[0:-1]), index=False, sep='|', quoting=csv.QUOTE_NONNUMERIC)
    
    shutil.rmtree(directory_to_extract_to)