### Install Spark

#### Download spark from:  https://spark.apache.org/downloads.html

### Spark Context

In [62]:
sc

<pyspark.context.SparkContext at 0x24b4d50>

In [63]:
sc.version

u'1.5.2'

## RDD
### Resilient Distributed Datasets
Fault-tolerant collection of elements that can be operated on in parallel.

Creating RDDs
* *parallelizing* an existing collection
* referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

### Create an RDD - *Parallelize* existing collection

In [64]:
from numpy import random as RND

In [65]:
r = RND.randint(1, 101, 10000000)

In [66]:
r[:100]

array([ 20,   8,  45,  34,  54,  21,  31,  19,  12,   3,  46,  73,   3,
        13,  55,  39,  72,  84,  84,  17,  36,  89,  58,  13,  13,  14,
        65,  68,  92,   6,  53,  85,  85,   7,  60,  30,  45,  40,  54,
        69,  71,  10,  47,  17,  77,   7,  31,  10,  35,  99,  60,   4,
        55,  55,  64,  36,  36,  21,  46,  16,   1,  48,  12,  84,  47,
        71,  64,  34,   9,  30,  24,  17,  31,  49,  99,  58,  41,  51,
        94,  44,  62,   8,  10,  87,  36,  12,  37,  78,  65,  14,  68,
        95,  76,  25,  94,   9,  94,  29,  95, 100])

In [67]:
rdd = sc.parallelize(r)
rdd

ParallelCollectionRDD[108] at parallelize at PythonRDD.scala:423

In [69]:
rdd.mean()

50.502913700000057

In [70]:
rdd.stats()

(count: 10000000, mean: 50.5029137, stdev: 28.8576812393, max: 100.0, min: 1.0)

### Operations on RDD
* Transformations - Lazy operations
* Actions

### Apply transformations

In [72]:
rdd_map = rdd.map(lambda x: (x, 1))

In [73]:
rdd_reduce = rdd_map.reduceByKey(lambda a, b: a+b)

In [74]:
print rdd_reduce.toDebugString()

(32) PythonRDD[116] at RDD at PythonRDD.scala:43 []
 |   MapPartitionsRDD[115] at mapPartitions at PythonRDD.scala:374 []
 |   ShuffledRDD[114] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(32) PairwiseRDD[113] at reduceByKey at <ipython-input-73-0e75050cf259>:1 []
    |   PythonRDD[112] at reduceByKey at <ipython-input-73-0e75050cf259>:1 []
    |   ParallelCollectionRDD[108] at parallelize at PythonRDD.scala:423 []


### Apply actions

In [75]:
rdd_reduce.collect()

