In [2]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local").setAppName("Spark-Lab")
sc = SparkContext(conf=conf)

### Question 1

Use the `fakefriends.csv` data to figure out the Average Number of Friends by Age. 

In [3]:
fakefriends = sc.textFile("data/fakefriends.csv")

In [8]:
def parse_friends(friend):
    friend_lis = friend.split(',')
    age = friend_lis[2]
    num_friends = friend_lis[3]
    return age, int(num_friends)

In [11]:
ff_mapper = fakefriends.map(parse_friends).mapValues(lambda x: (x, 1))
ff_reducer = ff_mapper.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) 

In [16]:
ff_mapper.top(3)

[(u'69', (491, 1)), (u'69', (470, 1)), (u'69', (431, 1))]

In [15]:
ff_reducer.top(3)

[(u'69', (2352, 10)), (u'68', (2696, 10)), (u'67', (3434, 16))]

In [14]:
ff_reducer.mapValues(lambda x: x[0] / x[1]).collect()

[(u'56', 306),
 (u'54', 278),
 (u'28', 209),
 (u'29', 215),
 (u'60', 202),
 (u'61', 256),
 (u'62', 220),
 (u'63', 384),
 (u'64', 281),
 (u'49', 184),
 (u'66', 276),
 (u'67', 214),
 (u'68', 269),
 (u'69', 235),
 (u'52', 340),
 (u'24', 233),
 (u'25', 197),
 (u'26', 242),
 (u'27', 228),
 (u'20', 165),
 (u'21', 350),
 (u'22', 206),
 (u'23', 246),
 (u'46', 223),
 (u'47', 233),
 (u'44', 282),
 (u'45', 309),
 (u'42', 303),
 (u'43', 230),
 (u'40', 250),
 (u'41', 268),
 (u'35', 211),
 (u'36', 246),
 (u'31', 267),
 (u'32', 207),
 (u'39', 169),
 (u'38', 193),
 (u'59', 220),
 (u'58', 116),
 (u'55', 295),
 (u'18', 343),
 (u'57', 258),
 (u'30', 235),
 (u'51', 302),
 (u'50', 254),
 (u'53', 222),
 (u'34', 245),
 (u'19', 213),
 (u'37', 249),
 (u'48', 281),
 (u'33', 325),
 (u'65', 298)]

### Question 2

Use the `1800.csv` to find the minimum and maximum temperatures by location

In [40]:
temps = sc.textFile("data/1800.csv")
temps.top(3)

[u'ITE00100554,18001231,TMIN,25,,,E,',
 u'ITE00100554,18001231,TMAX,50,,,E,',
 u'ITE00100554,18001230,TMIN,31,,,E,']

In [42]:
temps_split = temps.map(lambda x: x.split(','))
temps_tmin = temps_split.filter(lambda x: x[2] == 'TMIN').map(lambda x: (x[0], int(x[3])))

In [46]:
min_station_temps = temps_tmin.reduceByKey(lambda x, y: min(x, y))

In [47]:
min_station_temps.collect()

[(u'ITE00100554', -148), (u'EZE00100082', -135)]

### Question 3
Use the `Book.txt` file to build a word counter.

### Question 4

From the `ml-100/u.data`, find the most popular movies. 


In [51]:
def loaMovieNames():
    movieNames = {}
    with open('data/ml-100k/u.item') as f:
        for line in f:
            fields = line.split('|')
            movieNames[fields[0]] = fields[1]
    return movieNames


movie_names = sc.broadcast(loaMovieNames())

In [64]:
movies = sc.textFile('data/ml-100k/u.data')
movies_mapper = movies.map(lambda x: x.split('\t')[1:3]).map(lambda x: (x[0], (int(x[1]), 1)))

In [79]:
movies_reducer = movies_mapper.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
# note that in order to extract from broadcasted variable you need to use the value method
extract_movie_pop = lambda deets: (movie_names.value[deets[0]], deets[1][0] / deets[1][1])
movie_popularity = movies_reducer.map(extract_movie_pop)

