### first class of Spark
- use Apache Spark with the PySpark API
- In order to use Spark and DataFrame API, we will need to use a SQLContext, which is created on top of SparkContext
- Both of SQLContext and SparkContext are created when Spark is lanuched, ** sc ** stands for SparkContext, ** sqlContext ** stands for SQLContext 

In [2]:
type(sqlContext)

pyspark.sql.context.SQLContext

In [3]:
sc

<pyspark.context.SparkContext at 0x1010377d0>

In [4]:
### sqlContext attributes
dir(sqlContext)

['__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__format__',
 '__getattribute__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_inferSchema',
 '_instantiatedContext',
 '_jsc',
 '_jsqlContext',
 '_jvm',
 '_sc',
 '_ssql_ctx',
 'cacheTable',
 'clearCache',
 'createDataFrame',
 'createExternalTable',
 'dropTempTable',
 'getConf',
 'getOrCreate',
 'newSession',
 'range',
 'read',
 'readStream',
 'registerDataFrameAsTable',
 'registerFunction',
 'setConf',
 'sparkSession',
 'sql',
 'streams',
 'table',
 'tableNames',
 'tables',
 'udf',
 'uncacheTable']

In [1]:
### getting help
# help(sqlContext)

In [6]:
sc.version

u'2.0.1'

### using DataFrames and chaining together transformations and actions

In [8]:
### create a python collection of 10,000 people
from faker import Faker
fake = Faker()
fake.seed(1234)
fake

<faker.generator.Generator at 0x109b93f50>

In [9]:
# each entry looks like this: last_name, first_name, ssn, job, age
from pyspark.sql import Row
def fake_entry():
    name = fake.name().split()
    return (name[1], name[0], fake.ssn(), fake.job(), abs(2016-fake.date_time().year)+1)

In [10]:
# create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
    for _ in xrange(times):
        yield func(*args, **kwargs)

In [11]:
data = list(repeat(10000, fake_entry))

In [12]:
data[0]

(u'Smith', u'Jessica', u'819-93-5822', 'Personnel officer', 44)

In [13]:
### transform python object into a DataFrame
dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))

In [14]:
print 'type of dataDF: {0}'.format(type(dataDF))

type of dataDF: <class 'pyspark.sql.dataframe.DataFrame'>


In [16]:
dataDF.printSchema()

root
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- ssn: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- age: long (nullable = true)



In [17]:
### register a DaraFrame as a named table
sqlContext.registerDataFrameAsTable(dataDF, 'dataframe')

In [18]:
help(dataDF)

Help on DataFrame in module pyspark.sql.dataframe object:

class DataFrame(__builtin__.object)
 |  A distributed collection of data grouped into named columns.
 |  
 |  A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
 |  and can be created using various functions in :class:`SQLContext`::
 |  
 |      people = sqlContext.read.parquet("...")
 |  
 |  Once created, it can be manipulated using the various domain-specific-language
 |  (DSL) functions defined in: :class:`DataFrame`, :class:`Column`.
 |  
 |  To select a column from the data frame, use the apply method::
 |  
 |      ageCol = people.age
 |  
 |  A more concrete example::
 |  
 |      # To create DataFrame using SQLContext
 |      people = sqlContext.read.parquet("...")
 |      department = sqlContext.read.parquet("...")
 |  
 |      people.filter(people.age > 30).join(department, people.deptId == department.id) \
 |        .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
 |  
 | 

In [19]:
dataDF.rdd.getNumPartitions()

4

In [20]:
### query
### when u use DataFrame or SparkSQL, u are building a query plan. Each transformation u apply to a DataFrame adds some 
### information to the query plan
newDF = dataDF.distinct().select('*')
newDF.explain(True)

