In [1]:
// 1. Installation

//install libraries
import org.archive.archivespark._
import org.archive.archivespark.functions._
import org.archive.archivespark.specific.warc._

// data files - generic path from docker run -v
val cdxPath = "/data/arc_cdx/*.cdx"
val warcPath = "/data/warc"

import org.apache.spark.sql.{Row, SparkSession}

val session = spark.newSession

// collect all records

val r = ArchiveSpark.load(WarcSpec.fromFiles(cdxPath, warcPath))

In [2]:
// 2.2 count all captures
// need r.mimetype not started by 1995
r.count()

72235608

In [6]:
val r1995 = r.filter(r => r.timestamp.startsWith("1995"))

In [7]:
r1995.count()

284

In [4]:
// 3. generate basic data frame

val m = r.map(f=> (f.originalUrl,f.digest,f.status,f.mime))
val df = session.createDataFrame(m).toDF("originalUrl","digest","status","mime").cache()

In [5]:
df.show()

+--------------------+--------------------+------+----+
|         originalUrl|              digest|status|mime|
+--------------------+--------------------+------+----+
|http://www.stopkl...|PMUYSQN6O7BISA2A4...|   200|  im|
|http://www.stopkl...|A7INTAJ2IYZI2YCV7...|   200|  im|
|http://www.stopkl...|A7INTAJ2IYZI2YCV7...|   200|  im|
|http://www.stopkl...|A7INTAJ2IYZI2YCV7...|   200|  im|
|http://www.stopkl...|A7INTAJ2IYZI2YCV7...|   200|  im|
|http://www.stopkl...|7H3SNEXZ5GTSJXCBL...|   200|  im|
|http://www.stopkl...|VHX3XTMT47JUSZA3I...|   200|  im|
|http://www.stopkl...|VHX3XTMT47JUSZA3I...|   200|  im|
|http://www.stopkl...|73XLK7LCKIRK24HK7...|   200|  im|
|http://www.stopkl...|73XLK7LCKIRK24HK7...|   200|  im|
|http://www.stopkl...|3R63VZAJO22ILAFRV...|   200|  im|
|http://www.stopkl...|W2X7RI5SYM7MFKUO7...|   200|  im|
|http://www.stopkl...|EICU253YO7TPFAGTQ...|   200|  im|
|http://www.stopkl...|INNKIN2JHJ3YYQPJP...|   200|  im|
|http://www.stopkl...|INNKIN2JHJ3YYQPJP...|   20

In [8]:
// 5. srednia i mediana liczby wersji na pojedynczy URL
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.functions._

val mDu = df.groupBy("originalUrl").agg(count("digest") as "digests").orderBy(desc("digests")).cache()

In [9]:
mDu.show(false)

                                                                                +-------------------------------------------------------------------+-------+
|originalUrl                                                        |digests|
+-------------------------------------------------------------------+-------+
|http://www.rzeczpospolita.pl:80/gifs/rek1.gif                      |164076 |
|http://www.rzeczpospolita.pl:80/gifs/subheadm.gif                  |109050 |
|http://www.rzeczpospolita.pl:80/gifs/tlo.gif                       |69606  |
|http://www.rzeczpospolita.pl:80/gifs/tlopastyll.gif                |63126  |
|http://www.rzeczpospolita.pl:80/gifs/head.gif                      |61952  |
|http://img.wp.pl:80/pixel.gif                                      |49326  |
|http://www.wp.pl:80/robots.txt                                     |35122  |
|http://www.rzeczpospolita.pl:80/gifs/rek2.gif                      |30536  |
|http://of.pl:80/robots.txt                                 

In [10]:
// 5.1 mean
mDu.agg(avg("digests")).show()

+-----------------+
|     avg(digests)|
+-----------------+
|3.787504512485132|
+-----------------+



In [11]:
// 5.2 median
mDu.agg(expr("percentile(digests, 0.5)").as("median")).cache().show()

                                                                                +------+
|median|
+------+
|   2.0|
+------+



In [12]:

val vBu = df.groupBy("originalUrl").agg(countDistinct("digest") as "distDigests").cache()


In [14]:
// avg
vBu.agg(avg("distDigests") as "avgDigests").show()


+------------------+
|        avgDigests|
+------------------+
|1.2547012033555849|
+------------------+



In [15]:
vBu.agg(expr("percentile(distDigests, 0.5)").as("medDigests")).cache().show()

+----------+
|medDigests|
+----------+
|       1.0|
+----------+

