## Spark for Data Engineering

In [1]:
import os
import findspark
findspark.init()

import pyspark
from pyspark.sql import SQLContext

os.chdir('../airflow-dag')


# Create Spark Context
spark = pyspark.SparkContext(
    appName='DataFrame-Kafka',
    master='spark://localhost:7077')

# Create a SQLContext
sqlcontext = SQLContext(spark)

### Reading a CSV

 * No headers
 * Do not infer schema

In [2]:
df = sqlcontext.read.csv('data.csv')
df.show(5)

+---------------+---+--------------------+----------------+-----------+-----+-----------+-----------+
|            _c0|_c1|                 _c2|             _c3|        _c4|  _c5|        _c6|        _c7|
+---------------+---+--------------------+----------------+-----------+-----+-----------+-----------+
|           name|age|              street|            city|      state|  zip|        lng|        lat|
|Joshua Thornton|934|   42634 Justin Pike|       Tammyview|   Missouri|84425|-174.886963|-82.9911035|
|     Diane Wong| 95|1080 Mandy Burgs ...|      Wrightfurt|    Florida|10248|  70.297174| 70.8837635|
|    Sara Jensen|112|    013 Justin Ferry|Port Christopher|Mississippi|61398| -59.932277|-43.0450955|
|  Steven Taylor|125|883 Charles Alley...|       Smithfort|      Texas|07902| -86.878794|   0.991363|
+---------------+---+--------------------+----------------+-----------+-----+-----------+-----------+
only showing top 5 rows



In [3]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)



* With headers
* Infering Schema

In [4]:
# Read CSV - w/ headers and infering schema
df = sqlcontext.read.csv('data.csv',
                         header=True,
                         inferSchema=True)
df.show(5)

+---------------+---+--------------------+----------------+-----------+-----+-----------+-----------+
|           name|age|              street|            city|      state|  zip|        lng|        lat|
+---------------+---+--------------------+----------------+-----------+-----+-----------+-----------+
|Joshua Thornton|934|   42634 Justin Pike|       Tammyview|   Missouri|84425|-174.886963|-82.9911035|
|     Diane Wong| 95|1080 Mandy Burgs ...|      Wrightfurt|    Florida|10248|  70.297174| 70.8837635|
|    Sara Jensen|112|    013 Justin Ferry|Port Christopher|Mississippi|61398| -59.932277|-43.0450955|
|  Steven Taylor|125|883 Charles Alley...|       Smithfort|      Texas| 7902| -86.878794|   0.991363|
|Rebecca Elliott|865|0029 Young Extens...|        Fredland|   Oklahoma|69278|-153.239569| 70.9311545|
+---------------+---+--------------------+----------------+-----------+-----+-----------+-----------+
only showing top 5 rows



In [5]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lng: double (nullable = true)
 |-- lat: double (nullable = true)



### Data Manipulation

* Select a column by its name

In [6]:
df.select('name').show()

+-----------------+
|             name|
+-----------------+
|  Joshua Thornton|
|       Diane Wong|
|      Sara Jensen|
|    Steven Taylor|
|  Rebecca Elliott|
|  Thomas Robinson|
|  Michael Barrett|
|     Denise Ramos|
|       Megan Pace|
|  Sandy Blackwell|
|      Robin Avery|
|     Lisa Andrade|
|   Zachary Hughes|
|     Stacy Bailey|
|   Crystal Benson|
|       Jay Turner|
|Amanda Cunningham|
|    Jorge Johnson|
|  Christine Jones|
|      Steve Estes|
+-----------------+
only showing top 20 rows



* Filtering columns

In [7]:
df.filter('age < 40').select(['name', 'age', 'state']).show()

