In [1]:
#Managing Imports
from time import time
import numpy as np
import operator
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
import pyspark.sql.functions as func

In [2]:
#This notebook comes with a pre-configured sparkContext called sc
sc

<pyspark.context.SparkContext at 0x7f1084022f50>

In [3]:
#RDDs
dummy_list = range(1000)
#this is a list
print type(dummy_list)
rddlist = sc.parallelize(dummy_list)
#this is a RDD
print type(rddlist)

<type 'list'>
<class 'pyspark.rdd.RDD'>


In [4]:
#More RDDs
file_path = "data/sequence.txt"
file_rdd = sc.textFile(file_path)
json_rdd = sc.textFile("data/people.json")
# print json_rdd.collect()
# print type(json_rdd.collect())
#Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

In [5]:
start_time = time()
filtered_rdd = file_rdd.filter(lambda x: len(x) < 2)
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

Time taken (in seconds) = 0.00355696678162


In [6]:
start_time = time()
filtered_data = file_rdd.filter(lambda x: len(x) < 2).collect()
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

Time taken (in seconds) = 5.95632791519


In [7]:
start_time = time()
mapped_rdd = file_rdd.map(lambda x: 2*x)
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

Time taken (in seconds) = 0.00350093841553


In [8]:
start_time = time()
mapped_data = file_rdd.map(lambda x: 2*x).collect()
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

Time taken (in seconds) = 2.28691196442


In [9]:
print type(filtered_rdd)
print type(filtered_data)
print type(mapped_rdd)
print type(mapped_data)
print "Conclusion: RDDs are lazy. They do nothing unless there is an action is called."

<class 'pyspark.rdd.PipelinedRDD'>
<type 'list'>
<class 'pyspark.rdd.PipelinedRDD'>
<type 'list'>
Conclusion: RDDs are lazy. They do nothing unless there is an action is called.


In [10]:
start_time = time()
print len(file_rdd.map(lambda x: 2*x).filter(lambda x: len(x)>1).take(10))
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

10
Time taken (in seconds) = 0.129802942276


In [11]:
start_time = time()
print len(file_rdd.map(lambda x: 2*x).filter(lambda x: len(x)>1).take(100000))
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

100000
Time taken (in seconds) = 0.553807973862


In [12]:
start_time = time()
print len(file_rdd.map(lambda x: 2*x).filter(lambda x: len(x)>1).collect())
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

1000000
Time taken (in seconds) = 2.25160479546


In [13]:
print "Lazy is better."

Lazy is better.


In [14]:
start_time = time()
print "We want to count the number of 1, 2, ... digit numbers."
file_path = "data/sequence.txt"
file_rdd = sc.textFile(file_path) 
mapped_rdd = file_rdd.map(lambda a: (len(a), 1))
count_rdd = mapped_rdd.reduceByKey(lambda a, b: a+b).sortByKey()
print count_rdd.collect()
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

We want to count the number of 1, 2, ... digit numbers.
[(1, 9), (2, 90), (3, 900), (4, 9000), (5, 90000), (6, 900000), (7, 1)]
Time taken (in seconds) = 1.79352688789


In [15]:
start_time = time()
print "We want to count the number of 1, 2, ... digit numbers."
mapped_data = np.asarray(map(lambda a: (len(a), 1), file_rdd.collect()))
# count_map = mapped_rdd.reduceByKey(lambda a, b: a+b).sortByKey().collect()
# print count_map
end_time = time()
print "Time taken (in seconds) = " + str(end_time - start_time)

We want to count the number of 1, 2, ... digit numbers.
Time taken (in seconds) = 7.90541195869


In [16]:
print "Lets add all the numbers. We have two ways of doing that."
print "Approach 1: update a counter variable"
counter = 0
def increment_counter(x):
    global counter
    counter+=x
mapped_rdd = file_rdd.map(lambda a: int(a))
mapped_rdd.foreach(increment_counter)
print "Sum using first approach: ", counter
print "Approach 2: use reduce operation"
print "Sum using second appraoch: ", mapped_rdd.reduce(operator.add)
print "Which one is correct?"

Lets add all the numbers. We have two ways of doing that.
Approach 1: update a counter variable
Sum using first approach:  0
Approach 2: use reduce operation
Sum using second appraoch:  500000500000
Which one is correct?


