In [1]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession, Row
import pandas as pd


In [2]:
spark = SparkSession.builder.appName("rdd-app").config("spark.config.option", "value").getOrCreate()
scfg = SparkConf().setAppName('rdd-app')
sc = spark.sparkContext

In [3]:
import string

text_file = '/user/student/shakespeare/tragedy/hamlet.txt'
text = sc.textFile(text_file)

In [4]:
def strip_punc(s):
    return s.translate(str.maketrans('', '', string.punctuation)).split(' ')

def search_word_in_line(word):
    count = 1
    for line in text.collect():
        if word in strip_punc(line):
            print('{}. {}'.format(count, line))
        count += 1

In [5]:
flatmap = text.flatMap(lambda line: line.translate(str.maketrans('', '', string.punctuation)).split(' '))
map = flatmap.map(lambda word: (word, 1))
reduced = map.reduceByKey(lambda a, b: a + b)

In [6]:
counts = text.flatMap(lambda line: line.translate(str.maketrans('', '', string.punctuation)).split(' '))\
             .map(lambda word: (word, 1))\
             .reduceByKey(lambda a, b: a + b)    

In [7]:
word = "purpose"
for count in reduced.collect():
    # kv = str(count).translate(str.maketrans('', '', string.punctuation)).split(' ')
    kv = strip_punc(str(count))
    if word == kv[0]:
        print('Found \'{}\' occurs \'{}\' times'.format(kv[0], kv[1])) 
        search_word_in_line(word)
        break

Found 'purpose' occurs '11' times
2599.     Why, any thing, but to the purpose. You were sent
2926.     Black as his purpose, did the night resemble
3216.     And drive his purpose on to these delights.
3540.     from the purpose of playing, whose end, both at the
3909.     The passion ending, doth the purpose lose.
4766.     Is but to whet thy almost blunted purpose.
6202.     And, for that purpose, I'll anoint my sword.
6227.     Our purpose may hold there.
6378.     purpose, confess thyself--
7328.     king hold his purpose, I will win for him an I can;
7376.     I am constant to my purpose; they follow the king's


## Manipulating airline performance data

In [8]:

from pyspark.sql.types import Row
from datetime import datetime


In [9]:
data_by_year = '/user/student/airline/2000.csv'
airline_performance = spark.read.option("header", "true").csv(data_by_year)

In [10]:
airline_performance.head()

