In [1]:
from common import Common
from music import Music
from pyspark import *
from pyspark.streaming import *
from pyspark import SparkContext, SparkConf

In [2]:
## Do not run this multiple times
common = Common()
sc = common.get_spark_core()

<SparkContext master=local appName=myapp>
2.4.5


### Read data from a file

In [3]:
rdd = sc.textFile('/home/ec2-user/data/blogtexts')
print(rdd.first())
rdd.take(2)

Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you imagine how many drives / CDs / Blue-ray DVDs would be required to store them? It is difficult to imagine this scale of data generation even as a data science professional. While this pace of data generation is very exciting,  it has created entirely new set of challenges and has forced us to find new ways to handle Big Huge data effectively.


['Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you imagine how many drives / CDs / Blue-ray DVDs would be required to store them? It is difficult to imagine this scale of data generation even as a data science professional. While this pace of data generation is very exciting,  it has created entirely new set of challenges and has forced us to find new ways to handle Big Huge data effectively.',
 '']

### Read data from a variable

In [4]:
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
print(rdd.first())
rdd.take(2)

1


[1, 2]

### Map Transformation

In [5]:
print ('Map')
def multiply_by_two(x:int) -> int:
    return x*2
    
data = range(0,100)
rdd = sc.parallelize(data)
print ('input rdd: ', rdd.take(5))

rdd = rdd.map(multiply_by_two)
print ('output rdd: ', rdd.take(5))

Map
input rdd:  [0, 1, 2, 3, 4]
output rdd:  [0, 2, 4, 6, 8]


### Filter Transformation

In [6]:
print ('Filter')
def filter_by_even(x:int) -> int:
    return x%2==0

data = range(0,100)
rdd = sc.parallelize(data)
print ('input rdd: ', rdd.take(10))

rdd = rdd.filter(filter_by_even)
print ('output rdd: ', rdd.take(10))

