This notebook provides an environment for testing changes to the HDFS path parser we use to classify and track the contents of HDFS.

To see how things will change, we can compare what we get against the current production results: http://intranet.wa.bl.uk/ukwa-reports/reports/hdfs/

First we need to get an up-to-date copy of the full listing of files on HDFS. These are generated daily and placed on HDFS.

In [24]:
!ls -lh /mnt/gluster/ingest/task-state/hdfs/current/

total 2.0G
-rw-r--r--. 1 root root 525M Jul 11 01:12 current-hdfs-all-files-list.csv
-rw-r--r--. 1 root root  37M Jul 11 01:48 current-hdfs-all-files-list.csv.gz
-rw-r--r--. 1 root root 525M Jul 10 01:12 current-hdfs-all-files-list.csv.old
-rw-r--r--. 1 root root 928M Jul 11 01:44 current-hdfs-parsed-paths.csv


In [25]:
# This cell is tagged `parameters` (use View > Cell Toolbar > Tags to modify)
file_list_csv = '/mnt/gluster/ingest/task-state/hdfs/current/current-hdfs-all-files-list.csv'
parsed_files_csv = 'test/hdfs-parsed-paths.csv'

Now we can run the path parser on it:

In [27]:
import os
import sys
import re
import csv
import enum
import datetime
import logging

# Set up a logger to give some feedback:
logger = logging.getLogger()
    
# Expected headers for the raw HDFS file list CSV, see ListAllFilesOnHDFSToLocalFile
file_list_headers = ['permissions', 'number_of_replicas', 'userid', 'groupid', 'filesize', 'modified_at', 'filename']


