# PySpark and Big Data Project

Analyzing Common Crawl Data with RDDs

Initialize a new Spark Context and read in the domain graph as an RDD.

In [1]:
# Import required modules
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession.builder.getOrCreate()

# Get SparkContext
sc = spark.sparkContext

# Configurar el nivel de registro a un valor diferente (por ejemplo, "INFO")
sc.setLogLevel("INFO")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/04 12:20:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Read Domains CSV File into an RDD
file_path = 'data/cc-main-limited-domains.csv'
common_crawl_domain_counts = sc.textFile(file_path)

# Display first few domains from the RDD
common_crawl_domain_counts.take(5)

23/09/04 12:20:58 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 221.5 KiB, free 434.2 MiB)
23/09/04 12:20:59 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 32.6 KiB, free 434.2 MiB)
23/09/04 12:20:59 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.123:52281 (size: 32.6 KiB, free: 434.4 MiB)
23/09/04 12:20:59 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:0
23/09/04 12:20:59 INFO FileInputFormat: Total input files to process : 1
23/09/04 12:20:59 INFO SparkContext: Starting job: runJob at PythonRDD.scala:179
23/09/04 12:20:59 INFO DAGScheduler: Got job 0 (runJob at PythonRDD.scala:179) with 1 output partitions
23/09/04 12:20:59 INFO DAGScheduler: Final stage: ResultStage 0 (runJob at PythonRDD.scala:179)
23/09/04 12:20:59 INFO DAGScheduler: Parents of final stage: List()
23/09/04 12:20:59 INFO DAGScheduler: Missing parents: List()
23/09/04 12:20:59 INFO DA

['367855\t172-in-addr\tarpa\t1',
 '367856\taddr\tarpa\t1',
 '367857\tamphic\tarpa\t1',
 '367858\tbeta\tarpa\t1',
 '367859\tcallic\tarpa\t1']

Applying `fmt_domain_graph_entry` over `common_crawl_domain_counts` and saveing the result as a new RDD named `formatted_host_counts`.

In [3]:
def fmt_domain_graph_entry(entry):
    """
    Formats a Common Crawl domain graph entry. Extracts the site_id, 
    top-level domain (tld), domain name, and subdomain count as seperate items.
    """

    # Split the entry on delimiter ('\t') into site_id, domain, tld, and num_subdomains
    site_id, domain, tld, num_subdomains = entry.split('\t')        
    return int(site_id), domain, tld, int(num_subdomains)

# Apply `fmt_domain_graph_entry` to the raw data RDD
formatted_host_counts = common_crawl_domain_counts.map(fmt_domain_graph_entry)

# Display the first few entries of the new RDD
formatted_host_counts.take(10)

23/09/04 12:21:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:179
23/09/04 12:21:00 INFO DAGScheduler: Got job 1 (runJob at PythonRDD.scala:179) with 1 output partitions
23/09/04 12:21:00 INFO DAGScheduler: Final stage: ResultStage 1 (runJob at PythonRDD.scala:179)
23/09/04 12:21:00 INFO DAGScheduler: Parents of final stage: List()
23/09/04 12:21:00 INFO DAGScheduler: Missing parents: List()
23/09/04 12:21:00 INFO DAGScheduler: Submitting ResultStage 1 (PythonRDD[3] at RDD at PythonRDD.scala:53), which has no missing parents
23/09/04 12:21:00 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 8.8 KiB, free 434.1 MiB)
23/09/04 12:21:00 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 5.5 KiB, free 434.1 MiB)
23/09/04 12:21:00 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.123:52281 (size: 5.5 KiB, free: 434.4 MiB)
23/09/04 12:21:00 INFO SparkContext: Created broadcast 2 from broadcast 

[(367855, '172-in-addr', 'arpa', 1),
 (367856, 'addr', 'arpa', 1),
 (367857, 'amphic', 'arpa', 1),
 (367858, 'beta', 'arpa', 1),
 (367859, 'callic', 'arpa', 1),
 (367860, 'ch', 'arpa', 1),
 (367861, 'd', 'arpa', 1),
 (367862, 'home', 'arpa', 7),
 (367863, 'iana', 'arpa', 1),
 (367907, 'local', 'arpa', 1)]