[(32, 100254),
 (64, 99947),
 (96, 99913),
 (1, 100164),
 (97, 99861),
 (65, 99927),
 (33, 100109),
 (2, 99936),
 (98, 99840),
 (66, 100165),
 (34, 99676),
 (99, 99960),
 (35, 100188),
 (67, 100201),
 (3, 99528),
 (68, 99892),
 (36, 100650),
 (4, 99642),
 (100, 99821),
 (5, 100380),
 (69, 99082),
 (37, 99450),
 (70, 100521),
 (38, 100483),
 (6, 99992),
 (71, 99830),
 (39, 99620),
 (7, 99781),
 (8, 99188),
 (40, 99933),
 (72, 100785),
 (9, 99984),
 (73, 100183),
 (41, 100009),
 (10, 99565),
 (74, 99850),
 (42, 100504),
 (43, 100074),
 (75, 100588),
 (11, 99968),
 (76, 99974),
 (44, 100525),
 (12, 99960),
 (13, 99948),
 (77, 100088),
 (45, 100007),
 (78, 99395),
 (46, 99917),
 (14, 100271),
 (79, 99867),
 (47, 99614),
 (15, 99797),
 (16, 99573),
 (48, 100340),
 (80, 100648),
 (17, 100131),
 (81, 100164),
 (49, 100298),
 (18, 99995),
 (82, 99426),
 (50, 99876),
 (51, 100148),
 (83, 99715),
 (19, 100166),
 (84, 99617),
 (52, 100283),
 (20, 100332),
 (21, 99974),
 (85, 100218),
 (53, 99592)

### Persist

In [76]:
rdd_reduce.cache()

#rdd_reduce.persist(StorageLevel.MEMORY_ONLY)

PythonRDD[116] at RDD at PythonRDD.scala:43

In [77]:
rdd_reduce.take(10)

[(32, 100254),
 (64, 99947),
 (96, 99913),
 (1, 100164),
 (97, 99861),
 (65, 99927),
 (33, 100109),
 (2, 99936),
 (98, 99840),
 (66, 100165)]

In [19]:
rdd_reduce.first()

(32, 100126)

In [20]:
rdd_reduce.count()

100

In [21]:
rdd_reduce.saveAsTextFile('output.txt')

### Sort by key

In [22]:
rdd_reduce_sort_key = rdd_reduce.sortByKey()

In [23]:
rdd_reduce_sort_key.take(10)

[(1, 99537),
 (2, 99630),
 (3, 99752),
 (4, 100570),
 (5, 100091),
 (6, 100480),
 (7, 100319),
 (8, 99984),
 (9, 99868),
 (10, 100249)]

In [24]:
rdd_reduce_sort_key = rdd_reduce.sortByKey(ascending=False)

In [25]:
rdd_reduce_sort_key.take(10)

[(100, 99722),
 (99, 100400),
 (98, 100321),
 (97, 100017),
 (96, 99793),
 (95, 100169),
 (94, 100604),
 (93, 100026),
 (92, 100176),
 (91, 100241)]

### Sort by value

In [26]:
rdd_reduce_sort_value = rdd_reduce.sortBy(lambda x: x[1], ascending=False)

In [27]:
rdd_reduce_sort_value.take(10)

[(54, 100874),
 (75, 100611),
 (94, 100604),
 (4, 100570),
 (19, 100510),
 (85, 100496),
 (6, 100480),
 (17, 100478),
 (80, 100406),
 (99, 100400)]

In [28]:
print rdd_reduce_sort_value.toDebugString()

(32) PythonRDD[42] at RDD at PythonRDD.scala:43 []
 |   MapPartitionsRDD[39] at mapPartitions at PythonRDD.scala:374 []
 |   ShuffledRDD[38] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(32) PairwiseRDD[37] at sortBy at <ipython-input-26-4199a7551756>:1 []
    |   PythonRDD[36] at sortBy at <ipython-input-26-4199a7551756>:1 []
    |   PythonRDD[10] at RDD at PythonRDD.scala:43 []
    |       CachedPartitions: 32; MemorySize: 6.6 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
    |   MapPartitionsRDD[9] at mapPartitions at PythonRDD.scala:374 []
    |   ShuffledRDD[8] at partitionBy at NativeMethodAccessorImpl.java:-2 []
    +-(32) PairwiseRDD[7] at reduceByKey at <ipython-input-14-0e75050cf259>:1 []
       |   PythonRDD[6] at reduceByKey at <ipython-input-14-0e75050cf259>:1 []
       |   ParallelCollectionRDD[4] at parallelize at PythonRDD.scala:423 []


###Custom partition size

In [29]:
rdd = sc.parallelize(r,10)

In [30]:
rdd_map = rdd.map(lambda x: (x, 1))
rdd_reduce = rdd_map.reduceByKey(lambda a, b: a+b, 5)

In [31]:
print rdd_reduce.toDebugString()

(5) PythonRDD[48] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[47] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[46] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(10) PairwiseRDD[45] at reduceByKey at <ipython-input-30-cd3349b29452>:2 []
    |   PythonRDD[44] at reduceByKey at <ipython-input-30-cd3349b29452>:2 []
    |   ParallelCollectionRDD[43] at parallelize at PythonRDD.scala:423 []


In [32]:
rdd_reduce.take(10)

[(65, 99789),
 (35, 99855),
 (100, 99722),
 (5, 100091),
 (70, 99619),
 (40, 99692),
 (10, 100249),
 (75, 100611),
 (45, 99914),
 (15, 99848)]

## Without spark

In [78]:
with open('/home/haroon/projects/SHOnet/production_jobs_0303_US.tsv') as f:
    word_count = {}
    
    for line in f:
        title = line.strip().split('\t')[0].lower()
        for token in title.split(' '):
            try:
                word_count[token] += 1
            except KeyError:
                word_count[token] = 1
                
sorted(word_count.iteritems(), key=lambda x: x[1], reverse=True)[:10]

[('-', 961245),
 ('/', 301399),
 ('manager', 198220),
 ('job', 144905),
 ('nurse', 137965),
 ('and', 124298),
 ('in', 121744),
 ('assistant', 110689),
 ('sales', 103689),
 ('engineer', 102173)]

## With Spark

In [79]:
rdd = sc.textFile('/home/haroon/projects/SHOnet/production_jobs_0303_US.tsv', minPartitions=32)

In [80]:
rdd.take(2)

[u'Financial Consultant - Westlake, TX - Future Opportunities\t41-3031.02',
 u'Emergency Room RN Travel Nursing Job\t29-1141.00']

In [81]:
rdd_title = rdd.map(lambda x: x.split('\t')[0])

In [82]:
rdd_title.take(5)

[u'Financial Consultant - Westlake, TX - Future Opportunities',
 u'Emergency Room RN Travel Nursing Job',
 u'Hiring Experienced Class A-CDL Driving Professionals for Dedicated Regional Route',
 u'Account Clerk I',
 u'Resident Care Associate']

### Job Title word count

In [83]:
wc = rdd_title.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a+b)