class HdfsPathParser(object):
    """
    This class takes a HDFS file path and determines what, if any, crawl it belongs to, etc.
    """

    @staticmethod
    def field_names():
        """This returns the extended set of field names that this class derives from the basic listing."""
        return ['recognised', 'collection', 'stream','job', 'layout', 'kind', 'permissions', 'number_of_replicas', 'user_id', 'group_id', 'file_size', 'modified_at', 'timestamp', 'file_path', 'file_name', 'file_ext']

    def __init__(self, item):
        """
        Given a string containing the absolute HDFS file path, parse it to work our what kind of thing it is.

        Determines crawl job, launch, kind of file, etc.

        For WCT-era selective content, the job is the Target ID and the launch is the Instance ID.

        :param file_path:
        """

        # Perform basic processing:
        # ------------------------------------------------
        # To be captured later
        self.recognised = False
        self.collection = None
        self.stream = None
        self.layout = None
        self.job = None
        self.kind = 'unknown'
        # From the item listing:
        self.permissions = item['permissions']
        self.number_of_replicas = item['number_of_replicas']
        self.user_id = item['userid']
        self.group_id = item['groupid']
        self.file_size = item['filesize']
        self.modified_at = item['modified_at']
        self.file_path = item['filename']
        # Derived:
        self.file_name = os.path.basename(self.file_path)
        first_dot_at = self.file_name.find('.')
        if first_dot_at != -1:
            self.file_ext = self.file_name[first_dot_at:]
        else:
            self.file_ext = None
        self.timestamp_datetime = datetime.datetime.strptime(item['modified_at'], "%Y-%m-%dT%H:%M:%S")
        self.timestamp = self.timestamp_datetime.isoformat()
        self.launch_datetime = None

        # Look for different filename patterns:
        # ------------------------------------------------
        self.analyse_file_path()

        # Now Add data based on file kind and file name...
        # ------------------------------------------------

        # Distinguish 'bad' crawl files, e.g. warc.gz.open files that are down as warcs
        if self.kind == 'warcs':
            if not self.file_name.endswith(".warc.gz"):
                # The older selective crawls allowed CDX files alongside the WARCs:
                if self.collection == 'selective' and self.file_name.endswith(".warc.cdx"):
                    self.kind = 'cdx'
                else:
                    self.kind = 'warcs-invalid'
            else:
                # Attempt to parse file timestamp out of filename,
                # Store ISO formatted date in self.timestamp, datetime object in self.timestamp_datetime
                mwarc = re.search('^.*-([12][0-9]{16})-.*\.warc\.gz$', self.file_name)
                if mwarc:
                    self.timestamp_datetime = datetime.datetime.strptime(mwarc.group(1), "%Y%m%d%H%M%S%f")
                    self.timestamp = self.timestamp_datetime.isoformat()
                else:
                    if self.stream and self.launch_datetime:
                        # fall back on launch datetime:
                        self.timestamp_datetime = self.launch_datetime
                        self.timestamp = self.timestamp_datetime.isoformat()

        # Distinguish crawl logs from other logs...
        if self.kind == 'logs':
            if self.file_name.startswith("crawl.log"):
                self.kind = 'crawl-logs'

    def analyse_file_path(self):
        """
        This function analyses the file path to classify the item.
        """
        
        #
        # Selective era layout /data/<target-id>/<instance-id>/<kind>
        #
        if re.search('^/data/', self.file_path ):
            self.layout = 'wct'
            self.collection = 'selective'
            self.stream = CrawlStream.selective
            mby  = re.search('^/data/([0-9]+)/([0-9]+)/(DLX/|Logs/|WARCS/|)([^\/]+)$', self.file_path)
            if mby:
                self.recognised = True
                # In this case the job is the Target ID and the launch is the Instance ID:
                (self.job, self.launch, self.kind, self.file_name) = mby.groups()
                self.kind = self.kind.lower().strip('/')
                if self.kind == '':
                    self.kind = 'unknown'
                self.launch_datetime = None
                
        # 
        # First NPLD era file layout /heritrix/output/(warcs|viral|logs)/<job>...
        #
        elif re.search('^/heritrix/output/(warcs|viral|logs)/.*', self.file_path ):
            self.layout = 'npld-2013'
            self.collection = 'npld'
            # Original domain-crawl layout: kind/job (need to look for this first)
            mdc  = re.search('^/heritrix/output/(warcs|viral|logs)/(dc|crawl)[0-3]\-([0-9]{8}|[0-9]{14})/([^\/]+)$', self.file_path)
            # original frequent crawl layout: kind/job/launch-id
            mfc  = re.search('^/heritrix/output/(warcs|viral|logs)/([a-z\-0-9]+)[-/]([0-9]{12,14})/([^\/]+)$', self.file_path)
            if mdc:
                self.recognised = True
                self.stream = CrawlStream.domain
                (self.kind, self.job, self.launch, self.file_name) = mdc.groups()
                self.job = 'dc%s' % self.launch[0:4] # Overriding old job name.
                # Cope with variation in folder naming - all DC crawlers run as a single launch on the same day:
                if len(self.launch) > 8:
                    self.launch = self.launch[0:8]
                self.launch_datetime = datetime.datetime.strptime(self.launch, "%Y%m%d")
            elif mfc:
                self.recognised = True
                self.stream = CrawlStream.frequent
                (self.kind, self.job, self.launch, self.file_name) = mfc.groups()
                self.launch_datetime = datetime.datetime.strptime(self.launch, "%Y%m%d%H%M%S")

        # 
        # Second NPLD era file layout /heritrix/output/<job>/<launch>(warcs|viral|logs)/...
        #
        elif re.search('^/heritrix/output/(dc2.+|frequent.*)/.*', self.file_path ):
            self.layout = 'npld-2018'
            self.collection = 'npld'
            # 2019 frequent-crawl layout: job/launch-id/kind (same as DC now?
            mfc2 = re.search('^/heritrix/output/([a-z\-0-9]+)/([0-9]{12,14})[^/]*/(warcs|viral|logs)/([^\/]+)$', self.file_path)
            if mfc2:
                self.recognised = True
                (self.job, self.launch, self.kind, self.file_name) = mfc2.groups()
                # Recognise domain crawls:
                if self.job.startswith('dc2'):
                    self.stream = CrawlStream.domain
                else:
                    self.stream = CrawlStream.frequent
                # Parse a launch datetime
                self.launch_datetime = datetime.datetime.strptime(self.launch, "%Y%m%d%H%M%S")
                
        # 
        # Files stored but intended for deletion.
        #
        elif self.file_path.startswith('/_to_be_deleted/'):
            self.recognised = True
            self.kind = 'to-be-deleted'
            self.file_name = os.path.basename(self.file_path)
            
        #
        # If un-matched, default to classifying by top-level folder.
        #
        else:
            self.collection = self.file_path.split(os.path.sep)[1]
            self.file_name = os.path.basename(self.file_path)
        

    def to_dict(self):
        d = dict()
        for f in self.field_names():
            d[f] = str(getattr(self,f,""))
        return d