Applying `extract_subdomain_counts` over `common_crawl_domain_counts` and saving the result as a new RDD named `host_counts`.

In [4]:
def extract_subdomain_counts(entry):
    """
    Extract the subdomain count from a Common Crawl domain graph entry.
    """
    
    # Split the entry on delimiter ('\t') into site_id, domain, tld, and num_subdomains
    site_id, domain, tld, num_subdomains = entry.split('\t')
    
    # return ONLY the num_subdomains
    return int(num_subdomains)


# Apply `extract_subdomain_counts` to the raw data RDD
host_counts = common_crawl_domain_counts.map(extract_subdomain_counts)

# Display the first few entries
host_counts.take(10)

23/09/04 12:21:01 INFO SparkContext: Starting job: runJob at PythonRDD.scala:179
23/09/04 12:21:01 INFO DAGScheduler: Got job 2 (runJob at PythonRDD.scala:179) with 1 output partitions
23/09/04 12:21:01 INFO DAGScheduler: Final stage: ResultStage 2 (runJob at PythonRDD.scala:179)
23/09/04 12:21:01 INFO DAGScheduler: Parents of final stage: List()
23/09/04 12:21:01 INFO DAGScheduler: Missing parents: List()
23/09/04 12:21:01 INFO DAGScheduler: Submitting ResultStage 2 (PythonRDD[4] at RDD at PythonRDD.scala:53), which has no missing parents
23/09/04 12:21:01 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 8.7 KiB, free 434.1 MiB)
23/09/04 12:21:01 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 5.4 KiB, free 434.1 MiB)
23/09/04 12:21:01 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.1.123:52281 (size: 5.4 KiB, free: 434.4 MiB)
23/09/04 12:21:01 INFO SparkContext: Created broadcast 3 from broadcast 

[1, 1, 1, 1, 1, 1, 1, 7, 1, 1]

Using `host_counts`, calculate the total number of subdomains across all domains in the dataset, save the result to a variable named `total_host_counts`.

In [5]:
# Reduce the RDD to a single value, the sum of subdomains, with a lambda function as the reduce function
total_host_counts = host_counts.reduce(lambda a,b: a+b)

# Display result count
print(total_host_counts)

23/09/04 12:21:01 INFO SparkContext: Starting job: reduce at /var/folders/j0/7nr6h2fd7k18y1yvw_s7dm540000gn/T/ipykernel_7216/3125385089.py:2
23/09/04 12:21:01 INFO DAGScheduler: Got job 3 (reduce at /var/folders/j0/7nr6h2fd7k18y1yvw_s7dm540000gn/T/ipykernel_7216/3125385089.py:2) with 2 output partitions
23/09/04 12:21:01 INFO DAGScheduler: Final stage: ResultStage 3 (reduce at /var/folders/j0/7nr6h2fd7k18y1yvw_s7dm540000gn/T/ipykernel_7216/3125385089.py:2)
23/09/04 12:21:01 INFO DAGScheduler: Parents of final stage: List()
23/09/04 12:21:01 INFO DAGScheduler: Missing parents: List()
23/09/04 12:21:01 INFO DAGScheduler: Submitting ResultStage 3 (PythonRDD[5] at reduce at /var/folders/j0/7nr6h2fd7k18y1yvw_s7dm540000gn/T/ipykernel_7216/3125385089.py:2), which has no missing parents
23/09/04 12:21:01 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 9.3 KiB, free 434.1 MiB)
23/09/04 12:21:01 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (

595466


23/09/04 12:21:01 INFO PythonRunner: Times: total = 205, boot = 9, init = 146, finish = 50
23/09/04 12:21:01 INFO PythonRunner: Times: total = 211, boot = 8, init = 148, finish = 55
23/09/04 12:21:01 INFO Executor: Finished task 1.0 in stage 3.0 (TID 4). 1412 bytes result sent to driver
23/09/04 12:21:01 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 1498 bytes result sent to driver
23/09/04 12:21:01 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 4) in 237 ms on 192.168.1.123 (executor driver) (1/2)
23/09/04 12:21:01 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 238 ms on 192.168.1.123 (executor driver) (2/2)
23/09/04 12:21:01 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
23/09/04 12:21:01 INFO DAGScheduler: ResultStage 3 (reduce at /var/folders/j0/7nr6h2fd7k18y1yvw_s7dm540000gn/T/ipykernel_7216/3125385089.py:2) finished in 0.256 s
23/09/04 12:21:01 INFO DAGScheduler: Job 3 is finished. Cancelling potential sp

