In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Window, functions as F

# Initialize a Spark session with increased memory
spark = (SparkSession.builder
    .appName("CompareShortTrips")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "4g") 
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.sql.session.timeZone", "UTC")
    .getOrCreate()
    )

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/13 14:47:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
def haversine(lat1_col, lon1_col, lat2_col, lon2_col):
    """
    Calculate the great circle distance in meters between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lat1 = F.radians(lat1_col)
    lon1 = F.radians(lon1_col)
    lat2 = F.radians(lat2_col)
    lon2 = F.radians(lon2_col)
    
    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = F.sin(dlat/2)**2 + F.cos(lat1) * F.cos(lat2) * F.sin(dlon/2)**2
    c = 2 * F.asin(F.sqrt(a)) 
    r = 6371000  # Radius of earth in meters. Use 3956 for miles. Determines return value units.
    return c * r


In [3]:
#get number of trips where the alighting stop id is the trip_next_stop_id
stop_times = spark.read.parquet("../data/02_intermediate/stop_times_avl/stop_times")
stop_times = stop_times.dropDuplicates(["TRIP_ID","STOP_SEQUENCE"])
next_stop_window = Window.partitionBy("TRIP_ID").orderBy(F.col("STOP_SEQUENCE").asc())
stop_times = stop_times.withColumn(
    "STOP_ID_NEXT",
    F.lead(F.col("STOP_ID"),1).over(next_stop_window)
    )
stop_times = stop_times.select(
    F.col("ROUTE_ID_OLD").alias("LINE_ID"),
    F.col("STOP_ID"),
    F.col("STOP_ID_NEXT").alias("TRIP_STOP_ID_NEXT"),
    F.col("DIRECTION_ID"),
).dropDuplicates(["LINE_ID","STOP_ID","DIRECTION_ID"])

In [10]:
journeys = spark.read.parquet("../data/03_primary/rider_events_partitioned")
#get journey profiles
journey_window = Window.partitionBy("CARD_ID","JOURNEY_ID").orderBy(F.col("DATETIME").asc())
card_window = Window.partitionBy("CARD_ID").orderBy(F.col("DATETIME").asc())

#check for single stop rides
single_stop_rides = journeys.withColumn(
    "STOP_ID_NEXT",
    F.lead(F.col("STOP_ID"),1).over(journey_window)
    ).withColumn(
    "EVENT_TYPE_NEXT",
    F.lead(F.col("EVENT_TYPE"),1).over(journey_window)
    ).filter(
        F.col("EVENT_TYPE") == "ORIGIN"
    ).filter(
        F.col("EVENT_TYPE_NEXT") == "DESTINATION"
    ).join(
        stop_times.select("LINE_ID","STOP_ID","DIRECTION_ID","TRIP_STOP_ID_NEXT"),
        on = ["LINE_ID","STOP_ID","DIRECTION_ID"],
        how = "left"
    ).withColumn(
        "IS_SINGLE_STOP_RIDE",
        F.when(F.col("STOP_ID_NEXT")==F.col("TRIP_STOP_ID_NEXT"),1).otherwise(0)
    ).groupBy("JOURNEY_ID").agg(
        F.max(F.col("IS_SINGLE_STOP_RIDE")).alias("IS_SINGLE_STOP_RIDE")
        )
#get journey legs
legs = journeys.filter(
    F.col("EVENT").isin(["BOARDED","ALIGHTED"])
    ).withColumn(
        "STOP_LAT_NEXT",
        F.lead(F.col("STOP_LAT"),1).over(journey_window)
    ).withColumn(
        "STOP_LON_NEXT",
        F.lead(F.col("STOP_LON"),1).over(journey_window)
    ).filter(F.col("STOP_LAT_NEXT").isNotNull()
    ).withColumn(
        "LEG_DISTANCE_METERS",
        haversine(
            F.col("STOP_LAT"),
            F.col("STOP_LON"),
            F.col("STOP_LAT_NEXT"),
            F.col("STOP_LON_NEXT")
            )
    ).filter(
        F.col("EVENT")=="BOARDED"
    ).groupBy("JOURNEY_ID").agg(
        F.min("LEG_DISTANCE_METERS").alias("LEG_DISTANCE_METERS_MIN"),
        F.max("LEG_DISTANCE_METERS").alias("LEG_DISTANCE_METERS_MAX"),
        F.sum("LEG_DISTANCE_METERS").alias("TOTAL_JOURNEY_DISTANCE_TRAVELED_METERS")
    )
#get number of legs
n_legs = journeys.filter(
    F.col("EVENT").isin(["BOARDED"])
    ).groupBy(["JOURNEY_ID"]).agg(F.count("*").alias("N_LEGS"))
#get distance between alighting location and next boarding
card_id = Window.partitionBy("CARD_ID").orderBy(F.col("DATETIME").asc())
distances_from_alighting_to_next_boarding = journeys.filter(
    F.col("EVENT").isin(["BOARDED","ALIGHTED"])
    ).withColumn(
        "STOP_LAT_NEXT",
        F.lead(F.col("STOP_LAT"),1).over(journey_window)
    ).withColumn(
        "STOP_LON_NEXT",
        F.lead(F.col("STOP_LON"),1).over(journey_window)
    ).filter(F.col("STOP_LAT_NEXT").isNotNull()
    ).withColumn(
        "ALIGHTING_TO_NEXT_BOARDING_DISTANCE_METERS",
        haversine(
            F.col("STOP_LAT"),
            F.col("STOP_LON"),
            F.col("STOP_LAT_NEXT"),
            F.col("STOP_LON_NEXT")
            )
    ).filter(
        F.col("EVENT")=="ALIGHTED"
    ).groupBy("JOURNEY_ID").agg(
        F.max(
            F.col("ALIGHTING_TO_NEXT_BOARDING_DISTANCE_METERS")
        ).alias("ALIGHTING_TO_NEXT_BOARDING_DISTANCE_METERS_MAX"),
        F.min(
            F.col("ALIGHTING_TO_NEXT_BOARDING_DISTANCE_METERS")
        ).alias("ALIGHTING_TO_NEXT_BOARDING_DISTANCE_METERS_MIN")
)
#lines used
lines_used = journeys.select(
    "JOURNEY_ID",
    "LINE_ID"
    ).distinct().groupBy("JOURNEY_ID").agg(F.collect_list(F.col("LINE_ID")).alias("LINE_IDS_USED"))
lines_used = lines_used.withColumn("INCLUDES_MAX",F.when(
    F.array_contains(F.col("LINE_IDS_USED"),200) | 
    F.array_contains(F.col("LINE_IDS_USED"),100) | 
    F.array_contains(F.col("LINE_IDS_USED"),190) | 
    F.array_contains(F.col("LINE_IDS_USED"),90)  |
    F.array_contains(F.col("LINE_IDS_USED"),290) ,
    1
    ).otherwise(0)
    )

#confidences
confidences = journeys.filter(F.col("EVENT") == "ALIGHTED").groupBy(["JOURNEY_ID"]).agg(
    F.min(F.col("CONFIDENCE")).alias("CONFIDENCE_MIN"),
    F.max(F.col("CONFIDENCE")).alias("CONFIDENCE_MAX"),
    F.mean(F.col("CONFIDENCE")).alias("CONFIDENCE_MEAN"),
    )

#get distance from origin boarding to destination alighting
distances_from_origin_to_destination = journeys.filter(
    F.col("EVENT_TYPE").isin(["ORIGIN","DESTINATION"])
    ).withColumn(
        "STOP_LAT_NEXT",
        F.lead(F.col("STOP_LAT"),1).over(journey_window)
    ).withColumn(
        "STOP_LON_NEXT",
        F.lead(F.col("STOP_LON"),1).over(journey_window)
    ).filter(
        F.col("STOP_LAT_NEXT").isNotNull()
    ).filter(
        F.col("EVENT_TYPE")=="ORIGIN"
    ).withColumn(
        "ORIGIN_DESTINATION_DISTANCE_METERS",
        haversine(
            F.col("STOP_LAT"),
            F.col("STOP_LON"),
            F.col("STOP_LAT_NEXT"),
            F.col("STOP_LON_NEXT")
            )
    ).select("JOURNEY_ID","ORIGIN_DESTINATION_DISTANCE_METERS")
#get distances from origin to origin
distances_from_origin_to_origin = journeys.filter(
    F.col("EVENT_TYPE").isin(["ORIGIN"])
    ).withColumn(
        "STOP_LAT_NEXT",
        F.lead(F.col("STOP_LAT"),1).over(card_window)
    ).withColumn(
        "STOP_LON_NEXT",
        F.lead(F.col("STOP_LON"),1).over(card_window)
    ).filter(
        F.col("STOP_LAT_NEXT").isNotNull()
    ).filter(
        F.col("EVENT_TYPE")=="ORIGIN"
    ).withColumn(
        "ORIGIN_ORIGIN_DISTANCE_METERS",
        haversine(
            F.col("STOP_LAT"),
            F.col("STOP_LON"),
            F.col("STOP_LAT_NEXT"),
            F.col("STOP_LON_NEXT")
            )
    ).select("JOURNEY_ID","ORIGIN_ORIGIN_DISTANCE_METERS")
#get inter boarding times
inter_boarding_times = journeys.filter(
    F.col("EVENT") == "BOARDED"
    ).withColumn(
        "INTER_BOARDING_TIME_SECONDS",
        F.lead(F.col("DATETIME"),1).over(card_window).cast("long") - F.col("DATETIME").cast("long")
    ).groupBy("JOURNEY_ID").agg(
        F.max("INTER_BOARDING_TIME_SECONDS").alias("MAX_INTER_BOARDING_TIME_SECONDS")
        )
#get journey times
journey_times = journeys.filter(
    F.col("EVENT_TYPE") == "ORIGIN").select(
        "JOURNEY_ID",
        F.col("DATETIME").alias("ORIGIN_DATETIME")
    ).join(
    journeys.filter(
        F.col("EVENT_TYPE") == "DESTINATION"
        ).select("JOURNEY_ID",F.col("DATETIME").alias("DESTINATION_DATETIME")
                 ),
        on = ["JOURNEY_ID"]
).withColumn(
    "JOURNEY_TIME_SECONDS",
    F.col("DESTINATION_DATETIME").cast("long")-F.col("ORIGIN_DATETIME").cast("long")
).select("JOURNEY_ID","JOURNEY_TIME_SECONDS")
#journey profiles
journey_profiles = journeys.select(
    "JOURNEY_ID",
    "CARD_ID",
    "VALIDITY_SCORE"
    ).distinct().join(
    distances_from_alighting_to_next_boarding,
    on = ["JOURNEY_ID"],
    how = "left"
).join(
    n_legs,
    on = ["JOURNEY_ID"],
    how = "left"
).join(
    legs,
    on = ["JOURNEY_ID"],
    how = "left"
).join(
    distances_from_origin_to_destination,
    on = ["JOURNEY_ID"],
    how = "left"
).join(
    lines_used,
    on = ["JOURNEY_ID"],
    how = "left"
).join(
    confidences,
    on = ["JOURNEY_ID"],
    how = "left"
).join(
    distances_from_origin_to_origin,
    on = ["JOURNEY_ID"],
    how = "left"
).join(
    single_stop_rides,
    on = ["JOURNEY_ID"],
    how = "left"
).withColumn(
    "IS_SINGLE_STOP_RIDE",
    F.when(F.col("IS_SINGLE_STOP_RIDE") == 1,1).otherwise(0)
).join(
    inter_boarding_times,
    on = ["JOURNEY_ID"],
    how = "left"
).join(
    journey_times,
    on = ["JOURNEY_ID"],
    how = "left"

)
journey_profiles = journey_profiles.withColumn(
    "IS_MAX_ONLY", 
        F.when(
            F.array_sort(F.array_union(
                F.col("LINE_IDS_USED"), 
                F.array(F.lit(100), F.lit(200), F.lit(90), F.lit(190), F.lit(290))
            )) == F.array_sort(
                F.array(F.lit(100), F.lit(200), F.lit(90), F.lit(190), F.lit(290))),
                1).otherwise(0)
).cache()
journey_profiles.write.mode("overwrite").parquet("journey_profiles/")

24/09/13 15:21:37 WARN CacheManager: Asked to cache already cached data.
                                                                                

24/09/14 02:56:48 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 978750 ms exceeds timeout 120000 ms
24/09/14 02:56:48 WARN SparkContext: Killing executors is not supported by current scheduler.


In [5]:
#get sample journeys for gut checks
journeys = journeys.withColumn("IS_LOOP",F.when(F.col("IS_LOOP")==True,1).otherwise(0))
# Convert DATETIME column to ISO formatted string
journeys = journeys.withColumn("DATETIME", F.date_format("DATETIME", "yyyy-MM-dd'T'HH:mm:ss.SSS"))

profiles_to_sample = journey_profiles.filter(~F.col("ORIGIN_ORIGIN_DISTANCE_METERS").isNull())
n = 25
#max only
max_only_journeys = profiles_to_sample.filter(F.col("IS_MAX_ONLY") == 1).select("CARD_ID")
max_only_sample_journeys = max_only_journeys.orderBy(F.rand()).limit(n).join(
    journeys,
    on = ["CARD_ID"],
    how = "left"
    )
#bus only
bus_only_journeys = profiles_to_sample.filter(F.col("INCLUDES_MAX") == 0).select("CARD_ID")
bus_only_sample_journeys = bus_only_journeys.orderBy(F.rand()).limit(n).join(
    journeys,
    on = ["CARD_ID"],
    how = "left")
#bus and max
bus_and_max_journeys = profiles_to_sample.filter((F.col("INCLUDES_MAX") == 1) &(F.col("IS_MAX_ONLY")==0) ).select("CARD_ID")
bus_and_max_sample_journeys = bus_and_max_journeys.orderBy(F.rand()).limit(n).join(
    journeys,
    on = ["CARD_ID"],
    how = "left"
    )
max_only_sample_journeys.toPandas().to_csv("max_only_sample_journeys.csv",index = False)
bus_only_sample_journeys.toPandas().to_csv("bus_only_sample_journeys.csv",index = False)
bus_and_max_sample_journeys.toPandas().to_csv("bus_and_max_sample_journeys.csv",index = False)

                                                                                

In [6]:
stops_of_interest = [
    8376,
    8341,
    8342,
    8375,
    8373,
    8344,
    8372,
    8345,
    8371,
    8346
]
check_cutoff = journeys.filter(
    F.col("LINE_ID").isin([200,100,90,290,190])
    ).filter(
        F.col("STOP_ID").isin(stops_of_interest)
        ).filter(
            F.col("EVENT")=="BOARDED"
        ).select("JOURNEY_ID").join(
            journeys,
            on = ["JOURNEY_ID"],
            how = "left").filter(
                (F.col("EVENT")=="ALIGHTED")
                &
                F.col("LINE_ID").isin([200,100,90,290,190])
                 ).withColumn(
                    "ABOVE_CUTOFF",
                    F.when(F.col("CONFIDENCE")>0.15,"Y").otherwise("N")
                    ).groupBy(
                    F.col("ABOVE_CUTOFF")
                    ).agg(
                        F.count("*"),
                    ).toPandas()

                                                                                

In [7]:
check_cutoff

Unnamed: 0,ABOVE_CUTOFF,count(1)
0,Y,19113
1,N,2382


In [8]:
journeys.schema

StructType(List(StructField(JOURNEY_ID,StringType,true),StructField(FARE_CATEGORY_DESCRIPTION,StringType,true),StructField(DATETIME,StringType,true),StructField(STOP_ID,LongType,true),StructField(CARD_ID,StringType,true),StructField(LINE_ID,IntegerType,true),StructField(STOP_LAT,DoubleType,true),StructField(STOP_LON,DoubleType,true),StructField(DIRECTION_ID,IntegerType,true),StructField(VALIDITY_SCORE,DoubleType,true),StructField(EVENT,StringType,true),StructField(CONFIDENCE,DoubleType,true),StructField(EVENT_TYPE,StringType,true),StructField(IS_LOOP,IntegerType,false),StructField(JOURNEY_START_DATE,DateType,true)))

In [9]:
journey_profiles.show()

+--------------------+--------------------+-------------------+----------------------------------------------+----------------------------------------------+------+-----------------------+-----------------------+--------------------------------------+----------------------------------+-------------+------------+--------------------+--------------------+--------------------+-----------------------------+-------------------+-------------------------------+--------------------+-----------+
|          JOURNEY_ID|             CARD_ID|     VALIDITY_SCORE|ALIGHTING_TO_NEXT_BOARDING_DISTANCE_METERS_MAX|ALIGHTING_TO_NEXT_BOARDING_DISTANCE_METERS_MIN|N_LEGS|LEG_DISTANCE_METERS_MIN|LEG_DISTANCE_METERS_MAX|TOTAL_JOURNEY_DISTANCE_TRAVELED_METERS|ORIGIN_DESTINATION_DISTANCE_METERS|LINE_IDS_USED|INCLUDES_MAX|      CONFIDENCE_MIN|      CONFIDENCE_MAX|     CONFIDENCE_MEAN|ORIGIN_ORIGIN_DISTANCE_METERS|IS_SINGLE_STOP_RIDE|MAX_INTER_BOARDING_TIME_SECONDS|JOURNEY_TIME_SECONDS|IS_MAX_ONLY|
+---------------