# RDD Review
We are reviewing some of the basic techniques for manipulating a RDD dataset.

In [34]:
## Word Count Revisit
### Read in a text file

In [35]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession, Row


In [36]:
spark = SparkSession.builder.appName("rdd-app").config("spark.config.option", "value").getOrCreate()
scfg = SparkConf().setAppName('rdd-app')
sc = spark.sparkContext

In [37]:
import string

text_file = '/user/student/shakespeare/tragedy/hamlet.txt'
text = sc.textFile(text_file)

In [38]:
text.collect()

['',
 'The Tragedy of Hamlet, Prince of Denmark',
 'Shakespeare homepage | Hamlet | Entire play',
 'ACT I',
 'SCENE I. Elsinore. A platform before the castle.',
 '',
 '    FRANCISCO at his post. Enter to him BERNARDO ',
 '',
 'BERNARDO',
 '',
 "    Who's there?",
 '',
 'FRANCISCO',
 '',
 '    Nay, answer me: stand, and unfold yourself.',
 '',
 'BERNARDO',
 '',
 '    Long live the king!',
 '',
 'FRANCISCO',
 '',
 '    Bernardo?',
 '',
 'BERNARDO',
 '',
 '    He.',
 '',
 'FRANCISCO',
 '',
 '    You come most carefully upon your hour.',
 '',
 'BERNARDO',
 '',
 "    'Tis now struck twelve; get thee to bed, Francisco.",
 '',
 'FRANCISCO',
 '',
 "    For this relief much thanks: 'tis bitter cold,",
 '    And I am sick at heart.',
 '',
 'BERNARDO',
 '',
 '    Have you had quiet guard?',
 '',
 'FRANCISCO',
 '',
 '    Not a mouse stirring.',
 '',
 'BERNARDO',
 '',
 '    Well, good night.',
 '    If you do meet Horatio and Marcellus,',
 '    The rivals of my watch, bid them make haste.',
 '',
 '

### Supporting functions

In [39]:
def strip_punc(s):
    return s.translate(str.maketrans('', '', string.punctuation)).split(' ')

def search_word_in_line(word):
    count = 1
    for line in text.collect():
        if word in strip_punc(line):
            print('{}. {}'.format(count, line))
        count += 1

### Split a line into tokens separated by space (' ') after removing punctuations

In [40]:
flatmap = text.flatMap(lambda line: line.translate(str.maketrans('', '', string.punctuation)).split(' '))
map = flatmap.map(lambda word: (word, 1))
reduced = map.reduceByKey(lambda a, b: a + b)

In [41]:
reduced.collect()

[('', 19400),
 ('The', 152),
 ('Tragedy', 1),
 ('of', 628),
 ('Prince', 1),
 ('Denmark', 21),
 ('Shakespeare', 1),
 ('Entire', 1),
 ('ACT', 5),
 ('SCENE', 20),
 ('Elsinore', 6),
 ('platform', 5),
 ('before', 17),
 ('FRANCISCO', 9),
 ('at', 75),
 ('his', 279),
 ('BERNARDO', 25),
 ('there', 62),
 ('stand', 13),
 ('unfold', 3),
 ('Long', 2),
 ('live', 15),
 ('king', 66),
 ('carefully', 1),
 ('upon', 40),
 ('now', 74),
 ('twelve', 5),
 ('this', 247),
 ('thanks', 7),
 ('tis', 47),
 ('cold', 6),
 ('And', 262),
 ('am', 51),
 ('heart', 29),
 ('quiet', 5),
 ('guard', 3),
 ('Not', 15),
 ('mouse', 2),
 ('stirring', 1),
 ('good', 82),
 ('night', 36),
 ('do', 127),
 ('meet', 7),
 ('Horatio', 30),
 ('rivals', 1),
 ('watch', 15),
 ('bid', 5),
 ('them', 74),
 ('make', 47),
 ('haste', 11),
 ('think', 46),
 ('hear', 32),
 ('Stand', 2),
 ('HORATIO', 127),
 ('ground', 10),
 ('Dane', 6),
 ('Give', 20),
 ('O', 111),
 ('farewell', 8),
 ('soldier', 2),
 ('relieved', 2),
 ('Say', 5),
 ('What', 90),
 ('is', 313

### Making it into a single statement

In [42]:
counts = text.flatMap(lambda line: line.translate(str.maketrans('', '', string.punctuation)).split(' '))\
             .map(lambda word: (word, 1))\
             .reduceByKey(lambda a, b: a + b)    

### Run the search

In [43]:
word = "purpose"
for count in reduced.collect():
    # kv = str(count).translate(str.maketrans('', '', string.punctuation)).split(' ')
    kv = strip_punc(str(count))
    if word == kv[0]:
        print('Found \'{}\' occurs \'{}\' times'.format(kv[0], kv[1])) 
        search_word_in_line(word)
        break

Found 'purpose' occurs '11' times
2599.     Why, any thing, but to the purpose. You were sent
2926.     Black as his purpose, did the night resemble
3216.     And drive his purpose on to these delights.
3540.     from the purpose of playing, whose end, both at the
3909.     The passion ending, doth the purpose lose.
4766.     Is but to whet thy almost blunted purpose.
6202.     And, for that purpose, I'll anoint my sword.
6227.     Our purpose may hold there.
6378.     purpose, confess thyself--
7328.     king hold his purpose, I will win for him an I can;
7376.     I am constant to my purpose; they follow the king's


## Manipulating airline performance data

### Creating an RDD with one row.

In [44]:
airport = sc.parallelize([Row(iata="00M",airport="Thigpen ",city="Bay Springs",\
                              state="MS",country="USA",lat=31.95376472,long=-89.23450472)])
print(airport.count())
print(airport.take(3))
print(airport.collect())

1
[Row(airport='Thigpen ', city='Bay Springs', country='USA', iata='00M', lat=31.95376472, long=-89.23450472, state='MS')]
[Row(airport='Thigpen ', city='Bay Springs', country='USA', iata='00M', lat=31.95376472, long=-89.23450472, state='MS')]


### Converting an RDD to a Dataframe (DF)

In [45]:

from pyspark.sql.types import Row
from datetime import datetime

airport_df = airport.toDF()
airport_df.show()
airport_df

+--------+-----------+-------+----+-----------+------------+-----+
| airport|       city|country|iata|        lat|        long|state|
+--------+-----------+-------+----+-----------+------------+-----+
|Thigpen |Bay Springs|    USA| 00M|31.95376472|-89.23450472|   MS|
+--------+-----------+-------+----+-----------+------------+-----+



DataFrame[airport: string, city: string, country: string, iata: string, lat: double, long: double, state: string]

### More complex dataset

In [46]:

complex = sc.parallelize([Row(col_float=3.1415,
                              col_string='da pi',
                              col_boolean=True,
                              col_integer=201,
                              col_list=[1,2,3,4])])
complex.collect()

[Row(col_boolean=True, col_float=3.1415, col_integer=201, col_list=[1, 2, 3, 4], col_string='da pi')]

### Converting to DF

In [47]:
complex_df = complex.toDF()
complex_df.show()

+-----------+---------+-----------+------------+----------+
|col_boolean|col_float|col_integer|    col_list|col_string|
+-----------+---------+-----------+------------+----------+
|       true|   3.1415|        201|[1, 2, 3, 4]|     da pi|
+-----------+---------+-----------+------------+----------+



### More complex data type

In [50]:
real_complex = sc.parallelize([
    Row(col_list=[1,2,3], col_dict = {"pi": 3.1415}, col_row = Row(number=3, fraction=1415), col_time=datetime(2019,7,22,5,51,0)),
    Row(col_list=[3,4,5], col_dict = {"sqrt2": 1.4142}, col_row = Row(number=1, fraction=4142), col_time=datetime(2019,7,22,5,54,0)),
    Row(col_list=[6,7,9,10], col_dict = {"sqrt3": 1.73205}, col_row = Row(number=1, fraction=73205), col_time=datetime(2019,7,22,5,55,0))
])
real_complex.collect() # A little bit hard to see

[Row(col_dict={'pi': 3.1415}, col_list=[1, 2, 3], col_row=Row(fraction=1415, number=3), col_time=datetime.datetime(2019, 7, 22, 5, 51)),
 Row(col_dict={'sqrt2': 1.4142}, col_list=[3, 4, 5], col_row=Row(fraction=4142, number=1), col_time=datetime.datetime(2019, 7, 22, 5, 54)),
 Row(col_dict={'sqrt3': 1.73205}, col_list=[6, 7, 9, 10], col_row=Row(fraction=73205, number=1), col_time=datetime.datetime(2019, 7, 22, 5, 55))]

In [51]:
real_complex_df = real_complex.toDF()
real_complex_df.show();

+------------------+-------------+----------+-------------------+
|          col_dict|     col_list|   col_row|           col_time|
+------------------+-------------+----------+-------------------+
|    [pi -> 3.1415]|    [1, 2, 3]| [1415, 3]|2019-07-22 05:51:00|
| [sqrt2 -> 1.4142]|    [3, 4, 5]| [4142, 1]|2019-07-22 05:54:00|
|[sqrt3 -> 1.73205]|[6, 7, 9, 10]|[73205, 1]|2019-07-22 05:55:00|
+------------------+-------------+----------+-------------------+



**It is much easier to view the data structure now**

## Airline Performance data
Loading data from HDFS

In [52]:
data_by_year = '/user/student/airline/1987.csv'
airline_performance = spark.read.option("header", "true").csv(data_by_year)

In [53]:
airline_performance.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|1987|   10|        14|        3|    741|       730|    912|       849|           PS|     1451

### Loading airport table

In [54]:

airports_file = '/user/student/airline/airports.csv'
airports = spark.read.option("header", "true").csv(airports_file)
airports.show()

+----+--------------------+------------------+-----+-------+-----------+------------+
|iata|             airport|              city|state|country|        lat|        long|
+----+--------------------+------------------+-----+-------+-----------+------------+
| 00M|            Thigpen |       Bay Springs|   MS|    USA|31.95376472|-89.23450472|
| 00R|Livingston Municipal|        Livingston|   TX|    USA|30.68586111|-95.01792778|
| 00V|         Meadow Lake|  Colorado Springs|   CO|    USA|38.94574889|-104.5698933|
| 01G|        Perry-Warsaw|             Perry|   NY|    USA|42.74134667|-78.05208056|
| 01J|    Hilliard Airpark|          Hilliard|   FL|    USA| 30.6880125|-81.90594389|
| 01M|   Tishomingo County|           Belmont|   MS|    USA|34.49166667|-88.20111111|
| 02A|         Gragg-Wade |           Clanton|   AL|    USA|32.85048667|-86.61145333|
| 02C|             Capitol|        Brookfield|   WI|    USA|   43.08751|-88.17786917|
| 02G|   Columbiana County|    East Liverpool|   OH|  

### Airports is a DF data type

In [55]:
airports

DataFrame[iata: string, airport: string, city: string, state: string, country: string, lat: string, long: string]

In [56]:
airports.count()

3376

In [57]:
airports.collect()

[Row(iata='00M', airport='Thigpen ', city='Bay Springs', state='MS', country='USA', lat='31.95376472', long='-89.23450472'),
 Row(iata='00R', airport='Livingston Municipal', city='Livingston', state='TX', country='USA', lat='30.68586111', long='-95.01792778'),
 Row(iata='00V', airport='Meadow Lake', city='Colorado Springs', state='CO', country='USA', lat='38.94574889', long='-104.5698933'),
 Row(iata='01G', airport='Perry-Warsaw', city='Perry', state='NY', country='USA', lat='42.74134667', long='-78.05208056'),
 Row(iata='01J', airport='Hilliard Airpark', city='Hilliard', state='FL', country='USA', lat='30.6880125', long='-81.90594389'),
 Row(iata='01M', airport='Tishomingo County', city='Belmont', state='MS', country='USA', lat='34.49166667', long='-88.20111111'),
 Row(iata='02A', airport='Gragg-Wade ', city='Clanton', state='AL', country='USA', lat='32.85048667', long='-86.61145333'),
 Row(iata='02C', airport='Capitol', city='Brookfield', state='WI', country='USA', lat='43.08751', lo

In [58]:
airports.take(5)

[Row(iata='00M', airport='Thigpen ', city='Bay Springs', state='MS', country='USA', lat='31.95376472', long='-89.23450472'),
 Row(iata='00R', airport='Livingston Municipal', city='Livingston', state='TX', country='USA', lat='30.68586111', long='-95.01792778'),
 Row(iata='00V', airport='Meadow Lake', city='Colorado Springs', state='CO', country='USA', lat='38.94574889', long='-104.5698933'),
 Row(iata='01G', airport='Perry-Warsaw', city='Perry', state='NY', country='USA', lat='42.74134667', long='-78.05208056'),
 Row(iata='01J', airport='Hilliard Airpark', city='Hilliard', state='FL', country='USA', lat='30.6880125', long='-81.90594389')]

In [59]:
airports.first()

Row(iata='00M', airport='Thigpen ', city='Bay Springs', state='MS', country='USA', lat='31.95376472', long='-89.23450472')

In [60]:
airports.head(5)

[Row(iata='00M', airport='Thigpen ', city='Bay Springs', state='MS', country='USA', lat='31.95376472', long='-89.23450472'),
 Row(iata='00R', airport='Livingston Municipal', city='Livingston', state='TX', country='USA', lat='30.68586111', long='-95.01792778'),
 Row(iata='00V', airport='Meadow Lake', city='Colorado Springs', state='CO', country='USA', lat='38.94574889', long='-104.5698933'),
 Row(iata='01G', airport='Perry-Warsaw', city='Perry', state='NY', country='USA', lat='42.74134667', long='-78.05208056'),
 Row(iata='01J', airport='Hilliard Airpark', city='Hilliard', state='FL', country='USA', lat='30.6880125', long='-81.90594389')]

In [61]:
# Accessing rows
airports.collect()[2]

Row(iata='00V', airport='Meadow Lake', city='Colorado Springs', state='CO', country='USA', lat='38.94574889', long='-104.5698933')

In [62]:
airports.collect()[2]['state'] # use column name.

'CO'

In [63]:
airports.collect()[2][3] # use column index

'CO'

In [64]:
airport_rdd = airports.rdd.map(lambda x: (x.iata, x.airport, x.city, x.state, x.country, x.lat, x.long))
airport_rdd.collect()

[('00M',
  'Thigpen ',
  'Bay Springs',
  'MS',
  'USA',
  '31.95376472',
  '-89.23450472'),
 ('00R',
  'Livingston Municipal',
  'Livingston',
  'TX',
  'USA',
  '30.68586111',
  '-95.01792778'),
 ('00V',
  'Meadow Lake',
  'Colorado Springs',
  'CO',
  'USA',
  '38.94574889',
  '-104.5698933'),
 ('01G', 'Perry-Warsaw', 'Perry', 'NY', 'USA', '42.74134667', '-78.05208056'),
 ('01J',
  'Hilliard Airpark',
  'Hilliard',
  'FL',
  'USA',
  '30.6880125',
  '-81.90594389'),
 ('01M',
  'Tishomingo County',
  'Belmont',
  'MS',
  'USA',
  '34.49166667',
  '-88.20111111'),
 ('02A', 'Gragg-Wade ', 'Clanton', 'AL', 'USA', '32.85048667', '-86.61145333'),
 ('02C', 'Capitol', 'Brookfield', 'WI', 'USA', '43.08751', '-88.17786917'),
 ('02G',
  'Columbiana County',
  'East Liverpool',
  'OH',
  'USA',
  '40.67331278',
  '-80.64140639'),
 ('03D',
  'Memphis Memorial',
  'Memphis',
  'MO',
  'USA',
  '40.44725889',
  '-92.22696056'),
 ('04M',
  'Calhoun County',
  'Pittsboro',
  'MS',
  'USA',
  '33.930

In [65]:
# More selective
airport_rdd = airports.rdd.map(lambda x: (x.iata, x.airport))
airport_rdd.collect()

[('00M', 'Thigpen '),
 ('00R', 'Livingston Municipal'),
 ('00V', 'Meadow Lake'),
 ('01G', 'Perry-Warsaw'),
 ('01J', 'Hilliard Airpark'),
 ('01M', 'Tishomingo County'),
 ('02A', 'Gragg-Wade '),
 ('02C', 'Capitol'),
 ('02G', 'Columbiana County'),
 ('03D', 'Memphis Memorial'),
 ('04M', 'Calhoun County'),
 ('04Y', 'Hawley Municipal'),
 ('05C', 'Griffith-Merrillville '),
 ('05F', 'Gatesville - City/County'),
 ('05U', 'Eureka'),
 ('06A', 'Moton  Municipal'),
 ('06C', 'Schaumburg'),
 ('06D', 'Rolla Municipal'),
 ('06M', 'Eupora Municipal'),
 ('06N', 'Randall '),
 ('06U', 'Jackpot/Hayden '),
 ('07C', 'Dekalb County'),
 ('07F', 'Gladewater Municipal'),
 ('07G', 'Fitch H Beach'),
 ('07K', 'Central City Municipal'),
 ('08A', 'Wetumpka Municipal'),
 ('08D', 'Stanley Municipal'),
 ('08K', 'Harvard State'),
 ('08M', 'Carthage-Leake County'),
 ('09A', 'Butler-Choctaw County'),
 ('09J', 'Jekyll Island'),
 ('09K', 'Sargent Municipal'),
 ('09M', 'Charleston Municipal'),
 ('09W', 'South Capitol Street'),

In [66]:
# default with col names.
airport_rdd = airports.rdd
airport_rdd.collect()

[Row(iata='00M', airport='Thigpen ', city='Bay Springs', state='MS', country='USA', lat='31.95376472', long='-89.23450472'),
 Row(iata='00R', airport='Livingston Municipal', city='Livingston', state='TX', country='USA', lat='30.68586111', long='-95.01792778'),
 Row(iata='00V', airport='Meadow Lake', city='Colorado Springs', state='CO', country='USA', lat='38.94574889', long='-104.5698933'),
 Row(iata='01G', airport='Perry-Warsaw', city='Perry', state='NY', country='USA', lat='42.74134667', long='-78.05208056'),
 Row(iata='01J', airport='Hilliard Airpark', city='Hilliard', state='FL', country='USA', lat='30.6880125', long='-81.90594389'),
 Row(iata='01M', airport='Tishomingo County', city='Belmont', state='MS', country='USA', lat='34.49166667', long='-88.20111111'),
 Row(iata='02A', airport='Gragg-Wade ', city='Clanton', state='AL', country='USA', lat='32.85048667', long='-86.61145333'),
 Row(iata='02C', airport='Capitol', city='Brookfield', state='WI', country='USA', lat='43.08751', lo

In [67]:
airports.describe(['lat']).show()

+-------+------------------+
|summary|               lat|
+-------+------------------+
|  count|              3376|
|   mean|40.036523625524204|
| stddev| 8.329558669019436|
|    min|          13.48345|
|    max|            9.5167|
+-------+------------------+

