### View spark session

In [1]:
spark

<pyspark.sql.session.SparkSession at 0x7f041c01f1d0>

# WordCount Program

In [3]:
# Create an RDD for data stored in HDFS. Change the path to where you have data.
poems = sc.textFile("/user/mapr/poems")

In [4]:
# Find number of records in poems RDD
poems.count()

8

In [5]:
# Find first record in the RDD
poems.first()

'Warm summer sun,'

In [6]:
poems.take(5)

['Warm summer sun,',
 'Shine kindly here,',
 'Warm southern wind,',
 'Blow softly here.',
 'Green sod above,']

In [7]:
poems.collect()

['Warm summer sun,',
 'Shine kindly here,',
 'Warm southern wind,',
 'Blow softly here.',
 'Green sod above,',
 'Lie light, lie light.',
 'Good night, dear heart,',
 'Good night, good night.']

In [8]:
poems.flatMap(lambda line: str.lower(line).split(r" "))\
.map(lambda word: (word, 1))\
.groupByKey()\
.mapValues(lambda values: sum(values))\
.collect()

[('here.', 1),
 ('good', 3),
 ('lie', 2),
 ('green', 1),
 ('light,', 1),
 ('above,', 1),
 ('here,', 1),
 ('summer', 1),
 ('night.', 1),
 ('night,', 2),
 ('shine', 1),
 ('blow', 1),
 ('wind,', 1),
 ('southern', 1),
 ('sun,', 1),
 ('kindly', 1),
 ('dear', 1),
 ('heart,', 1),
 ('softly', 1),
 ('warm', 2),
 ('sod', 1),
 ('light.', 1)]

# Exercise: 
Find out average trading volume per stock in 2016 using stocks.csv

In [14]:
stocks = sc.textFile("/user/mapr/stocks")

In [15]:
stocks.count()

1857093

In [17]:
stocks.cache().count()

1857093

In [18]:
for line in stocks.take(10):
    print(line)

date,open,high,low,close,volume,adjclose,symbol
2000-07-17,95.4375,97.5,92.75,96.625,3508100.0,74.269199,XLNX
2000-07-17,22.625,22.75,22.4375,22.5625,201600.0,13.48614,ES
2000-07-17,6.750002,6.937503,6.375,6.5,1235700.0,5.241649,CHK
2000-07-17,19.812501,20.1875,19.500001,20.1875,1434100.0,3.806147,NI
2000-07-17,30.5,30.6875,30.0,30.03125,254600.0,19.81183,SNA
2000-07-17,44.749996,45.062498,44.500004,45.000009,535200.0,17.400773,FOXA
2000-07-17,19.625,19.625,19.25,19.375,309500.0,13.768835,R
2000-07-17,16.6562,16.6875,16.125,16.25,5507200.0,1.755466,ROST
2000-07-17,56.25,57.25,56.0625,56.125,7941200.0,18.31076,PG


In [19]:
stocks2016 = stocks.filter(lambda line: line.startswith("2016"))
for line in stocks2016.take(10):
    print(line)

2016-01-04,46.119999,46.130001,45.360001,45.799999,3472200.0,44.870315,XLNX
2016-01-04,50.650002,50.889999,50.23,50.880001,1590300.0,50.053021,ES
2016-01-04,4.44,4.97,4.4,4.95,3.84761E7,4.95,CHK
2016-01-04,19.41,19.66,19.309999,19.52,2592700.0,19.121168,NI
2016-01-04,168.789993,169.5,166.5,168.529999,413800.0,167.208406,SNA
2016-01-04,86.279999,87.269997,85.550003,87.18,2657300.0,85.403348,LYB
2016-01-04,17.559999,17.639999,17.379999,17.620001,5887800.0,17.325726,WU
2016-01-04,26.719999,26.76,26.299999,26.59,1.19141E7,26.446528,FOXA
2016-01-04,67.029999,69.269997,64.639999,68.769997,4249800.0,67.626307,WYNN
2016-01-04,55.950001,57.200001,55.25,56.900002,1119900.0,56.144751,R


In [22]:
stocks_paired = stocks2016\
.map(lambda line:line.split(","))\
.map(lambda tokens: (tokens[7], float(tokens[5])))\

stocks_paired.first()

('XLNX', 3472200.0)

In [27]:
avg_vol_by_stock = stocks_paired\
.groupByKey()\
.mapValues(lambda values: sum(values)/ len(values))\
.sortBy(lambda pair: pair[1], False)

for pair in avg_vol_by_stock.take(10):
    print(pair)