+-----------------+---+--------------+
|             name|age|         state|
+-----------------+---+--------------+
|    Robert Palmer|  9|      Nebraska|
|  Kathryn Acevedo|  3|South Carolina|
|      Erica Brown|  3|        Nevada|
|       Mary Baker| 22|South Carolina|
|     Erik English| 22|      Arkansas|
|William Poole DDS| 36|    Washington|
|  Katherine Smith| 28|        Hawaii|
|     Jamie Fisher| 12|       Indiana|
|    Marilyn Gross| 25|       Wyoming|
|     Michael Shaw| 17|       Wyoming|
|  Colleen Cochran| 20| West Virginia|
|     Lori Rodgers| 36|     Minnesota|
|      Erin Chavez|  5|      Missouri|
|    Matthew Davis| 20|        Alaska|
|      Samuel Pham| 31|   Connecticut|
|    Thomas Nguyen| 24|         Maine|
| Tiffany Peterson| 36|North Carolina|
|  Scott Mcpherson| 38|North Carolina|
|     Joshua Jones| 39| New Hampshire|
|    Richard Smith|  4|       Vermont|
+-----------------+---+--------------+
only showing top 20 rows



* Creating a new RDD

In [8]:
u40 = df.filter('age < 40').collect()

In [9]:
u40

