In [2]:
import pyspark as ps
import random

spark = (ps.sql.SparkSession.builder 
      .master("local[4]") 
      .appName("testing") 
      .getOrCreate()
      )
sc = spark.sparkContext  # for the pre-2.0 sparkContext

In [3]:
spark = ps.sql.SparkSession.builder \
        .appName("rdd test") \
        .getOrCreate()

random.seed(1)

def sample(p):
    x, y = random.random(), random.random()
    return 1 if x*x + y*y < 1 else 0

count = spark.sparkContext.parallelize(range(0, 10000000)).map(sample) \
             .reduce(lambda a, b: a + b)

print("Pi is (very) roughly {}".format(4.0 * count / 10000000))

Pi is (very) roughly 3.140704


In [4]:
# sc.parallelize: create an RDD from a python array/list
# creating an adhoc list
data_array = [['matthew', 4],
              ['jorge', 8],
              ['josh', 15],
              ['evangeline', 16],
              ['emilie', 23],
              ['yunjin', 42]]

# reading the array/list using SparkContext
rdd = sc.parallelize(data_array)

# to output the content in python [irl, use with great care]
rdd.collect()

[['matthew', 4],
 ['jorge', 8],
 ['josh', 15],
 ['evangeline', 16],
 ['emilie', 23],
 ['yunjin', 42]]

In [7]:
%ls /home/asus/DSI_Lectures/spark/natalie_hunt/data/toy_data.pkl

part-00000  part-00001  _SUCCESS


In [30]:
# sc.pickeFile() : from a HDFS pickle file
# The import will give you an rdd composed of whatever table was stored into that file.
dir_link = '/home/asus/DSI_Lectures/spark/natalie_hunt/'
# reading the file using SparkContext
rdd = sc.pickleFile('/home/asus/DSI_Lectures/spark/natalie_hunt/data/toy_data.pkl')

# to output the content in python [irl, use with great care]
rdd.collect()

['emilie,23', 'yunjin,42', 'matthew,4', 'jorge,8', 'josh,15', 'evangeline,16']

In [17]:
import os
ACCESS_KEY = os.environ['AWSAccessKeyId']
SECRET_KEY = os.environ['AWSSecretKey']

In [25]:
link = 's3a://mortar-example-data/airline-data'

In [26]:
rdd = sc.textFile(link)

In [28]:
# find out how many partitions there are...
rdd.getNumPartitions()

Py4JJavaError: An error occurred while calling o272.partitions.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
	... 33 more


In [31]:
# displaying the content of the file in stdout
with open(dir_link+'data/sales.txt', 'r') as fin:
    print(fin.read())

#ID    Date           Store   State  Product    Amount
101    11/13/2014     100     WA     331        300.00
104    11/18/2014     700     OR     329        450.00
102    11/15/2014     203     CA     321        200.00
106    11/19/2014     202     CA     331        330.00
103    11/17/2014     101     WA     373        750.00
105    11/19/2014     202     CA     321        200.00



In [33]:
rdd1 = sc.textFile(dir_link+'data/sales.txt')
rdd1.collect()

['#ID    Date           Store   State  Product    Amount',
 '101    11/13/2014     100     WA     331        300.00',
 '104    11/18/2014     700     OR     329        450.00',
 '102    11/15/2014     203     CA     321        200.00',
 '106    11/19/2014     202     CA     331        330.00',
 '103    11/17/2014     101     WA     373        750.00',
 '105    11/19/2014     202     CA     321        200.00']

In [35]:
# applies split() to each row
rdd2 = rdd1.map(lambda rowstr : rowstr.split())
rdd2.collect()

[['#ID', 'Date', 'Store', 'State', 'Product', 'Amount'],
 ['101', '11/13/2014', '100', 'WA', '331', '300.00'],
 ['104', '11/18/2014', '700', 'OR', '329', '450.00'],
 ['102', '11/15/2014', '203', 'CA', '321', '200.00'],
 ['106', '11/19/2014', '202', 'CA', '331', '330.00'],
 ['103', '11/17/2014', '101', 'WA', '373', '750.00'],
 ['105', '11/19/2014', '202', 'CA', '321', '200.00']]

In [36]:
rdd3 = rdd2.filter(lambda row: not row[0].startswith('#'))
rdd3.collect()