== Parsed Logical Plan ==
'Project [*]
+- Aggregate [last_name#0, first_name#1, ssn#2, occupation#3, age#4L], [last_name#0, first_name#1, ssn#2, occupation#3, age#4L]
   +- LogicalRDD [last_name#0, first_name#1, ssn#2, occupation#3, age#4L]

== Analyzed Logical Plan ==
last_name: string, first_name: string, ssn: string, occupation: string, age: bigint
Project [last_name#0, first_name#1, ssn#2, occupation#3, age#4L]
+- Aggregate [last_name#0, first_name#1, ssn#2, occupation#3, age#4L], [last_name#0, first_name#1, ssn#2, occupation#3, age#4L]
   +- LogicalRDD [last_name#0, first_name#1, ssn#2, occupation#3, age#4L]

== Optimized Logical Plan ==
Aggregate [last_name#0, first_name#1, ssn#2, occupation#3, age#4L], [last_name#0, first_name#1, ssn#2, occupation#3, age#4L]
+- LogicalRDD [last_name#0, first_name#1, ssn#2, occupation#3, age#4L]

== Physical Plan ==
*HashAggregate(keys=[last_name#0, first_name#1, ssn#2, occupation#3, age#4L], functions=[], output=[last_name#0, first_name#1, ssn#2

In [21]:
### using select
subDF = dataDF.select('last_name', 'first_name', 'ssn', 'occupation', (dataDF.age - 1).alias('age'))

In [22]:
### using collect, returns all the results
results = subDF.collect()
print results

[Row(last_name=u'Smith', first_name=u'Jessica', ssn=u'819-93-5822', occupation=u'Personnel officer', age=43), Row(last_name=u'Johnson', first_name=u'Cynthia', ssn=u'710-35-6233', occupation=u'Music tutor', age=40), Row(last_name=u'Smith', first_name=u'Brian', ssn=u'438-96-0646', occupation=u'Leisure centre manager', age=25), Row(last_name=u'Riley', first_name=u'Ann', ssn=u'243-56-6446', occupation=u'Hydrologist', age=30), Row(last_name=u'Chavez', first_name=u'Timothy', ssn=u'477-02-5081', occupation=u'Accountant, chartered certified', age=40), Row(last_name=u'Brown', first_name=u'James', ssn=u'529-17-5573', occupation=u'Chemical engineer', age=2), Row(last_name=u'Thomas', first_name=u'Tina', ssn=u'275-04-2768', occupation=u'Retail manager', age=38), Row(last_name=u'Moore', first_name=u'Troy', ssn=u'750-05-3862', occupation=u'Engineering geologist', age=30), Row(last_name=u'Patton', first_name=u'Karen', ssn=u'424-81-8815', occupation=u'Graphic designer', age=10), Row(last_name=u'Martine

In [23]:
### using show, displays 20 rows
subDF.show()

+---------+-----------+-----------+--------------------+---+
|last_name| first_name|        ssn|          occupation|age|
+---------+-----------+-----------+--------------------+---+
|    Smith|    Jessica|819-93-5822|   Personnel officer| 43|
|  Johnson|    Cynthia|710-35-6233|         Music tutor| 40|
|    Smith|      Brian|438-96-0646|Leisure centre ma...| 25|
|    Riley|        Ann|243-56-6446|         Hydrologist| 30|
|   Chavez|    Timothy|477-02-5081|Accountant, chart...| 40|
|    Brown|      James|529-17-5573|   Chemical engineer|  2|
|   Thomas|       Tina|275-04-2768|      Retail manager| 38|
|    Moore|       Troy|750-05-3862|Engineering geolo...| 30|
|   Patton|      Karen|424-81-8815|    Graphic designer| 10|
| Martinez|  Elizabeth|167-44-1195|              Lawyer|  7|
|  Matthew|        Dr.|311-70-8099|Chief Technology ...|  4|
|  Pittman|Christopher|648-79-4467|Armed forces oper...| 28|
|  Johnson|     Dustin|456-09-9328|  Press photographer| 32|
|    Scott|     Andrea|0

In [24]:
subDF.show(n=25, truncate=False)

+---------+-----------+-----------+--------------------------------------------+---+
|last_name|first_name |ssn        |occupation                                  |age|
+---------+-----------+-----------+--------------------------------------------+---+
|Smith    |Jessica    |819-93-5822|Personnel officer                           |43 |
|Johnson  |Cynthia    |710-35-6233|Music tutor                                 |40 |
|Smith    |Brian      |438-96-0646|Leisure centre manager                      |25 |
|Riley    |Ann        |243-56-6446|Hydrologist                                 |30 |
|Chavez   |Timothy    |477-02-5081|Accountant, chartered certified             |40 |
|Brown    |James      |529-17-5573|Chemical engineer                           |2  |
|Thomas   |Tina       |275-04-2768|Retail manager                              |38 |
|Moore    |Troy       |750-05-3862|Engineering geologist                       |30 |
|Patton   |Karen      |424-81-8815|Graphic designer              

In [25]:
### using count
print dataDF.count()
print subDF.count()

10000
10000


In [30]:
### using filter
filteredDF = subDF.filter(subDF.age < 10)
filteredDF.show(10, truncate=False)
filteredDF.count()

+---------+----------+-----------+--------------------------------------------+---+
|last_name|first_name|ssn        |occupation                                  |age|
+---------+----------+-----------+--------------------------------------------+---+
|Brown    |James     |529-17-5573|Chemical engineer                           |2  |
|Martinez |Elizabeth |167-44-1195|Lawyer                                      |7  |
|Matthew  |Dr.       |311-70-8099|Chief Technology Officer                    |4  |
|Hernandez|Norma     |196-43-7367|Conservation officer, nature                |8  |
|Choi     |Shannon   |484-92-4220|Physiotherapist                             |7  |
|Morris   |Geoffrey  |241-76-5892|Lighting technician, broadcasting/film/video|8  |
|Johnson  |Chad      |034-99-9738|Pathologist                                 |0  |
|Robertson|Jessica   |063-76-3052|Press photographer                          |4  |
|Cox      |Erica     |834-71-6427|Higher education careers adviser          

2209

In [39]:
### lambda function and user defined functions
### pyspark.sql.functions.udf  vs. sqlContext.registerFunction
### the later is used in the context of spark SQL query
### e.g. sqlContext.registerFunction("stringLengthString", lambda x: len(x))
### sqlContext.sql("SELECT stringLengthString('test')").collect()
from pyspark.sql.types import BooleanType
less_ten = pyspark.sql.functions.udf(lambda s:s<10, BooleanType())
lambdaDF = subDF.filter(less_ten(subDF.age))
lambdaDF.show(5, truncate=False)
lambdaDF.count()

+---------+----------+-----------+----------------------------+---+
|last_name|first_name|ssn        |occupation                  |age|
+---------+----------+-----------+----------------------------+---+
|Brown    |James     |529-17-5573|Chemical engineer           |2  |
|Martinez |Elizabeth |167-44-1195|Lawyer                      |7  |
|Matthew  |Dr.       |311-70-8099|Chief Technology Officer    |4  |
|Hernandez|Norma     |196-43-7367|Conservation officer, nature|8  |
|Choi     |Shannon   |484-92-4220|Physiotherapist             |7  |
+---------+----------+-----------+----------------------------+---+
only showing top 5 rows



2209

In [41]:
even = pyspark.sql.functions.udf(lambda s:s%2==0, BooleanType())
evenDF = lambdaDF.filter(even(lambdaDF.age))
evenDF.show(5, truncate=False)
evenDF.count()

+---------+----------+-----------+--------------------------------------------+---+
|last_name|first_name|ssn        |occupation                                  |age|
+---------+----------+-----------+--------------------------------------------+---+
|Brown    |James     |529-17-5573|Chemical engineer                           |2  |
|Matthew  |Dr.       |311-70-8099|Chief Technology Officer                    |4  |
|Hernandez|Norma     |196-43-7367|Conservation officer, nature                |8  |
|Morris   |Geoffrey  |241-76-5892|Lighting technician, broadcasting/film/video|8  |
|Johnson  |Chad      |034-99-9738|Pathologist                                 |0  |
+---------+----------+-----------+--------------------------------------------+---+
only showing top 5 rows



1101

In [42]:
### first() and take()
print "first: {0}\n".format(filteredDF.first())
print "first four of them: {0}\n".format(filteredDF.take(4))

first: Row(last_name=u'Brown', first_name=u'James', ssn=u'529-17-5573', occupation=u'Chemical engineer', age=2)

first four of them: [Row(last_name=u'Brown', first_name=u'James', ssn=u'529-17-5573', occupation=u'Chemical engineer', age=2), Row(last_name=u'Martinez', first_name=u'Elizabeth', ssn=u'167-44-1195', occupation=u'Lawyer', age=7), Row(last_name=u'Matthew', first_name=u'Dr.', ssn=u'311-70-8099', occupation=u'Chief Technology Officer', age=4), Row(last_name=u'Hernandez', first_name=u'Norma', ssn=u'196-43-7367', occupation=u'Conservation officer, nature', age=8)]



In [44]:
### orderby -- Pandas style
dataDF.orderBy(dataDF.age.desc()).take(5)

[Row(last_name=u'House', first_name=u'Michelle', ssn=u'043-85-4303', occupation=u'Tourist information centre manager', age=47),
 Row(last_name=u'Henry', first_name=u'Andrea', ssn=u'002-33-5681', occupation=u'Information systems manager', age=47),
 Row(last_name=u'Mendoza', first_name=u'Claire', ssn=u'327-62-6567', occupation=u'Fitness centre manager', age=47),
 Row(last_name=u'Clark', first_name=u'Chad', ssn=u'550-80-0263', occupation=u'Interior and spatial designer', age=47),
 Row(last_name=u'Crane', first_name=u'Curtis', ssn=u'441-91-6404', occupation=u'Call centre manager', age=47)]

In [53]:
### orderby -- subscript style
dataDF.orderBy('age', 'first_name').take(5)

[Row(last_name=u'James', first_name=u'Alexandria', ssn=u'438-67-2346', occupation=u'Designer, jewellery', age=1),
 Row(last_name=u'Avila', first_name=u'Alexis', ssn=u'399-44-2866', occupation=u'Customer service manager', age=1),
 Row(last_name=u'Gray', first_name=u'Alicia', ssn=u'155-72-6329', occupation=u'Nurse, mental health', age=1),
 Row(last_name=u'Santana', first_name=u'Alison', ssn=u'405-73-8652', occupation=u'Information systems manager', age=1),
 Row(last_name=u'Stephens', first_name=u'Alyssa', ssn=u'464-59-5082', occupation=u'Immigration officer', age=1)]

In [54]:
### distinct and dropDuplicates
print dataDF.count()
print dataDF.distinct().count()

10000
10000


In [55]:
tempDF = sqlContext.createDataFrame([('Joe', 1), ('Joe', 1), ('Anna', 15),('Anna', 12), ('Ravi', 5)], ('name', 'score'))

In [57]:
tempDF.show()

+----+-----+
|name|score|
+----+-----+
| Joe|    1|
| Joe|    1|
|Anna|   15|
|Anna|   12|
|Ravi|    5|
+----+-----+



In [58]:
### the distinct transformation will look at each column
tempDF.distinct().show()

+----+-----+
|name|score|
+----+-----+
| Joe|    1|
|Ravi|    5|
|Anna|   12|
|Anna|   15|
+----+-----+



In [59]:
print dataDF.count()
print dataDF.dropDuplicates(['first_name','last_name']).count()

10000
9326


In [61]:
### drop, behaves like the opposite of select()
dataDF.drop('occupation').drop('age').take(3)

[Row(last_name=u'Smith', first_name=u'Jessica', ssn=u'819-93-5822'),
 Row(last_name=u'Johnson', first_name=u'Cynthia', ssn=u'710-35-6233'),
 Row(last_name=u'Smith', first_name=u'Brian', ssn=u'438-96-0646')]

In [63]:
### groupBy
dataDF.groupBy('occupation').count().show(3, truncate=False)

+--------------------------+-----+
|occupation                |count|
+--------------------------+-----+
|Librarian, academic       |20   |
|Designer, ceramics/pottery|16   |
|Engineer, aeronautical    |21   |
+--------------------------+-----+
only showing top 3 rows



In [64]:
dataDF.groupBy('occupation').avg('age').show(3, truncate=False)

+--------------------------+------------------+
|occupation                |avg(age)          |
+--------------------------+------------------+
|Librarian, academic       |19.85             |
|Designer, ceramics/pottery|25.75             |
|Engineer, aeronautical    |18.714285714285715|
+--------------------------+------------------+
only showing top 3 rows



In [66]:
print 'Maximum age: {0}'.format(dataDF.groupBy().max('age').first()[0])

Maximum age: 47


In [67]:
### sample
dataDF.sample(withReplacement=False, fraction=.1).count()

1047

In [76]:
### caching
### by default, Spark will automatically delete partitions from memory to make space for new ones
### if u plan to use a DataFrame for more than onece, u should tell Spark to cache it
### chche is also a lazy operation, it wouldn't be executed until some actions (count/collect) would have happend first
filteredDF.cache()
print filteredDF.is_cached
print filteredDF.count()
print filteredDF.is_cached

True
2209
True


In [75]:
### unchche / unpersist
filteredDF.unpersist()
print filteredDF.is_cached

False


In [78]:
'''
-- easy for debugging:
    df2 = df1.transformation1()
    df2.action1()
    ...

-- readability:
    (dataDF.filter(dataDF.age > 20)
    .select(concat(dataDF.first_name, lit(''), dataDF.last_name), dataDF.occupation)
    .show(truncate=False)
    )
'''

'\n-- easy for debugging:\n    df2 = df1.transformation1()\n    df3\n'

In [86]:
from pyspark.sql.functions import *
(dataDF.filter(dataDF.age > 20)
    .select(concat(dataDF.first_name, lit(', '), dataDF.last_name), dataDF.occupation)
    .show(truncate=False)
)

+---------------------------------+--------------------------------------------+
|concat(first_name, , , last_name)|occupation                                  |
+---------------------------------+--------------------------------------------+
|Jessica, Smith                   |Personnel officer                           |
|Cynthia, Johnson                 |Music tutor                                 |
|Brian, Smith                     |Leisure centre manager                      |
|Ann, Riley                       |Hydrologist                                 |
|Timothy, Chavez                  |Accountant, chartered certified             |
|Tina, Thomas                     |Retail manager                              |
|Troy, Moore                      |Engineering geologist                       |
|Christopher, Pittman             |Armed forces operational officer            |
|Dustin, Johnson                  |Press photographer                          |
|Kathryn, Cardenas          