Stop the current `SparkSession` and `sparkContext` before moving on to analyze the data with SparkSQL

In [6]:
# Stop the sparkContext and the SparkSession
spark.stop()

23/09/04 12:21:01 INFO SparkContext: SparkContext is stopping with exitCode 0.
23/09/04 12:21:01 INFO SparkUI: Stopped Spark web UI at http://192.168.1.123:4040
23/09/04 12:21:01 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/09/04 12:21:01 INFO MemoryStore: MemoryStore cleared
23/09/04 12:21:01 INFO BlockManager: BlockManager stopped
23/09/04 12:21:01 INFO BlockManagerMaster: BlockManagerMaster stopped
23/09/04 12:21:01 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/09/04 12:21:01 INFO SparkContext: Successfully stopped SparkContext


## Exploring Domain Counts with PySpark DataFrames and SQL

Create a new `SparkSession` and assign it to a variable named `spark`.

In [7]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

# Read the target file into a DataFrame
common_crawl = spark.read\
.option('delimiter', '\t')\
.option('inferSchema', True)\
.csv(file_path)


# Display the DataFrame to the notebook
common_crawl.show(5, truncate = False)

23/09/04 12:21:02 INFO SparkContext: Running Spark version 3.4.1
23/09/04 12:21:02 INFO ResourceUtils: No custom resources configured for spark.driver.
23/09/04 12:21:02 INFO SparkContext: Submitted application: pyspark-shell
23/09/04 12:21:02 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/09/04 12:21:02 INFO ResourceProfile: Limiting resource is cpu
23/09/04 12:21:02 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/09/04 12:21:02 INFO SecurityManager: Changing view acls to: victorurdaneta
23/09/04 12:21:02 INFO SecurityManager: Changing modify acls to: victorurdaneta
23/09/04 12:21:02 INFO SecurityManager: Changing view acls groups to: 
23/09/04 12:21:02 INFO SecurityManager: Changing modify acls groups to: 
23/09/04 12:2

+------+-----------+----+---+
|_c0   |_c1        |_c2 |_c3|
+------+-----------+----+---+
|367855|172-in-addr|arpa|1  |
|367856|addr       |arpa|1  |
|367857|amphic     |arpa|1  |
|367858|beta       |arpa|1  |
|367859|callic     |arpa|1  |
+------+-----------+----+---+
only showing top 5 rows



