In [19]:
from os import environ

from pyspark.sql import SparkSession

environ['PYSPARK_PYTHON'] = "/mnt/ceph/storage/data-in-progress/data-research/web-search/web-archive-query-log/venv/bin/python"
session = SparkSession.builder \
    .master("yarn") \
    .appName("web-archive-query-log-corpus") \
    .config("spark.executor.instances", 5) \
    .getOrCreate()

In [20]:
sc = session.sparkContext
sc

In [21]:
SAMPLE_CORPUS = False
# SAMPLE_CORPUS = True

In [22]:
from pathlib import Path

global_data_dir = Path("/mnt/ceph/storage/data-in-progress/data-research/web-search/web-archive-query-log/")
global_data_dir

PosixPath('/mnt/ceph/storage/data-in-progress/data-research/web-search/web-archive-query-log')

In [23]:
data_dir = global_data_dir / "focused"
data_dir

PosixPath('/mnt/ceph/storage/data-in-progress/data-research/web-search/web-archive-query-log/focused')

In [24]:
relative_paths = [
    path.relative_to(data_dir / "archived-urls").with_name(path.name[:-len(".jsonl.gz")])
    for path in data_dir.glob("archived-urls/*/*/*.jsonl.gz")
]
len(relative_paths)

62303

In [25]:
from uuid import uuid5
from uuid import NAMESPACE_URL
from tqdm.auto import tqdm
from json import loads
from gzip import GzipFile


def index_jsonl(path: Path, base_type: str) -> dict:
    jsonl_path = data_dir / base_type / path.with_suffix(".jsonl.gz")
    if not jsonl_path.exists():
        return {}
    offset = 0
    index = {}
    try:
        with GzipFile(jsonl_path, "r") as gzip_file:
            for line in tqdm(gzip_file, desc="Index JSONL"):
                try:
                    record = loads(line)
                except:
                    print(f"Could not index {line} at {path}.")
                    return {} # TODO: Maybe just skip the one faulty record.
                record_id = uuid5(
                    NAMESPACE_URL,
                    f"{record['timestamp']}:{record['url']}",
                )
                index[record_id] = (
                    jsonl_path,
                    offset,
                )
                offset = gzip_file.tell()
        return index
    except:
        print(f"Could not read JSONL file at {path}.")
        return {}

In [26]:
def index_jsonl_snippets(path: Path, base_type: str) -> dict:
    jsonl_path = data_dir / base_type / path.with_suffix(".jsonl.gz")
    if not jsonl_path.exists():
        return {}
    offset = 0
    index = {}
    try:
        with GzipFile(jsonl_path, "r") as gzip_file:
            for line in tqdm(gzip_file, desc="Index JSONL snippets"):
                try:
                    record = loads(line)
                except JSONDecodeError:
                    print(f"Could not index {line} at {path}.")
                    return {} # TODO: Maybe just skip the one faulty record.
                for snippet_index, snippet in enumerate(record["results"]):
                    record_id = uuid5(
                        NAMESPACE_URL,
                        f"{snippet['rank']}:{snippet['timestamp']}:{snippet['url']}",
                    )
                    index[record_id] = (
                        jsonl_path,
                        offset,
                        snippet_index,
                    )
                offset = gzip_file.tell()
        return index
    except:
        print(f"Could not read JSONL file at {path}.")
        return {}

In [27]:
from fastwarc import FileStream, WarcRecordType, WarcRecord, ArchiveIterator


def index_warc(path: Path, base_type: str) -> dict:
    warc_path = data_dir / base_type / path
    if not warc_path.exists():
        return {}
    index = {}
    for warc_child_path in warc_path.iterdir():
        if warc_child_path.name.startswith("."):
            continue
        try:
            stream = FileStream(str(warc_child_path.absolute()))
            records = ArchiveIterator(
                stream,
                record_types=WarcRecordType.response,
                parse_http=False,
            )
            for record in tqdm(records, desc="Index WARC"):
                record: WarcRecord
                offset = record.stream_pos
                record_url_header = record.headers["Archived-URL"]
                try:
                    record_url = loads(record_url_header)
                except JSONDecodeError:
                    print(f"Could not index {record_url_header} at {path}.")
                    return {} # TODO: Maybe just skip the one faulty record.
                record_id = uuid5(
                    NAMESPACE_URL,
                    f"{record_url['timestamp']}:{record_url['url']}",
                )
                index[record_id] = (
                    warc_child_path,
                    offset,
                )
        except:
            print(f"Could not read WARC file at {warc_child_path}.")
            return {} # TODO: Maybe just skip the one faulty file.
    return index

