In [1]:
from pyspark.sql import SparkSession
from pyspark import StorageLevel

In [2]:
# We create a spark session and use the spark context within the spark session because we want both dataframes and rdds
# 
ss = SparkSession.builder.appName("DataProc").config("spark.executor.cores", "24").config("spark.driver.memory", "32g").config("spark.executor.memory", "56g").config("spark.executor.memoryOverhead", "32g").getOrCreate()
sc = ss.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/07 17:37:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# we load up an rdd from the paths text file
paths_rdd = sc.textFile("data/wet.paths") 

In [4]:
paths_rdd.count()

                                                                                

100000

In [5]:
# we take approximately sample_size paths from the samples rdd. path contains like 30k crawls so we don't want huge samples
sample_size = 100
sampled_paths = sc.parallelize(paths_rdd.takeSample(withReplacement=False, num=sample_size, seed=0))
# the above example isn't exactly the most optimal sampling technique, but it allows us to be exact with sample size
# also the strings aren't very big, and there isn't a ton of data, so this doesn't really cost us much time

In [6]:
sampled_paths.count()

100

In [7]:
# this is just something we need to prepend to every path so we can create it into a link to download
url_head = "https://data.commoncrawl.org/" 

In [8]:
# now we have an rdd of just urls 
urls_rdd = sampled_paths.map(lambda x: url_head + x) 

In [9]:
urls_rdd.take(1)

['https://data.commoncrawl.org/crawl-data/CC-MAIN-2025-47/segments/1762439342611.65/wet/CC-MAIN-20251108144350-20251108174350-00626.warc.wet.gz']

In [10]:
# we use requests to get a http response from the common crawl server
def get_request(url): 
    import requests
    import random
    import time
    
    jitter = random.random()
    time.sleep(jitter)
    response = requests.get(url, stream=True) 
    if response.status_code == 200: 
        return response 
    else:
        # Here we implement an exponential back off strategy to make sure we're not overloading the server
        # Here we wait for a total of (2^11)/10 = 204.8 seconds or about 3.5 minutes max to get a response from the server
        # So total wait time can be a bit over 5 minutes
        for i in range(12):
            jitter = random.random()
            time.sleep((2**i + jitter)/10)
            response = requests.get(url, stream=True)
            if response.status_code == 200: 
                # Request was successful 
                return response 
        return None 

In [11]:
# This is to have our responses RDD be filtered so we don't have any Nonetypes
responses_rdd = urls_rdd.map(get_request).persist(StorageLevel.MEMORY_AND_DISK).filter(lambda x: x is not None)

In [12]:
# we convert the responses into gzip files in memory 
def get_zipped(response): 
    import io 
    return io.BytesIO(response.content) 

In [13]:
zipped_files = responses_rdd.map(get_zipped).persist(StorageLevel.MEMORY_AND_DISK) 
responses_rdd.unpersist()

PythonRDD[10] at RDD at PythonRDD.scala:53

In [14]:
zipped_files.take(1) 

25/12/07 17:37:28 WARN BlockManager: Task 31 already completed, not releasing lock for rdd_9_0
                                                                                

[<_io.BytesIO at 0x150e6c3c7c70>]

In [15]:
# we unzip the zipped files also within memory 
def unzip(zipped): 
    import gzip 
    with gzip.GzipFile(fileobj=zipped) as decompressed_file: 
        return decompressed_file.read()

In [16]:
unzipped_rdd = zipped_files.map(unzip).persist(StorageLevel.MEMORY_AND_DISK) 
zipped_files.unpersist()

PythonRDD[9] at RDD at PythonRDD.scala:53

In [17]:
type(unzipped_rdd.take(1))

25/12/07 17:37:35 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
25/12/07 17:37:38 WARN BlockManager: Task 32 already completed, not releasing lock for rdd_12_0
                                                                                

list

In [18]:
# we convert the unzipped bytes into usable strings 
str_rdd = unzipped_rdd.map(lambda x: x.decode("utf-8"))
unzipped_rdd.unpersist()
str_rdd.persist(StorageLevel.MEMORY_AND_DISK)

PythonRDD[14] at RDD at PythonRDD.scala:53

In [19]:
str_rdd.count()

                                                                                

100

In [20]:

#

In [21]:
#str_rdd.is_cached
str_rdd.unpersist()

PythonRDD[14] at RDD at PythonRDD.scala:53

In [None]:
# as you can see this is a plaintext snippet of the plaintext WET file. we need to process this string rdd to do our analysis. 
# we need to strip away the description data and access just the text. ideally we get rid of line breaks and stuff like that too. 
# then throw it into language detection, lda, and so on 
print(str_rdd.take(1)[0][:10000]) 

In [None]:
# Here we look at the raw string and see where to split it 
str_rdd.take(1)[0][:10000] 

In [23]:
# Since there is a \n\r\n between every double line break, we split it accordingly 
# We also get rid of the first 3 chunks as it essentially is just WET file metadata that exists per file; we're only interested in per-site data
split_rdd = str_rdd.flatMap(lambda x: x.split("\n\r\n")[3:])
split_rdd.persist(StorageLevel.MEMORY_AND_DISK)
split_rdd.getStorageLevel()

StorageLevel(True, True, False, False, 1)

In [24]:
list_of_chunks = split_rdd.take(12)

25/12/07 17:39:06 WARN BlockManager: Task 57 already completed, not releasing lock for rdd_17_0
                                                                                

In [25]:
list_of_chunks[0] 

'WARC/1.0\r\nWARC-Type: conversion\r\nWARC-Target-URI: http://05xpp.com/\r\nWARC-Date: 2025-11-08T15:53:36Z\r\nWARC-Record-ID: <urn:uuid:2f166469-bf5a-46d3-89b9-9f64e5fa927f>\r\nWARC-Refers-To: <urn:uuid:722e0d41-d6d3-47cd-9a16-cf6f214439ee>\r\nWARC-Block-Digest: sha1:BFJ2KPUO7F24BQMLD52CYBHJUTAZFFY3\r\nWARC-Identified-Content-Language: zho\r\nContent-Type: text/plain\r\nContent-Length: 7952\r'

In [26]:
print((split_rdd.count())//2) 



2266125


                                                                                

In [None]:
type_rdd = str_rdd.map(lambda x: str(type(x))) 

In [None]:
type_rdd.take(1)

In [None]:
split_rdd = str_rdd.map(lambda x: x[0].split("\n\r\n")) 

In [35]:
listed = str_rdd.take(1)[0].split("\n\r\n") 

                                                                                

In [3]:
ss.stop()