# Intro to Rdd transformations

In [7]:
rdd_airports = (
                 sc. textFile("/Users/RahulReddy/Desktop/pyspark/data/airport-codes-na.txt")
                .map(lambda element: element.split("\t"))
    )
rdd_airports.count()

527

In [8]:
rdd_depart =  ( 
                sc.textFile("/Users/RahulReddy/Desktop/pyspark/data/departuredelays.csv")
                .map(lambda element: element.split(","))
                )
rdd_depart.count()

1391579

In [9]:
rdd_depart.take(3)

[[u'date', u'delay', u'distance', u'origin', u'destination'],
 [u'01011245', u'6', u'602', u'ABE', u'ATL'],
 [u'01020600', u'-8', u'369', u'ABE', u'DTW']]

In [12]:
rdd_airports.map(lambda line: (line[0],line[1])).take(5)  #select columns using map and column indexes

[(u'City', u'State'),
 (u'Abbotsford', u'BC'),
 (u'Aberdeen', u'SD'),
 (u'Abilene', u'TX'),
 (u'Akron', u'OH')]

In [15]:
# Filter Rows that have state == "WA"
rdd_airports.map(lambda line: (line[0],line[1])).filter(lambda c : c[1] == "WA").take(5)

[(u'Bellingham', u'WA'),
 (u'Moses Lake', u'WA'),
 (u'Pasco', u'WA'),
 (u'Pullman', u'WA'),
 (u'Seattle', u'WA')]

In [18]:
# Filter only second column == "WA", 
# select first two columns within the RDD,
# and flatten out all values
rdd_airports.map(lambda c : (c[0],c[1])).filter(lambda c : c[1] == "WA") .flatMap(lambda x: x).take(4)

[u'Bellingham', u'WA', u'Moses Lake', u'WA']

In [24]:
# Provide the distinct elements for the 
# third column of airports representing
# countries
rdd_airports.map(lambda c : c[2]).distinct().take(5)

[u'Canada', u'USA', u'Country']

In [25]:
rdd_airports.take(5)

[[u'City', u'State', u'Country', u'IATA'],
 [u'Abbotsford', u'BC', u'Canada', u'YXX'],
 [u'Aberdeen', u'SD', u'USA', u'ABR'],
 [u'Abilene', u'TX', u'USA', u'ABI'],
 [u'Akron', u'OH', u'USA', u'CAK']]

In [26]:
rdd_depart.take(5)

[[u'date', u'delay', u'distance', u'origin', u'destination'],
 [u'01011245', u'6', u'602', u'ABE', u'ATL'],
 [u'01020600', u'-8', u'369', u'ABE', u'DTW'],
 [u'01021245', u'-2', u'602', u'ABE', u'ATL'],
 [u'01020605', u'-4', u'602', u'ABE', u'ATL']]

In [28]:
#flights data selecting origin and data 

flt = rdd_depart.map(lambda c : (c[3],c[0]))
flt.take(5)

[(u'origin', u'date'),
 (u'ABE', u'01011245'),
 (u'ABE', u'01020600'),
 (u'ABE', u'01021245'),
 (u'ABE', u'01020605')]

In [30]:
#airports selecting columns for location and state
airport = rdd_airports.map(lambda c : (c[3],c[1]))
airport.take(5)

[(u'IATA', u'State'),
 (u'YXX', u'BC'),
 (u'ABR', u'SD'),
 (u'ABI', u'TX'),
 (u'CAK', u'OH')]

In [32]:
# filter all the distinct rows for the state after the join condition
flt.join(airport).map(lambda c : c[0]).distinct().take(10)

[u'JFK',
 u'GUC',
 u'CIC',
 u'FSD',
 u'DBQ',
 u'MIA',
 u'AVP',
 u'LIH',
 u'MEM',
 u'EKO']

In [33]:
# View each row within RDD + the index 
# i.e. output is in form ([row], idx)
rdd_airports.map(lambda c : (c[0],c[1])).zipWithIndex().take(5)

[((u'City', u'State'), 0),
 ((u'Abbotsford', u'BC'), 1),
 ((u'Aberdeen', u'SD'), 2),
 ((u'Abilene', u'TX'), 3),
 ((u'Akron', u'OH'), 4)]

In [34]:
rdd_airports.map(lambda c : (c[0],c[1])).zipWithIndex().filter(lambda(row,idx): idx > 0).take(5)

[((u'Abbotsford', u'BC'), 1),
 ((u'Aberdeen', u'SD'), 2),
 ((u'Abilene', u'TX'), 3),
 ((u'Akron', u'OH'), 4),
 ((u'Alamosa', u'CO'), 5)]

In [36]:
(
    rdd_airports
.map(lambda c : (c[0],c[1]))
.zipWithIndex()
.filter(lambda(row,idx): idx > 0)
.take(5)
)

[((u'Abbotsford', u'BC'), 1),
 ((u'Aberdeen', u'SD'), 2),
 ((u'Abilene', u'TX'), 3),
 ((u'Akron', u'OH'), 4),
 ((u'Alamosa', u'CO'), 5)]

In [37]:
(
    rdd_airports
.map(lambda c : (c[0],c[1]))
.zipWithIndex()
.filter(lambda(row,idx): idx > 0)
.map(lambda(row,idx): row)
.take(5)
)

[(u'Abbotsford', u'BC'),
 (u'Aberdeen', u'SD'),
 (u'Abilene', u'TX'),
 (u'Akron', u'OH'),
 (u'Alamosa', u'CO')]

In [41]:
v = sc.parallelize([('a',1),('a',2),('a',3),('b',1),('b',2),('b',3)])

In [48]:
v.reduceByKey(lambda x,y: x+y).take(5)
 

[('a', 6), ('b', 6)]

In [85]:
 ( 
        rdd_depart.map(lambda c : (c[3],c[1])
                      )
        .zipWithIndex()
       .filter(lambda (row,idx): idx > 0 #and row[1] == u"delay"
              )
       .map(lambda(row,idx): row)
        #.map(la)
       .reduceByKey(lambda x,y : int(x)+int(y))
        .take(5)
)

[(u'JFK', 387929),
 (u'MIA', 169373),
 (u'LIH', -646),
 (u'LIT', 34489),
 (u'RDM', 3445)]

In [71]:
( rdd_depart.map(lambda c : ( c[1]) )
        .filter(lambda c : c == "delay")
    .distinct().collect()
 )

[u'delay']