In [28]:
from typing import Optional


def detect_language(text: str) -> Optional[str]:
    text = text.replace("\n", " ")
    from cld3 import get_language
    language_prediction = get_language(text)
    if language_prediction is None:
        return None
    return language_prediction.language.split("-")[0] if language_prediction.is_reliable else None

In [29]:
from typing import Optional
from datetime import datetime
from io import TextIOWrapper
from uuid import UUID
from bleach import clean

def process_snippet(
    service: str,
    archived_search_result_snippet_index: dict,
    archived_raw_search_result_index: dict,
    archived_parsed_search_result_index: dict,
    archived_search_result_snippet_id: UUID,
) -> Optional[dict]:
    print(f"Process archived search result snippet ID {archived_search_result_snippet_id}.")
    archived_search_result_snippet_location = archived_search_result_snippet_index.get(archived_search_result_snippet_id)
    archived_raw_search_result_location = archived_raw_search_result_index.get(archived_search_result_snippet_id)
    archived_parsed_search_result_location = archived_parsed_search_result_index.get(archived_search_result_snippet_id)
    
    if archived_search_result_snippet_location is not None:
        with GzipFile(archived_search_result_snippet_location[0], "rb") as gzip_file:
            gzip_file.seek(archived_search_result_snippet_location[1])
            with TextIOWrapper(gzip_file) as text_file:
                line = text_file.readline()
                archived_search_result_snippet = loads(line)["results"][archived_search_result_snippet_location[2]]
    else:
        print(f"Could not find archived search result snippet ID {archived_search_result_snippet_id}.")
        return None
    print(archived_search_result_snippet)
    
    wayback_timestamp = datetime.fromtimestamp(archived_search_result_snippet["timestamp"]).strftime("%Y%m%d%H%M%S")
    wayback_url = f"https://web.archive.org/web/{wayback_timestamp}/{archived_search_result_snippet['url']}"
    wayback_raw_url = f"https://web.archive.org/web/{wayback_timestamp}id_/{archived_search_result_snippet['url']}"
    title_and_text = archived_search_result_snippet["title"]
    if archived_search_result_snippet["snippet"] is not None:
        title_and_text += f" {archived_search_result_snippet['snippet']}"
    title_and_text = clean(
        title_and_text,
        tags=[],
        attributes=[],
        protocols=[],
        strip=True,
        strip_comments=True,
    )
    language = detect_language(title_and_text)
    
    document = {
        "id": str(archived_search_result_snippet_id),
        "url": archived_search_result_snippet["url"],
        "timestamp": archived_search_result_snippet["timestamp"],
        "wayback_url": wayback_url,
        "wayback_raw_url": wayback_raw_url,
        "snippet_rank": archived_search_result_snippet["rank"],
        "snippet_title": archived_search_result_snippet["title"],
        "snippet_text": archived_search_result_snippet["snippet"],
        "language": language,
        "service": service,
        "archived_snippet_location": {
            "relative_path": str(archived_search_result_snippet_location[0].relative_to(global_data_dir)),
            "byte_offset": archived_search_result_snippet_location[1],
            "index": archived_search_result_snippet_location[2],
        } if archived_search_result_snippet_location is not None else None,
        "archived_raw_search_result_location": {
            "relative_path": str(archived_raw_search_result_location[0].relative_to(global_data_dir)),
            "byte_offset": archived_raw_search_result_location[1],
        } if archived_raw_search_result_location is not None else None,
        "archived_parsed_search_result_location": {
            "relative_path": str(archived_parsed_search_result_location[0].relative_to(global_data_dir)),
            "byte_offset": archived_parsed_search_result_location[1],
        } if archived_parsed_search_result_location is not None else None,
    }
    print(f"Finished processing archived search result snippet ID {archived_search_result_snippet_id}.")
    return document

