Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For secondary sorting #57

Closed
yaominator opened this issue Jan 30, 2016 · 9 comments
Closed

For secondary sorting #57

yaominator opened this issue Jan 30, 2016 · 9 comments

Comments

@yaominator
Copy link

For this line

https://github.com/sryza/aas/blob/master/ch08-geotime/src/main/scala/com/cloudera/datascience/geotime/RunGeoTime.scala#L175

I am wondering if it should be changed to

implicit val ordering: Ordering[(K,S)] = Ordering.by(_._2)

so it can be sorted by the pickup time. If using the _1, it means it will use lic to sort, but in the same partition, it is always the same anyway. I think what we need is to sort by the pickup time within the partition.

@srowen
Copy link
Collaborator

srowen commented Jan 31, 2016

I'm not familiar with this code but the key is not lic but (lic, f(trip)) and so it is likely to be intended. Values are separately sorted and split. @sryza ?

@yaominator
Copy link
Author

As far as I can tell, for this line

implicit val ordering: Ordering[(K,S)] = Ordering.by(_._1)

K is lic, S is f(trip). Ordering by(_._1) means to order by K , which is lic, right ? Did I miss something here?

@srowen
Copy link
Collaborator

srowen commented Feb 1, 2016

It should apply to the repartition and sort of presess which is not keyed by lic but a different tuple.

@yaominator
Copy link
Author

val presess = rdd.map {
      case (lic, trip) => {
        ((lic, secondaryKeyFunc(trip)), trip)
      }
    }
    val partitioner = new FirstKeyPartitioner[K, S](numPartitions)
    implicit val ordering: Ordering[(K,S)] = Ordering.by(_._1)
    presess.repartitionAndSortWithinPartitions(partitioner).mapPartitions(groupSorted(_, splitFunc))

Here is what I understand.

presess is RDD of ((lic, secondaryKeyFunc(trip)), trip) . The key is composite key includes lic and timestamp of the trip. The value is the trip.

    implicit val ordering: Ordering[(K,S)] = Ordering.by(_._1)

This line indicates how we want to sort by the key, which is by the first element of the tuple( lic , timestamp) , which is lic in this case.

As I tried to run the code with small set of taxi data, I found that the data are not ordered by the pickup time.

@sryza
Copy link
Owner

sryza commented Feb 2, 2016

My reading is that the code in its present form is correct. In this case, secondaryKeyFunc returns the pickup time of the trip, so the tuples in presess are ((lic, pickup time), trip). The given Ordering orders on the first element of the outer tuple, which is the (lic, pickup time) inner tuple. Scala tuples have a natural lexicographic ordering, so the records will first be ordered by license, but within each license, will be ordered by pickup time. Which is the intended behavior. @yaominator does that conflict with what you're observing?

CC @jwills who wrote this code originally.

@yaominator
Copy link
Author

Here is the mock data I am using for testing purpose

medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:42,2013-01-01 15:18:10,4,382,1.00,-73.978165,40.757977,-73.989838,40.751171
89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:49,2013-01-01 15:18:10,4,382,1.00,-73.978165,40.757977,-73.989838,40.751171
89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:46,2013-01-01 15:18:10,4,382,1.00,-73.978165,40.757977,-73.989838,40.751171

I added a line to print out the trip

--- a/ch08-geotime/src/main/scala/com/cloudera/datascience/geotime/RunGeoTime.scala
+++ b/ch08-geotime/src/main/scala/com/cloudera/datascience/geotime/RunGeoTime.scala
@@ -101,7 +101,9 @@ object RunGeoTime extends Serializable {
     def secondaryKeyFunc(trip: Trip) = trip.pickupTime.getMillis
     val sessions = groupByKeyAndSortValues(taxiDone, secondaryKeyFunc, split, 30)
     sessions.cache()
+    sessions.values.foreach( trip => trip.toList.foreach(println))

What I got is

16/02/02 17:07:48 INFO BlockManagerInfo: Added rdd_26_17 in memory on localhost:64693 (size: 17.5 KB, free: 529.8 MB)
Trip(2013-01-01T15:11:42.000-05:00,2013-01-01T15:18:10.000-05:00,{"x":-73.978165,"y":40.757977},{"x":-73.989838,"y":40.751171})
Trip(2013-01-01T15:11:49.000-05:00,2013-01-01T15:18:10.000-05:00,{"x":-73.978165,"y":40.757977},{"x":-73.989838,"y":40.751171})
Trip(2013-01-01T15:11:46.000-05:00,2013-01-01T15:18:10.000-05:00,{"x":-73.978165,"y":40.757977},{"x":-73.989838,"y":40.751171})

As you can see, it is not ordered by pick up time. Keep the same order as is in the text file.

@sryza
Copy link
Owner

sryza commented Feb 3, 2016

I tried running the code on @yaominator 's data sample and hit the incorrect behavior as well. It looks my analysis of what was going on was incorrect, because, by the contract of repartionAndSortWithinPartitions, the given ordering only applies to keys, which are the (lic, pickup time) tuples. The ordering overrides the natural lexicographic tuple ordering to only look at the first element.

I've pushed a fix here: 5d0514c.

This code isn't included in the text of the book, so I don't think we need an accompanying change there.

@yaominator
Copy link
Author

works great !!! +1

@sryza
Copy link
Owner

sryza commented Feb 3, 2016

Awesome

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants