In [None]:
### Initialize a new Spark Context and read in the domain graph as an RDD.

# Import required modules
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

# Get SparkContext
sc = spark.sparkContext

# Read Domains CSV File into an RDD
common_crawl_domain_counts = sc.textFile('./crawl/cc-main-limited-domains.csv')

# Display first few domains from the RDD
common_crawl_domain_counts.take(10)

In [None]:
### Apply fmt_domain_graph_entry over common_crawl_domain_counts and save the result
### as a new RDD named formatted_host_counts.

def fmt_domain_graph_entry(entry):
    """
    Formats a Common Crawl domain graph entry. Extracts the site_id, 
    top-level domain (tld), domain name, and subdomain count as seperate items.
    """

    # Split the entry on delimiter ('\t') into site_id, domain, tld, and num_subdomains
    site_id, domain, tld, num_subdomains = entry.split('\t')        
    return int(site_id), domain, tld, int(num_subdomains)

# Apply `fmt_domain_graph_entry` to the raw data RDD
formatted_host_counts = common_crawl_domain_counts\
.map(lambda e: fmt_domain_graph_entry(e))

# Display the first few entries of the new RDD
formatted_host_counts.take(10)

In [None]:
### Apply extract_subdomain_counts over common_crawl_domain_counts and save the result
### as a new RDD named host_counts.

def extract_subdomain_counts(entry):
    """
    Extract the subdomain count from a Common Crawl domain graph entry.
    """
    
    # Split the entry on delimiter ('\t') into site_id, domain, tld, and num_subdomains
    site_id, domain, tld, num_subdomains = entry.split('\t')
    
    # return ONLY the num_subdomains
    return int(num_subdomains)


# Apply `extract_subdomain_counts` to the raw data RDD
host_counts = common_crawl_domain_counts\
.map(lambda e: extract_subdomain_counts(e))

# Display the first few entries
host_counts.take(10)

In [None]:
### Using host_counts, calculate the total number of subdomains across all domains
### in the dataset, save the result to a variable named total_host_counts.


# Reduce the RDD to a single value, the sum of subdomains, with a lambda function
# as the reduce function
total_host_counts = host_counts\
.reduce(lambda a,b: a+b)

# Display result count
total_host_counts

In [None]:
### Stop the current SparkSession and sparkContext before moving on to analyze the data with SparkSQL

# Stop the sparkContext and the SparkSession
spark.stop()

In [None]:
### Create a new SparkSession and assign it to a variable named spark.

from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession\
.builder\
.getOrCreate()

In [None]:
### Read ./crawl/cc-main-limited-domains.csv into a new Spark DataFrame named common_crawl.

# Read the target file into a DataFrame
common_crawl = spark \
.read \
.option('delimiter', '\t') \
.option('inferSchema', 'True') \
.csv('./crawl/cc-main-limited-domains.csv')


# Display the DataFrame to the notebook
common_crawl.show(5, truncate=False)

In [None]:
### Rename the DataFrame's columns to the following: site_id, domaintop_level_domain, num_subdomains

# Rename the DataFrame's columns with `withColumnRenamed()`
common_crawl = common_crawl \
.withColumnRenamed('_c0', 'site_id') \
.withColumnRenamed('_c1', 'domain') \
.withColumnRenamed('_c2', 'top_level_domain') \
.withColumnRenamed('_c3', 'num_subdomains')

# Display the first few rows of the DataFrame and the new schema

common_crawl.show(5)

In [None]:
### Save the common_crawl DataFrame as parquet files in a directory called ./results/common_crawl/.

# Save the `common_crawl` DataFrame to a series of parquet files

common_crawl \
.write \
.parquet('./results/common_crawl', mode='overwrite')

In [None]:
### Read ./results/common_crawl/ into a new DataFrame to confirm our DataFrame was saved properly.

# Read from parquet directory
common_crawl_domains = spark.read \
.parquet('./results/common_crawl/')

# Display the first few rows of the DataFrame and the schema
common_crawl_domains.show(5, truncate=False)
common_crawl_domains.printSchema()


In [None]:
### Create a local temporary view from common_crawl_domains

# Create a temporary view in the metadata for this `SparkSession`
common_crawl_domains.createOrReplaceTempView('crawl')

In [None]:
### Calculate the total number of domains for each top-level domain in the dataset.

# Aggregate the DataFrame using DataFrame methods
common_crawl_domains \
.groupby('top_level_domain') \
.count() \
.orderBy('count', ascending=False) \
.show(10, truncate=False)

# Aggregate the DataFrame using SQL

spark.sql(
"""
SELECT top_level_domain, count(num_subdomains)
FROM crawl
GROUP BY top_level_domain
ORDER BY count(num_subdomains) DESC
"""
).show(10, truncate=False)

In [None]:
### Calculate the total number of subdomains for each top-level domain in the dataset.

# Aggregate the DataFrame using DataFrame methods
common_crawl_domains \
.groupby('top_level_domain') \
.sum('num_subdomains') \
.orderBy('sum(num_subdomains)', ascending=False) \
.show(10, truncate=False)

# Aggregate the DataFrame using SQL

spark.sql(
"""
SELECT top_level_domain, SUM(num_subdomains)
FROM crawl
GROUP BY top_level_domain
ORDER BY SUM(num_subdomains) DESC
"""
).show(10, truncate=False)

In [None]:
### How many sub-domains does nps.gov have? Filter the dataset to that website's entry,
### display the columns top_level_domain, domain, and num_subdomains in your result.

# Filter the DataFrame using DataFrame Methods
common_crawl_domains \
.select(['top_level_domain', 'domain', 'num_subdomains']) \
.filter(common_crawl_domains.domain == 'nps') \
.filter(common_crawl_domains.top_level_domain == 'gov') \
.show(5, truncate=False)

# Filter the DataFrame using SQL
spark.sql(
"""
SELECT top_level_domain, domain, num_subdomains
FROM crawl
WHERE domain = 'nps' and top_level_domain = 'gov'
"""
).show(10, truncate=False)


In [None]:
### Close the SparkSession and underlying sparkContext

# Stop the notebook's `SparkSession` and `sparkContext`
spark.stop()