# Cloudvane Sample 1 - Basics of RDDs

## Import libraries
We will need pyspark and random, since we generate some values. 
We will also create the spark context. 

NOTE: only execute this line once, since otherwise you will create multiple spark context's (which isn't possible with the same name!)

In [11]:
import pyspark
import random

sc = pyspark.SparkContext(appName="Cloudvane_S01")

## Create Data

Now, let's create some data. We will use the expovariate to get different data sets, both negative and positive
We will create 100 entries

In [38]:
someValues = []

for i in range(0,100):
    someValues.append(random.expovariate(random.random()-0.5))

someValues

[-0.39052765211055196,
 -0.18288145405368328,
 -0.840247848391749,
 -198.88346281412788,
 -1.9430921485739105,
 16.483623772565192,
 1.0277011272947039,
 -12.388154209455285,
 -1.1270971429287095,
 -4.883935587778448,
 1.5676936495442393,
 2.1311945947828472,
 2.8452657725322634,
 -19.904683801871926,
 9.296825495359155,
 -7.352958838775534,
 9.685279820095236,
 -10.070354070141581,
 11.660280023307187,
 3.366825580226044,
 -77.96153754411752,
 3.712168911008661,
 0.07516359676992539,
 44.903601366655806,
 1.3103360286093775,
 3.628974834769563,
 13.646311998408688,
 11.942105527148861,
 48.789601151863174,
 0.9597972414893609,
 -6.723116376706565,
 -0.13003904919520468,
 -99.68110411872009,
 -1.6200299927834667,
 -3.6071470327649884,
 0.5029789115989489,
 -1.7857219465177903,
 -1.041472611093276,
 -5.8573532472676915,
 -1.1951289556187288,
 -14.396184099101573,
 3.546289243484724,
 -3.7836794738931814,
 -2.319411099922199,
 2.4733593029763923,
 -3.684868547714244,
 -34.37529297628501,

## Let's now switch over to Spark ... 

We parallelize the data to get back an RDD

In [13]:
spark_data = sc.parallelize(someValues)

## Let's do some basic math - sum and count

In [14]:
sp_sum = spark_data.sum()
sp_sum

-230.79359878996544

In [15]:
sp_ct = spark_data.count()
sp_ct

100

## But what if we want to get the average? 

We need to create a function for that - and then apply the function

In [16]:
def average(vals):
    return vals.sum() / vals.count()

In [17]:
average(spark_data)

-2.3079359878996546

# Transformations in Spark
## Filter data

In [18]:
sp_pos = spark_data.filter(lambda x: x>0.0).collect()
sp_pos

[88.34565364262268,
 0.45385649435298375,
 0.46904166519865287,
 0.1877880521690893,
 102.53631695817127,
 0.6364964164480461,
 0.19863253799948713,
 29.46021296980408,
 2.3555510156841004,
 1.052184063595872,
 5.36007533255343,
 0.9888260521694023,
 7.203306232812682,
 1.3970260876189113,
 0.5164789411430335,
 0.6704481697927206,
 3.0602454358552516,
 4.772168508954168,
 3.1424333977160015,
 0.9387951453048047,
 1.6690042303336938,
 1.0831883044415573,
 2.0078206646486176,
 1.538744882998567,
 5.130641901930761,
 7.878382165179673,
 15.681277994516416,
 1.1261064424114673,
 1.5555583794217287,
 13.053021289972232,
 0.4279691734717034,
 2.4104514294762676,
 4.412272263448461,
 1.5142114069484345,
 8.045739845164702,
 40.947409727755925,
 12.937844201015126,
 9.535233760270598,
 3.662073923714872,
 1.334248644614033,
 6.076225954536054]

In [19]:
sp_pos = spark_data.filter(lambda x: x>0.0).filter(lambda y: y<3.0).collect()
sp_pos

[0.45385649435298375,
 0.46904166519865287,
 0.1877880521690893,
 0.6364964164480461,
 0.19863253799948713,
 2.3555510156841004,
 1.052184063595872,
 0.9888260521694023,
 1.3970260876189113,
 0.5164789411430335,
 0.6704481697927206,
 0.9387951453048047,
 1.6690042303336938,
 1.0831883044415573,
 2.0078206646486176,
 1.538744882998567,
 1.1261064424114673,
 1.5555583794217287,
 0.4279691734717034,
 2.4104514294762676,
 1.5142114069484345,
 1.334248644614033]

## Sort Data

In [20]:
sp_sorted = spark_data.sortBy(lambda x: x).collect()
sp_sorted

[-98.9855394163018,
 -82.99907639866315,
 -79.26003495162618,
 -57.08402597863696,
 -39.954941495333095,
 -22.665733210312986,
 -16.21422570750955,
 -15.132142246086985,
 -13.469363181335009,
 -11.9685694199877,
 -11.79727270903096,
 -10.640544940363565,
 -10.177456941661683,
 -9.350136396451243,
 -9.15562223823075,
 -8.885087776097684,
 -8.828489723753306,
 -7.121259402310163,
 -5.968796870185603,
 -5.795297889498543,
 -5.759169571034792,
 -5.426576097307699,
 -4.892257593968877,
 -4.870336498868918,
 -4.803714343447072,
 -4.680890685710039,
 -4.237095583451295,
 -4.070230304352671,
 -4.002950779462385,
 -3.7723444233913637,
 -3.7385652813430474,
 -3.7001325937448177,
 -3.476529308122447,
 -3.4607695761471398,
 -3.2630086565965457,
 -3.1834886702976726,
 -3.1731665500044492,
 -2.5811021702883856,
 -2.5128827141064334,
 -2.4868113697586396,
 -2.3181227973720886,
 -2.178959830984491,
 -2.165302513270822,
 -2.159527108924834,
 -1.8400311952384203,
 -1.8049642819371752,
 -1.64447847631652

In [21]:
sp_sorted = spark_data.sortBy(lambda x: x, False).collect()
sp_sorted

[102.53631695817127,
 88.34565364262268,
 40.947409727755925,
 29.46021296980408,
 15.681277994516416,
 13.053021289972232,
 12.937844201015126,
 9.535233760270598,
 8.045739845164702,
 7.878382165179673,
 7.203306232812682,
 6.076225954536054,
 5.36007533255343,
 5.130641901930761,
 4.772168508954168,
 4.412272263448461,
 3.662073923714872,
 3.1424333977160015,
 3.0602454358552516,
 2.4104514294762676,
 2.3555510156841004,
 2.0078206646486176,
 1.6690042303336938,
 1.5555583794217287,
 1.538744882998567,
 1.5142114069484345,
 1.3970260876189113,
 1.334248644614033,
 1.1261064424114673,
 1.0831883044415573,
 1.052184063595872,
 0.9888260521694023,
 0.9387951453048047,
 0.6704481697927206,
 0.6364964164480461,
 0.5164789411430335,
 0.46904166519865287,
 0.45385649435298375,
 0.4279691734717034,
 0.19863253799948713,
 0.1877880521690893,
 -0.02543266703814248,
 -0.08194197431146938,
 -0.1879603077805609,
 -0.5292979201735992,
 -0.5482104497704919,
 -0.6996978372561382,
 -0.69997821587222

## Join Data 

In [22]:
ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
ds_two = sc.parallelize([("Mark", 2015), ("Anastasia", 2017)])

sorted(ds_one.join(ds_two).collect())

[('Mark', (1984, 2015))]

## Distinct

In [23]:
ds_distinct = sc.parallelize([(1), (2), (3), (4), (1)]).distinct().collect()
ds_distinct

[4, 1, 2, 3]

## Group Data

In [24]:
ds_set = sc.parallelize([("Mark", 1984), ("Lisa", 1985), ("Mark", 2015)])

ds_grp = ds_set.groupByKey().mapValues(list).collect()
ds_grp

[('Lisa', [1985]), ('Mark', [1984, 2015])]

In [25]:
ds_set.groupByKey().mapValues(len).collect()

[('Lisa', 1), ('Mark', 2)]

## Union

In [26]:
ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
ds_two = sc.parallelize([("Luke", 2015), ("Anastasia", 2017)])

sorted(ds_one.union(ds_two).collect())

[('Anastasia', 2017), ('Lisa', 1985), ('Luke', 2015), ('Mark', 1984)]

## Intersection

In [27]:
ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
ds_two = sc.parallelize([("Mark", 1984), ("Anastasia", 2017)])

sorted(ds_one.intersection(ds_two).collect())

[('Mark', 1984)]

## Map Function

In [28]:
import datetime

def doTheTrans(values):
    
    age = datetime.datetime.now().year - values[1]
    gender = ""
    
    if values[0].endswith("a"):
        gender = "f"
    else:
        gender = "m"
        
    return ([values[0], age, gender])

ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])

sorted(ds_one.map(doTheTrans).collect())

[['Lisa', 34, 'f'], ['Mark', 35, 'm']]

In [29]:
sorted(ds_one.map(lambda x: (x[0], datetime.datetime.now().year - x[1] +1)).collect())

[('Lisa', 35), ('Mark', 36)]

# Actions in Spark

In [43]:
from operator import *

spark_data.reduce(mul)

-3.410529684695683e+52

In [52]:
ds_one.first()

('Mark', 1984)

In [54]:
ds_one.count()

2

In [55]:
ds_one.saveAsTextFile("data/dsone.csv")