In [30]:
def process_url(
    service: str,
    archived_urls_index: dict,
    archived_query_urls_index: dict,
    archived_raw_serps_index: dict,
    archived_parsed_serps_index: dict,
    archived_search_result_snippet_index: dict,
    archived_raw_search_result_index: dict,
    archived_parsed_search_result_index: dict,
    archived_url_id: UUID,
) -> Optional[tuple]:
    print(f"Process archived URL ID {archived_url_id}.")
    
    archived_url_location = archived_urls_index.get(archived_url_id)
    archived_query_url_location = archived_query_urls_index.get(archived_url_id)
    archived_raw_serp_location = archived_raw_serps_index.get(archived_url_id)
    archived_parsed_serp_location = archived_parsed_serps_index.get(archived_url_id)
    
    if archived_url_location is not None:
        with GzipFile(archived_url_location[0], "rb") as gzip_file:
            gzip_file.seek(archived_url_location[1])
            with TextIOWrapper(gzip_file) as text_file:
                line = text_file.readline()
                archived_url = loads(line)
    else:
        print(f"Could not find archived URL ID {archived_url_id}.")
        return None
    if archived_query_url_location is not None:
        with GzipFile(archived_query_url_location[0], "rb") as gzip_file:
            gzip_file.seek(archived_query_url_location[1])
            with TextIOWrapper(gzip_file) as text_file:
                line = text_file.readline()
                archived_query_url = loads(line)
    else:
        archived_query_url = None
    if archived_parsed_serp_location is not None:
        with GzipFile(archived_parsed_serp_location[0], "rb") as gzip_file:
            gzip_file.seek(archived_parsed_serp_location[1])
            with TextIOWrapper(gzip_file) as text_file:
                line = text_file.readline()
                archived_parsed_serp = loads(line)
    else:
        archived_parsed_serp = None
            
    wayback_timestamp = datetime.fromtimestamp(archived_url["timestamp"]).strftime("%Y%m%d%H%M%S")
    wayback_url = f"https://web.archive.org/web/{wayback_timestamp}/{archived_url['url']}"
    wayback_raw_url = f"https://web.archive.org/web/{wayback_timestamp}id_/{archived_url['url']}"
    language = detect_language(archived_query_url["query"]) \
        if archived_query_url is not None else None
    
    partial_documents = [
        process_snippet(
            service,
            archived_search_result_snippet_index,
            archived_raw_search_result_index,
            archived_parsed_search_result_index,
            uuid5(
                NAMESPACE_URL,
                f"{snippet['rank']}:{snippet['timestamp']}:{snippet['url']}",
            ),
        )
        for snippet in archived_parsed_serp["results"]
    ] if archived_parsed_serp is not None else None
    
    partial_query = {
        "id": str(archived_url_id),
        "url": archived_url["url"],
        "timestamp": archived_url["timestamp"],
        "wayback_url": wayback_url,
        "wayback_raw_url": wayback_raw_url,
        "url_query": archived_query_url["query"] if archived_query_url is not None else None,
        "url_page": archived_query_url["page"] if archived_query_url is not None else None,
        "url_offset": archived_query_url["offset"] if archived_query_url is not None else None,
        "serp_query": archived_parsed_serp["interpreted_query"] if archived_parsed_serp is not None else None,
        "language": language,
        "service": service,
        "archived_url_location":{
            "relative_path": str(archived_url_location[0].relative_to(global_data_dir)),
            "byte_offset": archived_url_location[1],
        },
        "archived_query_url_location":{
            "relative_path": str(archived_query_url_location[0].relative_to(global_data_dir)),
            "byte_offset": archived_query_url_location[1],
        } if archived_query_url_location is not None else None,
        "archived_raw_serp_location":{
            "relative_path": str(archived_raw_serp_location[0].relative_to(global_data_dir)),
            "byte_offset": archived_raw_serp_location[1],
        } if archived_raw_serp_location is not None else None,
        "archived_parsed_serp_location":{
            "relative_path": str(archived_parsed_serp_location[0].relative_to(global_data_dir)),
            "byte_offset": archived_parsed_serp_location[1],
        } if archived_parsed_serp_location is not None else None,
    }
    
    query = {
        **partial_query,
        "results": partial_documents,
    }
    documents = [
        {
            **partial_document,
            "query": partial_query,
        }
        for partial_document in partial_documents
    ] if partial_documents is not None else []
    print(f"Finished processing archived URL ID {archived_url_id}.")
    return query, documents

