# Spark Exercise Solutions

In [None]:
import findspark
findspark.init()

In [None]:
import pyspark

In [None]:
cd ../spark/

In [None]:
from ds101_spark_tools import use_spark_context

## Exercise: Pi Approximation as MapReduce

Original code:

In [None]:
import random

def random_point(x):
    return (random.random(), random.random())

def inside(p):
    x, y = p
    return x*x + y*y < 1

In [None]:
num_samples = 100000

In [None]:
sc = pyspark.SparkContext(appName="Pi")
count = sc.range(num_samples).map(random_point).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

Map-Reduce reimplementation:

In [None]:
def inside_counter(p):
    x, y = p
    if (x*x + y*y < 1):
        return 1
    else:
        return 0

In [None]:
sc = pyspark.SparkContext(appName="PiMapReduced")

count = sc.range(num_samples).map(random_point).map(inside_counter).reduce(lambda a, b: a + b)
pi = 4 * count / num_samples
print(pi)

sc.stop()

## Exercise: Pi Approximation as Batch Job

In [None]:
%%file scripts/pi_approximation_job_solution.py

from contextlib import contextmanager
from pyspark import SparkContext, SparkConf
import random

SPARK_APP_NAME='pi'

def random_point(x):
    return (random.random(), random.random())

def inside(p):
    x, y = p
    return x*x + y*y < 1

def pi_approximation(spark_context, num_samples):
    """ Approximate pi via Monte Carlo method"""
    count = spark_context.range(num_samples).map(random_point).filter(inside).count()
    pi = 4 * count / num_samples
    return pi

@contextmanager
def use_spark_context(sparkAppName):
    conf = SparkConf().setAppName(sparkAppName) 
    spark_context = SparkContext(conf=conf)

    try:
        yield spark_context
    finally:
        spark_context.stop()

with use_spark_context(SPARK_APP_NAME) as spark_context:
    num_samples = 100000000
    pi = pi_approximation(spark_context, num_samples)
    print()
    print("RESULT: pi is approximately ", pi)
    print()


## Exercises: Wrangling the Zipcode DataFrame

In [None]:
from contextlib import contextmanager

@contextmanager
def use_spark_session(appName):
    spark_session = pyspark.sql.SparkSession.builder.appName(appName).getOrCreate()
    try:
        print("starting ", appName)
        yield spark_session
    finally:
        spark_session.stop()
        print("stopping ", appName)

a) Output a table of the total population of each state in descending order

In [None]:
from pyspark.sql.functions import col

with use_spark_session("State population size") as spark:
    df = spark.read.json("../.assets/data/zipcodes/zips.json")
    df.groupby("state") \
        .sum() \
        .sort(col("sum(pop)").desc()) \
        .show()   

b) Show the zip code areas north of the 49th parallel with more than 1000 inhabitans.

In [None]:
from pyspark.sql.functions import col

with use_spark_session("49th parallel") as spark:
    df = spark.read.json("../.assets/data/zipcodes/zips.json")
    df.withColumn("lat", df["loc"][0]) \
        .withColumn("lon", df["loc"][1]) \
        .where(col("lon") >= 49) \
        .where(col("pop") >= 1000) \
        .show()    

## Exercise: Assembling the Full Pipeline

In [None]:
data_path = "../.assets/data/titanic/titanic.csv"

In [None]:
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType

schema = StructType([
             StructField('PassengerId', StringType()),
             StructField('Survived', IntegerType()),
             StructField('Pclass', IntegerType()),
             StructField('Name', StringType()),
             StructField('Sex', StringType()),
             StructField('Age', IntegerType()),
             StructField('SibSp', IntegerType()),
             StructField('Parch', IntegerType()),
             StructField('Ticket', StringType()),
             StructField('Fare', DoubleType()),
             StructField('Cabin', StringType()),
             StructField('Embarked', StringType())
        ])