In [84]:
print wc.toDebugString()

(32) PythonRDD[127] at RDD at PythonRDD.scala:43 []
 |   MapPartitionsRDD[126] at mapPartitions at PythonRDD.scala:374 []
 |   ShuffledRDD[125] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(32) PairwiseRDD[124] at reduceByKey at <ipython-input-83-f179d6b771b5>:1 []
    |   PythonRDD[123] at reduceByKey at <ipython-input-83-f179d6b771b5>:1 []
    |   MapPartitionsRDD[120] at textFile at NativeMethodAccessorImpl.java:-2 []
    |   /home/haroon/projects/SHOnet/production_jobs_0303_US.tsv HadoopRDD[119] at textFile at NativeMethodAccessorImpl.java:-2 []


In [87]:
wc.cache()
wc.take(1000)

[(u'Executive-Laboratory', 1),
 (u'', 22),
 (u'(101803)', 1),
 (u'Co-Founder/CEO', 1),
 (u'(Enh12)', 1),
 (u'(171620)', 1),
 (u'101,', 10),
 (u'(49614674)', 1),
 (u'I-494', 1),
 (u'SPECIALIST-PHYSICIAN', 1),
 (u'Grower/Logistics', 1),
 (u'PFM)', 1),
 (u'Management/New', 1),
 (u'72707', 1),
 (u'(01154176)', 1),
 (u'WA(12', 1),
 (u'800-1200', 3),
 (u'CPU/System', 2),
 (u'LASP', 1),
 (u'0343', 8),
 (u'Sr.Front-end', 1),
 (u'218056', 1),
 (u'TIM00363', 1),
 (u'(44696268)', 1),
 (u'Disptach)', 1),
 (u'YUMA', 7),
 (u'O39', 1),
 (u'(41462168749-623)', 1),
 (u'(102792)', 1),
 (u'010612', 5),
 (u'Selector/Forklift', 3),
 (u'Physicall', 2),
 (u'(44286489)', 1),
 (u'Center-No', 1),
 (u'Specialist.FullTime.7009Hwy6.', 1),
 (u'OFFLN', 1),
 (u'(601231)', 8),
 (u'ASST-IMAGING', 1),
 (u'(TS-308)', 2),
 (u'Control-Spendid', 1),
 (u'TriHealth', 4),
 (u'/Hire', 1),
 (u'Developer/SSRS', 1),
 (u'Consultant"', 2),
 (u'Salomon', 5),
 (u'Investment/Fund', 3),
 (u'Sq.ft.', 1),
 (u'Coolville', 1),
 (u'Scientist

### Sort by value

In [88]:
wc_sort_value = wc.sortBy(lambda x: x[1], ascending=False)

In [89]:
wc_sort_value.take(100)

[(u'-', 961245),
 (u'/', 301399),
 (u'Manager', 190621),
 (u'Job', 133603),
 (u'Nurse', 131539),
 (u'Assistant', 105133),
 (u'in', 105031),
 (u'and', 102460),
 (u'&', 98848),
 (u'Engineer', 98506),
 (u'Sales', 97157),
 (u'Senior', 82771),
 (u'Physician', 80488),
 (u'Specialist', 79270),
 (u'RN', 74578),
 (u'Analyst', 71129),
 (u'Registered', 68703),
 (u'Service', 68492),
 (u'Time', 67202),
 (u'Care', 63465),
 (u'Associate', 62310),
 (u'of', 58753),
 (u'Technician', 57708),
 (u'\u2013', 56491),
 (u'for', 55906),
 (u'Director', 48079),
 (u'Full', 47684),
 (u'Developer', 47449),
 (u'Medical', 46857),
 (u'Therapist', 46295),
 (u'Shift', 46215),
 (u'Services', 44742),
 (u'School', 44578),
 (u'Management', 41938),
 (u'Business', 41764),
 (u'Center', 41211),
 (u'Coordinator', 40805),
 (u'Medicine', 39992),
 (u'Health', 39699),
 (u'Lead', 38750),
 (u'Part', 38381),
 (u'Security', 37334),
 (u'Support', 37308),
 (u'Driver', 36736),
 (u'Project', 36735),
 (u'II', 35444),
 (u'Permanent', 34828),
 

In [90]:
rdd_hdfs = sc.textFile('hdfs://localhost:9000/user/haroon/clean_jobs_feed.tsv', minPartitions=32)

In [91]:
word_count1 = (rdd_hdfs.map(lambda x: x.lower())
            .flatMap(lambda x: x.split())
            .map(lambda x: (x, 1))
            .reduceByKey(lambda a, b: a + b)
            .sortBy(lambda x: x[1], ascending=False)
           )

In [92]:
word_count1.take(5)

[(u'gt', 12479230),
 (u'lt', 12432637),
 (u'experience', 5530139),
 (u'amp', 4796602),
 (u'work', 4514705)]

## Spark streaming
### Run a netcat server

```
nc -lk 9999
```

### Run network_wordcount.py with hostname and port
```
./spark/bin/spark-submit spark/examples/src/main/python/streaming/network_wordcount.py localhost 9999
```

## Spark SQL

In [93]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlContext = SQLContext(sc)

In [94]:
lines = sc.textFile("/home/haroon/projects/SHOnet/production_jobs_0303_US.tsv")
rdd = lines.map(lambda x: tuple(x.split('\t')))

In [95]:
schemaString = "title onet_code"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)


In [96]:
schema

StructType(List(StructField(title,StringType,true),StructField(onet_code,StringType,true)))

In [97]:
# Apply the schema to the RDD.
schemaJob = sqlContext.createDataFrame(rdd, schema)

# Register the DataFrame as a table.
schemaJob.registerTempTable("job")

# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT title FROM job")

# The results of SQL queries are RDDs and support all the normal RDD operations.
titles = results.map(lambda p: "Title: " + p.title)
for title in titles.take(10):
  print(title)

Title: Financial Consultant - Westlake, TX - Future Opportunities
Title: Emergency Room RN Travel Nursing Job
Title: Hiring Experienced Class A-CDL Driving Professionals for Dedicated Regional Route
Title: Account Clerk I
Title: Resident Care Associate
Title: Software Engineer II (657331)
Title: Supervisor, PIA Employer Shared Responsibility
Title: Travel RN ICU, SICU - Surgical Intensive Care Units
Title: Scala Engineer - Remote Position
Title: Retail Sales Consultant


In [98]:
onet_counts = sqlContext.sql("SELECT onet_code, count(distinct title) as onet_count FROM job GROUP BY onet_code ORDER BY count(distinct title) DESC")

for onet_count in onet_counts.take(10):
  print(onet_count)

Row(onet_code=u'29-1141.00', onet_count=177664)
Row(onet_code=u'15-1132.00', onet_count=70368)
Row(onet_code=u'53-3032.00', onet_count=45570)
Row(onet_code=u'41-3099.00', onet_count=43043)
Row(onet_code=u'41-2031.00', onet_count=39134)
Row(onet_code=u'29-1069.00', onet_count=36375)
Row(onet_code=u'15-1199.08', onet_count=35725)
Row(onet_code=u'43-4051.00', onet_count=33986)
Row(onet_code=u'15-1199.09', onet_count=32961)
Row(onet_code=u'29-1127.00', onet_count=32022)