Filter
input rdd:  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
output rdd:  [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


### FlatMap Transformation

In [7]:
print ('FlatMap in function')
def flatten(x):
    return x[1].split(',')

data = [('A', '1,2,3'), ('B', '5,6,7'), ('C', '8,9,10')]
rdd = sc.parallelize(data)
print ('input rdd: ', rdd.take(3))

rdd = rdd.flatMap(flatten)
print ('output rdd: ', rdd.take(6))

FlatMap in function
input rdd:  [('A', '1,2,3'), ('B', '5,6,7'), ('C', '8,9,10')]
output rdd:  ['1', '2', '3', '5', '6', '7']


In [8]:
print ('FlatMap in lambda')
data = [('A', '1,2,3'), ('B', '5,6,7'), ('C', '8,9,10')]
rdd = sc.parallelize(data)
print ('input rdd: ', rdd.take(3))

rdd = rdd.flatMap(lambda x: (x[1].split(',')))
print ('output rdd: ', rdd.take(6))

FlatMap in lambda
input rdd:  [('A', '1,2,3'), ('B', '5,6,7'), ('C', '8,9,10')]
output rdd:  ['1', '2', '3', '5', '6', '7']


### FlatMapValues Transformation

In [9]:
print ('FlatMapValues in function')
def flatten(x):
    return x.split(' ')

data = [('A', '1,2,3'), ('B', '5,6,7'), ('C', '8,9,10')]
rdd = sc.parallelize(data)
print ('input rdd: ', rdd.take(3))

rdd = rdd.flatMapValues(flatten)
print ('output rdd: ', rdd.take(6))

FlatMapValues in function
input rdd:  [('A', '1,2,3'), ('B', '5,6,7'), ('C', '8,9,10')]
output rdd:  [('A', '1,2,3'), ('B', '5,6,7'), ('C', '8,9,10')]


In [10]:
print ('FlatMapValues in lambda')
data = [('A', '1,2,3'), ('B', '5,6,7'), ('C', '8,9,10')]
rdd = sc.parallelize(data)
print ('input rdd: ', rdd.take(3))

rdd = rdd.flatMapValues(lambda x: x.split(','))
print ('output rdd: ', rdd.take(6))

FlatMapValues in lambda
input rdd:  [('A', '1,2,3'), ('B', '5,6,7'), ('C', '8,9,10')]
output rdd:  [('A', '1'), ('A', '2'), ('A', '3'), ('B', '5'), ('B', '6'), ('B', '7')]


### Working with files and classes

In [11]:
file = '/home/ec2-user/data/spotify.csv'
rdd = sc.textFile(file)

header = rdd.first()
print ('header: ', header)

rdd = rdd.filter(lambda x: x != header)
print ('first csv: ', rdd.first())

rdd = rdd.map(lambda x: x.split(',')).map(lambda x: (x[5], Music(x[0], x[1], x[2], x[3], x[4], x[5])))
print ('first class: ', rdd.first())


header:  acousticness,danceability,duration_ms,energy,song_title,artist
first csv:  0.0102,0.833,204600,0.434,Mask Off,Future
first class:  ('Future', <music.Music object at 0x7f6eac639410>)


In [12]:
def print_music(music:Music):
        return ('[acoustic: ' + music.acoustic + '] ' +
               '[dance: ' + music.dance + '] ' +
               '[duration: ' + music.duration + '] ' +
               '[energy: ' + music.energy + '] ' +
               '[title: ' + music.title + '] ' +
               '[artist: ' + music.artist+ ']')
print_music(rdd.first()[1])

'[acoustic: 0.0102] [dance: 0.833] [duration: 204600] [energy: 0.434] [title: Mask Off] [artist: Future]'

### ReduceByKey Transformation

In [13]:
rdd_mapped = rdd.map(lambda x: (x[0], 1))
print ('Mapped First: ', rdd_mapped.first())
print ('Mapped Count: ', rdd_mapped.count())

# count occurrence of "artist"
rdd_reduced = rdd_mapped.reduceByKey(lambda x,y: x+y)
print ('\nReduced First: ', rdd_reduced.first())
print ('Reduced Count: ', rdd_reduced.count())

Mapped First:  ('Future', 1)
Mapped Count:  2017

Reduced First:  ('Future', 8)
Reduced Count:  1369


### Group Transformation

In [14]:
rdd_mapped = rdd.map(lambda x: (x[0], 1))
print ('Mapped First: ', rdd_mapped.first())
print ('Mapped Count: ', rdd_mapped.count())

# count occurrence of "artist"
rdd_grouped = rdd_mapped.groupBy(lambda w: w[0])
print ('\nGrouped First: ', rdd_grouped.first())
print ('Grouped Count: ', rdd_grouped.count())

print ('\nGrouped first details:')
list(rdd_grouped.first()[1])

Mapped First:  ('Future', 1)
Mapped Count:  2017

Grouped First:  ('Future', <pyspark.resultiterable.ResultIterable object at 0x7f6eabbd2c90>)
Grouped Count:  1369

Grouped first details:


[('Future', 1),
 ('Future', 1),
 ('Future', 1),
 ('Future', 1),
 ('Future', 1),
 ('Future', 1),
 ('Future', 1),
 ('Future', 1)]

### Partitions and re-partitions

In [15]:
print ('Current partitions: ', rdd.getNumPartitions())
print ('Record count: ', rdd.count())

rdd = rdd.repartition(3)
print ('\nNew partitions: ', rdd.getNumPartitions())
print ('Record count: ', rdd.count())

Current partitions:  1
Record count:  2017

New partitions:  3
Record count:  2017


### mapPartition Transformation

In [16]:
rdd_mapped = rdd.map(lambda x: (x[0], 1))
print ('Mapped First: ', rdd_mapped.first())
print ('Mapped Count: ', rdd_mapped.count())

def func(iterator):
    yield list(iterator)[0:5]
    
rdd_mp = rdd_mapped.mapPartitions(func)
rdd_mp.collect()

Mapped First:  ('Cajmere', 1)
Mapped Count:  2017


[[('Cajmere', 1),
  ('The Rapture', 1),
  ('Young Thug', 1),
  ('Ty Segall', 1),
  ('Myron & E', 1)],
 [('Future', 1),
  ('Childish Gambino', 1),
  ('Future', 1),
  ('Beach House', 1),
  ('Junior Boys', 1)],
 [('The Avalanches', 1),
  ('Modern Folk', 1),
  ('Erkin Koray', 1),
  ('Lil Yachty', 1),
  ('PNL', 1)]]

### Join Transformation

In [17]:
data = [('brocoli', 6), ('melon', 3), ('banana', 1), ('melon', 4), ('brocoli', 9), ('melon', 15), ('brocoli', 16), ('melon', 13), ('banana', 11)]
label = [('brocoli','veggie'), ('melon','fruit'), ('banana','fruit')]

data_rdd = sc.parallelize(data)
label_rdd = sc.parallelize(label)

print ('Data: ', data_rdd.collect())
print ('\nLabel: ', label_rdd.collect())

Data:  [('brocoli', 6), ('melon', 3), ('banana', 1), ('melon', 4), ('brocoli', 9), ('melon', 15), ('brocoli', 16), ('melon', 13), ('banana', 11)]

Label:  [('brocoli', 'veggie'), ('melon', 'fruit'), ('banana', 'fruit')]


In [18]:
joined = data_rdd.join(label_rdd).map(lambda x: (x[0], x[1][1], x[1][0]))
joined.collect()

[('melon', 'fruit', 3),
 ('melon', 'fruit', 4),
 ('melon', 'fruit', 15),
 ('melon', 'fruit', 13),
 ('banana', 'fruit', 1),
 ('banana', 'fruit', 11),
 ('brocoli', 'veggie', 6),
 ('brocoli', 'veggie', 9),
 ('brocoli', 'veggie', 16)]

### Union Transformation

In [19]:
print ("UNION")
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize(['C', 'D'])
rdd3 = rdd1.union(rdd2)
rdd3.collect()

UNION


[1, 2, 3, 'C', 'D']

### Save file

In [20]:
joined.saveAsTextFile("/home/ec2-user/output/joined")