In [None]:
from operator import add
import re
from collections import OrderedDict
from operator import itemgetter 
import itertools
from pyspark.sql import SparkSession

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.35:7077") \
        .appName("Lecture1_Example5_common_crawl")\
        .config("spark.executor.cores",4)\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()


# RDD API 
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

In [None]:

# (*/*) - out of memory›
# ~6.4mins for 39496 files. (...00000/)  (takes 1 minute with 40 partitions)
# ~5 secs for 10 files (...00000/0*) 
# ~20 secs for 11110 files (...00000/1*) 


rdd = spark_context.newAPIHadoopFile(
    'hdfs://192.168.2.35:9000/data/crawl/CC-MAIN-20231128083443-20231128113443-00000.warc.wet',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': 'WARC/1.0'}
)\
.cache() # Keep this RDD in memory!

rdd.count()
# Only one job (previous .cache() did not trigger a job)

In [None]:
rdd.take(3)
# [(line_number, partition)]

In [None]:
rdd.getNumPartitions()

In [None]:
print(spark_context.uiWebUrl)

In [None]:
## Example #1 - Filter by Top_level Domain and compute most common words ##

# Try .ac.uk, .ru, .se, .com
p = re.compile('WARC-Target-URI: \S+\.ac.uk', re.IGNORECASE)


# Note: .partition(..) returns a 3-tuple: the string before the separator (index 0), 
# the separotor (index 1), and the part of the string afterwards (index 2) -- which is the part we want.
all_words = rdd\
    .filter(lambda doc: bool(p.search(doc[1])))\
    .map(lambda web_text: web_text[1].partition('\r\n\r\n')[2])\
    .flatMap(lambda t: t.split(' '))\
    .flatMap(lambda w: w.split('\n'))\



all_words_and_count = all_words.map(lambda w: w.strip())\
    .map(lambda w: (w,1))


word_counts = all_words_and_count.reduceByKey(add)

print(word_counts.takeOrdered(60, key=lambda x: -x[1]))



In [None]:
## Example #2 - Group by TLD and compute most common words for each ##

ex = "WARC-Type: conversion\
WARC-Target-URI: http://news.bbc.co.uk/2/hi/africa/3414345.stm\
WARC-Date: 2014-08-02T09:52:13Z"

p = re.compile('WARC-Target-URI: \S+\.([a-zA-Z]{2,3})/', re.IGNORECASE)
# print(p.search(ex).group(1))
# uk

def get_tld(content):
    match = p.search(content)
    if match is not None:
        return match.group(1)
    else:
        return None

# discard the line number
# partition() -- python function -- split on the first occurance, returns (before,split,after)
# filter out those with no TLD

    
words_by_tld_rdd = rdd\
    .map(lambda filename_content: filename_content[1])\
    .map(lambda content: (get_tld(content), content.partition('\r\n\r\n')[2]))\
    .filter(lambda tld_content: tld_content[0] is not None)\
    .flatMapValues(lambda words: words.split(' '))\
    .flatMapValues(lambda words: words.split('\n'))\
    .mapValues(lambda word: word.strip())
    #.take(10)

# print(words_by_tld_rdd.take(10))

tlds = words_by_tld_rdd.countByKey()
#print(tlds)

tlds = OrderedDict(sorted(tlds.items(), key = itemgetter(1), reverse = True))
# print(tlds)  

top_tlds = dict(itertools.islice(tlds.items(), 10))

# print(top_tlds)

print("Results:")

for tld in top_tlds:
    print(tld)
    top_words_for_tld = words_by_tld_rdd\
        .filter(lambda tld_word: tld_word[0] == tld)\
        .values()\
        .map(lambda w: (w,1))\
        .reduceByKey(add)\
        .takeOrdered(20, key=lambda x: -x[1])
    print(top_words_for_tld)

In [8]:
spark_session.stop()