In [2]:
import os
import string
import sys
from pprint import pprint

spark_home_folder = os.environ['SPARK_HOME']
sys.path.append(spark_home_folder + r'\python')

try:
    from pyspark import SparkContext
    from pyspark import SparkConf

except ImportError as e:
    print("Error: ", e)
    sys.exit(1)
    
conf = SparkConf()
conf.setMaster("local")
conf.setAppName("spark_wc")
sc = SparkContext(conf=conf)

In [6]:
sc

<pyspark.context.SparkContext at 0x90c2ad6f28>

In [None]:
# flatMapValues(func)

rdd = sc.parallelize([(1,2), (1,3), (2,4)])
print("Original RDD: ")
print(rdd.collect())

rdd2 = rdd.flatMapValues(lambda x: range(x, x+2)).collect()
print("After transformation: ")
print(rdd2)

In [16]:
# Read this file and map each line to a key-value pair
# Each line has ',' separated values. Key nust be the 41st item and value must be the list of all items.

import urllib.request
# f = urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
# print(raw_data.take(5))

print(raw_data.top(1))

# Creating Key - Value pairs
rdd = raw_data.map(lambda x: (x.split(',')[41], x.split(',')))

# Printing generated KV pair
print("Generated Key-Value pairs:")
print(rdd.top(1))

['9949,udp,other,SF,146,105,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,3,0.01,0.73,0.98,0.00,0.00,0.00,0.00,0.00,normal.']
Generated Key-Value pairs:
[('warezmaster.', ['9', 'tcp', 'ftp_data', 'SF', '0', '5153771', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '0.00', '0.00', '0.00', '0.00', '1.00', '0.00', '0.00', '12', '12', '1.00', '0.00', '1.00', '0.00', '0.00', '0.00', '0.00', '0.00', 'warezmaster.'])]


In [20]:
# collectAsMap()
# Returns a dictionary of all key value pairs.
# If a key has manu values, the key has its latest value in the dictionary

rdd = sc.parallelize([(1,2), (1,3), (2,4)])
print(rdd.collectAsMap())


{1: 3, 2: 4}


In [25]:
# lookup(key)
# Returns a list of all the values paired with the given key.

rdd = sc.parallelize([(1,2), (1,3), (2,4)])
print(rdd.lookup(1))

[2, 3]


### aggregateByKey(aggregation_variable, combinine_function, merging_function)

aggregateByKey() is used to perform operations on key-value pairs.

This can also be achieved by grouping all values of a key and then applying an aggregation function
But grouping can be expensive as data needs to be shuffled across nodes.
With aggregateByKey() we first perform operations to an get a partial aggregate value in each partition
and then use partial values from each partition to compute the final value.
This is analogous to combine block in Hadoop MapReduce jobs

THe function takes 3 parameters:
    1. `aggregateVariable`: initial value of aggregation variable
    2. `combining function`: 
        This function is used to compute partial aggregation value in each partition
        Takes 2 input parameters.
        a. aggregation variable
        b. value from key-value pair
    3. `merging function`: 
        This function is merges partial aggregate values from each partition
        Takes 2 input parametes
        a. partial value from partition 1
        b. partial value from partition 2

The following example illustrates the usage of aggregateByKey()


In [37]:
# aggregateByKey(aggregation_variable, combinine_function, merging_function)


# Find the number of occurrences of each letter

rdd = sc.parallelize(list("aaaaabbbbbbdfdfsssdfdfdsddfsfefds"))
rdd = rdd.map(lambda x: (x,1))\
        .aggregateByKey(
                        # Initial value of count for each key
                        0,
                        # Combining Function: Combines values in each partition
                        # c is the count of each key. Initialized to 0 in first argument
                        lambda c, x: c + x,
                        # Merging Function: COmbines values across partitions
                        lambda x, y: x + y
                       )
print(rdd.collect())

[('s', 6), ('b', 6), ('a', 5), ('e', 1), ('f', 7), ('d', 8)]


### combineByKey()

combineByKey() is transformation similar to aggregateByKey(). 


combineByKey is more general then aggregateByKey. aggregateByKey is suitable for compute aggregations for keys, example aggregations such as sum, etc. aggregateByKey() is additional computation after map on local partitions to reduce the amount of data sent out to other nodes and driver. 

