In [117]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import SQLContext
from datetime import datetime

BIG_TAXI = 's3a://chictaxi/chictaxi.csv'
SMALL_TAXI = 's3a://chictaxi/small.csv'
WEATHER = 's3a://chictaxi/weather.csv'

# sc = SparkContext()
sql_context = SQLContext(sc) 

In [2]:
def get_data(sql_context, path=SMALL_TAXI):
    df = sql_context.read.csv(path, header='true', inferSchema='true')
    return (df, df.rdd)


In [3]:
taxi_df, taxi_rdd = get_data(sql_context)

In [4]:
# https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=rdd#pyspark.RDD.sample 
sampled_rdd = taxi_rdd.sample(False, 0.0001, 81)

In [8]:
def q1(rdd):
    """How many taxi records are there? 
    How many taxi records for each year of the dataset?
    """
    count = rdd.count() 
    yearly_counts = rdd.map(lambda x: (getattr(x, 'Trip Start Timestamp'
                                              ).split('/')[-1].split(' ')[0], 1)).reduceByKey(lambda a,b: a+b)

    return count, yearly_counts
    

In [9]:
# q1 answer
total_records, yearly_counts = q1(sampled_rdd)
# yearly_counts.collect()

In [10]:
def get(x, key, default=0):
    return getattr(x, key) or default

def q2(rdd, total_records):
    """"How many records in total would you classify as bad? 
        Consider a bad record to be one where the Trip Seconds are less than 60,
        but also if the average speed is over 100 mph, the distance is more 
        than 1000 miles or the fare is over $2000 (excluding tips, tolls, etc). 
        
        Once you have defined this, ensure that all further answers are based only on good data. 
        How many records are “good” by year
        
        
        N.b. for trips under 1 mile, the ave. speed = 0.0 (as miles = 0), therefore this is a decent approximation
        without guarenteeing total accuracy as it doesn't take into account the precise coordinates of the journey
        when calculating average speed.
        
        
        
    """
    good_trips = rdd.filter(lambda x: (get(x, 'Trip Seconds') > 60 )
                    & (get(x, 'Trip Miles') < 1000)
                    & (get(x, 'Fare') < 2000))
#                     & ((getattr(x, 'Trip Miles', 0) / (getattr(x, 'Trip Seconds', 0) / 60)) < 100)
                   
    return good_trips, total_records - good_trips.count()

    

In [11]:
# q2 answer
good_trips, num_bad = q2(sampled_rdd, total_records)

In [12]:
def get_2018_rides(rdd):
    return rdd.filter(lambda x: getattr(x, 'Trip Start Timestamp').split('/')[-1].split(' ')[0] == '2018')

In [74]:
rides_2018 = get_2018_rides(good_trips)
# rides_2018_df = rides_2018.toDF()

In [77]:
# tricky because of custom aggregation required by excluding tolls
# def q3(df):
#     df.groupBy('Taxi Id').agg({'Total Price':"avg"}).orderBy('column_name', ascending=False)


# MapReduce approach - not complete
def q3(rdd):
    """For each taxi, calculate the average revenue per day excluding tolls (i.e. Fare + Tips). 
    Identify the most successful taxi in 2018 in terms of total revenue (Fare + Tips).
    
    https://stackoverflow.com/questions/29930110/calculating-the-averages-for-each-key-in-a-pairwise-k-v-rdd-in-spark-with-pyth
    
    """
    return rdd.map(lambda x: (get(x, 'Taxi ID'), [get(x, 'Fare') + get(x, 'Tips'), 1])
                      ).reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])).mapValues(
        lambda v: v[0]/v[1]).takeOrdered(6, key=lambda x: -x[1])
    

In [78]:
# q3 answer
sorted_fares = q3(rides_2018)

In [68]:
def test_q3_aggregation(sorted_fares, id_='50b668c005b90b8a98cb429f7ad632b913158b885e8c0a2948c4ed8a39801ca3027d4b0e3ee313f82046c085dd7ae8b044666fbd612e0ef663700efbf1dcc54a'):
    """Test to verify that aggregation logic is correct for q3 is correct using simple python"""
    fares = rides_2018.filter(lambda x: get(x, 'Taxi ID') == id_).map(lambda x: [get(x, 'Fare') + get(x, 'Tips'), 1]).collect()

    acc = 0
    for x in fares:
        acc += x[0]
    assert acc / len(ble) == sorted_fares.filter(lambda x: x[0] == id_).collect()[0][1]


In [188]:
# q4

# q4

def string_to_time(date):
    """E.g. turns '04/13/2017 07:30:00 AM' into datetime.time(6, 15).    

    N.b. extra complexity here as time format isn't a simple 24hr clock;
    first convert to PM times to 24 hr format by manipulating the string, 
    then convert to DateTime.
    """
    if 'PM' in date:
        time = date.split(' ')[1]
        hour = str(int(date.split(':')[0].split(' ')[-1]) + 12)
        _time = hour + time[2:]
        _date = date.replace(time, _time)[:-3]
        
    else:
        _date = date[:-3]
    # https://www.journaldev.com/23365/python-string-to-datetime-strptime
    return datetime.strptime(_date, '%m/%d/%Y %H:%M:%S')


def test_string_to_time():
    assert string_to_time('04/13/2017 07:30:00 PM') == datetime.datetime(2017, 4, 13, 19, 30)
    assert string_to_time('04/13/2017 07:30:00 AM') == datetime.datetime(2017, 4, 13, 7, 30)