[['101', '11/13/2014', '100', 'WA', '331', '300.00'],
 ['104', '11/18/2014', '700', 'OR', '329', '450.00'],
 ['102', '11/15/2014', '203', 'CA', '321', '200.00'],
 ['106', '11/19/2014', '202', 'CA', '331', '330.00'],
 ['103', '11/17/2014', '101', 'WA', '373', '750.00'],
 ['105', '11/19/2014', '202', 'CA', '321', '200.00']]

In [37]:
def casting_function(row):
    id, date, store, state, product, amount = row
    return((int(id), date, int(store), state, int(product), float(amount)))


In [39]:
rdd4 = rdd3.map(casting_function)
rdd4.collect()

[(101, '11/13/2014', 100, 'WA', 331, 300.0),
 (104, '11/18/2014', 700, 'OR', 329, 450.0),
 (102, '11/15/2014', 203, 'CA', 321, 200.0),
 (106, '11/19/2014', 202, 'CA', 331, 330.0),
 (103, '11/17/2014', 101, 'WA', 373, 750.0),
 (105, '11/19/2014', 202, 'CA', 321, 200.0)]

In [41]:
# cononcial way to write that in python

rdd_sales = sc.textFile(dir_link + 'data/sales.txt')\
            .map(lambda rowstr : rowstr.split())\
            .filter(lambda row: not row[0].startswith('#'))\
            .map(casting_function)
rdd_sales.collect()

[(101, '11/13/2014', 100, 'WA', 331, 300.0),
 (104, '11/18/2014', 700, 'OR', 329, 450.0),
 (102, '11/15/2014', 203, 'CA', 321, 200.0),
 (106, '11/19/2014', 202, 'CA', 331, 330.0),
 (103, '11/17/2014', 101, 'WA', 373, 750.0),
 (105, '11/19/2014', 202, 'CA', 321, 200.0)]

In [42]:
# creating an adhoc list
data_array = [['matthew', 4],
              ['jorge', 8],
              ['josh', 15],
              ['evangeline', 16],
              ['emilie', 23],
              ['yunjin', 42]]
rdd_names = sc.parallelize(data_array)
rdd_names.collect()

[['matthew', 4],
 ['jorge', 8],
 ['josh', 15],
 ['evangeline', 16],
 ['emilie', 23],
 ['yunjin', 42]]

### 1.3.2. Mapping

#### `.map(func)` : applying a function on every row

In [43]:
# applying a lambda function to an rdd
rddout = rdd_names.map(lambda x: len(x[0]))

# print out the original rdd
print("before: {}".format(rdd_names.collect()))

# print out the new rdd generated
print("after: {}".format(rddout.collect()))

before: [['matthew', 4], ['jorge', 8], ['josh', 15], ['evangeline', 16], ['emilie', 23], ['yunjin', 42]]
after: [7, 5, 4, 10, 6, 6]


#### `.flatMap(func)` : applying a function on every row and flattening the resulting lists

In [None]:
# applying a lambda function to an rdd (because why not)
rddout = rdd_names.flatMap(lambda row : [row[1], row[1]+2, row[1]+len(row[0])])

# print out the original rdd
print("before: {}".format(rdd_names.collect()))

# print out the new rdd generated
print("after: {}".format(rddout.collect()))

### 1.3.3. Row reduction

#### `.filter(func)`: filters an RDD using a function that returns boolean values

In [46]:
# filtering an rdd
rddout = rdd_sales.filter(lambda row: (row[3] == 'CA'))

# print out the original rdd
print("before: {}".format(rdd_sales.collect()))

# print out the new rdd generated
print("after: {}".format(rddout.collect()))

before: [(101, '11/13/2014', 100, 'WA', 331, 300.0), (104, '11/18/2014', 700, 'OR', 329, 450.0), (102, '11/15/2014', 203, 'CA', 321, 200.0), (106, '11/19/2014', 202, 'CA', 331, 330.0), (103, '11/17/2014', 101, 'WA', 373, 750.0), (105, '11/19/2014', 202, 'CA', 321, 200.0)]
after: [(102, '11/15/2014', 203, 'CA', 321, 200.0), (106, '11/19/2014', 202, 'CA', 331, 330.0), (105, '11/19/2014', 202, 'CA', 321, 200.0)]


#### `.sample(withReplacement, fraction, seed)`: sampling an RDD !!

In [47]:
# sampling an rdd
rddout = rdd_sales.sample(True, 0.4)