combineByKey is more general and offers the flexibility to specify any map side combine function. 

Usage:

`combineByKey(createCombiner, mergeValue, mergeCombiner)`

combineByKey() takes 3 other functions as input parameters.
1. `createCombiner`: 
    This is the very first aggregation step for each key. 
    All required variables can be initialized here. 
    Ex: ```python lambda value: (value, 1)```
    For avgerage, we need to maintain sum and count variables (sum, count)
            
            
2. `mergeValue`:
    Given a new value for a key, this function defines how to manipulate the data structure 
    created in createCombiner. Takes 2 input parameters. The first parameter is the combiner data structure and
    second parameter is the value.
    This operation takes place in each partition and partial values are computed.
    Ex: ```python lambda x, val: (x[0] + val, x[1] + 1)```
    x is the combiner data structure, where x[0] is sum and x[1] is count
    val is the new value
   
3. `mergeCombiner`:
    This function defines how to merge combiners. It takes partial combiner values from 2 patitions as inputs. 
    Each input is a obtained from different partitions and merged together.

    Ex: ```python lambda x, y: (x[0] + y[0], x[1] + y[1])```


[Refrence](http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/)

In [41]:
# Average of given list of numbers

rdd = sc.parallelize([1,2,3,4,5,6,7])
combiner_rdd = rdd.map(lambda x: ('key', x))\
                .combineByKey(
                    lambda val: (val, 1),
                    lambda x, val: (x[0] + val, x[1] + 1),
                    lambda x, y: (x[0] + y[0], x[1] + y[1])
                )
    
avg_rdd = combiner_rdd.map(lambda x: ("average", x[1][0]*1.0/x[1][1]))

avg_rdd.collect()

[('average', 4.0)]

### pipe()

pipe(cmd) is a transformation function of RDDs which is used to send each element to script `cmd` as input. `cmd` could be any pre-built functions or user-defined functions.

The `pipe()` is very helpful to interface Spark program with any other program, irrespective of its language. This can be useful to run legacy software, scripts in other languages, thus enabling code reuse. 
Examples: 
1. Executing existing Java code in PySpark script
2. Using legacy [BLAST] code to search protein databases parellely.
    
[BLAST]: http://blast.ncbi.nlm.nih.gov/Blast.cgi?CMD=Web&PAGE_TYPE=BlastHome

In [12]:
# pipeScript.py

import sys
for line in sys.stdin:
    print("The name entered is:  + {}".format(line.strip()))

In [19]:
# Script to illustrate functionality of pipe()
from pprint import pprint
rdd = sc.parallelize(["Alice", "Bob", "Eve"])
resultRDD = rdd.pipe('pipeScript').collect()
pprint(resultRDD)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\spark-1.6.1-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 111, in main
  File "C:\Spark\spark-1.6.1-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 106, in process
  File "C:\Spark\spark-1.6.1-bin-hadoop2.6\python\pyspark\rdd.py", line 317, in func
    return f(iterator)
  File "C:\Spark\spark-1.6.1-bin-hadoop2.6\python\pyspark\rdd.py", line 715, in func
    shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
  File "C:\Users\PBSKU\Anaconda3\lib\subprocess.py", line 950, in __init__
    restore_signals, start_new_session)
  File "C:\Users\PBSKU\Anaconda3\lib\subprocess.py", line 1220, in _execute_child
    startupinfo)
FileNotFoundError: [WinError 2] The system cannot find the file specified

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\spark-1.6.1-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 111, in main
  File "C:\Spark\spark-1.6.1-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 106, in process
  File "C:\Spark\spark-1.6.1-bin-hadoop2.6\python\pyspark\rdd.py", line 317, in func
    return f(iterator)
  File "C:\Spark\spark-1.6.1-bin-hadoop2.6\python\pyspark\rdd.py", line 715, in func
    shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
  File "C:\Users\PBSKU\Anaconda3\lib\subprocess.py", line 950, in __init__
    restore_signals, start_new_session)
  File "C:\Users\PBSKU\Anaconda3\lib\subprocess.py", line 1220, in _execute_child
    startupinfo)
FileNotFoundError: [WinError 2] The system cannot find the file specified

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [42]:
sc.stop()