
### Read in a text file 'macbeth'

In [1]:
import string

text_file = '/user/student/shakespeare/tragedy/macbeth.txt'
text = sc.textFile(text_file)

### Supporting functions

In [2]:
def strip_punc(s):
    return s.translate(str.maketrans('', '', string.punctuation)).split(' ')

def search_word_in_line(word):
    count = 1
    for line in text.collect():
        if word in strip_punc(line):
            print('{}. {}'.format(count, line))
        count += 1

### Split a line into tokens separated by space (' ') after removing punctuations

In [3]:
flatmap = text.flatMap(lambda line: line.translate(str.maketrans('', '', string.punctuation)).split(' '))
map = flatmap.map(lambda word: (word, 1))
reduced = map.reduceByKey(lambda a, b: a + b)

### Making it into a single statement

In [4]:
counts = text.flatMap(lambda line: line.translate(str.maketrans('', '', string.punctuation)).split(' '))\
             .map(lambda word: (word, 1))\
             .reduceByKey(lambda a, b: a + b)    

### Run the search

In [6]:
word = "grace"
for count in reduced.collect():
    # kv = str(count).translate(str.maketrans('', '', string.punctuation)).split(' ')
    kv = strip_punc(str(count))
    if word == kv[0]:
        print('Found \'{}\' occurs \'{}\' times'.format(kv[0], kv[1])) 
        search_word_in_line(word)
        break

Found 'grace' occurs '9' times
314.     You greet with present grace and great prediction
1553.     All is but toys: renown and grace is dead;
2421.     To grace us with your royal company.
2692.     He hopes 'bove wisdom, grace and fear:
2746.     Of the most pious Edward with such grace
3436.     Though all things foul would wear the brows of grace,
3437.     Yet grace must still look so.
3657.     That speak him full of grace.
4727.     That calls upon us, by the grace of Grace,


## Manipulating airline performance data

### Creating an RDD with one row.

In [29]:
from pyspark.sql.types import Row
from datetime import datetime
airport = sc.parallelize([Row(tailnum='N050AA', type='A', manufacturer='B', issue_date='20190101', model='A', status='O', aircraft_type="B", engine_type="C", year=2019)])
airport.count()
airport.take(3)
airport.collect()

[Row(aircraft_type='B', engine_type='C', issue_date='20190101', manufacturer='B', model='A', status='O', tailnum='N050AA', type='A', year=2019)]

### Converting an RDD to a Dataframe (DF)

In [30]:
airport_df = airport.toDF()
airport_df.show()

+-------------+-----------+----------+------------+-----+------+-------+----+----+
|aircraft_type|engine_type|issue_date|manufacturer|model|status|tailnum|type|year|
+-------------+-----------+----------+------------+-----+------+-------+----+----+
|            B|          C|  20190101|           B|    A|     O| N050AA|   A|2019|
+-------------+-----------+----------+------------+-----+------+-------+----+----+



### Loading plane-data table

In [7]:

airports_file = '/user/student/airline/plane-data.csv'
airports = spark.read.option("header", "true").csv(airports_file)
airports.show()

+-------+----+------------+----------+-----+------+-------------+-----------+----+
|tailnum|type|manufacturer|issue_date|model|status|aircraft_type|engine_type|year|
+-------+----+------------+----------+-----+------+-------------+-----------+----+
| N050AA|null|        null|      null| null|  null|         null|       null|null|
| N051AA|null|        null|      null| null|  null|         null|       null|null|
| N052AA|null|        null|      null| null|  null|         null|       null|null|
| N054AA|null|        null|      null| null|  null|         null|       null|null|
| N055AA|null|        null|      null| null|  null|         null|       null|null|
| N056AA|null|        null|      null| null|  null|         null|       null|null|
| N057AA|null|        null|      null| null|  null|         null|       null|null|
| N058AA|null|        null|      null| null|  null|         null|       null|null|
| N059AA|null|        null|      null| null|  null|         null|       null|null|
| N0

