# 3. Spark

Spark Programming Guide: <https://spark.apache.org/docs/latest/> (use Python API recommended)
Spark API: <https://spark.apache.org/docs/latest/api/python/index.html>


# 3.1 Example Walkthrough
3.1 Follow the Spark Examples below! After completion see Exercise 3.2 and 3.3!


### Initialize PySpark

First, we use the findspark package to initialize PySpark.

In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 72kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 21.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=ba98f4adcbc9a82253a4c5bbc6acf2cfce1d519a8ac9bfa9dc34e7e33e9d6b77
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [None]:
import os, sys

In [None]:
# Initialize PySpark
APP_NAME = "PySpark Lecture"
SPARK_MASTER="local[1]"
import pyspark
import pyspark.sql
from pyspark.sql import Row
conf=pyspark.SparkConf()
conf=pyspark.SparkConf().setAppName(APP_NAME).set("spark.local.dir", os.path.join(os.getcwd(), "tmp"))
sc = pyspark.SparkContext(master=SPARK_MASTER, conf=conf)
spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()

print("PySpark initiated...")

PySpark initiated...


### Hello, World!

Loading data, mapping it and collecting the records into RAM...

In [None]:
!wget https://raw.githubusercontent.com/scalable-infrastructure/exercise-students-2021/master/data/example.csv

--2021-03-14 10:42:30--  https://raw.githubusercontent.com/scalable-infrastructure/exercise-students-2021/master/data/example.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 189 [text/plain]
Saving to: ‘example.csv.1’


2021-03-14 10:42:30 (15.4 MB/s) - ‘example.csv.1’ saved [189/189]



In [None]:
# Load the text file using the SparkContext
csv_lines = sc.textFile("example.csv")

# Map the data to split the lines into a list
data = csv_lines.map(lambda line: line.split(","))

# Collect the dataset into local RAM
data.collect()

[['Russell Jurney', 'Relato', 'CEO'],
 ['Florian Liebert', 'Mesosphere', 'CEO'],
 ['Don Brown', 'Rocana', 'CIO'],
 ['Steve Jobs', 'Apple', 'CEO'],
 ['Donald Trump', 'The Trump Organization', 'CEO'],
 ['Russell Jurney', 'Data Syndrome', 'Principal Consultant']]

### Creating Objects from CSV

Using a function with a map operation to create objects (dicts) as records...

In [None]:
# Turn the CSV lines into objects
def csv_to_record(line):
    parts = line.split(",")
    record = {
      "name": parts[0],
      "company": parts[1],
      "title": parts[2]
    }
    return record

# Apply the function to every record
records = csv_lines.map(csv_to_record)

# Inspect the first item in the dataset
records.first()

{'company': 'Relato', 'name': 'Russell Jurney', 'title': 'CEO'}

### GroupBy

Using the groupBy operator to count the number of jobs per person...

In [None]:
# Group the records by the name of the person
grouped_records = records.groupBy(lambda x: x["name"])

# Show the first group
grouped_records.first()

# Count the groups
job_counts = grouped_records.map(
  lambda x: {
    "name": x[0],
    "job_count": len(x[1])
  }
)

job_counts.first()

job_counts.collect()

[{'job_count': 2, 'name': 'Russell Jurney'},
 {'job_count': 1, 'name': 'Florian Liebert'},
 {'job_count': 1, 'name': 'Don Brown'},
 {'job_count': 1, 'name': 'Steve Jobs'},
 {'job_count': 1, 'name': 'Donald Trump'}]

### Map vs FlatMap

Understanding the difference between the map and flatmap operators...

In [None]:
# Compute a relation of words by line
words_by_line = csv_lines\
  .map(lambda line: line.split(","))

print(words_by_line.collect())

# Compute a relation of words
flattened_words = csv_lines\
  .map(lambda line: line.split(","))\
  .flatMap(lambda x: x)

flattened_words.collect()

---
## Further Exercises




In [None]:
!wget https://raw.githubusercontent.com/scalable-infrastructure/exercise-students-2021/master/data/nasa/NASA_access_log_Jul95.gz
!gzip -d NASA_access_log_Jul95.gz

--2021-03-14 10:34:10--  https://raw.githubusercontent.com/scalable-infrastructure/exercise-students-2021/master/data/nasa/NASA_access_log_Jul95.gz
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 20676677 (20M) [application/octet-stream]
Saving to: ‘NASA_access_log_Jul95.gz’


2021-03-14 10:34:10 (43.8 MB/s) - ‘NASA_access_log_Jul95.gz’ saved [20676677/20676677]

gzip: NASA_access_log_Jul95 already exists; do you wish to overwrite (y or n)? y


3.2 Implement a wordcount using Spark. How many words are in the file `example.csv`?

3.3 Using the NASA Log file, implement a Spark version of the HTTP Response Code Analysis. How many log enteries per HTTP Response Code exist? 