In [80]:
sorted(movie_popularity.collect(), key=lambda x: x[1], reverse=True)

[('Santa with Muscles (1996)', 5),
 ('Great Day in Harlem, A (1994)', 5),
 ('They Made Me a Criminal (1939)', 5),
 ("Someone Else's America (1995)", 5),
 ('Saint of Fort Washington, The (1993)', 5),
 ('Aiqing wansui (1994)', 5),
 ('Entertaining Angels: The Dorothy Day Story (1996)', 5),
 ('Prefontaine (1997)', 5),
 ('Star Kid (1997)', 5),
 ('Marlene Dietrich: Shadow and Light (1996) ', 5),
 ('Good Will Hunting (1997)', 4),
 ('Sense and Sensibility (1995)', 4),
 ('Fargo (1996)', 4),
 ('Silence of the Lambs, The (1991)', 4),
 ('Terminator 2: Judgment Day (1991)', 4),
 ('C\xe9r\xe9monie, La (1995)', 4),
 ('Nightwatch (1997)', 4),
 ('Nico Icon (1995)', 4),
 ('Letter From Death Row, A (1998)', 4),
 ('Once Were Warriors (1994)', 4),
 ('Lawrence of Arabia (1962)', 4),
 ('Third Man, The (1949)', 4),
 ('Boot, Das (1981)', 4),
 ('Close Shave, A (1995)', 4),
 ('Ran (1985)', 4),
 ('12 Angry Men (1957)', 4),
 ('Raiders of the Lost Ark (1981)', 4),
 ('Princess Bride, The (1987)', 4),
 ('Empire Strik

### Question 5

Use the Marvel Superheroes dataset to find the most popular superhero based on co-occurrences with other superheroes. 

In [101]:
marvel_graph = sc.textFile("data/Marvel-Graph.txt")
marvel_names = sc.textFile("data/Marvel-Names.txt")
marvel_names_dict = dict(marvel_names.map(lambda x: x.split(' ', 1)).collect())
marvel_names_dict_broad = sc.broadcast(marvel_names_dict)

In [108]:
superheroes_occs = marvel_graph.map(lambda x: tuple(x.split(' ', 1)))
superheroes_occs = superheroes_occs.mapValues(lambda cos: len(cos.split()))

In [109]:
super_names = superheroes_occs.map(lambda x: (marvel_names_dict_broad.value[x[0]], x[1]))

In [110]:
sorted(super_names.collect(), key=lambda x: x[1], reverse=True)

[(u'"CAPTAIN MARVEL II/MO"', 501),
 (u'"ANT-MAN/DR. HENRY J."', 501),
 (u'"ANT-MAN/DR. HENRY J."', 501),
 (u'"DR. DOOM/VICTOR VON "', 501),
 (u'"HUMAN TORCH/JOHNNY S"', 501),
 (u'"HUMAN TORCH/JOHNNY S"', 501),
 (u'"ROGUE /"', 501),
 (u'"CAPTAIN AMERICA"', 501),
 (u'"CAPTAIN AMERICA"', 501),
 (u'"CAPTAIN AMERICA"', 501),
 (u'"BINARY/CAROL DANVERS"', 501),
 (u'"JONES, RICHARD MILHO"', 501),
 (u'"NIGHTCRAWLER/KURT WA"', 501),
 (u'"QUICKSILVER/PIETRO M"', 501),
 (u'"SHADOWCAT/KATHERINE "', 501),
 (u'"FURY, COL. NICHOLAS "', 501),
 (u'"DAREDEVIL/MATT MURDO"', 501),
 (u'"WONDER MAN/SIMON WIL"', 501),
 (u'"WONDER MAN/SIMON WIL"', 501),
 (u'"MAGNETO/MAGNUS/ERIC "', 501),
 (u'"PSYLOCKE/ELISABETH B"', 501),
 (u'"NOVA/RICHARD RIDER"', 501),
 (u'"HERCULES [GREEK GOD]"', 501),
 (u'"HERCULES [GREEK GOD]"', 501),
 (u'"WASP/JANET VAN DYNE "', 501),
 (u'"WASP/JANET VAN DYNE "', 501),
 (u'"STORM/ORORO MUNROE S"', 501),
 (u'"STORM/ORORO MUNROE S"', 501),
 (u'"BLACK PANTHER/T\'CHAL"', 501),
 (u'"WATSON-PA

### Question 6 

Let's download Bay Area Bike Share's trip data. 

In [113]:
! wget https://s3.amazonaws.com/babs-open-data/babs_open_data_year_1.zip


--2017-03-23 16:29:05--  https://s3.amazonaws.com/babs-open-data/babs_open_data_year_1.zip
Resolving s3.amazonaws.com... 54.231.80.235
Connecting to s3.amazonaws.com|54.231.80.235|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 84402747 (80M) [application/zip]
Saving to: ‘babs_open_data_year_1.zip’


2017-03-23 16:32:05 (465 KB/s) - ‘babs_open_data_year_1.zip’ saved [84402747/84402747]



In [None]:
! ls

In [114]:
! unzip babs_open_data_year_1.zip 

Archive:  babs_open_data_year_1.zip
   creating: 201402_babs_open_data/
  inflating: 201402_babs_open_data/201402_station_data.csv  
  inflating: 201402_babs_open_data/201402_status_data.csv  
  inflating: 201402_babs_open_data/201402_trip_data.csv  
  inflating: 201402_babs_open_data/201402_weather_data.csv  
  inflating: 201402_babs_open_data/README.txt  
   creating: 201408_babs_open_data/
  inflating: 201408_babs_open_data/201408_station_data.csv  
  inflating: 201408_babs_open_data/201408_status_data.csv  
  inflating: 201408_babs_open_data/201408_trip_data.csv  
  inflating: 201408_babs_open_data/201408_weather_data.csv  
  inflating: 201408_babs_open_data/README.txt  


In [115]:
! rm -f *.zip


In [116]:
! ls


[34m201402_babs_open_data[m[m      [34mmetastore_db[m[m
[34m201408_babs_open_data[m[m      obama.txt
Intro-to-Spark.ipynb       proj_gutenberg_alic.txt
Intro_to_Big_Data.ipynb    reducer.py
Intro_to_Spark_I_Lab.ipynb reducer.py~
[34mdata[m[m                       word_counter.py
derby.log                  word_counter.py~
mapper.py


In [117]:
# size of the downloaded data
! du -h


 60K	./.ipynb_checkpoints
610M	./201402_babs_open_data
643M	./201408_babs_open_data
 15M	./data/ml-100k
 18M	./data
1.0M	./metastore_db/log
2.9M	./metastore_db/seg0
  0B	./metastore_db/tmp
3.9M	./metastore_db
1.2G	.


In [118]:
trips = sc.textFile("201408_babs_open_data/201408_trip_data.csv")

> **Check:** What kind of object is `trips`?

### Exercise 1: Split CSV Lines

In Spark, we can build complex pipelines that only get executed when we ask to collect them.



In other words, we can define the pipeline with all its steps, and only when we call collect will the data flow through it. In order to get familiar with this new workflow, we will start with small steps to build our pipeline.

First step:
- Apply a map to trips that splits each line at commas and save that to a an RDD

**Hint:** if you want to check that you're doing things right, you can collect the result and display the first few lines.

In [122]:
trips_split = trips.map(lambda line: line.split(','))

### Exercise 2: filter for Caltrain station¶

In Spark we can also create filters using the `filter` method. Let's select station number 70 by filtering on the 5th column, we will do all the following analysis just on this station, which corresponds to the most popular starting point. Save this to a variable called `station_70.`

In [134]:
station_70 = trips_split.filter(lambda x: x[4] == '70')

### Exercise 3: trips by day - hour (mapper)
Let's analyse the trips by the hour. We can do this by performing a map reduce job in Spark. First we will need to emit tuples with a count of 1 for each (date, hour) key, and then we will sum the counts by key.

- Emit tuple of ((date, hour), 1), applying a map to `station_70` that extracts the relevant data from each line

In [143]:
stattion_70_date_hour = station_70.map(lambda station: station[2].split(" ")).map(lambda x: ((x[0], x[1].split(":")[0]), 1))

In [144]:
stattion_70_date_hour.collect()

[((u'8/31/2014', u'20'), 1),
 ((u'8/31/2014', u'19'), 1),
 ((u'8/31/2014', u'14'), 1),
 ((u'8/31/2014', u'14'), 1),
 ((u'8/31/2014', u'14'), 1),
 ((u'8/31/2014', u'12'), 1),
 ((u'8/31/2014', u'11'), 1),
 ((u'8/31/2014', u'11'), 1),
 ((u'8/31/2014', u'11'), 1),
 ((u'8/31/2014', u'11'), 1),
 ((u'8/31/2014', u'9'), 1),
 ((u'8/31/2014', u'9'), 1),
 ((u'8/30/2014', u'23'), 1),
 ((u'8/30/2014', u'20'), 1),
 ((u'8/30/2014', u'17'), 1),
 ((u'8/30/2014', u'17'), 1),
 ((u'8/30/2014', u'17'), 1),
 ((u'8/30/2014', u'17'), 1),
 ((u'8/30/2014', u'16'), 1),
 ((u'8/30/2014', u'14'), 1),
 ((u'8/30/2014', u'13'), 1),
 ((u'8/30/2014', u'13'), 1),
 ((u'8/30/2014', u'13'), 1),
 ((u'8/30/2014', u'12'), 1),
 ((u'8/30/2014', u'12'), 1),
 ((u'8/30/2014', u'12'), 1),
 ((u'8/30/2014', u'11'), 1),
 ((u'8/30/2014', u'11'), 1),
 ((u'8/30/2014', u'11'), 1),
 ((u'8/30/2014', u'11'), 1),
 ((u'8/30/2014', u'10'), 1),
 ((u'8/30/2014', u'9'), 1),
 ((u'8/30/2014', u'6'), 1),
 ((u'8/29/2014', u'22'), 1),
 ((u'8/29/2014', u

### Exercise 4:  trips by day - hour (reducer)

Use the `reduceByKey` method to obtain the number of trips per (day, hour)

In [147]:
hour_hour_status = stattion_70_date_hour.reduceByKey(lambda x, y: x + y)

### Exercise 5: trips by hour (mapper)

Let's further group the trips by hour. We'll do this with a second Map Reduce job.
First we will discard the day and emit tuples of (hour, count). You can achieve this with a map

In [167]:
hour_count = hour_hour_status.map(lambda x: (x[0][1], x[1]))

### Exercise 6: trips by hour (reducer)
Then let's calculate the average number of trips by hour using the `combineByKey` method.

You can find a suggestion on how to do it [here](http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/).

In [168]:
hour_count_avg = hour_count.combineByKey(lambda value: (value, 1),
                                         lambda x, value: (x[0] + value, x[1] + 1),
                                         lambda x, y: (x[0] + y[0], x[1] + y[1]))
                        

In [180]:
hour_count_avg.map(lambda x: (x[0], x[1][0] / x[1][1])).collect()

[(u'20', 1),
 (u'21', 1),
 (u'22', 1),
 (u'23', 1),
 (u'1', 2),
 (u'0', 1),
 (u'3', 1),
 (u'2', 1),
 (u'5', 1),
 (u'4', 1),
 (u'7', 16),
 (u'6', 4),
 (u'9', 10),
 (u'8', 23),
 (u'11', 2),
 (u'10', 3),
 (u'13', 1),
 (u'12', 1),
 (u'15', 1),
 (u'14', 1),
 (u'17', 6),
 (u'16', 2),
 (u'19', 4),
 (u'18', 6)]

### Exercise 7: collect
We can finally collect our result and sort them. 