In [17]:
accum = sc.accumulator(0)
mapped_rdd.foreach(lambda x: accum.add(x))
print("Actual sum: ", accum.value)

('Actual sum: ', 500000500000)


In [18]:
print "So far we talked about big data. What about structured big data?"
sqlContext = SQLContext(sc)
sqlContext

So far we talked about big data. What about structured big data?


<pyspark.sql.context.SQLContext at 0x7f10669611d0>

In [19]:
#dataframes
df = sqlContext.read.json("data/people.json")
df.show()
print "A DataFrame is the structured version of RDD. This is the familiar relational view of the data."

+--------------------+----------+------+---+---------------+---------+
|               email|first_name|gender| id|     ip_address|last_name|
+--------------------+----------+------+---+---------------+---------+
|mross0@woothemes.com|      Mark|  Male|  1|  193.74.224.53|     Ross|
|pbarnes1@state.tx.us|  Patricia|Female|  2|  118.198.82.58|   Barnes|
|ghenry2@geocities.jp|   Gregory|  Male|  3| 11.202.213.200|    Henry|
|mfernandez3@tmall...|      Mark|  Male|  4|   47.154.217.0|Fernandez|
| jbaker4@auda.org.au|  Jennifer|Female|  5|   39.74.105.41|    Baker|
|agilbert5@reddit.com|       Ann|Female|  6|199.219.100.148|  Gilbert|
|hriley6@yolasite.com|    Howard|  Male|  7|   10.82.75.192|    Riley|
|    sfisher7@bbb.org|    Samuel|  Male|  8|  167.52.36.254|   Fisher|
|jmccoy8@examiner.com|    Jeremy|  Male|  9|  121.43.78.116|    Mccoy|
|     jlopez9@ask.com|      Judy|Female| 10| 148.247.157.84|    Lopez|
|rcollinsa@linkedi...|    Robert|  Male| 11|  211.141.37.30|  Collins|
|cfull

In [20]:
df.printSchema()
print "DF can infer schema on its own. We can always override the inferred schema."

root
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- last_name: string (nullable = true)

DF can infer schema on its own. We can always override the inferred schema.


In [21]:
rdd_df = file_rdd.toDF()

TypeError: Can not infer schema for type: <type 'unicode'>

In [22]:
rdd_df = file_rdd.map(lambda a: Row(a)).toDF()
rdd_df.show()
rdd_df.printSchema()

+---+
| _1|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
| 20|
+---+
only showing top 20 rows

root
 |-- _1: string (nullable = true)



In [23]:
# rdd_df = file_rdd.map(lambda a: Row(int(a))).toDF()
# rdd_df.show()
# rdd_df.printSchema()

In [24]:
df.show()

+--------------------+----------+------+---+---------------+---------+
|               email|first_name|gender| id|     ip_address|last_name|
+--------------------+----------+------+---+---------------+---------+
|mross0@woothemes.com|      Mark|  Male|  1|  193.74.224.53|     Ross|
|pbarnes1@state.tx.us|  Patricia|Female|  2|  118.198.82.58|   Barnes|
|ghenry2@geocities.jp|   Gregory|  Male|  3| 11.202.213.200|    Henry|
|mfernandez3@tmall...|      Mark|  Male|  4|   47.154.217.0|Fernandez|
| jbaker4@auda.org.au|  Jennifer|Female|  5|   39.74.105.41|    Baker|
|agilbert5@reddit.com|       Ann|Female|  6|199.219.100.148|  Gilbert|
|hriley6@yolasite.com|    Howard|  Male|  7|   10.82.75.192|    Riley|
|    sfisher7@bbb.org|    Samuel|  Male|  8|  167.52.36.254|   Fisher|
|jmccoy8@examiner.com|    Jeremy|  Male|  9|  121.43.78.116|    Mccoy|
|     jlopez9@ask.com|      Judy|Female| 10| 148.247.157.84|    Lopez|
|rcollinsa@linkedi...|    Robert|  Male| 11|  211.141.37.30|  Collins|
|cfull

In [25]:
print "DF can be queried in two ways: API and sql queries"
print "First method: API"

DF can be queried in two ways: API and sql queries
First method: API


In [26]:
df.select(df['first_name'], df['gender']).show()