23/09/04 12:21:07 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
23/09/04 12:21:07 INFO DAGScheduler: Got job 2 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
23/09/04 12:21:07 INFO DAGScheduler: Final stage: ResultStage 2 (showString at NativeMethodAccessorImpl.java:0)
23/09/04 12:21:07 INFO DAGScheduler: Parents of final stage: List()
23/09/04 12:21:07 INFO DAGScheduler: Missing parents: List()
23/09/04 12:21:07 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[13] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
23/09/04 12:21:07 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 15.0 KiB, free 433.9 MiB)
23/09/04 12:21:07 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 7.4 KiB, free 433.9 MiB)
23/09/04 12:21:07 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.1.123:52300 (size: 7.4 KiB, free: 434.3 

Rename the DataFrame's columns to the following: 

- site_id
- domain
- top_level_domain
- num_subdomains

In [8]:
# Rename the DataFrame's columns with `withColumnRenamed()`
common_crawl =common_crawl\
.withColumnRenamed("_c0", "site_id")\
.withColumnRenamed("_c1", "domain")\
.withColumnRenamed("_c2", "top_level_domain")\
.withColumnRenamed("_c3", "num_subdomains")\

# Display the first few rows of the DataFrame and the new schema
common_crawl.show(5, truncate = False)


+-------+-----------+----------------+--------------+
|site_id|domain     |top_level_domain|num_subdomains|
+-------+-----------+----------------+--------------+
|367855 |172-in-addr|arpa            |1             |
|367856 |addr       |arpa            |1             |
|367857 |amphic     |arpa            |1             |
|367858 |beta       |arpa            |1             |
|367859 |callic     |arpa            |1             |
+-------+-----------+----------------+--------------+
only showing top 5 rows



23/09/04 12:21:08 INFO FileSourceStrategy: Pushed Filters: 
23/09/04 12:21:08 INFO FileSourceStrategy: Post-Scan Filters: 
23/09/04 12:21:08 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 199.6 KiB, free 434.0 MiB)
23/09/04 12:21:08 INFO BlockManagerInfo: Removed broadcast_5_piece0 on 192.168.1.123:52300 in memory (size: 7.4 KiB, free: 434.4 MiB)
23/09/04 12:21:08 INFO BlockManagerInfo: Removed broadcast_4_piece0 on 192.168.1.123:52300 in memory (size: 34.5 KiB, free: 434.4 MiB)
23/09/04 12:21:08 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 34.5 KiB, free 434.2 MiB)
23/09/04 12:21:08 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 192.168.1.123:52300 (size: 34.5 KiB, free: 434.4 MiB)
23/09/04 12:21:08 INFO SparkContext: Created broadcast 6 from showString at NativeMethodAccessorImpl.java:0
23/09/04 12:21:08 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is c

## Reading and Writing Datasets to Disk

Saveing, reading and displaying the `common_crawl` DataFrame as parquet files in a directory called `./results/common_crawl/`.

In [9]:
# Save the `common_crawl` DataFrame to a series of parquet files
common_crawl.write.parquet('./results/common_crawl/', mode="overwrite")

# Read from parquet directory
common_crawl_domains = spark.read.parquet('./results/common_crawl/')

# Display the first few rows of the DataFrame and the schema
common_crawl_domains.show(5,truncate=False)

23/09/04 12:21:08 INFO FileSourceStrategy: Pushed Filters: 
23/09/04 12:21:08 INFO FileSourceStrategy: Post-Scan Filters: 
23/09/04 12:21:08 INFO ParquetUtils: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
23/09/04 12:21:08 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/09/04 12:21:08 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/09/04 12:21:08 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
23/09/04 12:21:08 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/09/04 12:21:08 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/09/04 12:21:08 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.parquet.ha

+-------+-----------+----------------+--------------+
|site_id|domain     |top_level_domain|num_subdomains|
+-------+-----------+----------------+--------------+
|367855 |172-in-addr|arpa            |1             |
|367856 |addr       |arpa            |1             |
|367857 |amphic     |arpa            |1             |
|367858 |beta       |arpa            |1             |
|367859 |callic     |arpa            |1             |
+-------+-----------+----------------+--------------+
only showing top 5 rows



23/09/04 12:21:10 INFO CodecPool: Got brand-new decompressor [.snappy]
23/09/04 12:21:10 INFO Executor: Finished task 0.0 in stage 6.0 (TID 6). 1970 bytes result sent to driver
23/09/04 12:21:10 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 6) in 116 ms on 192.168.1.123 (executor driver) (1/1)
23/09/04 12:21:10 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 
23/09/04 12:21:10 INFO DAGScheduler: ResultStage 6 (showString at NativeMethodAccessorImpl.java:0) finished in 0.132 s
23/09/04 12:21:10 INFO DAGScheduler: Job 6 is finished. Cancelling potential speculative or zombie tasks for this job
23/09/04 12:21:10 INFO TaskSchedulerImpl: Killing all running tasks in stage 6: Stage finished
23/09/04 12:21:10 INFO DAGScheduler: Job 6 finished: showString at NativeMethodAccessorImpl.java:0, took 0.137047 s


