# WARC files metadata management using Apache Atlas

**Web Archive** ([WARC](http://bibnum.bnf.fr/WARC/)) is a file format used by **web crawlers** to store data harvested from the Internet. A WARC file is composed of a **sequence of records**, where each record contains either a text header or a data block representing a **web resource** (cf. image below).

This tutorial shows you how to use [Apache Atlas](http://atlas.apache.org/) for modelling: 

* WARC files and WARC records
* Web crawling process

![WARC Overview](img/warc-overview.png)


# WARC metadata model

Atlas models metadata using the a [type system](http://atlas.apache.org/#/TypeSystem) (i.e., metadata is modelled using or extending Atlas types).

The UML class diagram below models WARC files metadata by extending two of Atlas's core types: **fs_path** and **Referenceable**. 

The diagram is interpreted as follows:

* A **WARC_FILE** has a `name` (inherited from **Asset**), a `path` (inherited from **fs_path**), and is composed of a set of `records` collected at a specific date (`crawl_date`).


* A **WARC_RECORD** is located at a specific position in a WARC file (`source_file_offset`) and contains a web resource conforming to a type (`content_type_norm`). For textual resources, a WARC_RECORD also stores the resource language (`content_language`).


* **WARC_FILES** and **WARC_RECORDS** are types of ATLAS_ENTITIES. They are also uniquely identified by a `qualifiedName` (inherited from **Referenceable**).

> *Remarks*:  
> Atlas models relationships among entities using the **RELATIONSHIP** type. For instance, the relationship between **WARC_FILES** and **WARC_RECORDS** should be modelled using an extra class extending the **RELATIONSHIP** type (e.g. **WARC_FILE_RECORDS**) that connects both types. For keeping the diagram simple, this relationship is modelled using the graphical UML composite relationship. However, you will see this relationship in the model implementation below.


![Partial WARC model](img/warc-model-partial.svg)

## Model implementation

The following code uses the [Atlas TypesREST API](http://atlas.apache.org/api/v2/ui/index.html#/TypesREST) for **creating the types** implementing the WARC metadata model. Namely: 

* `warc_file` and `warc_record` entity types
* `warc_file_records` relationship type


In [None]:
types_def = {
    
    "entityDefs" : [
        {
            "name": "warc_file",
            "superTypes": ["fs_path"],
            "attributeDefs": [
                { "name": "crawl_date",  "typeName": "string",  "isOptional": True },
            ],    
        },
        {
            "name": "warc_record",
            "superTypes": ["Referenceable"],
            "attributeDefs": [
                { "name": "content_type_norm",  "typeName": "string",  "isOptional": True },        
                { "name": "content_language",   "typeName": "string",  "isOptional": True },   
                { "name": "source_file_offset", "typeName": "long",    "isOptional": True },  
            ],
        }
    ],
    
    "relationshipDefs": [
        {
            "name": "warc_file_records",    
            "endDef1": {
                "type": "warc_file",
                "isContainer": True,
                "cardinality": "SET",
                "name": "records",        
            },
            "endDef2": {
                "type": "warc_record",
                "name": "warc_file",  

            },
            "relationshipCategory" : "COMPOSITION"
        }
    ]
}

import requests
import json

REQUEST_AUTH   = ('admin', 'admin')
REQUEST_HEADER = { "Content-Type": "application/json", "Accept": "application/json" }

req = requests.post(
    url     = "http://atlas:21000/api/atlas/v2/types/typedefs", 
    auth    = REQUEST_AUTH, 
    headers = REQUEST_HEADER, 
    json    = types_def
)

# pretty print atlas response 
!echo '{req.text}' | python -m json.tool

## Inserting samples

With the types implementing the WARC model created, it is possible to insert **WARC_FILE** and **WARC_RECORD** samples (i.e., instances or entities). 

The following code inserts into Atlas: 

* 1x `warc_file` entity representing the metadata of a **fictional WARC file** uniquely identified by `warc_guid`.
* 2x `warc_records` entities representing HTML pages contained in the WARC file identified by `warc_guid`.

Note that this is done in two steps using the [Atlas EntityREST API](http://atlas.apache.org/api/v2/ui/index.html#/EntityREST).


In [None]:
def create_entity(entity):
    req = requests.post(
        url     = "http://atlas:21000/api/atlas/v2/entity", 
        auth    = REQUEST_AUTH, 
        headers = REQUEST_HEADER, 
        json    = { "entity": entity }
    )
    res  = json.loads( req.text )
    guid = [res["guidAssignments"][k] for k in res["guidAssignments"]][0]
    return guid


warc = {
    "typeName":   "warc_file",
    "attributes": {
        "name":          "sample.warc",
        "qualifiedName": "sample.warc",         
        "path":          "/some_path/sample.warc",
        "crawl_date":    "2021-05-27T20:58:55Z",
    }
}

warc_guid = create_entity(warc)
warc_guid

In [None]:
record1 = {
    "typeName":   "warc_record",
    "attributes": {
        "content_type_norm": "html",
        "content_language":  "en",
        "qualifiedName":     "sample.warc::1",
        "source_file_offset": 1,        
        "warc_file": {
            "guid": warc_guid
        }
    },
}

record2 = {
    "typeName":   "warc_record",
    "attributes": {
        "content_type_norm": "html",
        "content_language":  "en",
        "qualifiedName":     "sample.warc::2",
        "source_file_offset": 2,        
        "warc_file": {
            "guid": warc_guid
        }        
    },
}

record1_guid = create_entity(record1)
record2_guid = create_entity(record2)

print(record1_guid)
print(record2_guid)

### TODO

* Open the [Atlas WebUI](http://localhost:21000) and explore the created entities.

# Extracting metadata from WARC files

## Creating a WARC file

The following code uses `wget` for crawling and producing a WARC file from http://example.com. 

Note that:
* The resources collected by `wget` (i.e., [index.html](http://example.com/index.html)) will be stored in the `example.com` directory.
* The resulting `example.com.warc` will not be compressed for facilitating its content exploration. 


In [None]:
!wget "http://example.com"  \
    --directory-prefix="example.com"  \
    --warc-file="example.com"         \
    --no-warc-compression             \
    --no-warc-keep-log

### TODO

* How many records compose the [example.com.warc](example.com.warc) file?
* How the content of [index.html](example.com/index.html) relates to the content of [example.com.warc](example.com.warc)?


## Metadata extraction

[WARC Indexer](https://github.com/ukwa/webarchive-discovery) is a java application that extracts metadata from WARC files (e.g., `crawl_date`, `content_type_norm`, `content_language`, `source_file_offset`).

The following instruction extracts metadata from `example.com.warc` using WARC Indexer. 

Note that the metadata will: 

* Be stored as XML files stored in the `out/example.com.warc/` directory.
* Include the textual content (`text`) of WARC records.

In [None]:
!java -jar /jars/warc-indexer.jar --output out --disable_commit  --text  example.com.warc 

### TODO

* Explore the [out/example.com.warc/FILE_1.xml](out/example.com.warc/FILE_1.xml) file produced by WARC indexer. 
* How many new metadata attributes do you see?


## WARC model update

WARC Indexer can produce **up to 100 different metadata attributes**. The following code updates the **WARC_RECORD** type for storing all these attributes (see [warc_record_type_def.json](warc_record_type_def.json)).

In [None]:
with open('warc_record_type_def.json') as json_file:
    warc_record_def = json.load(json_file)

req = requests.put(
    url     = "http://atlas:21000/api/atlas/v2/types/typedefs", 
    auth    = REQUEST_AUTH, 
    headers = REQUEST_HEADER, 
    json    = warc_record_def
)

# pretty print atlas response 
!echo '{req.text}' | python -m json.tool

## Metadata preparation and loading

The following code transforms the metadata exported by WARC indexer to the JSON equivalent conforming to the WARC_FILE and WARC_RECORD entities.

In [None]:
import xmltodict
import sys
import glob

MULTIVALUED_ATTRIBUTES = [
    "_text_", "access_terms", "author", "collection", "collections", "comments", "keywords", "license_url", 
    "elements_used", "hashes", "crawl_years", "host_surt", "image_colours", "links_images", "links_domains", 
    "links_hosts", "links_hosts_surts", "links_public_suffixes", "links", "locations", "parse_error", 
    "pdf_pdfa_errors", "postcode_district", "postcode", "server", "generator", "text", "wct_collections", 
    "wct_description", "wct_instance_id", "wct_subjects"
]

def xml_to_dict(xml):
    obj = {}
    xmldict = xmltodict.parse( xml )
    for field in xmldict['doc']['field']:
        att = field['@name']
        val = field['#text']
        if att not in obj:
            obj[att] = [] if att in MULTIVALUED_ATTRIBUTES else None
        if obj[att] == None:
            obj[att] = val
        else:
            obj[att].append(val)
    return obj

In [None]:
import os

warc_name = "example.com.warc"

warc = {
    "typeName":   "warc_file",
    "attributes": {
        "name":          warc_name,
        "qualifiedName": warc_name,         
        "path":          "{}/{}".format( os.getcwd(), warc_name ),
        "crawl_date":    "2021-05-27T20:58:55Z",
    }
}

warc_guid = create_entity(warc)
warc_guid

for file_path in glob.iglob('out/**/*.xml', recursive=True):
    
    with open(file_path, 'r') as file:
        atts = xml_to_dict( file.read() )
    
    record = {
        "typeName":   "warc_record",
        "attributes": atts
    }
    
    record["attributes"]["qualifiedName"] = "{}::{}".format( warc_name, record["attributes"]["source_file_offset"] )
    record["attributes"]["warc_file"]     = { "guid": warc_guid }
        
    create_entity(record)

# Crawling process modelling

Atlas models processes using the **Process** type.

The diagram below extends the original WARC metadata diagram for modelling a **crawling process**. The diagram is interpreted as follows:

* The **CRAWLING_PROCESS** that receives a list of `urls` as parameters and `outputs` a set of **WARC_FILEs**. It also stores information about the `crawler_version` used for crawling the `urls`.

> Note that the **CRAWLING_PROCESS** does not receive any `inputs`. This is a modelling choice for avoiding the representation of input urls as **DataSets**. For instance, for avoiding the creation of a new type _CRAWLING_INPUT_, subtype of DataSet.


![WARC Model](img/warc-model.svg)

The following code implements the **CRAWLING_PROCESS** model and creates a new crawling entity for representing the process that produced the `example.com.warc` file.

In [None]:
types_def = {
    
    "entityDefs" : [
        {
            "name": "crawling_process",
            "superTypes": ["Process"],
            "attributeDefs": [
                { "name": "urls",          "typeName": "array<string>",  "isOptional": False },
                { "name": "crawler_info",  "typeName": "string",         "isOptional": False },                
            ],    
        },
    ],
}

req = requests.post(
    url     = "http://atlas:21000/api/atlas/v2/types/typedefs", 
    auth    = REQUEST_AUTH, 
    headers = REQUEST_HEADER, 
    json    = types_def
)

In [None]:
crawler_info = !wget --version

process = {
    "typeName":   "crawling_process",
    "attributes": {
        "urls":          [ "http://example.com" ],
        "outputs":       [{ "guid": warc_guid  }],
        "crawler_info":  crawler_info[0],
        "name":          "crawling 2021-05-27",
        "qualifiedName": "crawling:2021-05-27T20:58:55Z",
    }
}

create_entity(process)

### TODO

* Use the [Atlas WebUI](http://localhost:21000) to explore the `crawling 2021-05-27` process **lineage graph**.

# Play around with your own URL

**Step 1.** Set a URL

In [None]:
URL   = "https://projet-lifranum.univ-lyon3.fr/projet"

**Step 2.** Crawl the target URL

In [None]:
from urllib.parse import urlparse

LEVEL = 1        # maximum number of links to follow (i.e, crawl depth)
WAIT  = 0.1      # num. seconds to wait between consecutive calls 

domain = urlparse(URL).netloc

!wget {URL}           \
  --delete-after -nd  \
  --recursive         \
  --level={LEVEL}     \
  --wait={WAIT}       \
  --random-wait       \
  --no-parent         \
  --follow-tags=a     \
  --adjust-extension  \
  --warc-file={domain}\
  --no-warc-keep-log

**Step 3.** Extract WARC metadata

In [None]:
warc_name = domain + ".warc.gz"

!java -jar /jars/warc-indexer.jar --output out --disable_commit  --text  {warc_name}

**Step 4.** Load metadata into Atlas



In [None]:
from datetime import datetime

warc = {
    "typeName":   "warc_file",
    "attributes": {
        "name":          warc_name,
        "qualifiedName": warc_name,         
        "path":          "{}/{}".format( os.getcwd(), warc_name ),
        "crawl_date":    datetime.now().strftime("%m-%d-%YT%H:%M:%S"),
    }
}

warc_guid = create_entity(warc)
print("WARC_FILE: " + warc_guid)

for file_path in glob.iglob('out/{}/*.xml'.format(warc_name)):
    
    with open(file_path, 'r') as file:
        atts = xml_to_dict( file.read() )
    
    record = {
        "typeName":   "warc_record",
        "attributes": atts
    }
    
    record["attributes"]["qualifiedName"] = "{}::{}".format( warc_name, record["attributes"]["source_file_offset"] )
    record["attributes"]["warc_file"]     = { "guid": warc_guid }
        
    record_guid = create_entity(record)
    print(' ', "WARC_RECORD: " + record_guid)

    
crawler_info = !wget

process = {
    "typeName":   "crawling_process",
    "attributes": {
        "urls":          [ URL ],
        "outputs":       [{ "guid": warc_guid  }],
        "crawler_info":  crawler_info[0],
        "name":          "crawling {}".format(warc['attributes']['crawl_date']),
        "qualifiedName": "crawling:{}".format(warc['attributes']['crawl_date']),
    }
}


crawling_guid = create_entity(process)

print(' ', "CRAWLING_PROCESS: " + crawling_guid)     


**Step 5.** Explore the [Atlas WebUI](http://localhost:21000)