# WEB SCRAPE PYSPARK DOCUMENTATION

----------------------------
#### GIST OF CHANGES DONE IN THIS NOTEBOOK
----------------------------
-   Pyspark documentation for Version 3.5.0 was used for this code
-   The code initially webscrapes the hyper links available in the landing page for pyspark.sql functions and collects the category and function names along with the hyper links
-   In the next step, the code scrapes the individual hyperlinks and using desired html tags, extracts the code snippets and the corresponding code descriptions

NOTE: At the time of executing this code the latest URL was pointing to version 3.5.0
-   LATEST URL: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/
-   BASE URL  : https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/

----------------------------
<br>
<br>

In [1]:
# INSTALL NECESSARY PACKAGES
!pip install bs4 html5lib

Collecting bs4
  Downloading bs4-0.0.2-py2.py3-none-any.whl (1.2 kB)
Installing collected packages: bs4
Successfully installed bs4-0.0.2


In [2]:
# IMPORT NECESSARY PACKAGES
import re
import pandas as pd
import requests
import html5lib
from rich.progress import track
from multiprocessing import Pool
from bs4 import BeautifulSoup, SoupStrainer


In [3]:
# DISPLAY FUNCTIONS
def fn_display_header(msg):
  print('-' * 80)
  print(' ' * 10, msg)
  print('-' * 80)

def fn_display_message(msg):
  print(msg)

In [5]:
from scipy.special import tandg
# DEFINE BASE URL TO START THE WEB SCRAPING
# NOTE: At the time of executing this code the latest URL was pointing to version 3.5.0
#       LATEST URL: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/
#       BASE URL  : https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/


base_url = "https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/"
index_url = base_url + "index.html"
pd.set_option('display.max_colwidth', None)

excluded_categories = ['Core Classes', 'Spark Session']
sublink_categories = ['Functions']

response = requests.get(index_url)
soup = BeautifulSoup(response.text, 'html.parser', parse_only=SoupStrainer(['div', 'li', 'a']))  # Use parse_only to reduce the list of tags to be searched

# -- Find Parent elements and sub-elements to obtain category and link
links= []
find_tags = soup.find('div', class_='section', id='spark-sql').find_all('li', class_='toctree-l1')
for tag_l1 in track(find_tags, description="Processing..."):
  category = tag_l1.find('a').get_text()
  for tag_l2 in tag_l1.find_all('li'):
    function = tag_l2.get_text()
    link = base_url + tag_l2.find('a').get('href')
    if category in sublink_categories:                 # -- Find sublinks for relevant categories and update the parent link
      parent_link = link.split('#')[0]
      section_id = link.split('#')[1]

      response_sublink = requests.get(parent_link).text
      soup_sublink = BeautifulSoup(response_sublink, 'html.parser', parse_only=SoupStrainer(['div', 'li', 'a']))
      find_sublink_tags = soup_sublink.find('div', class_='section', id=section_id).find_all('a', class_='reference internal')

      new_category = function
      for sublink_tag in find_sublink_tags:
        link = base_url + sublink_tag.get('href')
        function = link.split('#')[1]
        if category not in excluded_categories:
          links.append({'Category': new_category, 'Function': function, 'Link': link})
    elif category not in excluded_categories:
      links.append({'Category': category, 'Function': function, 'Link': link})

#print(f" -- {link}")
#print(f" -- {find_sublink_tags}")


fn_display_header("Display Initial Pyspark Links/Categories to Scrape")
fn_display_message(f" - Link Count: {len(links)}")
fn_display_message("-" * 80)

rows_to_print = 50

for item in links[:rows_to_print]:
  print(item)


Output()

--------------------------------------------------------------------------------
           Display Initial Pyspark Links/Categories to Scrape
--------------------------------------------------------------------------------
 - Link Count: 712