## Querying Domain Counts with PySpark DataFrames and SQL

Create a local temporary view from `common_crawl_domains`

In [10]:
# Create a temporary view in the metadata for this `SparkSession`
common_crawl_domains.createOrReplaceTempView("common_crawl_domains")

23/09/04 12:21:10 INFO BlockManagerInfo: Removed broadcast_8_piece0 on 192.168.1.123:52300 in memory (size: 34.5 KiB, free: 434.3 MiB)
23/09/04 12:21:10 INFO BlockManagerInfo: Removed broadcast_9_piece0 on 192.168.1.123:52300 in memory (size: 78.3 KiB, free: 434.4 MiB)


Calculate the total number of domains for each top-level domain in the dataset, using Dataframe and SQL methods

In [11]:
# Aggregate the DataFrame using DataFrame methods
common_crawl_domains\
.select(["top_level_domain", "num_subdomains"])\
.groupBy("top_level_domain").sum("num_subdomains")\
.orderBy("sum(num_subdomains)", ascending = False)\
.show(truncate = False)

# Aggregate the DataFrame using SQL
query = """ SELECT top_level_domain, sum(num_subdomains)
FROM common_crawl_domains
GROUP BY top_level_domain 
ORDER BY sum(num_subdomains) DESC
"""

spark.sql(query).show(truncate = False)

23/09/04 12:21:10 INFO FileSourceStrategy: Pushed Filters: 
23/09/04 12:21:10 INFO FileSourceStrategy: Post-Scan Filters: 
23/09/04 12:21:11 INFO CodeGenerator: Code generated in 68.407171 ms
23/09/04 12:21:11 INFO MemoryStore: Block broadcast_13 stored as values in memory (estimated size 201.1 KiB, free 434.0 MiB)
23/09/04 12:21:11 INFO MemoryStore: Block broadcast_13_piece0 stored as bytes in memory (estimated size 35.2 KiB, free 433.9 MiB)
23/09/04 12:21:11 INFO BlockManagerInfo: Added broadcast_13_piece0 in memory on 192.168.1.123:52300 (size: 35.2 KiB, free: 434.3 MiB)
23/09/04 12:21:11 INFO SparkContext: Created broadcast 13 from showString at NativeMethodAccessorImpl.java:0
23/09/04 12:21:11 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/09/04 12:21:11 INFO DAGScheduler: Registering RDD 31 (showString at NativeMethodAccessorImpl.java:0) as input to shuffle 0
23/09/04 12:21:11 INFO DAGSchedul

+----------------+-------------------+
|top_level_domain|sum(num_subdomains)|
+----------------+-------------------+
|edu             |484438             |
|gov             |85354              |
|travel          |10768              |
|coop            |8683               |
|jobs            |6023               |
|post            |143                |
|map             |40                 |
|arpa            |17                 |
+----------------+-------------------+



23/09/04 12:21:11 INFO FileSourceStrategy: Pushed Filters: 
23/09/04 12:21:11 INFO FileSourceStrategy: Post-Scan Filters: 
23/09/04 12:21:11 INFO MemoryStore: Block broadcast_16 stored as values in memory (estimated size 201.1 KiB, free 433.9 MiB)
23/09/04 12:21:11 INFO BlockManagerInfo: Removed broadcast_14_piece0 on 192.168.1.123:52300 in memory (size: 18.3 KiB, free: 434.3 MiB)
23/09/04 12:21:11 INFO BlockManagerInfo: Removed broadcast_15_piece0 on 192.168.1.123:52300 in memory (size: 19.3 KiB, free: 434.4 MiB)
23/09/04 12:21:11 INFO MemoryStore: Block broadcast_16_piece0 stored as bytes in memory (estimated size 35.2 KiB, free 433.9 MiB)
23/09/04 12:21:11 INFO BlockManagerInfo: Added broadcast_16_piece0 in memory on 192.168.1.123:52300 (size: 35.2 KiB, free: 434.3 MiB)
23/09/04 12:21:11 INFO SparkContext: Created broadcast 16 from showString at NativeMethodAccessorImpl.java:0
23/09/04 12:21:11 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open co