('BAC', 109953689.74358974)
('FCX', 47979558.333333336)
('CHK', 41622735.256410256)
('AAPL', 40944183.974358976)
('GE', 37751663.461538464)
('F', 37432197.43589743)
('PFE', 35777183.974358976)
('MSFT', 34194448.07692308)
('FB', 28902566.025641024)
('MU', 27260807.692307692)


In [29]:
avg_vol_by_stock.saveAsTextFile("/user/mapr/stocks_avg")

In [30]:
!hadoop fs -ls /user/mapr/stocks_avg

Found 3 items
-rwxr-xr-x   1 mapr mapr          0 2017-08-08 04:14 /user/mapr/stocks_avg/_SUCCESS
-rwxr-xr-x   1 mapr mapr       8712 2017-08-08 04:14 /user/mapr/stocks_avg/part-00000
-rwxr-xr-x   1 mapr mapr       4976 2017-08-08 04:14 /user/mapr/stocks_avg/part-00001


In [31]:
!hadoop fs -cat /user/mapr/stocks_avg/part* | head

('BAC', 109953689.74358974)
('FCX', 47979558.333333336)
('CHK', 41622735.256410256)
('AAPL', 40944183.974358976)
('GE', 37751663.461538464)
('F', 37432197.43589743)
('PFE', 35777183.974358976)
('MSFT', 34194448.07692308)
('FB', 28902566.025641024)
('MU', 27260807.692307692)
cat: Unable to write to output stream.
cat: Unable to write to output stream.


# MovieLens Join of Data Using RDD

In [41]:
data_home = "/user/mapr/movielens"

In [42]:
movies = sc\
.textFile(data_home + "/movies")\
.filter(lambda line: not line.startswith("movieId"))\
.map(lambda line: line.split(","))\
.map(lambda tokens: (tokens[0], tokens[1]))

for line in movies.take(10):
    print(line)

('1', 'Toy Story (1995)')
('2', 'Jumanji (1995)')
('3', 'Grumpier Old Men (1995)')
('4', 'Waiting to Exhale (1995)')
('5', 'Father of the Bride Part II (1995)')
('6', 'Heat (1995)')
('7', 'Sabrina (1995)')
('8', 'Tom and Huck (1995)')
('9', 'Sudden Death (1995)')
('10', 'GoldenEye (1995)')


In [44]:
ratings = sc\
.textFile(data_home + "/ratings")\
.filter(lambda line: not line.startswith("userId"))\
.map(lambda line: line.split(","))\
.map(lambda tokens: (tokens[1], float(tokens[2])))

for line in ratings.take(10):
    print(line)

('16', 4.0)
('24', 1.5)
('32', 4.0)
('47', 4.0)
('50', 4.0)
('110', 4.0)
('150', 3.0)
('161', 4.0)
('165', 3.0)
('204', 0.5)


In [45]:
ratings_avg = ratings.groupByKey().mapValues(lambda values: sum(values) / len(values))

for p in ratings_avg.take(10):
    print(p)

('2250', 2.1666666666666665)
('5951', 4.5)
('6558', 2.6666666666666665)
('66798', 3.1)
('110501', 4.3)
('4833', 1.0)
('2265', 1.5)
('27704', 2.0)
('3665', 1.0)
('4652', 2.0)


In [46]:
for p in ratings_avg.join(movies).take(10):
    print(p)

('53318', (4.0, 'Cashback (2006)'))
('3793', (3.5576923076923075, 'X-Men (2000)'))
('5460', (2.125, '"Powerpuff Girls'))
('25796', (3.0, '"Ghoul'))
('142074', (3.0, 'Knock Knock (2015)'))
('4986', (0.5, 'Silent Rage (1982)'))
('87660', (4.5, 'Too Big to Fail (2011)'))
('26489', (1.5, 'Strange Invaders (1983)'))
('27704', (2.0, 'Battle Royale 2: Requiem (Batoru rowaiaru II: Chinkonka) (2003)'))
('59026', (4.0, '99 francs (2007)'))


In [48]:
movie_avg_title = ratings_avg.join(movies).map(lambda p: (p[1][1], p[1][0]))
for p in movie_avg_title.take(10):
    print(p)

('Cashback (2006)', 4.0)
('X-Men (2000)', 3.5576923076923075)
('"Powerpuff Girls', 2.125)
('"Ghoul', 3.0)
('Knock Knock (2015)', 3.0)
('Silent Rage (1982)', 0.5)
('Too Big to Fail (2011)', 4.5)
('Strange Invaders (1983)', 1.5)
('Battle Royale 2: Requiem (Batoru rowaiaru II: Chinkonka) (2003)', 2.0)
('99 francs (2007)', 4.0)