In [None]:
from pyspark.ml import Transformer, Estimator, Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, VectorAssembler, SQLTransformer
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
class NaDropper(Transformer):
    """
    Drops rows with at least one not-a-number element
    """
    
    # lazy workaround - a transformer needs to have these attributes
    # TODO: replace if needed
    _defaultParamMap = dict()
    _paramMap = dict()
    _params = dict()
    uid = 0

    def __init__(self, cols=None):
        self.cols = cols


    def _transform(self, data):
        dataAfterDrop = data.dropna(subset=self.cols) 
        return dataAfterDrop
    
    def __repr__(self):
        """ Show a proper string representation when printing the pipeline stage"""
        return str(type(self))


In [None]:
class Rename_label(Transformer):
    """
    Renames a target column to `label`
    """
    
    # lazy workaround - a transformer needs to have these attributes
    # TODO: replace if needed
    _defaultParamMap = dict()
    _paramMap = dict()
    _params = dict()
    uid = 0

    def __init__(self, col=None):
        self.col = col


    def _transform(self, data):
        dataAfterRename = data.withColumnRenamed(self.col, "label") 
        return dataAfterRename
    
    def __repr__(self):
        """ Show a proper string representation when printing the pipeline stage"""
        return str(type(self))


In [None]:
with use_spark_session("ML Pipeline") as spark:
    data_raw = spark.read.csv(data_path, header=True, schema=schema)
    stages = []

    nadrop = NaDropper(cols=data_raw.columns)
    encode1 = StringIndexer(inputCol="Embarked", outputCol="Embarked_encoded")
    encode2 = StringIndexer(inputCol="Sex", outputCol="Sex_encoded")
    # rename = Rename_label(col="Survived")
    rename= SQLTransformer(statement = "SELECT *, Survived as label FROM __THIS__")
    assemble_features = VectorAssembler(inputCols=["Age", "Fare", "Sex_encoded", "Embarked_encoded"], 
                                        outputCol="features")
    classifier = DecisionTreeClassifier()

    training_stages = [nadrop, encode1, encode2, rename, assemble_features, classifier]

    splitRatio = 0.8
    data_training, data_test = data_raw.randomSplit([splitRatio, 1-splitRatio])

    model = Pipeline(stages=training_stages).fit(data_training)

    predictions = model.transform(data_test)

    predictions[["label","rawPrediction","probability", "prediction"]].show(20)

## Exercise: Using SQLTransformer

In [None]:
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
from pyspark.ml.feature import SQLTransformer

with use_spark_session("Using SQL Transformer") as spark:

    schema = StructType([
                 StructField('PassengerId', StringType()),
                 StructField('Survived', IntegerType()),
                 StructField('Pclass', IntegerType()),
                 StructField('Name', StringType()),
                 StructField('Sex', StringType()),
                 StructField('Age', IntegerType()),
                 StructField('SibSp', IntegerType()),
                 StructField('Parch', IntegerType()),
                 StructField('Ticket', StringType()),
                 StructField('Fare', DoubleType()),
                 StructField('Cabin', StringType()),
                 StructField('Embarked', StringType())
            ])
    data_path = "../.assets/data/titanic/titanic.csv"
    data = spark.read.format("csv").option("header", "true").schema(schema).load(data_path) 

    
    na_dropper = SQLTransformer(
                    statement="SELECT * FROM __THIS__ WHERE {}".format(
                        " AND ".join(["{} IS NOT NULL".format(x) for x in data.columns])
                        )
                    )
    
    print("BEFORE:")
    for col in data.columns:
        print(col, " : ", data.filter(f"{col} is NULL").count())
    print()
    
    data_clean = na_dropper.transform(data)
    
    print("AFTER")
    for col in data_clean.columns:
        print(col, " : ", data_clean.filter(f"{col} is NULL").count())
    print()

## Exercise: Word Count

In [None]:
text_path = "../.assets/data/iliad/iliad.txt"

with use_spark_context("WordCount") as spark:
    text_file = spark.textFile(text_path)
    counts = text_file  \
                 .flatMap(lambda line: line.split(".")) \
                 .flatMap(lambda sentence: sentence.split(" ")) \
                 .filter(lambda word: len(word) > 0) \
                 .map(lambda word: (word.lower(), 1)) \
                 .reduceByKey(lambda a, b: a + b) \
                 .sortBy(lambda pair: pair[1], ascending=False)
    for kv in counts.take(10):
        print(kv)