+----------------+-------------------+
|top_level_domain|sum(num_subdomains)|
+----------------+-------------------+
|edu             |484438             |
|gov             |85354              |
|travel          |10768              |
|coop            |8683               |
|jobs            |6023               |
|post            |143                |
|map             |40                 |
|arpa            |17                 |
+----------------+-------------------+



23/09/04 12:21:12 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
23/09/04 12:21:12 INFO DAGScheduler: Got job 10 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
23/09/04 12:21:12 INFO DAGScheduler: Final stage: ResultStage 12 (showString at NativeMethodAccessorImpl.java:0)
23/09/04 12:21:12 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 11)
23/09/04 12:21:12 INFO DAGScheduler: Missing parents: List()
23/09/04 12:21:12 INFO DAGScheduler: Submitting ResultStage 12 (MapPartitionsRDD[43] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
23/09/04 12:21:12 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 41.1 KiB, free 434.1 MiB)
23/09/04 12:21:12 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 19.3 KiB, free 434.1 MiB)
23/09/04 12:21:12 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on 192.168.1.123:52300 (si

Calculate the total number of subdomains for each top-level domain in the dataset.

In [12]:
# Aggregate the DataFrame using DataFrame methods
common_crawl_domains\
.select(["top_level_domain","domain", "num_subdomains"])\
.groupBy(["top_level_domain", "domain"]).sum("num_subdomains")\
.orderBy("sum(num_subdomains)", ascending = False)\
.show(truncate = False)

# Aggregate the DataFrame using SQL
query = """ SELECT top_level_domain, domain, sum(num_subdomains)
FROM common_crawl_domains
GROUP BY top_level_domain, domain 
ORDER BY sum(num_subdomains) DESC
"""

spark.sql(query).show(truncate = False)


23/09/04 12:21:12 INFO FileSourceStrategy: Pushed Filters: 
23/09/04 12:21:12 INFO FileSourceStrategy: Post-Scan Filters: 
23/09/04 12:21:12 INFO CodeGenerator: Code generated in 44.542911 ms
23/09/04 12:21:12 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 201.3 KiB, free 433.9 MiB)
23/09/04 12:21:12 INFO BlockManagerInfo: Removed broadcast_18_piece0 on 192.168.1.123:52300 in memory (size: 19.3 KiB, free: 434.3 MiB)
23/09/04 12:21:12 INFO BlockManagerInfo: Removed broadcast_17_piece0 on 192.168.1.123:52300 in memory (size: 18.3 KiB, free: 434.4 MiB)
23/09/04 12:21:12 INFO BlockManagerInfo: Removed broadcast_16_piece0 on 192.168.1.123:52300 in memory (size: 35.2 KiB, free: 434.4 MiB)
23/09/04 12:21:12 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 35.2 KiB, free 434.2 MiB)
23/09/04 12:21:12 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on 192.168.1.123:52300 (size: 35.2 KiB, free: 434.4 MiB)
23/09/04 12

+----------------+--------+-------------------+
|top_level_domain|domain  |sum(num_subdomains)|
+----------------+--------+-------------------+
|edu             |academia|9657               |
|edu             |mit     |7114               |
|edu             |stanford|7015               |
|edu             |harvard |5497               |
|edu             |wisc    |5376               |
|gov             |nasa    |5370               |
|edu             |tamu    |5088               |
|edu             |berkeley|4874               |
|edu             |umn     |4748               |
|edu             |ncsu    |4448               |
|edu             |cuny    |4260               |
|edu             |unc     |4249               |
|edu             |cornell |4235               |
|edu             |ucla    |4217               |
|edu             |ucsd    |4122               |
|edu             |ucdavis |4105               |
|edu             |umich   |4067               |
|edu             |psu     |3840         