# print out the original rdd
print("before: {}".format(rdd_sales.collect()))

# print out the new rdd generated
print("after: {}".format(rddout.collect()))

before: [(101, '11/13/2014', 100, 'WA', 331, 300.0), (104, '11/18/2014', 700, 'OR', 329, 450.0), (102, '11/15/2014', 203, 'CA', 321, 200.0), (106, '11/19/2014', 202, 'CA', 331, 330.0), (103, '11/17/2014', 101, 'WA', 373, 750.0), (105, '11/19/2014', 202, 'CA', 321, 200.0)]
after: [(104, '11/18/2014', 700, 'OR', 329, 450.0), (102, '11/15/2014', 203, 'CA', 321, 200.0)]


#### `.distinct()`: obtaining distinct rows

In [49]:
# obtaining distinct values of the "state" column of rdd_sales
rddout = rdd_sales.map(lambda row: row[3])\
                    .distinct()

# print out the original rdd
print("before: {}".format(rdd_sales.collect()))

# print out the new rdd generated
print("after: {}".format(rddout.collect()))

before: [(101, '11/13/2014', 100, 'WA', 331, 300.0), (104, '11/18/2014', 700, 'OR', 329, 450.0), (102, '11/15/2014', 203, 'CA', 321, 200.0), (106, '11/19/2014', 202, 'CA', 331, 330.0), (103, '11/17/2014', 101, 'WA', 373, 750.0), (105, '11/19/2014', 202, 'CA', 321, 200.0)]
after: ['CA', 'WA', 'OR']


### 1.3.4. Methods with a `<k,v>` paradigm

#### `.values()`: returns the values of a RDD made of `<k,v>` pairs

In [50]:
# grabing values of rdd_names
rddout = rdd_names.values()

# print out the original rdd
print("before: {}".format(rdd_names.collect()))

# print out the new rdd generated
print("after: {}".format(rddout.collect()))


before: [['matthew', 4], ['jorge', 8], ['josh', 15], ['evangeline', 16], ['emilie', 23], ['yunjin', 42]]
after: [4, 8, 15, 16, 23, 42]


#### `.keys()`: returns the keys of a RDD made of `<k,v>` pairs

In [51]:
# grab the keys of rdd_names
rddout = rdd_names.keys()

# print out the original rdd
print("before: {}".format(rdd_names.collect()))

# print out the new rdd generated
print("after: {}".format(rddout.collect()))

before: [['matthew', 4], ['jorge', 8], ['josh', 15], ['evangeline', 16], ['emilie', 23], ['yunjin', 42]]
after: ['matthew', 'jorge', 'josh', 'evangeline', 'emilie', 'yunjin']


#### `rddA.join(rddB)`: join another RDD

In [53]:
rdd_salesperstate = rdd_sales.map(lambda row: (row[3],row[5]))

rdd_salesperstate.collect()

[('WA', 300.0),
 ('OR', 450.0),
 ('CA', 200.0),
 ('CA', 330.0),
 ('WA', 750.0),
 ('CA', 200.0)]

In [54]:
# creating an adhoc list of managers for each state
data_array = [['CA', 'matthew'],
              ['OR', 'jorge'],
              ['WA','matthew'],
              ['TX', 'emilie']]

# reading the array/list using SparkContext
rdd_managers = sc.parallelize(data_array)

# to output the content in python [irl, use with great care]
rdd_salesperstate.join(rdd_managers).collect()

# combines in the first column which is the key for both arrays

[('CA', (200.0, 'matthew')),
 ('CA', (330.0, 'matthew')),
 ('CA', (200.0, 'matthew')),
 ('OR', (450.0, 'jorge')),
 ('WA', (300.0, 'matthew')),
 ('WA', (750.0, 'matthew'))]

#### `.reduceByKey(func)`: reduce `v`s by their `k` by applying func (what ?)

The `func` here needs to be associative and commutative... can you guess why ?

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

Similarly to combiner for MapReduce framework

In [56]:
# creating an adhoc list
data_array = [['CA', 1],
              ['WA', 1],
              ['CA', 2],
              ['OR', 1],
              ['CA', 5],
              ['OR', 1]]
# reading the array/list using SparkContext
rdd = sc.parallelize(data_array)

# to output the content in python[irl, use with great care]
rdd.collect()