--------------------------------------------------------------------------------
{'Category': 'Configuration', 'Function': 'pyspark.sql.conf.RuntimeConfig', 'Link': 'https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.conf.RuntimeConfig.html'}
{'Category': 'Input/Output', 'Function': 'pyspark.sql.DataFrameReader.csv', 'Link': 'https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html'}
{'Category': 'Input/Output', 'Function': 'pyspark.sql.DataFrameReader.format', 'Link': 'https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.format.html'}
{'Category': 'Input/Output', 'Function': 'pyspark.sql.DataFrameReader.jdbc', 'Li

In [6]:
# PRINT DISTINCT CATEGORIES
fn_display_header("Display Initial Links Info")

relevant_category_list_1 = set([item['Category'] for item in links])
fn_display_message(f" - Unique Categories: {relevant_category_list_1}")



--------------------------------------------------------------------------------
           Display Initial Links Info
--------------------------------------------------------------------------------
 - Unique Categories: {'Aggregate Functions', 'Call Functions', 'Configuration', 'UDF', 'Window', 'Datetime Functions', 'Partition Transformation Functions', 'Input/Output', 'Math Functions', 'DataFrame', 'Misc Functions', 'Catalog', 'Protobuf', 'String Functions', 'UDTF', 'Row', 'Predicate Functions', 'Data Types', 'Grouping', 'Observation', 'Window Functions', 'Normal Functions', 'Xml Functions', 'Collection Functions', 'Column', 'Bitwise Functions', 'Avro', 'Sort Functions'}


In [8]:
# SCRAPE INDIVIDUAL LINK CONTENT  - PASS 1
# Reminder to handle nulls - returntype

function_dataset_list_1 = []
#relevant_category_list_1 = ['Functions']     # Override to test one specific category

relevant_links1 = [item for item in links if item["Category"] in relevant_category_list_1]

trim_text = lambda text : text.replace('\n', '').strip() if text else ''    # Remove newlines and trims
keep_only_alphanum = lambda text : re.sub('[^a-zA-Z0-9. ]', ' ', text) if text else '' # Remove special characters
remove_special_char = lambda text : "".join(char for char in text if ord(char)<128)   # Remove special characters

with Pool(processes=2) as pool:
  for item in track(relevant_links1, description="Processing..."):
    category = item["Category"]

    response = requests.get(item["Link"])
    soup = BeautifulSoup(
        response.text,
        'lxml',                                         # lxml is faster than 'html.parser'
        parse_only=SoupStrainer(['h1', 'dd', 'p']))     # Use parse_only to reduce the list of tags to be searched

    feature = item["Function"]

    feature_desc = ''
    find_desc_texts = soup.find('dd').find_all('p');

    for text in find_desc_texts:
      if text.find('span', class_='versionmodified'):
        break
      else:
        feature_desc = feature_desc + ' ' + text.get_text()

    feature_desc = trim_text(feature_desc)
    feature_desc = keep_only_alphanum(feature_desc)
    feature_desc = remove_special_char(feature_desc)
    function_dataset_list_1.append({'Category': category, 'function': feature, 'feature_desc': feature_desc})

# -- Create a dataframe of the initial dataset
df_function_pass_1 = pd.DataFrame(function_dataset_list_1)

fn_display_header(f"PASS 1: Get and refine primary function description. \nRelevant Category List: {relevant_category_list_1}")
fn_display_message(f" - function_dataset_list_1 Count: {len(function_dataset_list_1)}")
fn_display_message("-" * 80)
df_function_pass_1.head().style.set_properties(**{'text-align': 'left'})

Output()

--------------------------------------------------------------------------------
           PASS 1: Get and refine primary function description. 
Relevant Category List: {'Aggregate Functions', 'Call Functions', 'Configuration', 'UDF', 'Window', 'Datetime Functions', 'Partition Transformation Functions', 'Input/Output', 'Math Functions', 'DataFrame', 'Misc Functions', 'Catalog', 'Protobuf', 'String Functions', 'UDTF', 'Row', 'Predicate Functions', 'Data Types', 'Grouping', 'Observation', 'Window Functions', 'Normal Functions', 'Xml Functions', 'Collection Functions', 'Column', 'Bitwise Functions', 'Avro', 'Sort Functions'}
--------------------------------------------------------------------------------
 - function_dataset_list_1 Count: 712
--------------------------------------------------------------------------------


Unnamed: 0,Category,function,feature_desc
0,Configuration,pyspark.sql.conf.RuntimeConfig,User facing configuration API accessible through SparkSession.conf. Options set here are automatically propagated to the Hadoop configuration during I O.
1,Input/Output,pyspark.sql.DataFrameReader.csv,Loads a CSV file and returns the result as a DataFrame. This function will go through the input once to determine the input schema ifinferSchema is enabled. To avoid going through the entire data once disableinferSchema option or specify the schema explicitly using schema.
2,Input/Output,pyspark.sql.DataFrameReader.format,Specifies the input data source format.
3,Input/Output,pyspark.sql.DataFrameReader.jdbc,Construct a DataFrame representing the database table named tableaccessible via JDBC URL url and connection properties. Partitions of the table will be retrieved in parallel if either column orpredicates is specified. lowerBound upperBound and numPartitionsis needed when column is specified. If both column and predicates are specified column will be used.
4,Input/Output,pyspark.sql.DataFrameReader.json,Loads JSON files and returns the results as a DataFrame. JSON Lines newline delimited JSON is supported by default.For JSON one record per file set the multiLine parameter to true. If the schema parameter is not specified this function goesthrough the input once to determine the input schema.


In [9]:
# FOR DEBUGGING
# for item in function_dataset_list_1:
#  print(item)


In [10]:
# SCRAPE INDIVIDUAL LINK CONTENT  - PASS 2
# Reminder to handle nulls - returntype

function_dataset_list_2 = []
code_dataset_list_2 = []
#relevant_category_list_1 = ['DataFrame']     # Uncomment to test one specific category. Post testing Comment and rerun the notebook

relevant_links1 = [item for item in links if item["Category"] in relevant_category_list_1]

with Pool(processes=2) as pool:
  for item in track(relevant_links1, description="Processing..."):
    category = item["Category"]

    response = requests.get(item["Link"])
    soup = BeautifulSoup(
        response.text,
        'lxml',                                         # lxml is faster than 'html.parser'
        parse_only=SoupStrainer(['pre', 'p']))     # Use parse_only to reduce the list of tags to be searched

    feature = item["Function"]

    example_text = ''
    find_example_texts = soup.find_all(['pre', 'p']);

    for text in find_example_texts:
      text_mod = text.get_text()
      if not text_mod.startswith(('>', '.', ' ', '+', '|')) and not text_mod.strip() == '':
        text_mod = '# ' + text_mod
      text_mod = text_mod.replace(' >>>', '<br>>>>')
      example_text = example_text + ' ' + text_mod.replace('\n', '<br>')
      #print(f'---->{text_mod}')

    example_text = re.sub(r'^.*?Examples', '', example_text)
    example_text = re.sub(r'# previous.*$', '', example_text)
    example_text = example_text.replace(' >>>', '<br>>>>')
    function_dataset_list_2.append({'Category': category, 'function': feature, 'example_text': example_text, 'link': item["Link"]})


# -- Create a dataframe of the dataset with example text
df_function_pass_2 = pd.DataFrame(function_dataset_list_2)

fn_display_header(f"PASS 2: Get and refine primary function description. \nRelevant Category List: {relevant_category_list_1}")
fn_display_message(f" - function_dataset_list_2 Count: {len(function_dataset_list_2)}")
fn_display_message("-" * 80)
df_function_pass_2.head().style.set_properties(**{'text-align': 'left'})

Output()

--------------------------------------------------------------------------------
           PASS 2: Get and refine primary function description. 
Relevant Category List: {'Aggregate Functions', 'Call Functions', 'Configuration', 'UDF', 'Window', 'Datetime Functions', 'Partition Transformation Functions', 'Input/Output', 'Math Functions', 'DataFrame', 'Misc Functions', 'Catalog', 'Protobuf', 'String Functions', 'UDTF', 'Row', 'Predicate Functions', 'Data Types', 'Grouping', 'Observation', 'Window Functions', 'Normal Functions', 'Xml Functions', 'Collection Functions', 'Column', 'Bitwise Functions', 'Avro', 'Sort Functions'}
--------------------------------------------------------------------------------
 - function_dataset_list_2 Count: 712
--------------------------------------------------------------------------------


Unnamed: 0,Category,function,example_text,link
0,Configuration,pyspark.sql.conf.RuntimeConfig,"# User-facing configuration API, accessible through SparkSession.conf. # Options set here are automatically propagated to the Hadoop configuration during I/O. # Changed in version 3.4.0: Supports Spark Connect. # Methods # get(key[,Â default]) # Returns the value of Spark runtime configuration property for the given key, assuming it is set. # isModifiable(key) # Indicates whether the configuration property with the given key is modifiable in the current session. # set(key,Â value) # Sets the given Spark runtime configuration property. # unset(key) # Resets the configuration property for the given key.",https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.conf.RuntimeConfig.html
1,Input/Output,pyspark.sql.DataFrameReader.csv,"# Write a DataFrame into a CSV file and read it back. >>> import tempfile >>> with tempfile.TemporaryDirectory() as d: ... # Write a DataFrame into a CSV file ... df = spark.createDataFrame([{""age"": 100, ""name"": ""Hyukjin Kwon""}]) ... df.write.mode(""overwrite"").format(""csv"").save(d) ... ... # Read the CSV file as a DataFrame with 'nullValue' option set to 'Hyukjin Kwon'. ... spark.read.csv(d, schema=df.schema, nullValue=""Hyukjin Kwon"").show() +---+----+ |age|name| +---+----+ |100|NULL| +---+----+",https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html
2,Input/Output,pyspark.sql.DataFrameReader.format,">>> spark.read.format('json') <...readwriter.DataFrameReader object ...>  # Write a DataFrame into a JSON file and read it back. >>> import tempfile >>> with tempfile.TemporaryDirectory() as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( ... [{""age"": 100, ""name"": ""Hyukjin Kwon""}] ... ).write.mode(""overwrite"").format(""json"").save(d) ... ... # Read the JSON file as a DataFrame. ... spark.read.format('json').load(d).show() +---+------------+ |age| name| +---+------------+ |100|Hyukjin Kwon| +---+------------+",https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.format.html
3,Input/Output,pyspark.sql.DataFrameReader.jdbc,"# Construct a DataFrame representing the database table named table accessible via JDBC URL url and connection properties. # Partitions of the table will be retrieved in parallel if either column or predicates is specified. lowerBound, upperBound and numPartitions is needed when column is specified. # If both column and predicates are specified, column will be used. # New in version 1.4.0. # Changed in version 3.4.0: Supports Spark Connect. # the name of the table # alias of partitionColumn option. Refer to partitionColumn in Data Source Option for the version you use. # a list of expressions suitable for inclusion in WHERE clauses; each one defines one partition of the DataFrame # a dictionary of JDBC database connection arguments. Normally at least properties âuserâ and âpasswordâ with their corresponding values. For example { âuserâ : âSYSTEMâ, âpasswordâ : âmypasswordâ } # For the extra options, refer to Data Source Option for the version you use. # Notes # Donât create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.",https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.jdbc.html
4,Input/Output,pyspark.sql.DataFrameReader.json,"# Write a DataFrame into a JSON file and read it back. >>> import tempfile >>> with tempfile.TemporaryDirectory() as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( ... [{""age"": 100, ""name"": ""Hyukjin Kwon""}] ... ).write.mode(""overwrite"").format(""json"").save(d) ... ... # Read the JSON file as a DataFrame. ... spark.read.json(d).show() +---+------------+ |age| name| +---+------------+ |100|Hyukjin Kwon| +---+------------+",https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html


In [11]:
# MERGE SCRAPED DATA and SAVE TO CSV

df_pyspark_raw = pd.merge(df_function_pass_1, df_function_pass_2, on=['Category', 'function'])

df_pyspark_raw.head().style.set_properties(**{'text-align': 'left'})

Unnamed: 0,Category,function,feature_desc,example_text,link
0,Configuration,pyspark.sql.conf.RuntimeConfig,User facing configuration API accessible through SparkSession.conf. Options set here are automatically propagated to the Hadoop configuration during I O.,"# User-facing configuration API, accessible through SparkSession.conf. # Options set here are automatically propagated to the Hadoop configuration during I/O. # Changed in version 3.4.0: Supports Spark Connect. # Methods # get(key[,Â default]) # Returns the value of Spark runtime configuration property for the given key, assuming it is set. # isModifiable(key) # Indicates whether the configuration property with the given key is modifiable in the current session. # set(key,Â value) # Sets the given Spark runtime configuration property. # unset(key) # Resets the configuration property for the given key.",https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.conf.RuntimeConfig.html
1,Input/Output,pyspark.sql.DataFrameReader.csv,Loads a CSV file and returns the result as a DataFrame. This function will go through the input once to determine the input schema ifinferSchema is enabled. To avoid going through the entire data once disableinferSchema option or specify the schema explicitly using schema.,"# Write a DataFrame into a CSV file and read it back. >>> import tempfile >>> with tempfile.TemporaryDirectory() as d: ... # Write a DataFrame into a CSV file ... df = spark.createDataFrame([{""age"": 100, ""name"": ""Hyukjin Kwon""}]) ... df.write.mode(""overwrite"").format(""csv"").save(d) ... ... # Read the CSV file as a DataFrame with 'nullValue' option set to 'Hyukjin Kwon'. ... spark.read.csv(d, schema=df.schema, nullValue=""Hyukjin Kwon"").show() +---+----+ |age|name| +---+----+ |100|NULL| +---+----+",https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html
2,Input/Output,pyspark.sql.DataFrameReader.format,Specifies the input data source format.,">>> spark.read.format('json') <...readwriter.DataFrameReader object ...>  # Write a DataFrame into a JSON file and read it back. >>> import tempfile >>> with tempfile.TemporaryDirectory() as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( ... [{""age"": 100, ""name"": ""Hyukjin Kwon""}] ... ).write.mode(""overwrite"").format(""json"").save(d) ... ... # Read the JSON file as a DataFrame. ... spark.read.format('json').load(d).show() +---+------------+ |age| name| +---+------------+ |100|Hyukjin Kwon| +---+------------+",https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.format.html
3,Input/Output,pyspark.sql.DataFrameReader.jdbc,Construct a DataFrame representing the database table named tableaccessible via JDBC URL url and connection properties. Partitions of the table will be retrieved in parallel if either column orpredicates is specified. lowerBound upperBound and numPartitionsis needed when column is specified. If both column and predicates are specified column will be used.,"# Construct a DataFrame representing the database table named table accessible via JDBC URL url and connection properties. # Partitions of the table will be retrieved in parallel if either column or predicates is specified. lowerBound, upperBound and numPartitions is needed when column is specified. # If both column and predicates are specified, column will be used. # New in version 1.4.0. # Changed in version 3.4.0: Supports Spark Connect. # the name of the table # alias of partitionColumn option. Refer to partitionColumn in Data Source Option for the version you use. # a list of expressions suitable for inclusion in WHERE clauses; each one defines one partition of the DataFrame # a dictionary of JDBC database connection arguments. Normally at least properties âuserâ and âpasswordâ with their corresponding values. For example { âuserâ : âSYSTEMâ, âpasswordâ : âmypasswordâ } # For the extra options, refer to Data Source Option for the version you use. # Notes # Donât create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.",https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.jdbc.html
4,Input/Output,pyspark.sql.DataFrameReader.json,Loads JSON files and returns the results as a DataFrame. JSON Lines newline delimited JSON is supported by default.For JSON one record per file set the multiLine parameter to true. If the schema parameter is not specified this function goesthrough the input once to determine the input schema.,"# Write a DataFrame into a JSON file and read it back. >>> import tempfile >>> with tempfile.TemporaryDirectory() as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( ... [{""age"": 100, ""name"": ""Hyukjin Kwon""}] ... ).write.mode(""overwrite"").format(""json"").save(d) ... ... # Read the JSON file as a DataFrame. ... spark.read.json(d).show() +---+------------+ |age| name| +---+------------+ |100|Hyukjin Kwon| +---+------------+",https://spark.apache.org/docs/3.5.0/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html


In [12]:
# WRITE FINAL SCRAPED RAW DATASET TO CSV AND DOWNLOAD

DOWNLOAD_FLAG = 'N'  # Set to Y to download

if DOWNLOAD_FLAG == 'Y':
  df_pyspark_raw.to_csv('ETL_P1_get_raw_web_data_v1.csv')

  from google.colab import files
  files.download('ETL_P1_get_raw_web_data_v1.csv')