Equivalent, but as script for `spark_submit`:

In [None]:
%%file scripts/spark_wordcount.py

from contextlib import contextmanager
from pyspark import SparkContext, SparkConf
import argparse

@contextmanager
def use_spark_context(appName):
    conf = SparkConf().setAppName(appName) 
    spark_context = SparkContext(conf=conf)

    try:
        print("starting ", appName)
        yield spark_context
    finally:
        spark_context.stop()
        print("stopping ", appName)

        
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='')
    parser.add_argument('-t','--text', help='path to text data', required=True)
    args = vars(parser.parse_args())
    
    text_path = args["text"]

    with use_spark_context("WordCount") as spark:
        text_file = spark.textFile(text_path)
        counts = text_file  \
                     .flatMap(lambda line: line.split(".")) \
                     .flatMap(lambda sentence: sentence.split(" ")) \
                     .filter(lambda word: len(word) > 0) \
                     .map(lambda word: (word.lower(), 1)) \
                     .reduceByKey(lambda a, b: a + b) \
                     .sortBy(lambda pair: pair[1], ascending=False)
        for kv in counts.take(10):
            print(kv)


## Exercise: Bigram Count

In [None]:
text_path = "../.assets/data/iliad/iliad.txt"

with use_spark_context("BigramCount") as spark:
    text_file = spark.textFile(text_path)
    bigrams = text_file.flatMap(lambda line: line.split(".")) \
                       .map(lambda line: line.strip().split(" ")) \
                       .flatMap(lambda xs: (tuple(x) for x in zip(xs, xs[1:])))
    counts = bigrams.map(lambda x: (x, 1)) \
            .reduceByKey(lambda x, y: x + y) \
            .sortBy(lambda kv: kv[1], ascending=False)
    for kv in counts.take(10):
        print(kv)

## Exercise: The Greatest Name in British Surreal Comedy

In [None]:
%%file scripts/python_names.txt
Eric Idle
Graham Chapman
John Cleese
Terry Gilliam
Terry Jones
Michael Palin
Monty Python


In [None]:
with open("scripts/python_names.txt", "r") as f:
    names = [line.strip() for line in f]
    print(names)

In [None]:
%%file scripts/spark_greatest_name.py

from contextlib import contextmanager
from pyspark import SparkContext, SparkConf
import argparse

@contextmanager
def use_spark_context(appName):
    conf = SparkConf().setAppName(appName) 
    spark_context = SparkContext(conf=conf)

    try:
        print("starting ", appName)
        yield spark_context
    finally:
        spark_context.stop()
        print("stopping ", appName)

        
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='')
    parser.add_argument('-t','--text', help='path to text data', required=True)
    parser.add_argument('-n','--names', help='path to names file, one name per line', required=True)
    args = vars(parser.parse_args())
    
    text_path = args["text"]
    names_path = args["names"]
    with open(names_path, "r") as names_file:
        names = [line.strip() for line in names_file]
        name_bigrams = [tuple(name.split(" ")) for name in names]

    print("input names: ", name_bigrams)
    with use_spark_context("The Greatest Name") as spark:
        text_file = spark.textFile(text_path)
        bigrams = text_file.flatMap(lambda line: line.split(".")) \
                           .map(lambda line: line.strip().split(" ")) \
                           .flatMap(lambda xs: (tuple(x) for x in zip(xs, xs[1:])))
        counts = bigrams.map(lambda bigram: (bigram, 1)) \
                .reduceByKey(lambda x, y: x + y) \
                .filter(lambda kv: kv[0] in name_bigrams) \
                .collect()
        print("Number of mentions for each name:")
        print(counts)

---
_This notebook is licensed under a [Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International (CC BY-NC-SA 4.0)](https://creativecommons.org/licenses/by-nc-sa/4.0/). Copyright © 2018-2025 [Point 8 GmbH](https://point-8.de)_