+----------+------+
|first_name|gender|
+----------+------+
|      Mark|  Male|
|  Patricia|Female|
|   Gregory|  Male|
|      Mark|  Male|
|  Jennifer|Female|
|       Ann|Female|
|    Howard|  Male|
|    Samuel|  Male|
|    Jeremy|  Male|
|      Judy|Female|
|    Robert|  Male|
| Christina|Female|
|       Joe|  Male|
|     Jesse|  Male|
|      Sara|Female|
|     Terry|  Male|
|     Kelly|Female|
|    Nicole|Female|
|    Daniel|  Male|
|  Benjamin|  Male|
+----------+------+
only showing top 20 rows



In [27]:
df.groupBy(df["gender"]).count().show()

+------+-----+
|gender|count|
+------+-----+
|Female|  479|
|  Male|  521|
+------+-----+



In [28]:
df.groupBy(df["email"]).agg(func.count(df["email"]).alias("count")).orderBy("count", ascending=False).show()

+--------------------+-----+
|               email|count|
+--------------------+-----+
| jbaker4@auda.org.au|    1|
|   jyoung6d@sohu.com|    1|
|  ahall9c@flickr.com|    1|
|croberts9k@webede...|    1|
|   ckingcl@apple.com|    1|
|fcoopernt@google.com|    1|
|janderson5h@oaic....|    1|
|dmatthewsb7@image...|    1|
| jpowellbk@wiley.com|    1|
|arogersel@rediff.com|    1|
| ameyer16@rambler.ru|    1|
|     bsims67@msn.com|    1|
|mjoneslf@wootheme...|    1|
|mfranklinn9@hao12...|    1|
|aandrewspf@redcro...|    1|
| sanderson4r@ask.com|    1|
|  fcoopercq@live.com|    1|
|ladamsgf@hubpages...|    1|
|rbellgv@bluehost.com|    1|
|abrown36@yellowpa...|    1|
+--------------------+-----+
only showing top 20 rows



In [29]:
print "Second method: sql"
df.registerTempTable("people")

Second method: sql


In [30]:
sqlContext.sql("SELECT * FROM people WHERE gender = 'Female'").show()

+--------------------+----------+------+---+---------------+---------+
|               email|first_name|gender| id|     ip_address|last_name|
+--------------------+----------+------+---+---------------+---------+
|pbarnes1@state.tx.us|  Patricia|Female|  2|  118.198.82.58|   Barnes|
| jbaker4@auda.org.au|  Jennifer|Female|  5|   39.74.105.41|    Baker|
|agilbert5@reddit.com|       Ann|Female|  6|199.219.100.148|  Gilbert|
|     jlopez9@ask.com|      Judy|Female| 10| 148.247.157.84|    Lopez|
|cfullerb@comcast.net| Christina|Female| 12|  64.222.238.34|   Fuller|
|swarrene@economis...|      Sara|Female| 15|139.106.252.103|   Warren|
|    kcarrg@prlog.org|     Kelly|Female| 17|   42.81.36.143|     Carr|
|nbellh@soundcloud...|    Nicole|Female| 18|    219.227.1.1|     Bell|
|tgarciak@bizjourn...|   Theresa|Female| 21|243.177.142.241|   Garcia|
|mweaverl@china.co...|  Margaret|Female| 22|    75.44.81.77|   Weaver|
|     wsmithm@mlb.com|     Wanda|Female| 23|   116.184.26.4|    Smith|
|  lpi

In [31]:
print "We can define our own functions as well."
domain = func.udf(lambda s: s.split('@')[1], StringType())
df.select(domain(df.email).alias('domain'))\
.groupBy('domain').agg(func.count('domain').alias("count")).orderBy("count", ascending=False).show()

We can define our own functions as well.
+--------------------+-----+
|              domain|count|
+--------------------+-----+
|         alibaba.com|    8|
|        examiner.com|    7|
|             163.com|    7|
|       woothemes.com|    6|
|             mlb.com|    6|
|      friendfeed.com|    6|
|             fda.gov|    6|
|             free.fr|    6|
|           apple.com|    6|
|     sourceforge.net|    6|
|         cornell.edu|    6|
|            lulu.com|    6|
|             usa.gov|    5|
|        cbslocal.com|    5|
|pagesperso-orange.fr|    5|
|             pbs.org|    5|
|        gravatar.com|    5|
|            ucla.edu|    5|
|             nyu.edu|    5|
|       webeden.co.uk|    5|
+--------------------+-----+
only showing top 20 rows

