## Analyze Common Crawl Data with PySpark

In [2]:
# Import required modules
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

# Get SparkContext
sc = spark.sparkContext

In [3]:
# Read Domains CSV File into an RDD
common_crawl_domain_counts = sc.textFile('./crawl/cc-main-limited-domains.csv')

# Display first few domains from the RDD
common_crawl_domain_counts.take(10)

['367855\t172-in-addr\tarpa\t1',
 '367856\taddr\tarpa\t1',
 '367857\tamphic\tarpa\t1',
 '367858\tbeta\tarpa\t1',
 '367859\tcallic\tarpa\t1',
 '367860\tch\tarpa\t1',
 '367861\td\tarpa\t1',
 '367862\thome\tarpa\t7',
 '367863\tiana\tarpa\t1',
 '367907\tlocal\tarpa\t1']

In [4]:
def fmt_domain_graph_entry(entry):
    """
    Formats a Common Crawl domain graph entry. Extracts the site_id, 
    top-level domain (tld), domain name, and subdomain count as seperate items.
    """

    # Split the entry on delimiter ('\t') into site_id, domain, tld, and num_subdomains
    site_id, domain, tld, num_subdomains = entry.split('\t')        
    return int(site_id), domain, tld, int(num_subdomains)

In [5]:
# Apply `fmt_domain_graph_entry` to the raw data RDD
formatted_host_counts = common_crawl_domain_counts.map(fmt_domain_graph_entry)

# Display the first few entries of the new RDD
formatted_host_counts.take(10)

[(367855, '172-in-addr', 'arpa', 1),
 (367856, 'addr', 'arpa', 1),
 (367857, 'amphic', 'arpa', 1),
 (367858, 'beta', 'arpa', 1),
 (367859, 'callic', 'arpa', 1),
 (367860, 'ch', 'arpa', 1),
 (367861, 'd', 'arpa', 1),
 (367862, 'home', 'arpa', 7),
 (367863, 'iana', 'arpa', 1),
 (367907, 'local', 'arpa', 1)]

In [6]:
def extract_subdomain_counts(entry):
    """
    Extract the subdomain count from a Common Crawl domain graph entry.
    """
    
    # Split the entry on delimiter ('\t') into site_id, domain, tld, and num_subdomains
    site_id, domain, tld, num_subdomains = entry.split('\t')
    
    # return ONLY the num_subdomains
    return int(num_subdomains)


# Apply `extract_subdomain_counts` to the raw data RDD
host_counts = common_crawl_domain_counts.map(extract_subdomain_counts)

# Display the first few entries
host_counts.take(10)

[1, 1, 1, 1, 1, 1, 1, 7, 1, 1]

In [7]:
# Reduce the RDD to a single value, the sum of subdomains, with a lambda function
# as the reduce function
total_host_counts = host_counts.reduce(lambda x,y: x+y )

# Display result count
total_host_counts

595466

In [8]:
# Stop the sparkContext and the SparkSession
spark.stop()

## Exploring Domain Counts with PySpark DataFrames and SQL

In [9]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession.builder.getOrCreate()

In [10]:
# Read the target file into a DataFrame
common_crawl = spark.read.options(delimiter='\t', inferSchema='True').csv("./crawl/cc-main-limited-domains.csv")


# Display the DataFrame to the notebook
common_crawl.show()

+--------+--------------+----+---+
|     _c0|           _c1| _c2|_c3|
+--------+--------------+----+---+
|  367855|   172-in-addr|arpa|  1|
|  367856|          addr|arpa|  1|
|  367857|        amphic|arpa|  1|
|  367858|          beta|arpa|  1|
|  367859|        callic|arpa|  1|
|  367860|            ch|arpa|  1|
|  367861|             d|arpa|  1|
|  367862|          home|arpa|  7|
|  367863|          iana|arpa|  1|
|  367907|         local|arpa|  1|
|  367908|           nic|arpa|  1|
|48987160|     1-20media|coop|  1|
|48987161|           134|coop|  1|
|48987162|            19|coop|  4|
|48987163|   1strochdale|coop|  1|
|48987164|          2012|coop|  4|
|48987165|2012intlsummit|coop|  1|
|48987166|      2147mans|coop|  1|
|48987167|   21stcentury|coop|  1|
|48987168|   2decologico|coop|  1|
+--------+--------------+----+---+
only showing top 20 rows



In [11]:
# Rename the DataFrame's columns with `withColumnRenamed()`
common_crawl = common_crawl.withColumnRenamed("_c0", "site_id") \
        .withColumnRenamed("_c1", "domain") \
        .withColumnRenamed("_c2", "top_level_domain") \
        .withColumnRenamed("_c3", "num_subdomains")
  