### Plane-data is a DF data type

In [8]:
airports

DataFrame[tailnum: string, type: string, manufacturer: string, issue_date: string, model: string, status: string, aircraft_type: string, engine_type: string, year: string]

In [9]:
airports.count()

5029

In [18]:
airports.collect()

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N056AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N057AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=Non

In [19]:
airports.take(5)

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)]

In [11]:
airports.first()

Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)

In [20]:
airports.head(5)

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)]

In [13]:
# Accessing rows
airports.collect()[2]

Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)

In [15]:
airports.collect()[0]['tailnum'] # use column name.

'N050AA'

In [32]:
airports.collect()[0][0] # use column index

'N050AA'

In [33]:
airport_rdd = airports.rdd.map(lambda x: (x.tailnum, x.type, x.manufacturer, x.issue_date, x.model, x.status, x.aircraft_type, x.engine_type, x.year))
airport_rdd.collect()

[('N050AA', None, None, None, None, None, None, None, None),
 ('N051AA', None, None, None, None, None, None, None, None),
 ('N052AA', None, None, None, None, None, None, None, None),
 ('N054AA', None, None, None, None, None, None, None, None),
 ('N055AA', None, None, None, None, None, None, None, None),
 ('N056AA', None, None, None, None, None, None, None, None),
 ('N057AA', None, None, None, None, None, None, None, None),
 ('N058AA', None, None, None, None, None, None, None, None),
 ('N059AA', None, None, None, None, None, None, None, None),
 ('N060AA', None, None, None, None, None, None, None, None),
 ('N061AA', None, None, None, None, None, None, None, None),
 ('N062AA', None, None, None, None, None, None, None, None),
 ('N063AA', None, None, None, None, None, None, None, None),
 ('N064AA', None, None, None, None, None, None, None, None),
 ('N065AA', None, None, None, None, None, None, None, None),
 ('N066AA', None, None, None, None, None, None, None, None),
 ('N067AA', None, None, 

In [34]:
# More selective
airport_rdd = airports.rdd.map(lambda x: (x.tailnum, x.type))
airport_rdd.collect()

[('N050AA', None),
 ('N051AA', None),
 ('N052AA', None),
 ('N054AA', None),
 ('N055AA', None),
 ('N056AA', None),
 ('N057AA', None),
 ('N058AA', None),
 ('N059AA', None),
 ('N060AA', None),
 ('N061AA', None),
 ('N062AA', None),
 ('N063AA', None),
 ('N064AA', None),
 ('N065AA', None),
 ('N066AA', None),
 ('N067AA', None),
 ('N068AA', None),
 ('N069AA', None),
 ('N070AA', None),
 ('N071AA', None),
 ('N072AA', None),
 ('N073AA', None),
 ('N074AA', None),
 ('N075AA', None),
 ('N076AA', None),
 ('N077AA', None),
 ('N078AA', None),
 ('N079AA', None),
 ('N080AA', None),
 ('N081AA', None),
 ('N082AA', None),
 ('N083AA', None),
 ('N084AA', None),
 ('N10156', 'Corporation'),
 ('N102UW', 'Corporation'),
 ('N10323', 'Corporation'),
 ('N103US', 'Corporation'),
 ('N104UA', 'Corporation'),
 ('N104UW', 'Corporation'),
 ('N10575', 'Corporation'),
 ('N105UA', 'Corporation'),
 ('N105UW', 'Corporation'),
 ('N106US', 'Corporation'),
 ('N107UA', 'Corporation'),
 ('N107US', 'Corporation'),
 ('N108UW', 'Corpo

In [35]:
# default with col names.
airport_rdd = airports.rdd
airport_rdd.collect()

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N056AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N057AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=Non

In [36]:
airports.describe(['model']).show()

+-------+------------------+
|summary|             model|
+-------+------------------+
|  count|              4480|
|   mean|            470.25|
| stddev|483.28761278007806|
|    min|              1121|
|    max| VANS AIRCRAFT RV6|
+-------+------------------+

