# Notes While Processing Common Crawl Data with PySpark

Notes, helpful links, or code snippets

# Useful Links

Example Repo: https://github.com/commoncrawl/cc-pyspark

Common Crawl Format Example: https://gist.github.com/Smerity/e750f0ef0ab9aa366558#file-bbc-warc

Point the environment variable SPARK_HOME to your Spark installation

In [None]:
$ export SPARK_HOME="/Users/lxu213/spark/"

Submit example job to spark

In [None]:
$ $SPARK_HOME/bin/spark-submit ./server_count.py \ --num_output_partitions 1 --log_level WARN \ ./input/test_warc.txt servernames

ReadWARC: Assuming that you have the aws command line tools installed, you can list the contents of a crawl using:

In [None]:
$ aws s3 ls s3://commoncrawl/crawl-data/CC-MAIN-2014-10/ --recursive | head -6

Copy one segment to local using:

In [None]:
$ aws s3 cp s3://commoncrawl/crawl-data/CC-MAIN-2014-10/segments/1394023864559/warc/CC-MAIN-20140305125104-00002-ip-10-183-142-35.ec2.internal.warc.gz .

Configure EMR to run a pyspark job using Python

In [None]:
https://aws.amazon.com/premiumsupport/knowledge-center/emr-pyspark-python-3x/

PySpark Cheat Sheet: Spark in Python 

In [None]:
https://www.datacamp.com/community/blog/pyspark-cheat-sheet-python

PySpark Tutorial-Learn to use Apache Spark with Python

In [None]:
https://www.dezyre.com/apache-spark-tutorial/pyspark-tutorial

Apache Spark: Python Programming Guide

In [None]:
https://spark.apache.org/docs/0.9.0/python-programming-guide.html

Resilient Distributed Datasets are Apache Spark’s data abstraction, and the features they are built and implemented with are responsible for their significant speed. More about RDDs below:

1. RDDs are read-only, partitioned data stores, which are distributed across many machines (typically on a cluster)
2. RDDs can be invoked within Spark through Pyspark, Spark SQL or Spark Scala. Data which is ingested, or exists on the disk on the Linux file system or on the Hadoop Distributed File System (HDFS) can be taken and converted to a distributed dataset.
3. The key reasons RDDs are an abstraction that works better for distributed data processing, is because they don’t feature some of the issues that MapReduce, the older paradigm for data processing (which Spark is replacing increasingly). Chiefly, these are:
    - Replication: Replication of data on different parts of a cluster, is a feature of HDFS that enables data to be stored in a fault-tolerant manner. Spark’s RDDs address fault tolerance by using a lineage graph. The different name (resilient, as opposed to replicated) indicates this difference of implementation in the core functionality of Spark
    - Serialization: Serialization in MapReduce bogs it down, speed wise, in operations like shuffling and sorting.
    - Disk IO : One of the most computationally expensive operations is writing files to disk and reading them again, and this kind of Disk input-output impacts the performance of big compute jobs. Although Apache Spark can cache and persist RDDs to save time during in-memory computation, it is primarily an in-memory processing engine that depends on cheap access to RAM (which differs from the “commodity hardware” argument that’s made for Hadoop). Disk IO is expensive and time consuming in “big compute” jobs (as opposed to “big data”, which refers to large data set storage and handling). At every stage of a map or reduce step in MapReduce, there is Disk IO, which is avoided because Spark’s resource manager and optimiser allow for fine-grained control over scheduling and resilient processing.
    - Optimisation and Lazy Evaluation: These are mentioned together since lazy evaluation (a la Scala) allows a sequence of transformations to be performed on RDDs without actually spending compute time on them. Spark natively represents these transformations as a Directed Acyclic Graph (DAG) and Spark’s Catalyst Optimizer allows such computational graphs to be optimised and staged appropriately, based on the memory settings. Spark’s native resource manager is capable of handling various tasks by itself in conjunction with a file system, but Spark also integrates with existing resource managers in Hadoop based file systems (such as YARN).

Running Questions:
1. "The key reasons RDDs are an abstraction that works better for distributed data processing, is because they don’t feature some of the issues that MapReduce" ... MR is a strategy that can also be used in Spark?


Reading WARC Records

A key feature of the library is to be able to iterate over a stream of WARC records using the ArchiveIterator

It includes the following features: - Reading a WARC/ARC stream - On the fly ARC to WARC record conversion - Decompressing and de-chunking HTTP payload content stored in WARC/ARC files.

For example, the following prints the the url for each WARC response record:

from warcio.archiveiterator import ArchiveIterator

with open('path/to/file', 'rb') as stream:
    for record in ArchiveIterator(stream):
        if record.rec_type == 'response':
            print(record.rec_headers.get_header('WARC-Target-URI'))

The stream object could be a file on disk or a remote network stream. The ArchiveIterator reads the WARC content in a single pass. The record is represented by an ArcWarcRecord object which contains the format (ARC or WARC), record type, the record headers, http headers (if any), and raw stream for reading the payload.

class ArcWarcRecord(object):
    def __init__(self, *args):
        (self.format, self.rec_type, self.rec_headers, self.raw_stream,
         self.http_headers, self.content_type, self.length) = args