# Display the first few rows of the DataFrame and the new schema
common_crawl.show()

+--------+--------------+----------------+--------------+
| site_id|        domain|top_level_domain|num_subdomains|
+--------+--------------+----------------+--------------+
|  367855|   172-in-addr|            arpa|             1|
|  367856|          addr|            arpa|             1|
|  367857|        amphic|            arpa|             1|
|  367858|          beta|            arpa|             1|
|  367859|        callic|            arpa|             1|
|  367860|            ch|            arpa|             1|
|  367861|             d|            arpa|             1|
|  367862|          home|            arpa|             7|
|  367863|          iana|            arpa|             1|
|  367907|         local|            arpa|             1|
|  367908|           nic|            arpa|             1|
|48987160|     1-20media|            coop|             1|
|48987161|           134|            coop|             1|
|48987162|            19|            coop|             4|
|48987163|   1

## Reading and Writing Datasets to Disk

In [12]:
# Save the `common_crawl` DataFrame to a series of parquet files
common_crawl.write.parquet("./results/common_crawl/")

In [16]:
# Read from parquet directory
common_crawl_domains = spark.read.parquet("./results/common_crawl/")

# Display the first few rows of the DataFrame and the schema
common_crawl_domains.show()
common_crawl_domains.printSchema()

+--------+--------------+----------------+--------------+
| site_id|        domain|top_level_domain|num_subdomains|
+--------+--------------+----------------+--------------+
|  367855|   172-in-addr|            arpa|             1|
|  367856|          addr|            arpa|             1|
|  367857|        amphic|            arpa|             1|
|  367858|          beta|            arpa|             1|
|  367859|        callic|            arpa|             1|
|  367860|            ch|            arpa|             1|
|  367861|             d|            arpa|             1|
|  367862|          home|            arpa|             7|
|  367863|          iana|            arpa|             1|
|  367907|         local|            arpa|             1|
|  367908|           nic|            arpa|             1|
|48987160|     1-20media|            coop|             1|
|48987161|           134|            coop|             1|
|48987162|            19|            coop|             4|
|48987163|   1

## Querying Domain Counts with PySpark DataFrames and SQL

In [17]:
# Create a temporary view in the metadata for this `SparkSession`
common_crawl_domains.createOrReplaceTempView("crawl")

In [34]:
# Aggregate the DataFrame using DataFrame methods
common_crawl_domains_2 = common_crawl_domains.groupBy("top_level_domain").sum("num_subdomains").orderBy("sum(num_subdomains)", ascending=False)
common_crawl_domains_2.show()

+----------------+-------------------+
|top_level_domain|sum(num_subdomains)|
+----------------+-------------------+
|             edu|             484438|
|             gov|              85354|
|          travel|              10768|
|            coop|               8683|
|            jobs|               6023|
|            post|                143|
|             map|                 40|
|            arpa|                 17|
+----------------+-------------------+



In [21]:
# Aggregate the DataFrame using SQL
query = """ SELECT top_level_domain,SUM(num_subdomains) FROM crawl GROUP BY top_level_domain ORDER BY SUM(num_subdomains) DESC"""
spark.sql(query).show()

+----------------+-------------------+
|top_level_domain|sum(num_subdomains)|
+----------------+-------------------+
|             edu|             484438|
|             gov|              85354|
|          travel|              10768|
|            coop|               8683|
|            jobs|               6023|
|            post|                143|
|             map|                 40|
|            arpa|                 17|
+----------------+-------------------+



In [43]:
# Filter the DataFrame using DataFrame Methods
common_crawl_domains_3 = common_crawl_domains.filter((common_crawl_domains.domain == "nps") & (common_crawl_domains.top_level_domain == "gov")).select("top_level_domain", "num_subdomains")
common_crawl_domains_3.show()

+----------------+--------------+
|top_level_domain|num_subdomains|
+----------------+--------------+
|             gov|           178|
+----------------+--------------+



In [44]:
# Filter the DataFrame using SQL
query = """ SELECT top_level_domain, num_subdomains
FROM crawl
WHERE domain = 'nps' AND top_level_domain = 'gov'
"""
spark.sql(query).show()

+----------------+--------------+
|top_level_domain|num_subdomains|
+----------------+--------------+
|             gov|           178|
+----------------+--------------+



In [45]:
# Stop the notebook's `SparkSession` and `sparkContext`
spark.stop()