In [31]:
from random import sample


def process_relative_path_archived_ids(path: Path):
    print(f"Process relative path indices {path}.")
    service = path.parts[0]
    archived_urls_index = index_jsonl(path, "archived-urls")
    archived_query_urls_index = index_jsonl(path, "archived-query-urls")
    archived_raw_serps_index = index_warc(path, "archived-raw-serps")
    archived_parsed_serps_index = index_jsonl(path, "archived-parsed-serps")
    archived_search_result_snippets_index = index_jsonl_snippets(path, "archived-parsed-serps")
    archived_raw_search_results_index = index_warc(path, "archived-raw-search-results")
    archived_parsed_search_results_index = index_jsonl(path, "archived-parsed-search-results")
    archived_ids = archived_urls_index.keys()
    if SAMPLE_CORPUS:
        archived_ids = sample(archived_ids, min(len(archived_ids), 10))
    for archived_id in tqdm(archived_ids, desc="Yield archived IDs."):
        yield (
            service, 
            archived_urls_index,
            archived_query_urls_index,
            archived_raw_serps_index,
            archived_parsed_serps_index,
            archived_search_result_snippets_index,
            archived_raw_search_results_index,
            archived_parsed_search_results_index,
            archived_id,
        )
    print(f"Finished processing relative path indices {path}.")

In [32]:
def process_archived_id(task: tuple) -> tuple:
    service, archived_urls_index, archived_query_urls_index, archived_raw_serps_index, \
        archived_parsed_serps_index, archived_search_result_snippets_index, \
        archived_raw_search_results_index, archived_parsed_search_results_index, archived_id = task
    print(f"Process archived ID {archived_id}.")
    query_documents = process_url(
        service, 
        archived_urls_index,
        archived_query_urls_index,
        archived_raw_serps_index,
        archived_parsed_serps_index,
        archived_search_result_snippets_index,
        archived_raw_search_results_index,
        archived_parsed_search_results_index,
        archived_id,
    )
    print(f"Finished processing archived ID {archived_id}.")
    return query_documents

In [33]:
from json import dumps


def process_archived_id_query(query_documents: tuple):
    query, _ = query_documents
    query_id = query['id']
    print(f"Process archived ID query {query_id}.")
    query = dumps(query)
    print(f"Finished processing archived ID query {query_id}.")
    return query

In [34]:
def process_archived_id_documents(query_documents: tuple):
    query, documents = query_documents
    query_id = query['id']
    print(f"Process archived ID documents {query_id}.")
    for document in documents:
        document = dumps(document)
        yield document
    print(f"Finished processing archived ID documents {query_id}.")

In [35]:
!hdfs dfs -rm -r web-archive-query-log/queries/

rm: `web-archive-query-log/queries/': No such file or directory


In [36]:
from random import shuffle

shuffle(relative_paths)

In [None]:
sc.parallelize(relative_paths)\
    .repartition(100_000)\
    .flatMap(process_relative_path_archived_ids)\
    .repartition(200_000)\
    .map(process_archived_id)\
    .map(process_archived_id_query)\
    .repartition(100)\
    .saveAsTextFile(
        "web-archive-query-log/queries/", 
        compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec"
    )

In [None]:
!hdfs dfs -rm -r web-archive-query-log/documents/

In [None]:
rdd = sc.parallelize(relative_paths)\
    .repartition(100_000)\
    .flatMap(process_relative_path_archived_ids)\
    .repartition(200_000)\
    .map(process_archived_id)\
    .flatMap(process_archived_id_documents)\
    .repartition(100)\
    .saveAsTextFile(
        "web-archive-query-log/documents/", 
        compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec"
    )