In [1]:
from operator import add
import re
from collections import OrderedDict
from operator import itemgetter 
import itertools
from pyspark.sql import SparkSession

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.251:7077") \
        .appName("Lecture1_Example5_common_crawl")\
        .config("spark.executor.cores",4)\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()


# RDD API 
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/06 16:39:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:

# (*/*) - out of memory›
# ~6.4mins for 39496 files. (...00000/)  (takes 1 minute with 40 partitions)
# ~5 secs for 10 files (...00000/0*) 
# ~20 secs for 11110 files (...00000/1*) 


rdd = spark_context.newAPIHadoopFile(
    "hdfs://192.168.2.251:9000/data/crawl/CC-MAIN-20230921073711-20230921103711-00000.warc.wet",
    "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
    "org.apache.hadoop.io.LongWritable",
    "org.apache.hadoop.io.Text",
    conf={"textinputformat.record.delimiter": "WARC/1.0"}
)\
.cache() # Keep this RDD in memory!

rdd.count()
# Only one job (previous .cache() did not trigger a job)

                                                                                

37543

In [4]:
rdd.take(3)
# [(line_number, partition)]

[(0, ''),
 (8,
  '\r\nWARC-Type: warcinfo\r\nWARC-Date: 2023-10-05T05:57:19Z\r\nWARC-Filename: CC-MAIN-20230921073711-20230921103711-00000.warc.wet.gz\r\nWARC-Record-ID: <urn:uuid:f89d0e85-fbff-49dc-b801-b5c18e3c6b6f>\r\nContent-Type: application/warc-fields\r\nContent-Length: 382\r\n\r\nSoftware-Info: ia-web-commons.1.1.10-SNAPSHOT-20230912101454\r\nExtracted-Date: Thu, 05 Oct 2023 05:57:19 GMT\r\nrobots: checked via crawler-commons 1.5-SNAPSHOT (https://github.com/crawler-commons/crawler-commons)\r\nisPartOf: CC-MAIN-2023-40\r\noperator: Common Crawl Admin (info@commoncrawl.org)\r\ndescription: Wide crawl of the web for September/October 2023\r\npublisher: Common Crawl\r\n\r\n\r\n\r\n'),
 (657,
  '\r\nWARC-Type: conversion\r\nWARC-Target-URI: http://0-50.ru/news/incident/2013-04-06/id_31240.html\r\nWARC-Date: 2023-09-21T09:40:08Z\r\nWARC-Record-ID: <urn:uuid:c2c722d3-682b-42be-b58b-7e95e7e74da7>\r\nWARC-Refers-To: <urn:uuid:60c59970-5c55-4336-805c-a84c0dae5be1>\r\nWARC-Block-Digest: 

In [5]:
rdd.getNumPartitions()

3

In [6]:
print(spark_context.uiWebUrl)

http://de1-spark-host-180:4040


In [7]:
## Example #1 - Filter by Top_level Domain and compute most common words ##

# Try .ac.uk, .ru, .se, .com
p = re.compile("WARC-Target-URI: \S+\.ac.uk", re.IGNORECASE)


# Note: .partition(..) returns a 3-tuple: the string before the separator (index 0), 
# the separotor (index 1), and the part of the string afterwards (index 2) -- which is the part we want.
all_words = rdd\
    .filter(lambda doc: bool(p.search(doc[1])))\
    .map(lambda web_text: web_text[1].partition("\r\n\r\n")[2])\
    .flatMap(lambda t: t.split(" "))\
    .flatMap(lambda w: w.split("\n"))\



all_words_and_count = all_words.map(lambda w: w.strip())\
    .map(lambda w: (w,1))


word_counts = all_words_and_count.reduceByKey(add)

print(word_counts.takeOrdered(60, key=lambda x: -x[1]))





[('and', 2053), ('the', 1526), ('of', 1353), ('to', 945), ('in', 645), ('a', 579), ('for', 576), ('with', 410), ('The', 350), ('on', 322), ('et', 318), ('al.', 310), ('Research', 271), ('is', 258), ('-', 230), ('at', 228), ('University', 227), ('', 213), ('Astronomy', 211), ('by', 208), ('&', 200), ('A.', 186), ('Planck', 184), ('article', 177), ('number:', 170), ('from', 168), ('your', 165), ('that', 162), ('Astrophysics', 159), ('R.', 158), ('P.', 156), ('you', 154), ('our', 153), ('are', 151), ('Ade,', 148), ('or', 146), ('us', 137), ('as', 133), ('pp.', 132), ('this', 131), ('International', 131), ('(BSc)', 130), ('be', 128), ('Centre', 124), ('(MSc)', 122), ('(BA)', 121), ('Study', 121), ('results.', 115), ('Presented', 112), ('have', 109), ('data', 107), ('we', 105), ('Search', 105), ('About', 102), ('will', 99), ('Our', 98), ('an', 97), ('A', 96), ('research', 92), ('not', 91)]


                                                                                

In [8]:
## Example #2 - Group by TLD and compute most common words for each ##

ex = "WARC-Type: conversion\
WARC-Target-URI: http://news.bbc.co.uk/2/hi/africa/3414345.stm\
WARC-Date: 2014-08-02T09:52:13Z"

p = re.compile("WARC-Target-URI: \S+\.([a-zA-Z]{2,3})/", re.IGNORECASE)
# print(p.search(ex).group(1))
# uk

def get_tld(content):
    match = p.search(content)
    if match is not None:
        return match.group(1)
    else:
        return None

# discard the line number
# partition() -- python function -- split on the first occurance, returns (before,split,after)
# filter out those with no TLD

    
words_by_tld_rdd = rdd\
    .map(lambda filename_content: filename_content[1])\
    .map(lambda content: (get_tld(content), content.partition("\r\n\r\n")[2]))\
    .filter(lambda tld_content: tld_content[0] is not None)\
    .flatMapValues(lambda words: words.split(" "))\
    .flatMapValues(lambda words: words.split("\n"))\
    .mapValues(lambda word: word.strip())
    #.take(10)

# print(words_by_tld_rdd.take(10))

tlds = words_by_tld_rdd.countByKey()
#print(tlds)

tlds = OrderedDict(sorted(tlds.items(), key = itemgetter(1), reverse = True))
# print(tlds)  

top_tlds = dict(itertools.islice(tlds.items(), 10))

# print(top_tlds)

print("Results:")

for tld in top_tlds:
    print(tld)
    top_words_for_tld = words_by_tld_rdd\
        .filter(lambda tld_word: tld_word[0] == tld)\
        .values()\
        .map(lambda w: (w,1))\
        .reduceByKey(add)\
        .takeOrdered(20, key=lambda x: -x[1])
    print(top_words_for_tld)

                                                                                

Results:
com


                                                                                

[('the', 220335), ('to', 188078), ('and', 175143), ('of', 139934), ('a', 133113), ('', 124568), ('de', 110352), ('-', 109227), ('in', 109175), ('for', 82326), ('&', 71260), ('is', 63088), ('on', 51796), ('|', 49527), ('with', 48613), ('you', 48457), ('your', 47019), ('The', 46503), ('by', 39249), ('that', 37446)]
org


                                                                                

[('the', 42521), ('', 41085), ('and', 31152), ('of', 29529), ('to', 27789), ('a', 19857), ('in', 18975), ('-', 13746), ('de', 12908), ('for', 12193), ('is', 9813), ('on', 8417), ('The', 7875), ('with', 6864), ('by', 6838), ('that', 6796), ('&', 6434), ('you', 5747), ('are', 5361), ('la', 5225)]
ru


                                                                                

[('', 46708), ('и', 39355), ('в', 29330), ('для', 19241), ('на', 19007), ('с', 13740), ('не', 9292), ('-', 8772), ('по', 8159), ('В', 6751), ('от', 6090), ('что', 4909), ('из', 4886), ('—', 4754), ('к', 4535), ('или', 4085), ('1', 3973), ('0', 3796), ('Поковка', 3576), ('–', 3536)]
de


                                                                                

[('', 60844), ('und', 22366), ('der', 14737), ('die', 13638), ('in', 11411), ('von', 10303), ('für', 9200), ('-', 8430), ('zu', 7638), ('Sie', 7068), ('&', 6482), ('mit', 6295), ('den', 5754), ('auf', 5267), ('im', 4671), ('ist', 4430), ('the', 4275), ('oder', 4257), ('–', 4005), ('€', 3912)]
net


                                                                                

[('the', 16736), ('and', 12402), ('to', 11561), ('a', 10194), ('of', 8711), ('-', 7827), ('de', 7653), ('in', 6991), ('', 6385), ('is', 4989), ('for', 4669), ('The', 4610), ('on', 4198), ('with', 3907), ('&', 3205), ('you', 2983), ('I', 2935), ('by', 2916), ('Tax', 2678), ('your', 2644)]
it


                                                                                

[('di', 20021), ('e', 12151), ('in', 6523), ('per', 6508), ('a', 5435), ('il', 5433), ('la', 5387), ('del', 4287), ('-', 4242), ('duplicato', 3995), ('che', 3655), ('un', 3103), ('', 3097), ('i', 3054), ('con', 3039), ('della', 2758), ('è', 2718), ('da', 2697), ('al', 2677), ('le', 2585)]
fr


                                                                                

[('de', 28078), ('et', 10725), ('la', 9572), ('à', 8735), ('des', 7441), ('les', 6579), ('le', 6510), ('en', 5452), (':', 5337), ('-', 5068), ('du', 4899), ('pour', 4526), ('sur', 3747), ('un', 3706), ('dans', 2735), ('une', 2733), ('vous', 2648), ('au', 2625), ('est', 2517), ('Le', 2504)]
hu


                                                                                

[('a', 28686), ('és', 18324), ('az', 14159), ('-', 14008), ('keresőoptimalizálás', 10352), ('A', 8692), ('keresőoptimalizálás,', 7702), ('havidíjas', 7145), ('weboldal', 6387), ('hogy', 6327), ('Az', 6155), ('Budapest', 4858), ('Weboldal', 4328), ('készítés', 4276), ('Google', 3726), ('online', 3506), ('belső', 3446), ('átmérő', 3379), ('bérelhető', 3168), ('Komplex', 3040)]
pl


                                                                                

[('i', 12245), ('w', 9541), ('do', 8561), ('na', 6515), ('z', 6337), ('-', 4496), ('się', 3714), ('to', 2600), ('–', 2573), ('o', 2102), ('', 2086), ('nie', 1962), ('jest', 1938), ('dla', 1768), ('zł', 1682), ('W', 1502), ('(1)', 1339), ('the', 1301), ('od', 1260), ('a', 1258)]
es




[('de', 32364), ('la', 11097), ('en', 10436), ('y', 9830), ('el', 7870), ('a', 6682), ('que', 6635), ('los', 5639), ('del', 5437), ('para', 4348), ('las', 3938), ('se', 3421), ('con', 3086), ('por', 2987), ('o', 2961), ('un', 2250), ('una', 2063), ('al', 1907), ('su', 1720), ('-', 1636)]


                                                                                

In [8]:
spark_session.stop()