In [1]:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

from pyspark.rdd import RDD

from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql.types import *

import numpy as np
import pandas as pd
from bokeh.plotting import figure, show, ColumnDataSource
from bokeh.models import HoverTool
from bokeh.io import output_notebook

output_notebook()

In [2]:
conf = SparkConf().setMaster("local").setAppName("Playground")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/14 10:52:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
sc = SparkContext.getOrCreate(conf=conf)

In [4]:
filepath = "../data/FL_insurance_sample.csv"

In [5]:
sc.parallelize([1,2,3,4,5]).collect()

def rdd_from_list(sc, n):
    return sc.parallelize(range(n))

In [6]:
print(rdd_from_list(sc, 10).collect())

[Stage 1:>                                                          (0 + 1) / 1]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


                                                                                

In [7]:
result_rdd = rdd_from_list(sc, 3)
assert isinstance(result_rdd, RDD)
assert result_rdd.collect() == [0, 1, 2]

In [8]:
def load_file_to_rdd(sc, path):
    return sc.textFile(path)

In [9]:
result_rdd = load_file_to_rdd(sc, filepath)

assert isinstance(result_rdd, RDD)

In [10]:
def create_dataframe(spark, rdd, schema):
    """Creates a dataframe from an RDD and a schema"""
    return spark.createDataFrame(rdd, schema)
    

In [11]:
rdd = spark.sparkContext.parallelize([('1', 'a'), ('2', 'b'), ('3', 'c'), ('4', 'd'), ('5', 'e'), ('6', 'f')])
schema = StructType([StructField('id', StringType(), True), StructField('letter', StringType(), True)])

result_df = create_dataframe(spark, rdd, schema)

In [12]:
assert result_df.schema == schema
assert result_df.rdd.collect() == rdd.collect()

In [13]:
genders_rdd = spark.sparkContext.parallelize([('1', 'M'), ('2', 'F'), ('3', 'F'), ('4', 'M'), ('5', 'F'), ('6', 'M')])
grades_rdd = spark.sparkContext.parallelize([('1', 1.0), ('2', 2.0), ('3', 3.0), ('4', 4.0), ('5', 5.0), ('6', 6.0)])

genders_schema = StructType([StructField('ID', StringType(), True), StructField('gender', StringType(), True)])
grades_schema = StructType([StructField('ID', StringType(), True), StructField('grade', FloatType(), True)])

genders_df = create_dataframe(spark, genders_rdd, genders_schema)
grades_df = create_dataframe(spark, grades_rdd, grades_schema)

genders_df.createOrReplaceTempView('genders')
grades_df.createOrReplaceTempView('grades')

In [14]:
genders_df.filter(genders_df['ID'] > 2)

DataFrame[ID: string, gender: string]

In [15]:
genders_df[genders_df['ID'] > 2].show()

+---+------+
| ID|gender|
+---+------+
|  3|     F|
|  4|     M|
|  5|     F|
|  6|     M|
+---+------+



In [16]:
spark.sql('SELECT * FROM genders WHERE ID > 2').show()

+---+------+
| ID|gender|
+---+------+
|  3|     F|
|  4|     M|
|  5|     F|
|  6|     M|
+---+------+



In [17]:
genders_df[genders_df['ID'] > 2].show()

+---+------+
| ID|gender|
+---+------+
|  3|     F|
|  4|     M|
|  5|     F|
|  6|     M|
+---+------+



In [18]:
sc.parallelize([[1, 3], [2, 9]]).map(lambda row: row[0]).collect()

[1, 2]

In [19]:
def op1(sc, matrix):
    matrix = matrix.map(lambda row: [row[0]*2, row[1]])
    matrix = matrix.map(lambda row: [row[0], row[1]-3])
    return matrix

In [20]:
matrix = [[1,3], [2,5], [8,9]]
matrix_rdd = sc.parallelize(matrix)
result_rdd = op1(sc, matrix_rdd)

assert isinstance(result_rdd, RDD)
assert result_rdd.collect() == [[2, 0], [4, 2], [16, 6]]

In [21]:
result_rdd.collect()

[[2, 0], [4, 2], [16, 6]]

In [22]:
def op2(sc, sentences):
    sentences = sentences.flatMap(lambda sentence: sentence.split(' '))
    return sentences

In [23]:
sentences_rdd = sc.parallelize(['Hi everybody', 'My name is Fanilo', 'and your name is Antoine everybody'])
result_rdd = op2(sc, sentences_rdd)

assert isinstance(result_rdd, RDD)
assert result_rdd.collect() == ['Hi', 'everybody', 'My', 'name', 'is', 'Fanilo', 'and', 'your', 'name', 'is', 'Antoine', 'everybody']

In [24]:
sc.parallelize(range(20)).filter(lambda num: num > 5).collect()