Row(Year='2000', Month='1', DayofMonth='28', DayOfWeek='5', DepTime='1647', CRSDepTime='1647', ArrTime='1906', CRSArrTime='1859', UniqueCarrier='HP', FlightNum='154', TailNum='N808AW', ActualElapsedTime='259', CRSElapsedTime='252', AirTime='233', ArrDelay='7', DepDelay='0', Origin='ATL', Dest='PHX', Distance='1587', TaxiIn='15', TaxiOut='11', Cancelled='0', CancellationCode='NA', Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA')

In [11]:
airline_performance

DataFrame[Year: string, Month: string, DayofMonth: string, DayOfWeek: string, DepTime: string, CRSDepTime: string, ArrTime: string, CRSArrTime: string, UniqueCarrier: string, FlightNum: string, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: string, TaxiIn: string, TaxiOut: string, Cancelled: string, CancellationCode: string, Diverted: string, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string]

In [12]:
from pyspark.sql.types import IntegerType
airline_performance = airline_performance.withColumn("ArrDelay", airline_performance["ArrDelay"].cast(IntegerType()))
airline_performance = airline_performance.withColumn("DepDelay", airline_performance["DepDelay"].cast(IntegerType()))

In [13]:
airline_performance.describe(['ArrDelay']).show()
airline_performance.describe(['DepDelay']).show()

+-------+------------------+
|summary|          ArrDelay|
+-------+------------------+
|  count|           5481303|
|   mean|10.472886100257549|
| stddev|35.999971215504885|
|    min|             -1298|
|    max|              1441|
+-------+------------------+

+-------+------------------+
|summary|          DepDelay|
+-------+------------------+
|  count|           5495557|
|   mean|11.280681830795313|
| stddev|33.551670355502424|
|    min|              -990|
|    max|              1435|
+-------+------------------+



In [14]:
import pyspark.sql.functions as F
airline_performance.select(airline_performance.Dest,F.when(airline_performance.ArrDelay > 0, 1).otherwise(0)).show()

+----+------------------------------------------+
|Dest|CASE WHEN (ArrDelay > 0) THEN 1 ELSE 0 END|
+----+------------------------------------------+
| PHX|                                         1|
| PHX|                                         1|
| PHX|                                         0|
| PHX|                                         0|
| PHX|                                         0|
| PHX|                                         1|
| PHX|                                         1|
| PHX|                                         1|
| PHX|                                         0|
| PHX|                                         1|
| PHX|                                         1|
| PHX|                                         0|
| PHX|                                         0|
| PHX|                                         0|
| PHX|                                         1|
| PHX|                                         0|
| PHX|                                         0|


In [15]:
airline_performance = airline_performance.withColumn('ArrDelayCount',F.when(airline_performance.ArrDelay > 0, 1).otherwise(0))
airline_performance = airline_performance.withColumn('DepDelayCount',F.when(airline_performance.DepDelay > 0, 1).otherwise(0))
airline_performance.show(2)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+-------------+-------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|ArrDelayCount|DepDelayCount|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+-------------+-------------+
|2000|    

In [16]:
#Most Depature Delay Airport 2000
airline_performance\
.filter(F.col('DepDelayCount') == 1)\
.groupBy('Year','Origin')\
.count()\
.orderBy('count',ascending=False)\
.show(1)

+----+------+------+
|Year|Origin| count|
+----+------+------+
|2000|   ORD|150619|
+----+------+------+
only showing top 1 row



In [17]:
#Least Depature Delay Airport 2000
airline_performance\
.filter(F.col('DepDelayCount') == 1)\
.groupBy('Year','Origin')\
.count()\
.orderBy('count',ascending=True)\
.show(1)

+----+------+-----+
|Year|Origin|count|
+----+------+-----+
|2000|   BQN|    2|
+----+------+-----+
only showing top 1 row



In [18]:
#Most Arrival Delay Airport 2000
airline_performance\
.filter(F.col('ArrDelayCount') == 1)\
.groupBy('Year','Dest')\
.count()\
.orderBy('count',ascending=False)\
.show(1)

+----+----+------+
|Year|Dest| count|
+----+----+------+
|2000| ORD|154124|
+----+----+------+
only showing top 1 row



In [19]:
#least Arrival Delay Airport 2000
airline_performance\
.filter(F.col('ArrDelayCount') == 1)\
.groupBy('Year','Dest')\
.count()\
.orderBy('count',ascending=True)\
.show(1)

+----+----+-----+
|Year|Dest|count|
+----+----+-----+
|2000| LFT|    1|
+----+----+-----+
only showing top 1 row



In [20]:
#Most Arrival Delay Flight 2000
airline_performance\
.filter(F.col('ArrDelayCount') == 1)\
.groupBy('Year','UniqueCarrier','FlightNum')\
.count()\
.orderBy('count',ascending=False)\
.show(1)

+----+-------------+---------+-----+
|Year|UniqueCarrier|FlightNum|count|
+----+-------------+---------+-----+
|2000|           WN|      504| 1195|
+----+-------------+---------+-----+
only showing top 1 row



In [21]:
#Least Arrival Delay Flight 2000
airline_performance\
.filter(F.col('ArrDelayCount') == 1)\
.groupBy('Year','UniqueCarrier','FlightNum')\
.count()\
.orderBy('count',ascending=True)\
.show(1)

+----+-------------+---------+-----+
|Year|UniqueCarrier|FlightNum|count|
+----+-------------+---------+-----+
|2000|           CO|      732|    1|
+----+-------------+---------+-----+
only showing top 1 row



In [22]:
#Most Departure Delay Flight 2000
airline_performance\
.filter(F.col('DepDelayCount') == 1)\
.groupBy('Year','UniqueCarrier','FlightNum')\
.count()\
.orderBy('count',ascending=False)\
.show(1)

+----+-------------+---------+-----+
|Year|UniqueCarrier|FlightNum|count|
+----+-------------+---------+-----+
|2000|           WN|      504| 1191|
+----+-------------+---------+-----+
only showing top 1 row



In [23]:
#Least Departure Delay Flight 2000
airline_performance\
.filter(F.col('DepDelayCount') == 1)\
.groupBy('Year','UniqueCarrier','FlightNum')\
.count()\
.orderBy('count',ascending=True)\
.show(1)

+----+-------------+---------+-----+
|Year|UniqueCarrier|FlightNum|count|
+----+-------------+---------+-----+
|2000|           HP|     2385|    1|
+----+-------------+---------+-----+
only showing top 1 row



In [24]:
airline_performance\
.groupBy('Year','UniqueCarrier')\
.agg(F.mean('ArrDelay'))\
.show()

+----+-------------+------------------+
|Year|UniqueCarrier|     avg(ArrDelay)|
+----+-------------+------------------+
|2000|           WN|10.477490008880995|
|2000|           HP|14.938117949232083|
|2000|           NW| 5.949147881395566|
|2000|           CO| 6.849962457488627|
|2000|           AQ| 2.541563104114885|
|2000|           US| 9.861894755604572|
|2000|           AA| 9.726826100857926|
|2000|           DL| 8.279562675582072|
|2000|           UA|18.479591948454342|
|2000|           AS|12.821446179129007|
|2000|           TW| 9.143521902349235|
+----+-------------+------------------+



In [25]:
airline_performance\
.groupBy('Year','UniqueCarrier')\
.agg(F.mean('DepDelay'))\
.show()

+----+-------------+------------------+
|Year|UniqueCarrier|     avg(DepDelay)|
+----+-------------+------------------+
|2000|           WN|12.707381516850198|
|2000|           HP| 14.44049719173445|
|2000|           NW| 7.725420104963274|
|2000|           CO| 8.527505573702493|
|2000|           AQ|1.5896161281413974|
|2000|           US| 9.140569466629843|
|2000|           AA|11.038083717379468|
|2000|           DL|     9.45530543322|
|2000|           UA| 17.60769115237575|
|2000|           AS|12.951644905055739|
|2000|           TW| 9.593641289613062|
+----+-------------+------------------+



----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 43956)
Traceback (most recent call last):
  File "/usr/lib/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/home/student/dev/spark/python/pyspark/accumulators.py", line 268, in handle
    poll(accum_updates)
  File "/home/student/dev/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/home/student/dev/spark/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/home/student/dev/spark/python/