def q4(rdd):
    """ Taking 1 hour periods throughout the day (from midnight to midnight) 
    across the complete dataset, answer the following. 
    Where a trip crosses a boundary (where the drop off is in a different period to the pickup),
    assign that trip to the period where the midpoint of the journey happened.
    
    a. What is the average speed of taxis during each period?
    b. Which is the period where drivers in total earn the most money in
    terms of fares?
    c. Which is the period of the day where drivers in total earn the most
    in tips?
    
    
    Approach: 
    
    - Find the create a tuple of start and end-times (S, E)
    - Convert each of these into DateTime instances 
    - Find the midpoint, that is the start, + the time delta of the end - start, 
    and then set the hour in scope to this hour.
    
    
    N.b. due to not being able to assign and therefore reuse variables in the context of the lambda func, 
    the start time hs to to computed twice in this implementation. Whilst the code is very concise and expressive, 
    however it is slightly inefficent. A possible refactor is to use a function which takes a row rather than the 
    whole RDD and map to this.
        
    """    
    #   Midpoint:  lambda x: (x[0] + (x[1] - x[0]) /2).hour) , where x = (start, end)
    
    def start(x): return string_to_time(get(x, 'Trip Start Timestamp'))
    def end(x): return string_to_time(get(x, 'Trip Start Timestamp'))
    def avg_speed(x): 
        try: 
            return (get(x, 'Trip Miles') / (get(x, 'Trip Seconds'))) / 60
        except DivideByZeroError:
            return None
    
    
    from collections import namedtuple
    TripData = namedtuple('TripData', ['fare', 'tips', 'avg_speed'])
    Midpoint = namedtuple('Midpoint', ['trip_data' , 'start', 'end', 'midpoint'])

    
    return rdd.map(lambda x: Midpoints(TripData(get(x, 'Fare'), get(x, 'Tips'), 
                                        avg_speed(x)), 
                                       start(x), +  (end(x) - start(x)) / 2).hour)
                                        

                                     
    
    

In [184]:
def test_q4_mid_points():

    start = string_to_time('04/13/2017 07:30:00 AM')
    end = string_to_time('04/13/2017 07:37:00 AM')
    assert (start + (end - start)/2).hour == 7
    
    start = string_to_time('04/13/2017 09:30:00 AM')
    end = string_to_time('04/13/2017 07:37:00 AM')
    assert (start + (end - start)/2).hour == 14
    
    start = string_to_time('04/13/2017 11:30:00 PM')
    end = string_to_time('04/18/2017 01:00:00 AM')
    assert (start + (end - start)/2).hour == 0


In [189]:
re = q4(rides_2018)

re.take(5)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 42.0 failed 4 times, most recent failure: Lost task 0.3 in stage 42.0 (TID 186, 172.31.12.34, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ec2-user/spark/python/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-188-c1185248a845>", line 49, in <lambda>
AttributeError: 'datetime.datetime' object has no attribute 'map'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ec2-user/spark/python/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-188-c1185248a845>", line 49, in <lambda>
AttributeError: 'datetime.datetime' object has no attribute 'map'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [84]:
x.map(lambda x: ((getattr(x, 'Trip Start Timestamp').split(' ')[0], 1)), (getattr(x, 'Trip End Timestamp').split(' ')[1]))

AttributeError: map

In [111]:


datetime.strptime(getattr(x, 'Trip Start Timestamp'), '%m/%d/%Y %H:%M:%S').time()

ValueError: unconverted data remains:  AM

In [102]:
start = string_to_time('06:15:00')
end = string_to_time('06:18:00')

ParserError: Expected an ISO 8601-like string, but was given '03/10/2015 11:15:00 PM'. Try passing in a format string to resolve this.

In [113]:
sampled_rdd.take(10)

[Row(Trip ID='180cc29b289f9fa2680cf6baec647b0acb488a35', Taxi ID='d5c4fbae1c0c510364404a90fd477b19f7f03408ce40ff2cdb76b991835eadc1d7b1540d939262f561ba227da02a2c7bbbb1fc093511af5d261f29e34cd76cdf', Trip Start Timestamp='04/13/2017 07:30:00 AM', Trip End Timestamp='04/13/2017 07:30:00 AM', Trip Seconds=300, Trip Miles=0.0, Pickup Census Tract=17031081000, Dropoff Census Tract=17031081800, Pickup Community Area=8, Dropoff Community Area=8, Fare=7.0, Tips=2.0, Tolls=0.0, Extras=0.0, Trip Total=9.0, Payment Type='Credit Card', Company='Taxi Affiliation Services', Pickup Centroid Latitude=41.900265687, Pickup Centroid Longitude=-87.63210922, Pickup Centroid Location='POINT (-87.6321092196 41.9002656868)', Dropoff Centroid Latitude=41.89321636, Dropoff Centroid Longitude=-87.63784421, Dropoff Centroid  Location='POINT (-87.6378442095 41.8932163595)'),
 Row(Trip ID='bd8db45e120846d962d5505519e8cff54960de08', Taxi ID='65682911fa6de72195c92564114885bc4f8d8c17d5b00569a127edd4d8b0778bfc7984dc1078e

In [119]:
'04/13/2017 07:30:00 AM'[:-3]

'04/13/2017 07:30:00'