[Row(name='Robert Palmer', age=9, street='8789 Johnson Fords Apt. 058', city='Whiteview', state='Nebraska', zip=32864, lng=-68.376955, lat=62.118206),
 Row(name='Kathryn Acevedo', age=3, street='5243 Alexander Flats', city='East David', state='South Carolina', zip=2139, lng=-86.975776, lat=-66.0195175),
 Row(name='Erica Brown', age=3, street='2124 James Shores Suite 730', city='New Cynthia', state='Nevada', zip=94279, lng=139.177468, lat=55.0252475),
 Row(name='Mary Baker', age=22, street='638 Sophia Unions', city='Johnnyton', state='South Carolina', zip=26944, lng=3.895676, lat=37.36735),
 Row(name='Erik English', age=22, street='57772 Maldonado Heights', city='East Crystalburgh', state='Arkansas', zip=7101, lng=-169.489279, lat=0.9026385),
 Row(name='William Poole DDS', age=36, street='76147 Joe Park Suite 487', city='North Tammie', state='Washington', zip=23464, lng=-45.420664, lat=-52.382155),
 Row(name='Katherine Smith', age=28, street='650 Myers Drive Apt. 194', city='Davidmouth'

In [10]:
u40[0]

Row(name='Robert Palmer', age=9, street='8789 Johnson Fords Apt. 058', city='Whiteview', state='Nebraska', zip=32864, lng=-68.376955, lat=62.118206)

In [11]:
u40[0].asDict()

{'name': 'Robert Palmer',
 'age': 9,
 'street': '8789 Johnson Fords Apt. 058',
 'city': 'Whiteview',
 'state': 'Nebraska',
 'zip': 32864,
 'lng': -68.376955,
 'lat': 62.118206}

In [12]:
u40[0].asDict()['name']

'Robert Palmer'

* For Loop

In [13]:
for x in u40:
    print(x.asDict())

{'name': 'Robert Palmer', 'age': 9, 'street': '8789 Johnson Fords Apt. 058', 'city': 'Whiteview', 'state': 'Nebraska', 'zip': 32864, 'lng': -68.376955, 'lat': 62.118206}
{'name': 'Kathryn Acevedo', 'age': 3, 'street': '5243 Alexander Flats', 'city': 'East David', 'state': 'South Carolina', 'zip': 2139, 'lng': -86.975776, 'lat': -66.0195175}
{'name': 'Erica Brown', 'age': 3, 'street': '2124 James Shores Suite 730', 'city': 'New Cynthia', 'state': 'Nevada', 'zip': 94279, 'lng': 139.177468, 'lat': 55.0252475}
{'name': 'Mary Baker', 'age': 22, 'street': '638 Sophia Unions', 'city': 'Johnnyton', 'state': 'South Carolina', 'zip': 26944, 'lng': 3.895676, 'lat': 37.36735}
{'name': 'Erik English', 'age': 22, 'street': '57772 Maldonado Heights', 'city': 'East Crystalburgh', 'state': 'Arkansas', 'zip': 7101, 'lng': -169.489279, 'lat': 0.9026385}
{'name': 'William Poole DDS', 'age': 36, 'street': '76147 Joe Park Suite 487', 'city': 'North Tammie', 'state': 'Washington', 'zip': 23464, 'lng': -45.42

* Creating a View
* Using SQL

In [14]:
df.createOrReplaceTempView('people')
df_over40 = sqlcontext.sql("SELECT * FROM people WHERE age > 40")
df_over40.show()

+-----------------+---+--------------------+-----------------+-----------+-----+-----------+-----------+
|             name|age|              street|             city|      state|  zip|        lng|        lat|
+-----------------+---+--------------------+-----------------+-----------+-----+-----------+-----------+
|  Joshua Thornton|934|   42634 Justin Pike|        Tammyview|   Missouri|84425|-174.886963|-82.9911035|
|       Diane Wong| 95|1080 Mandy Burgs ...|       Wrightfurt|    Florida|10248|  70.297174| 70.8837635|
|      Sara Jensen|112|    013 Justin Ferry| Port Christopher|Mississippi|61398| -59.932277|-43.0450955|
|    Steven Taylor|125|883 Charles Alley...|        Smithfort|      Texas| 7902| -86.878794|   0.991363|
|  Rebecca Elliott|865|0029 Young Extens...|         Fredland|   Oklahoma|69278|-153.239569| 70.9311545|
|  Thomas Robinson|274|25590 Bradley Mis...|        Hoodhaven| Washington|94974|-153.785572| 68.8913855|
|  Michael Barrett|496|57919 Robert Fore...|        Per

* Some Statistics

In [15]:
df_over40.describe('age').show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|               970|
|   mean| 519.8855670103093|
| stddev|277.17541899989783|
|    min|                41|
|    max|              1000|
+-------+------------------+



* Grouping the data

In [16]:
df.groupBy('state').count().show()

+-------------+-----+
|        state|count|
+-------------+-----+
|         Utah|   19|
|       Hawaii|   24|
|    Minnesota|   22|
|         Ohio|   16|
|     Arkansas|   19|
|       Oregon|   17|
|        Texas|   25|
| North Dakota|   24|
| Pennsylvania|   21|
|  Connecticut|   20|
|      Vermont|   19|
|     Nebraska|   23|
|       Nevada|   23|
|   Washington|   15|
|     Illinois|   16|
|     Oklahoma|   21|
|     Delaware|   22|
|       Alaska|   27|
|   New Mexico|   17|
|West Virginia|   18|
+-------------+-----+
only showing top 20 rows



* Aggregating the data

In [17]:
df.agg({'age': 'mean'}).show()

+--------+
|avg(age)|
+--------+
| 504.924|
+--------+



* Some built-in functions:

 - Set of values
 - Count Distinct values
 - md5 hash of a string
 - Reverse of a string
 - Provides the soundex of the string field

In [18]:
import pyspark.sql.functions as f

In [19]:
df.select(f.collect_set(df['state'])).collect()

[Row(collect_set(state)=['Michigan', 'Vermont', 'Utah', 'Colorado', 'Ohio', 'Delaware', 'Georgia', 'South Dakota', 'Maine', 'Alabama', 'Wyoming', 'Massachusetts', 'Wisconsin', 'North Carolina', 'Montana', 'Indiana', 'Alaska', 'Texas', 'Oklahoma', 'Illinois', 'South Carolina', 'Nebraska', 'Tennessee', 'New Hampshire', 'Florida', 'Washington', 'New Jersey', 'Connecticut', 'Mississippi', 'Minnesota', 'Arkansas', 'Missouri', 'Nevada', 'West Virginia', 'Iowa', 'Hawaii', 'Arizona', 'Idaho', 'Kentucky', 'Pennsylvania', 'New Mexico', 'Oregon', 'Louisiana', 'Kansas', 'Rhode Island', 'New York', 'California', 'Virginia', 'Maryland', 'North Dakota'])]

In [20]:
df.select(f.countDistinct('state').alias('states')).show()

+------+
|states|
+------+
|    50|
+------+



In [21]:
df.select(f.md5('street').alias('hash')).collect()

[Row(hash='1e6bc37412d55b4227ce2f51d0f21061'),
 Row(hash='9d3cbd4c81923f48a3b595da196a437a'),
 Row(hash='970e52c8f9ce03177bc4f3bae10392c2'),
 Row(hash='700e0b9406d0d79f4af68e1b796fce1a'),
 Row(hash='6b9dbc10bc02314e6774824d6c07fba9'),
 Row(hash='d6764a9457910a109f74796b7c62c13d'),
 Row(hash='8a5b1e0bd8fc1814c15808fb02734250'),
 Row(hash='1c8a275655eb7366c2c324f123e06a15'),
 Row(hash='30731dae6dfa2a9d76edac2ddca0abe1'),
 Row(hash='a2c90863a97bc4dc967ba5a64e2b46de'),
 Row(hash='0a0da017849d7ec2a752aa984eda7c63'),
 Row(hash='4ec962cfb24c8402a7f7d4d2f97d292f'),
 Row(hash='fdc91ced84930c1565d2e3f7954337eb'),
 Row(hash='12db3ba6090172512aaedcfa2c16e255'),
 Row(hash='7218976064d397f2d1776150f1d15958'),
 Row(hash='fc2f17f7557085ad5c6c6092741de41d'),
 Row(hash='06fe5f98796aa2215d608fc3b9e71de8'),
 Row(hash='116f600b8004f2b426c92044ffdb1bdb'),
 Row(hash='af7b7c91dc7315c1447b29cf51a8a9c5'),
 Row(hash='b15cdc9df0296614b5cc1da8a1d1ac01'),
 Row(hash='6e1eb908115cc28bff757843ceb309e4'),
 Row(hash='85

In [22]:
df.select(f.reverse(df.state).alias('state-reverse')).collect()

[Row(state-reverse='iruossiM'),
 Row(state-reverse='adirolF'),
 Row(state-reverse='ippississiM'),
 Row(state-reverse='saxeT'),
 Row(state-reverse='amohalkO'),
 Row(state-reverse='notgnihsaW'),
 Row(state-reverse='nagihciM'),
 Row(state-reverse='erawaleD'),
 Row(state-reverse='erawaleD'),
 Row(state-reverse='ainigriV'),
 Row(state-reverse='anozirA'),
 Row(state-reverse='ohadI'),
 Row(state-reverse='tnomreV'),
 Row(state-reverse='ainrofilaC'),
 Row(state-reverse='ainrofilaC'),
 Row(state-reverse='anatnoM'),
 Row(state-reverse='dnalyraM'),
 Row(state-reverse='anatnoM'),
 Row(state-reverse='iiawaH'),
 Row(state-reverse='tnomreV'),
 Row(state-reverse='yesreJ weN'),
 Row(state-reverse='sttesuhcassaM'),
 Row(state-reverse='aksarbeN'),
 Row(state-reverse='ippississiM'),
 Row(state-reverse='sttesuhcassaM'),
 Row(state-reverse='atosenniM'),
 Row(state-reverse='dnalyraM'),
 Row(state-reverse='sasnaK'),
 Row(state-reverse='sasnaK'),
 Row(state-reverse='dnalsI edohR'),
 Row(state-reverse='iruossiM'

In [23]:
df.select(f.soundex(df.name).alias('soundex')).collect()

[Row(soundex='J236'),
 Row(soundex='D552'),
 Row(soundex='S625'),
 Row(soundex='S315'),
 Row(soundex='R124'),
 Row(soundex='T526'),
 Row(soundex='M241'),
 Row(soundex='D526'),
 Row(soundex='M251'),
 Row(soundex='S531'),
 Row(soundex='R151'),
 Row(soundex='L253'),
 Row(soundex='Z262'),
 Row(soundex='S321'),
 Row(soundex='C623'),
 Row(soundex='J365'),
 Row(soundex='A553'),
 Row(soundex='J622'),
 Row(soundex='C623'),
 Row(soundex='S312'),
 Row(soundex='T524'),
 Row(soundex='L216'),
 Row(soundex='D642'),
 Row(soundex='A425'),
 Row(soundex='T612'),
 Row(soundex='G643'),
 Row(soundex='K365'),
 Row(soundex='D133'),
 Row(soundex='N242'),
 Row(soundex='W451'),
 Row(soundex='K516'),
 Row(soundex='J520'),
 Row(soundex='K656'),
 Row(soundex='A524'),
 Row(soundex='R163'),
 Row(soundex='B655'),
 Row(soundex='G455'),
 Row(soundex='M624'),
 Row(soundex='L616'),
 Row(soundex='T621'),
 Row(soundex='A553'),
 Row(soundex='B116'),
 Row(soundex='D161'),
 Row(soundex='L563'),
 Row(soundex='J513'),
 Row(sound

In [24]:
spark.stop()