23/09/04 12:21:13 INFO Executor: Finished task 0.0 in stage 16.0 (TID 13). 3017 bytes result sent to driver
23/09/04 12:21:13 INFO TaskSetManager: Finished task 0.0 in stage 16.0 (TID 13) in 294 ms on 192.168.1.123 (executor driver) (1/1)
23/09/04 12:21:13 INFO TaskSchedulerImpl: Removed TaskSet 16.0, whose tasks have all completed, from pool 
23/09/04 12:21:13 INFO DAGScheduler: ShuffleMapStage 16 (showString at NativeMethodAccessorImpl.java:0) finished in 0.302 s
23/09/04 12:21:13 INFO DAGScheduler: looking for newly runnable stages
23/09/04 12:21:13 INFO DAGScheduler: running: Set()
23/09/04 12:21:13 INFO DAGScheduler: waiting: Set()
23/09/04 12:21:13 INFO DAGScheduler: failed: Set()
23/09/04 12:21:13 INFO ShufflePartitionsUtil: For shuffle(3), advisory target size: 67108864, actual target size 1048576, minimum partition size: 1048576
23/09/04 12:21:13 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashm

+----------------+--------+-------------------+
|top_level_domain|domain  |sum(num_subdomains)|
+----------------+--------+-------------------+
|edu             |academia|9657               |
|edu             |mit     |7114               |
|edu             |stanford|7015               |
|edu             |harvard |5497               |
|edu             |wisc    |5376               |
|gov             |nasa    |5370               |
|edu             |tamu    |5088               |
|edu             |berkeley|4874               |
|edu             |umn     |4748               |
|edu             |ncsu    |4448               |
|edu             |cuny    |4260               |
|edu             |unc     |4249               |
|edu             |cornell |4235               |
|edu             |ucla    |4217               |
|edu             |ucsd    |4122               |
|edu             |ucdavis |4105               |
|edu             |umich   |4067               |
|edu             |psu     |3840         

23/09/04 12:21:13 INFO TaskSetManager: Finished task 0.0 in stage 18.0 (TID 14) in 132 ms on 192.168.1.123 (executor driver) (1/1)
23/09/04 12:21:13 INFO TaskSchedulerImpl: Removed TaskSet 18.0, whose tasks have all completed, from pool 
23/09/04 12:21:13 INFO DAGScheduler: ResultStage 18 (showString at NativeMethodAccessorImpl.java:0) finished in 0.144 s
23/09/04 12:21:13 INFO DAGScheduler: Job 14 is finished. Cancelling potential speculative or zombie tasks for this job
23/09/04 12:21:13 INFO TaskSchedulerImpl: Killing all running tasks in stage 18: Stage finished
23/09/04 12:21:13 INFO DAGScheduler: Job 14 finished: showString at NativeMethodAccessorImpl.java:0, took 0.155534 s


How many sub-domains does `nps.gov` have? Filter the dataset to that website's entry, display the columns `top_level_domain`, `domain`, and `num_subdomains` in your result.

In [13]:
# Filter the DataFrame using DataFrame Methods
common_crawl_domains\
.filter(common_crawl_domains.top_level_domain == "gov")\
.filter(common_crawl_domains.domain == "nps")\
.show(truncate = False)

# Filter the DataFrame using SQL
query = """ SELECT top_level_domain, domain, sum(num_subdomains)
FROM common_crawl_domains
WHERE top_level_domain = 'gov'and domain = 'nps'
GROUP BY top_level_domain, domain 
ORDER BY sum(num_subdomains) DESC
"""

spark.sql(query).show(truncate = False)