class CrawlStream(enum.Enum):
    """
    An enumeration of the different crawl streams.
    """

    selective = 1
    """'selective' is permissions-based collection. e.g. Pre-NPLD collections."""

    frequent = 2
    """ 'frequent' covers NPLD crawls of curated sites."""

    domain = 3
    """ 'domain' refers to NPLD domain crawls."""

    def __str__(self):
        return self.name

# Now parse the input and classify the entries:
lines = 0
with open(parsed_files_csv, 'w') as fout:
    # Set up output file:
    writer = csv.DictWriter(fout, fieldnames=HdfsPathParser.field_names())
    writer.writeheader()
    with open(file_list_csv, 'r') as fin:
        reader = csv.DictReader(fin, fieldnames=file_list_headers)
        for item in reader:
            # Skip the first line:
            if item['filesize'] == 'filesize':
                continue
            # Output the enriched version:
            p = HdfsPathParser(item)
            writer.writerow(p.to_dict())
            lines += 1
            if lines%500000 == 0:
                logger.warning("Processed %i lines..." % lines)

logger.warning("Done. Processed a total of %i lines." % lines)


Processed 500000 lines...
Processed 1000000 lines...
Processed 1500000 lines...
Processed 2000000 lines...
Processed 2500000 lines...
Processed 3000000 lines...
Processed 3500000 lines...
Done. Processed a total of 3764529 lines.


In [28]:
import pandas as pd
from data_formatters import humanbytes

# Enable inline plots
%matplotlib inline

# Do not truncate file path display:
pd.set_option('display.max_colwidth', -1)

# Show more rows
pd.options.display.max_rows = 4000

# Load the data:
df = pd.read_csv(parsed_files_csv)

# Interpret modified_at and timestamp as a datetimes:
df.timestamp = pd.to_datetime(df.timestamp)

# Ignore the to-be-deleted data:
df = df.loc[df['kind'] != 'to-be-deleted']

# Show us the data:
df.head()

Unnamed: 0,recognised,collection,stream,job,layout,kind,permissions,number_of_replicas,user_id,group_id,file_size,modified_at,timestamp,file_path,file_name,file_ext
0,False,0_original,,,,unknown,-rw-r--r--,3,hdfs,supergroup,206979,2016-05-24T10:55:00,2016-05-24 10:55:00,/0_original/fc/crawler03/heritrix/output/images/1365759028.jpg,1365759028.jpg,.jpg
1,False,0_original,,,,unknown,-rw-r--r--,3,hdfs,supergroup,259004,2016-05-24T10:55:00,2016-05-24 10:55:00,/0_original/fc/crawler03/heritrix/output/images/1365759053.jpg,1365759053.jpg,.jpg
2,False,0_original,,,,unknown,-rw-r--r--,3,hdfs,supergroup,137950,2016-05-24T10:56:00,2016-05-24 10:56:00,/0_original/fc/crawler03/heritrix/output/images/1365759065.jpg,1365759065.jpg,.jpg
3,False,0_original,,,,unknown,-rw-r--r--,3,hdfs,supergroup,157173,2016-05-24T10:56:00,2016-05-24 10:56:00,/0_original/fc/crawler03/heritrix/output/images/1365759068.jpg,1365759068.jpg,.jpg
4,False,0_original,,,,unknown,-rw-r--r--,3,hdfs,supergroup,121905,2016-05-24T10:56:00,2016-05-24 10:56:00,/0_original/fc/crawler03/heritrix/output/images/1365759074.jpg,1365759074.jpg,.jpg


In [29]:
# Find the largest ten files:
largest = df.nlargest(20, 'file_size')[['file_size','file_path']]
# Report the size as a human-readable number:
largest['file_size'] = largest['file_size'].apply(humanbytes)
# Show the result:
largest

