# **PySpark**: The Apache Spark Python API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Python API.

## 2. The Spark Cluster

### 2.1. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()


23/04/08 14:25:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

In [2]:
sc = spark.sparkContext

In [3]:
websites = sc.textFile("/data/websites.csv")

In [18]:
websites.collect()

                                                                                

['website',
 'vimeo.com',
 'springer.com',
 'youtube.com',
 'marriott.com',
 'nytimes.com',
 'microsoft.com',
 'washingtonpost.com',
 'bloomberg.com',
 'bbc.com',
 'thestartmagazine.com']

In [15]:
import requests

In [None]:
# Define a function to capture HTML of a website and return as tuple
def capture_html(url):
    import requests
    response = requests.get(r'https://www.' + url)
    return (url, response.text)

# Filter out the first element of the RDD
websites_filtered = websites.filter(lambda x: x != 'website')

# Apply the function to each URL in the filtered RDD using the map function
website_data = websites_filtered.map(capture_html)

# Convert the RDD to a DataFrame and save as a CSV file
df = website_data.toDF(["url", "html"])
df.write.format("csv").save("/data/file")

In [40]:
def crawl_website(website):
    from selenium import webdriver
    from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
    from webdriver_manager.chrome import ChromeDriverManager
    import time

    from pyvirtualdisplay import Display
    import pandas as pd
    import requests
    import os
    import sys


    # virtual display
    display = Display(visible=0, size=(800, 600))
    display.start()

    # extension filepath
    ext_file = "/usr/bin/spark-3.0.0-bin-hadoop3.2/data/extension"

    opt = webdriver.ChromeOptions()
    # devtools necessary for complete network stack capture
    opt.add_argument("--auto-open-devtools-for-tabs")
    # loads extension
    opt.add_argument("load-extension=" + ext_file)
    # important for linux
    opt.add_argument("--no-sandbox")
    opt.add_argument("--disable-dev-shm-usage")

    dc = DesiredCapabilities.CHROME
    dc["goog:loggingPrefs"] = {"browser": "ALL"}

    os.mkdir("/usr/bin/spark-3.0.0-bin-hadoop3.2/data/server/output/" + website)
    driver = webdriver.Chrome(
        ChromeDriverManager().install(), options=opt, desired_capabilities=dc
    )
    requests.post(
        url="http://localhost:3000/complete", data={"website": website}
    )
    driver.get(r"https://www." + website)
    time.sleep(5)

    # driver.quit
    driver.quit()

In [41]:
# Filter out the first element of the RDD
websites_filtered = websites.filter(lambda x: x != 'website')

# Apply the function to each URL in the filtered RDD using the map function
website_data = websites_filtered.map(crawl_website)

In [42]:
website_data.collect()

                                                                                

[None, None, None, None, None, None, None, None, None, None]