# Exploration of WebArchives: Demo (Docker)

## Spark init

Initialize spark in [single-node cluster](https://docs.databricks.com/clusters/single-node.html) and configure pyspark with the AUT toolkit & GraphFrame libraries.

In [None]:
%run ../scripts/spark-init-docker.ipynb
spark

## Creating Web Archives

Example of [web archiving using WGET](https://wiki.archiveteam.org/index.php/Wget_with_WARC_output) as web crawler.

In [None]:
%%writefile input.txt
http://www.espinosa-oviedo.com
http://www.vargas-solar.com

Notes (see [WGET manual](https://www.gnu.org/software/wget/manual/wget.html)):

* Add `--recursive` for crawling the totality of a website. 
* Remove `--no-warc-compression` for generating compressed WARC files.  


In [None]:
LEVEL=1       # maximum number of links to follow (i.e, crawl depth)
WAIT=0.1      # num. seconds to wait between consecutive calls

IN_FILE       = "input.txt"  # list of URLs to crawl
OUT_DIR       = "WARC"       # folder where crawl results will be stored
OUT_WARC_FILE = "out"        # prefix for WARC files
OUT_LOG_FILE  = "log.txt"    # file containing WGET log

# https://www.gnu.org/software/wget/manual/wget.html
!wget \
  --delete-after -nd \
  --input-file={IN_FILE}  \
  --level={LEVEL}    \
  --no-parent        \
  --wait={WAIT}      \
  --random-wait      \
  --adjust-extension \
  --reject=css,js,xml,rss,php  \
  --user-agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15" \
  --warc-file=out    \
  --warc-max-size=300m  \
  --no-warc-keep-log    \
  --no-warc-compression \
  --output-file={OUT_LOG_FILE}

In [None]:
# Move resulting files to the OUT_DIR folder
!mkdir -p {OUT_DIR} 
!mv *.warc*  *.txt  {OUT_DIR}

## Querying Web Archives

Note: 

* **AUT generates dataframes**. See the [AUT dataframe schemas](https://aut.docs.archivesunleashed.org/docs/dataframe-schemas) and the [Spark SQL guide](https://spark.apache.org/docs/3.0.0/sql-getting-started.html) for more info.
* More examples are available in the [AUT documentation](https://aut.docs.archivesunleashed.org/docs/home). 



In [None]:
from pyspark.sql.functions import desc, col, udf
from pyspark.sql.types import StringType

from aut import *

WARCs_path = "WARC/*.warc*"

### Extract webpages URLs

Spark Dataframe API

In [None]:
WebArchive(sc, sqlContext, WARCs_path) \
  .webpages() \
  .select("url") \
  .show(20, False)  

Spark SQL equivalent

In [None]:
df = WebArchive(sc, sqlContext, WARCs_path).webpages()
df.createOrReplaceTempView("webpages")

sql='''
    SELECT url 
    FROM webpages 
'''

sqlContext.sql(sql).show(20, False)

### Extract Top-Level Domains

Uses a [User Defined Function](https://docs.databricks.com/spark/latest/spark-sql/udf-python.html) (UDF)

In [None]:
%%capture
!pip install tldextract

In [None]:
import tldextract
tldextract.extract('http://forums.news.cnn.com/')    # See https://github.com/john-kurkowski/tldextract

Spark Dataframe API

In [None]:
import tldextract
from pyspark.sql.functions import desc

@udf("string")
def extract_tld(s):
    return tldextract.extract(s).suffix

WebArchive(sc, sqlContext, WARCs_path) \
  .webpages() \
  .select(extract_tld("url").alias("tld")) \
  .groupBy("tld") \
  .count() \
  .sort(desc("count"))\
  .show(10, False)

Spark SQL equivalent

In [None]:
df = WebArchive(sc, sqlContext, WARCs_path).webpages()
df.createOrReplaceTempView("webpages")

sqlContext.udf.register("extract_tld", extract_tld)

sql='''
    SELECT tld, count(tld) AS count
    FROM (
      SELECT extract_tld(url) AS tld 
      FROM webpages 
    )
    GROUP BY tld
    ORDER BY count DESC
'''

sqlContext.sql(sql).show(20, False)

### Count words in web pages

Uses a [User Defined Function](https://docs.databricks.com/spark/latest/spark-sql/udf-python.html) (UDF)

In [None]:
from aut import remove_html, remove_http_header
from pyspark.sql.functions import col, udf

@udf("Integer")
def word_count(s):
  return len( s.split() )

WebArchive(sc, sqlContext, WARCs_path) \
  .webpages()\
  .withColumn("text", remove_html( remove_http_header("content") ))\
  .withColumn("word_count", word_count("text"))\
  .select("text", "word_count")\
  .show(1, False)

Spark SQL equivalent

In [None]:
df = WebArchive(sc, sqlContext, WARCs_path)\
        .webpages()\
        .withColumn("text", remove_html( remove_http_header("content") ))   # AUT's remove_html & remove_http_header work only with dataframes

df.createOrReplaceTempView("webpages_text")

@udf("Integer")
def word_count(s):
  return len( s.split() )

sqlContext.udf.register("word_count", word_count)

sql='''
    SELECT text, word_count(text) AS word_count 
    FROM   webpages_text 
'''

sqlContext.sql(sql).show(1, False)

### Count links between domains

In [None]:
from aut import extract_domain

edges = WebArchive(sc, sqlContext, WARCs_path) \
  .webgraph()\
  .withColumn("src_domain",  extract_domain("src"))  \
  .withColumn("dest_domain", extract_domain("dest")) \
  .select(["src_domain", "dest_domain"])\
  .groupBy(["src_domain", "dest_domain"])\
  .count()

edges.show(10, False)

Plots using [NetworkX](http://networkx.org) and [matplotlib](http:/:matplotlib.org)

In [None]:
import matplotlib.pyplot as plt
import networkx as nx

df = edges.limit(10).toPandas()

G = nx.from_pandas_edgelist(
    df, 
    source="src_domain", 
    target="dest_domain", 
    edge_key="dest_domain", 
    edge_attr="count"
)

pos = nx.planar_layout(G)
options = {
    "node_size": 1000,
    "node_color": "#bc5090",
    "node_shape": "o",
    "alpha": 0.5,
    "linewidths": 4,
    "font_size": 10,
    "font_color": "black",
    "width": 2,
    "edge_color": "grey",
}

plt.figure(figsize=(12, 12))
nx.draw(G, pos, with_labels=True, **options)
labels = {e: G.edges[e]["count"] for e in G.edges}
nx.draw_networkx_edge_labels(G, pos, edge_labels=labels)
plt.show()

### Distribution of HTTP Status Codes

In [None]:
codes = WebArchive(sc, sqlContext, WARCs_path) \
  .all()\
  .groupBy('http_status_code')\
  .count()
  
codes.show(20, True)

Plots using [Plotly Express](https://plotly.com/python/plotly-express/)

In [None]:
import plotly.express as px

fig = px.bar(
    codes.toPandas(),
    x='http_status_code', 
    y='count'
)

fig.show()

### Export graph to Gephi

See [Gephi Graph Viz Platform](http://gephi.org/).

In [None]:
graph = WebArchive(sc, sqlContext, WARCs_path) \
          .webgraph() \
          .groupBy("crawl_date", remove_prefix_www(extract_domain("src")).alias("src_domain"), remove_prefix_www(extract_domain("dest")).alias("dest_domain")) \
          .count() \
          .filter((col("dest_domain").isNotNull()) & (col("dest_domain") !="")) \
          .filter((col("src_domain").isNotNull()) & (col("src_domain") !="")) \
          .orderBy(desc("count")) \
          .collect()

WriteGEXF(graph, "links-for-gephi.gexf")

### Store results on disk

Save as `csv` file with header

In [None]:
WebArchive(sc, sqlContext, WARCs_path) \
  .webgraph()\
  .limit(10)\
  .write.format('csv').save("webgraph", header='true')

Save as `parquet` file (header automatically included)

In [None]:
WebArchive(sc, sqlContext, WARCs_path) \
  .webgraph()\
  .limit(10)\
  .write.parquet("webgraph.parquet")

Read csv/parquet files

In [None]:
# load parquet files
df = sqlContext.read.parquet("webgraph.parquet")
df.show(2)
df.printSchema()

# load csv files
df = sqlContext.read.option("header", True).csv("webgraph")
df.show(2)
df.printSchema()


## Extras

### Collecting LIFRANUM Web Archives from **Google Storage**

In [None]:
#%%capture
DIR="LIFRANUM"
!mkdir -p $DIR

!gsutil -m cp -r gs://cpe-lyon/LIFRANUM/repo-ecritures-num $DIR

### Accelerating operations

Caching dataframes in RAM accelerates spark operations.

In [None]:
WARCs_path = "LIFRANUM/repo-ecritures-num/out-00000.warc.gz"

webpages = WebArchive(sc, sqlContext, WARCs_path).webpages()

Without cache:

In [None]:
webpages.count()    # slow: spark loads the data, filter webpages, compute new columns, etc.

Using cache

In [None]:
webpages.cache().count()    # first time slow because all previous operations are re executed

In [None]:
webpages.count()            # second time is faster

### Loading image from bytes

[Reading image from string base64](https://dev.to/bl4ckst0n3/image-processing-how-to-read-image-from-string-in-python-pf8)



In [None]:
# Get an image's bytes from a WARC file
res = WebArchive(sc, sqlContext, WARCs_path) \
  .images() \
  .select("bytes")\
  .take(1)

img_base64_string = res[0][0]

In [None]:
import base64
import io
from PIL import Image

# load image from bytes
decoded_string = io.BytesIO( base64.b64decode(img_base64_string) )
Image.open(decoded_string)

### Named Entity Recognition (NER)

Uses [Spacy.io](https://spacy.io/usage/linguistic-features#named-entities-101)

In [None]:
import spacy
from spacy import displacy

# Recognizes english NERs
nlp = spacy.load("en_core_web_sm")
doc = nlp("Apple is looking at buying U.K. startup for $1 billion")

displacy.render(doc, style="ent", jupyter=True)

Entity Name Recognition using a webpage in english

In [None]:
WebArchive(sc, sqlContext, WARCs_path) \
  .webpages() \
  .select("*", remove_html(remove_http_header("content")).alias("text"))\
  .createOrReplaceTempView("webpages")

sql='''
    SELECT language, text
    FROM   webpages
    WHERE  language=='en' AND text <> ''
'''

res = sqlContext.sql(sql).take(1)   # returns a list of 1 element
txt = res[0][1]                     # "text" attribute from first element

doc = nlp(txt)
displacy.render(doc, style="ent", jupyter=True)  

## LIFRANUM WARC files (backup)

LIFRANUM's WARC files are stored in google drive too. Uncomment the lines below if necessary.

In [None]:
%%capture
# !pip install -q gdown

#!gdown https://drive.google.com/drive/folders/1xqDsY5KOeK5OMhW39EH37l79Pn-v59B_?usp=sharing -O ./LIFRANUM/autre --folder
#!gdown https://drive.google.com/drive/folders/170j3r23YJBlOpGsKrcZRSs3bqrS03qhi?usp=sharing -O ./LIFRANUM/cartoweb --folder
#!gdown https://drive.google.com/drive/folders/1NLuWLOldfmpwPeAr9Th_HCeH6ZoSw0zr?usp=sharing -O ./LIFRANUM/lifranum-method --folder
#!gdown https://drive.google.com/drive/folders/1wehg3nnCks9iVIvuXMZ5u685ocq__dQe?usp=sharing -O ./LIFRANUM/repo-ecritures-num --folder