[6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

In [25]:
def op3(sc, matrix):
    matrix = matrix.filter(lambda x: x % 2 == 1)
    return matrix

In [26]:
numbers = [1,2,3,4,5,6,7,8,9]
numbers_rdd = sc.parallelize(numbers)
result_rdd = op3(sc, numbers_rdd)

assert isinstance(result_rdd, RDD)
assert result_rdd.collect() == [1,3,5,7,9]

In [27]:
sc.parallelize(range(5)).reduce(lambda x, y: x+y)

10

In [28]:
def op4(sc, matrix):
    matrix = matrix.filter(lambda x: x % 2 == 1).map(lambda x: x*x).reduce(lambda x, y: x+y)
    return matrix

In [29]:
numbers = range(100)
numbers_rdd = sc.parallelize(numbers)
result = op4(sc, numbers_rdd)

assert result == 166650

In [30]:
sc.parallelize(range(10)).map(lambda num: (num % 2, num)).reduceByKey(lambda x,y: x+y).collect()

[(0, 20), (1, 25)]

In [31]:
def wordcount(sc, sentences):
    """
    Given a RDD of sentences, return the wordcount, after splitting sentences per whitespace.
    """
    # YOUR CODE HERE
    sentences = sentences.flatMap(lambda sentence: sentence.split(' '))
    word_count = sentences.map(lambda word: (word, 1)).reduceByKey(lambda x,y: x+y)
    return word_count

In [32]:
sentences_rdd = sc.parallelize(['Hi everybody', 'My name is Fanilo', 'and your name is Antoine everybody'])
result_rdd = wordcount(sc, sentences_rdd)

In [33]:
assert isinstance(result_rdd, RDD)
assert result_rdd.collect() == [
    ('Hi', 1),
    ('everybody', 2),
    ('My', 1),
    ('name', 2),
    ('is', 2),
    ('Fanilo', 1),
    ('and', 1),
    ('your', 1),
    ('Antoine', 1)
]

In [34]:
genders_rdd = sc.parallelize([('1', 'M'), ('2', 'M'), ('3', 'F'), ('4', 'F'), ('5', 'F'), ('6', 'M')])
grades_rdd = sc.parallelize([('1', 5), ('2', 12), ('3', 7), ('4', 18), ('5', 9), ('6', 5)])

genders_rdd.join(grades_rdd).collect()

                                                                                

[('1', ('M', 5)),
 ('4', ('F', 18)),
 ('2', ('M', 12)),
 ('3', ('F', 7)),
 ('5', ('F', 9)),
 ('6', ('M', 5))]

In [35]:
def mean_grade_per_gender(sc, genders, grades):
    """
    Given a RDD of studentID to grades and studentID to gender, compute mean grade for each gender returned as paired RDD.
    Assume all studentIDs are present in both RDDs, making inner join possible, no need to check that.
    """
    joined = genders.join(grades)
    mean_rdd = joined.map(lambda x: (x[1][0], x[1][1])).groupByKey().map(lambda x: (x[0], sum(x[1])/len(x[1])))
    return mean_rdd

In [37]:
genders_rdd = sc.parallelize([('1', 'M'), ('2', 'M'), ('3', 'F'), ('4', 'F'), ('5', 'F'), ('6', 'M')])
grades_rdd = sc.parallelize([('1', 5), ('2', 12), ('3', 7), ('4', 18), ('5', 9), ('6', 5)])

result_rdd = mean_grade_per_gender(sc, genders_rdd, grades_rdd)
assert isinstance(result_rdd, RDD)
assert result_rdd.collect() == [('M', 7.333333333333333), ('F', 11.333333333333334)]

In [38]:
sc.parallelize(['a', 'b', 'c', 'd']).zipWithIndex().collect()

[('a', 0), ('b', 1), ('c', 2), ('d', 3)]

In [42]:
def filter_header(sc, rdd):
    """
    From the FL insurance RDD, remove the first line.
    """
    # YOUR CODE HERE
    return rdd.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])

In [43]:
header = 'policyID,statecode,county,eq_site_limit,hu_site_limit,fl_site_limit,fr_site_limit,tiv_2011,tiv_2012,eq_site_deductible,hu_site_deductible,fl_site_deductible,fr_site_deductible,point_latitude,point_longitude,line,construction,point_granularity'
file = load_file_to_rdd(sc, filepath)
result_rdd = filter_header(sc, file)

assert isinstance(result_rdd, RDD)
assert file.filter(lambda line: line==header).collect()
assert not result_rdd.filter(lambda line: line == header).collect()

In [45]:
file_rdd = filter_header(sc, load_file_to_rdd(sc, filepath))

In [52]:
def get_county(sc, rdd):
    """
    From the FL insurance RDD, return a RDD containing all of the county.
    We assume the csv is correctly formatted and every line has the correct number of elements.
    """
    # YOUR CODE HERE
    return rdd.map(lambda line: line.split(',')[2])

def county_count(sc, rdd):
    """
    Return a RDD of key,value with county as key, count as values
    """
    # YOUR CODE HERE
    return rdd.map(lambda county: (county, 1)).reduceByKey(lambda x,y: x+y)

In [53]:
# CAREFUL: some tests are invisible so don't try to output a dictionary with what looks like the correct answers :)
file_rdd = filter_header(sc, load_file_to_rdd(sc, filepath))
county_rdd = get_county(sc, file_rdd)

result = dict(county_count(sc, county_rdd).collect())
assert result.get('CLAY COUNTY') == 346