[['CA', 1], ['WA', 1], ['CA', 2], ['OR', 1], ['CA', 5], ['OR', 1]]

In [57]:
rdd.reduceByKey(lambda v1,v2 :v1+v2).collect()

[('CA', 8), ('WA', 1), ('OR', 2)]

In [59]:
def mean(args):
    key,iterator = args
    total = 0.0; count = 0
    for x in iterator:
        total += x; count += 1
    return total / count

rdd.groupByKey()\
    .map(mean)\
    .collect()

[2.6666666666666665, 1.0, 1.0]

### Task

You want to obtain an RDD of the states sorted by their decreasing cumulated sales.

What transformations do you need to apply ?

If you had to draw a workflow of the transformations to apply ?

In [97]:
rddout = rdd_sales.map(lambda x: (x[3],x[5]))\
        # .reduceByKey(lambda amount1, amount2 : amount1 + amount2)\
        # .sortBy(lambda state_amount:state_amount[1],ascending=False)
rddout.collect()

[('WA', 300.0),
 ('OR', 450.0),
 ('CA', 200.0),
 ('CA', 330.0),
 ('WA', 750.0),
 ('CA', 200.0)]

### Task
You want to create a table of unique words and their occurences.

What transformations do you need to apply ?

If you had to draw a workflow of the transformations to apply ?

### Code

In [67]:
# displaying the content of the file in stdout
with open(dir_link + 'data/input.txt', 'r') as fin:
    print(fin.read())

# reading the file using SparkContext
rdd_text = sc.textFile(dir_link + 'data/input.txt')

hello world
another line
yet another line
yet another another line



In [82]:
rdd_txt = sc.textFile(dir_link +'data/input.txt')\
            .flatMap(lambda rowstr : rowstr.split())\
            .map(lambda word: (word,1))\
            .reduceByKey(lambda v1, v2: v1+v2)

rdd_txt.collect()

[('world', 1), ('line', 3), ('yet', 2), ('hello', 1), ('another', 4)]

### Task

Now, design a pipeline that would :
1. filter out headers
2. split each line based on comma
3. keep only fields for Date (col 0) and Close (col 4)
4. order by Close in descending order

### Code

In [85]:
rdd_aapl_raw = sc.textFile(dir_link+'data/aapl.csv')

print("lines in file: {}".format(rdd_aapl_raw.count()))

rdd_aapl_raw.take(5)

lines in file: 254


['Date,Open,High,Low,Close,Volume,Adj Close',
 '2016-10-25,117.949997,118.360001,117.309998,118.25,39190300,118.25',
 '2016-10-24,117.099998,117.739998,117.00,117.650002,23538700,117.650002',
 '2016-10-21,116.809998,116.910004,116.279999,116.599998,23192700,116.599998',
 '2016-10-20,116.860001,117.379997,116.330002,117.059998,24125800,117.059998']

In [141]:
rdd_csv = sc.textFile(dir_link+'data/aapl.csv')\
            .map(lambda line: line.split(","))\
            .filter(lambda row : not row[0].startswith('Date'))\
            .map(lambda fields: (float(fields[4]),fields[0]))\
            .sortBy(lambda row :row[0], ascending=False)

rdd_csv.collect()

AttributeError: 'NoneType' object has no attribute 'sc'

# 3. Caching / Persistency

- The RDD does no work until an action is called. And then when an action is called it figures out the answer and then throws away all the data.
- If you have an RDD that you are going to reuse in your computation you can use cache() to make Spark cache the RDD.
- This is especially useful if you have to run the same computation over and over again on one RDD: one use case ? oh I don't know maybe... **MACHINE LEARNING !!!**

## 3.1. Caching

Consider the following job...

In [138]:
import random
num_count = 500*1000
num_list = [random.random() for i in range(num_count)]
rdd1 = sc.parallelize(num_list)
rdd2 = rdd1.sortBy(lambda num: num)

In [139]:
%time rdd2.count()
%time rdd2.count()
%time rdd2.count()

CPU times: user 10.2 ms, sys: 582 µs, total: 10.8 ms
Wall time: 618 ms
CPU times: user 10.8 ms, sys: 0 ns, total: 10.8 ms
Wall time: 330 ms
CPU times: user 10.3 ms, sys: 0 ns, total: 10.3 ms
Wall time: 373 ms


500000

In [140]:
sc.stop()
spark.stop()