In [None]:
#Managing Imports
from time import time
import numpy as np
import operator
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
import pyspark.sql.functions as func
from pyspark import SparkContext
import json

If the notebook is run locally, then sc (SparkContext) would be pre-configured. If running using binder, we need to create SparkContext.

In [None]:
#This notebook comes with a pre-configured sparkContext called sc
try:
    sc
except NameError:
    sc = SparkContext(master='spark://master:7077')
    with open("data/sequence.txt") as f:
         sequence = [x.strip('\n') for x in f.readlines()]
    file_rdd = sc.parallelize(sequence)
    with open("data/people.json") as f:
        json_data = [x.strip('\n') for x in f.readlines()]
    json_rdd = sc.parallelize(json_data)
else:
    file_rdd = sc.textFile("data/sequence.txt")
    json_rdd = sc.textFile("data/people.json")
sc

In [None]:
#RDDs
dummy_list = range(1000)
#this is a list
print type(dummy_list)
rddlist = sc.parallelize(dummy_list)
#this is a RDD
print type(rddlist)

In [None]:
#More RDDs
print json_rdd.collect()
print type(json_rdd.collect())
#Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

In [None]:
start_time = time()
filtered_rdd = file_rdd.filter(lambda x: len(x) < 2)
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

In [None]:
start_time = time()
filtered_data = file_rdd.filter(lambda x: len(x) < 2).collect()
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)
print filtered_data

In [None]:
start_time = time()
mapped_rdd = file_rdd.map(lambda x: 2*x)
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

In [None]:
start_time = time()
mapped_data = file_rdd.map(lambda x: 2*x).collect()
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

In [None]:
print type(filtered_rdd)
print type(filtered_data)
print type(mapped_rdd)
print type(mapped_data)
print "Conclusion: RDDs are lazy. They do nothing unless there is an action is called."

In [None]:
start_time = time()
print len(file_rdd.map(lambda x: 2*x).filter(lambda x: len(x)>1).take(10))
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

In [None]:
start_time = time()
print len(file_rdd.map(lambda x: 2*x).filter(lambda x: len(x)>1).take(100000))
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

In [None]:
start_time = time()
print len(file_rdd.map(lambda x: 2*x).filter(lambda x: len(x)>1).collect())
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

In [None]:
print "Lazy is better."

In [None]:
start_time = time()
print "We want to count the number of 1, 2, ... digit numbers."
# file_path = "data/sequence.txt"
# file_rdd = sc.textFile(file_path) 
mapped_rdd = file_rdd.map(lambda a: (len(a), 1))
count_rdd = mapped_rdd.reduceByKey(lambda a, b: a+b).sortByKey()
print count_rdd.collect()
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

In [None]:
start_time = time()
print "We want to count the number of 1, 2, ... digit numbers."
mapped_data = np.asarray(map(lambda a: (len(a), 1), file_rdd.collect()))
# count_map = mapped_rdd.reduceByKey(lambda a, b: a+b).sortByKey().collect()
# print count_map
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

In [None]:
print "Lets add all the numbers. We have two ways of doing that."
print "Approach 1: update a counter variable"
counter = 0
def increment_counter(x):
    global counter
    counter+=x
mapped_rdd = file_rdd.map(lambda a: int(a))
mapped_rdd.foreach(increment_counter)
print "Sum using first approach: ", counter
print "Approach 2: use reduce operation"
print "Sum using second appraoch: ", mapped_rdd.reduce(operator.add)
print "Which one is correct?"

In [None]:
accum = sc.accumulator(0)
mapped_rdd.foreach(lambda x: accum.add(x))
print("Actual sum: ", accum.value)

In [None]:
print "So far we talked about big data. What about structured big data?"
sqlContext = SQLContext(sc)
sqlContext

In [None]:
#dataframes
df = sqlContext.read.json("data/people.json")
df.show()
print "A DataFrame is the structured version of RDD. This is the familiar relational view of the data."

In [None]:
df.printSchema()
print "DF can infer schema on its own. We can always override the inferred schema."

In [None]:
rdd_df = file_rdd.map(lambda a: Row(a)).toDF()
rdd_df.show()
rdd_df.printSchema()

In [None]:
# rdd_df = file_rdd.map(lambda a: Row(int(a))).toDF()
# rdd_df.show()
# rdd_df.printSchema()

In [None]:
df.show()

In [None]:
print "DF can be queried in two ways: API and sql queries"
print "First method: API"

In [None]:
df.select(df['first_name'], df['gender']).show()

In [None]:
df.groupBy(df["gender"]).count().show()

In [None]:
df.groupBy(df["email"]).agg(func.count(df["email"]).alias("count")).orderBy("count", ascending=False).show()

In [None]:
print "Second method: sql"
df.registerTempTable("people")

In [None]:
sqlContext.sql("SELECT * FROM people WHERE gender = 'Female'").show()

In [None]:
print "We can define our own functions as well."
domain = func.udf(lambda s: s.split('@')[1], StringType())
df.select(domain(df.email).alias('domain'))\
.groupBy('domain').agg(func.count('domain').alias("count")).orderBy("count", ascending=False).show()