Unnamed: 0,file_size,file_path
1734872,171.97 GB,/heritrix/output/logs/crawl2-20140610125818/scope.log.cp00016-20141104144708.gz
1624639,167.82 GB,/datasets/domain-crawl-logs.zip
1734786,163.90 GB,/heritrix/output/logs/crawl0-20140610125756/scope.log.cp00013-20141104100709.gz
1734826,157.97 GB,/heritrix/output/logs/crawl1-20140610125808/scope.log.cp00013-20141104144329.gz
1734881,118.22 GB,/heritrix/output/logs/crawl2-20140610125818/scope.log.cp00043-20140922110002.gz
1734832,112.95 GB,/heritrix/output/logs/crawl1-20140610125808/scope.log.cp00042-20140922105921.gz
1734886,107.76 GB,/heritrix/output/logs/crawl3-20130412145049/crawl.log.20130606081851
1734729,103.44 GB,/heritrix/output/logs/crawl0-20130412144423/crawl.log
1734827,102.28 GB,/heritrix/output/logs/crawl1-20140610125808/scope.log.cp00022-20141129182502.gz
1734792,102.02 GB,/heritrix/output/logs/crawl1-20130412144637/crawl.log


In [30]:
# Summarise file counts by file extension:
#df.groupby(df.file_ext).file_ext.count().reset_index(name='count')
df.groupby(df.file_ext).file_ext.count().nlargest(20).reset_index(name='count')

Unnamed: 0,file_ext,count
0,.warc.gz,806034
1,.arc.gz,509746
2,.arc.os.cdx.gz,465972
3,.bl.uk.warc.gz,196276
4,.bl.uk.warc.dlx,195737
5,.log,151926
6,.xml,148755
7,,137759
8,.bl.uk.warc.cdx,125331
9,.warc.dlx,96435


In [31]:
# Summarise by collection and year:
df.groupby([df.collection, df.timestamp.dt.year]).file_size.sum().apply(humanbytes).unstack().fillna("")

timestamp,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019
collection,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
0_original,,,,,,,22.36 TB,33.80 GB,,
1_data,,,,,1.14 GB,20.12 GB,162.35 GB,1.72 GB,15.90 GB,319.82 GB
2_backups,,,,,,,,11.21 GB,47.63 GB,16.85 GB
9_processing,,,,,,,165.58 GB,837.87 MB,16.64 GB,11.90 GB
blit,,,,,,,8.22 GB,106.80 GB,5.50 GB,399.98 MB
crawls,,,12.80 GB,,,,,,,
datasets,,,,,308.06 GB,,,200.04 GB,,
heritrix,,,,,3.68 TB,6.07 TB,555.70 GB,593.04 GB,,2.43 TB
ia,,31.27 TB,4.06 TB,,30.04 TB,,,23.16 MB,,
logs,,,,,10.33 GB,967.44 MB,216.11 MB,269.41 MB,468.74 MB,1.74 GB


In [32]:
# Overall summary of material:
bd = df.loc[df.stream != 'None'].groupby([df.collection, df.stream, df.timestamp.dt.year, df.kind]).file_size.sum().apply(humanbytes)
#bd['warcs'] = df.loc[df.kind == 'warcs'].groupby([df.stream, df.timestamp.dt.year]).file_size.transform('sum')
bd.reset_index()
bd.unstack(fill_value='')

Unnamed: 0_level_0,Unnamed: 1_level_0,kind,cdx,crawl-logs,dlx,logs,unknown,viral,warcs,warcs-invalid
collection,stream,timestamp,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
npld,domain,2013,,497.88 GB,,225.81 GB,,4.38 GB,33.90 TB,
npld,domain,2014,,,,1.40 TB,,4.88 GB,62.13 TB,
npld,domain,2015,,235.61 GB,,337.29 GB,,4.57 GB,75.49 TB,
npld,domain,2016,,1.02 TB,,889.37 GB,,8.01 GB,103.80 TB,
npld,domain,2017,,827.63 GB,,650.10 GB,,4.44 GB,76.97 TB,
npld,domain,2018,,1.42 TB,,,,2.08 GB,57.58 TB,
npld,domain,2019,,158.13 GB,,,,341.54 MB,11.10 TB,
npld,frequent,2013,,,,784.34 MB,,99.04 MB,5.25 TB,
npld,frequent,2014,,,,117.39 GB,,34.27 MB,9.69 TB,
npld,frequent,2015,,,,74.77 GB,,271.95 MB,14.37 TB,


Manual Classification Spot Check
-------------------------------------------------

In at attempt to ensure we've understood the data, we can take the first items of each class and take a look at them, to check the classification makes sense.

This Pandas code only looks at data that's been classified under a content stream. It groups the results by collection, stream, layout, year and kind, and then randomly samples a couple of items for inspection.