23/09/04 12:21:13 INFO FileSourceStrategy: Pushed Filters: IsNotNull(top_level_domain),IsNotNull(domain),EqualTo(top_level_domain,gov),EqualTo(domain,nps)
23/09/04 12:21:13 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(top_level_domain#98),isnotnull(domain#97),(top_level_domain#98 = gov),(domain#97 = nps)
23/09/04 12:21:13 INFO CodeGenerator: Code generated in 14.352805 ms
23/09/04 12:21:13 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 201.4 KiB, free 433.9 MiB)
23/09/04 12:21:13 INFO BlockManagerInfo: Removed broadcast_22_piece0 on 192.168.1.123:52300 in memory (size: 35.2 KiB, free: 434.4 MiB)
23/09/04 12:21:13 INFO BlockManagerInfo: Removed broadcast_24_piece0 on 192.168.1.123:52300 in memory (size: 19.6 KiB, free: 434.4 MiB)
23/09/04 12:21:13 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 35.2 KiB, free 434.2 MiB)
23/09/04 12:21:13 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 192.168.1

+--------+------+----------------+--------------+
|site_id |domain|top_level_domain|num_subdomains|
+--------+------+----------------+--------------+
|57661852|nps   |gov             |178           |
+--------+------+----------------+--------------+



23/09/04 12:21:14 INFO FileSourceStrategy: Pushed Filters: IsNotNull(top_level_domain),IsNotNull(domain),EqualTo(top_level_domain,gov),EqualTo(domain,nps)
23/09/04 12:21:14 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(top_level_domain#98),isnotnull(domain#97),(top_level_domain#98 = gov),(domain#97 = nps)
23/09/04 12:21:14 INFO CodeGenerator: Code generated in 31.846132 ms
23/09/04 12:21:14 INFO MemoryStore: Block broadcast_27 stored as values in memory (estimated size 201.3 KiB, free 434.0 MiB)
23/09/04 12:21:14 INFO BlockManagerInfo: Removed broadcast_26_piece0 on 192.168.1.123:52300 in memory (size: 6.7 KiB, free: 434.4 MiB)
23/09/04 12:21:14 INFO BlockManagerInfo: Removed broadcast_25_piece0 on 192.168.1.123:52300 in memory (size: 35.2 KiB, free: 434.4 MiB)
23/09/04 12:21:14 INFO MemoryStore: Block broadcast_27_piece0 stored as bytes in memory (estimated size 35.2 KiB, free 434.2 MiB)
23/09/04 12:21:14 INFO BlockManagerInfo: Added broadcast_27_piece0 in memory on 192.168.1.

+----------------+------+-------------------+
|top_level_domain|domain|sum(num_subdomains)|
+----------------+------+-------------------+
|gov             |nps   |178                |
+----------------+------+-------------------+



23/09/04 12:21:14 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashmap does not support this aggregate.
23/09/04 12:21:14 INFO CodeGenerator: Code generated in 26.253337 ms
23/09/04 12:21:14 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
23/09/04 12:21:14 INFO DAGScheduler: Got job 17 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
23/09/04 12:21:14 INFO DAGScheduler: Final stage: ResultStage 22 (showString at NativeMethodAccessorImpl.java:0)
23/09/04 12:21:14 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 21)
23/09/04 12:21:14 INFO DAGScheduler: Missing parents: List()
23/09/04 12:21:14 INFO DAGScheduler: Submitting ResultStage 22 (MapPartitionsRDD[70] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
23/09/04 12:21:14 INFO MemoryStore: Block broadcast_29 stored as values in memory (estimated size 44.2

Close the `SparkSession` and underlying `sparkContext`.

In [14]:
# Stop the notebook's `SparkSession` and `sparkContext`
spark.stop()

23/09/04 12:21:14 INFO SparkContext: SparkContext is stopping with exitCode 0.
23/09/04 12:21:14 INFO SparkUI: Stopped Spark web UI at http://192.168.1.123:4040
23/09/04 12:21:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/09/04 12:21:14 INFO MemoryStore: MemoryStore cleared
23/09/04 12:21:14 INFO BlockManager: BlockManager stopped
23/09/04 12:21:14 INFO BlockManagerMaster: BlockManagerMaster stopped
23/09/04 12:21:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/09/04 12:21:14 INFO SparkContext: Successfully stopped SparkContext