In [34]:
bd = df.loc[df.stream != 'None'].groupby([df.collection, df.stream, df.layout, df.timestamp.dt.year, df.kind]).apply(lambda x: x.sample(2, replace=True))
bd['file_path'].reset_index()

Unnamed: 0,collection,stream,layout,timestamp,kind,level_5,file_path
0,npld,domain,npld-2013,2013,crawl-logs,1734729,/heritrix/output/logs/crawl0-20130412144423/crawl.log
1,npld,domain,npld-2013,2013,crawl-logs,1734730,/heritrix/output/logs/crawl0-20130412144423/crawl.log.20130423110643
2,npld,domain,npld-2013,2013,logs,1734835,/heritrix/output/logs/crawl2-20130412144921/crawl2-20130412144921.zip
3,npld,domain,npld-2013,2013,logs,1734835,/heritrix/output/logs/crawl2-20130412144921/crawl2-20130412144921.zip
4,npld,domain,npld-2013,2013,viral,1915650,/heritrix/output/viral/crawl0-20130412144423/BL-20130412160848476-00000-23518~crawler02~8443.warc.gz
5,npld,domain,npld-2013,2013,viral,1915854,/heritrix/output/viral/crawl3-20130412145049/BL-20130610215522877-00000-8769~crawler02~8446.warc.gz
6,npld,domain,npld-2013,2013,warcs,1922594,/heritrix/output/warcs/crawl0-20130412144423/BL-20130413005409078-00070-23518~crawler02~8443.warc.gz
7,npld,domain,npld-2013,2013,warcs,1949788,/heritrix/output/warcs/crawl1-20130412144637/BL-20130416121014319-00950-23604~crawler02~8444.warc.gz
8,npld,domain,npld-2013,2014,logs,1734822,/heritrix/output/logs/crawl1-20140610125808/scope.log.cp00004-20140621021729.gz
9,npld,domain,npld-2013,2014,logs,1734877,/heritrix/output/logs/crawl2-20140610125818/scope.log.cp00030-20141208141541.gz


Timestamp Checks
----------------------------

Processing the data downstream, we came across this event.

   duplicate key value (filename,job_name,job_launch)=('crawl.log','dc2019','20190524') violates unique constraint "primary"
   
For more recent crawls, the fine-grained timestamp should not be that short a string.


In [41]:
df.loc[df['job'] == 'dc2019'].head()

Unnamed: 0,recognised,collection,stream,job,layout,kind,permissions,number_of_replicas,user_id,group_id,file_size,modified_at,timestamp,file_path,file_name,file_ext
1689117,True,npld,domain,dc2019,npld-2018,crawl-logs,-rw-r--r--,3,hdfs,supergroup,0,2019-06-19T11:35:00,2019-06-19 11:35:00,/heritrix/output/dc2019/20190524140702/logs/crawl.log,crawl.log,.log
1689118,True,npld,domain,dc2019,npld-2018,crawl-logs,-rw-r--r--,3,hdfs,supergroup,0,2019-06-20T16:44:00,2019-06-20 16:44:00,/heritrix/output/dc2019/20190524140702/logs/crawl.log.cp00001-20190524141211,crawl.log.cp00001-20190524141211,.log.cp00001-20190524141211
1689119,True,npld,domain,dc2019,npld-2018,crawl-logs,-rw-r--r--,3,hdfs,supergroup,860,2019-06-19T11:35:00,2019-06-19 11:35:00,/heritrix/output/dc2019/20190524141331/logs/crawl.log,crawl.log,.log
1689120,True,npld,domain,dc2019,npld-2018,crawl-logs,-rw-r--r--,3,hdfs,supergroup,0,2019-06-20T16:45:00,2019-06-20 16:45:00,/heritrix/output/dc2019/20190524141331/logs/crawl.log.cp00001-20190525141333,crawl.log.cp00001-20190525141333,.log.cp00001-20190525141333
1689121,True,npld,domain,dc2019,npld-2018,crawl-logs,-rw-r--r--,3,hdfs,supergroup,0,2019-06-20T16:45:00,2019-06-20 16:45:00,/heritrix/output/dc2019/20190524141331/logs/crawl.log.cp00002-20190526141333,crawl.log.cp00002-20190526141333,.log.cp00002-20190526141333
