In [1]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("CompareShortTrips") \
    .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/29 14:33:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [23]:
#build next stop id dataframe
from pyspark.sql import Window
from pyspark.sql import functions as F

def haversine(lat1_col, lon1_col, lat2_col, lon2_col):
    """
    Calculate the great circle distance in meters between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lat1 = F.radians(lat1_col)
    lon1 = F.radians(lon1_col)
    lat2 = F.radians(lat2_col)
    lon2 = F.radians(lon2_col)
    
    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = F.sin(dlat/2)**2 + F.cos(lat1) * F.cos(lat2) * F.sin(dlon/2)**2
    c = 2 * F.asin(F.sqrt(a)) 
    r = 6371000  # Radius of earth in meters. Use 3956 for miles. Determines return value units.
    return c * r

def get_percentile_columns(column,tag,step):
    """
    This function takes a Spark DataFrame column and returns a list of columns representing
    the percentiles from 0 to 100 in 5 percent increments.
    """
    percentiles = [i for i in range(0, 100+step,step)]
    return [F.percentile_approx(column,p/100).alias(f"PERCENTILE_{p}_{tag}") for p in percentiles]

def prepare_journeys(journeys,stop_times):
    alighting_stop_window = Window.partitionBy("CARD_ID").orderBy(F.col("DATETIME").asc())
    journeys = journeys.join(stop_times.dropDuplicates(),on = ["LINE_ID","STOP_ID"],how = "left")
    journeys = journeys.dropDuplicates(["CARD_ID","JOURNEY_ID","DATETIME","EVENT"])
    journeys = journeys.withColumn(
        "EVENT_NEXT_NEXT",
        F.lead(F.col("EVENT"),2).over(alighting_stop_window)
        ).withColumn(
        "EVENT_TYPE_NEXT_NEXT",
        F.lead(F.col("EVENT_TYPE"),2).over(alighting_stop_window)
        )
    journeys = journeys.withColumn(
        "STOP_ID_NEXT",
        F.lead(F.col("STOP_ID"),1).over(alighting_stop_window)
        ).withColumn(
        "STOP_LAT_NEXT",
        F.lead(F.col("STOP_LAT"),1).over(alighting_stop_window)
        ).withColumn(
        "STOP_LON_NEXT",
        F.lead(F.col("STOP_LON"),1).over(alighting_stop_window)
        ).withColumn(
        "STOP_ID_NEXT_NEXT",
        F.lead(F.col("STOP_ID"),2).over(alighting_stop_window)
        ).withColumn(
        "STOP_LAT_NEXT_NEXT",
        F.lead(F.col("STOP_LAT"),2).over(alighting_stop_window)
        ).withColumn(
        "STOP_LON_NEXT_NEXT",
        F.lead(F.col("STOP_LON"),2).over(alighting_stop_window)
        ).withColumn(
        "DATETIME_NEXT_NEXT",
        F.lead(F.col("DATETIME"),2).over(alighting_stop_window)
        ).withColumn(
            "CONFIDENCE_NEXT",
            F.lead(F.col("CONFIDENCE"),1).over(alighting_stop_window)
        )
    journeys = journeys.withColumn("DISTANCE_TO_NEXT_STOP",haversine(
        F.col("STOP_LAT"),
        F.col("STOP_LON"),
        F.col("STOP_LAT_NEXT"),
        F.col("STOP_LON_NEXT"))
    )
    journeys = journeys.withColumn("DISTANCE_TO_NEXT_NEXT_STOP",haversine(
        F.col("STOP_LAT"),
        F.col("STOP_LON"),
        F.col("STOP_LAT_NEXT_NEXT"),
        F.col("STOP_LON_NEXT_NEXT"))
    )
    journeys = journeys.withColumn("ALIGHTING_DISTANCE_TO_NEXT_STOP",haversine(
        F.col("STOP_LAT_NEXT"),
        F.col("STOP_LON_NEXT"),
        F.col("STOP_LAT_NEXT_NEXT"),
        F.col("STOP_LON_NEXT_NEXT"))
    )
    journeys = journeys.withColumn(
        "TIME_TO_NEXT_ORIGIN",
        F.datediff(F.col("DATETIME_NEXT_NEXT"),
                F.col("DATETIME")
                ))
    return journeys

def analyze_single_leg_trips(journeys):
    single_leg_trips = journeys.filter(F.col("EVENT_TYPE") == "ORIGIN").filter(F.col("EVENT_TYPE_NEXT_NEXT") == "ORIGIN")
    single_leg_trips = single_leg_trips.withColumn(
    "IS_NULL_JOURNEY",
    F.when(
        F.col("STOP_ID")==F.col("STOP_ID_NEXT_NEXT"),
        F.lit("Y")
    ).otherwise(F.lit("N"))).withColumn(
    "IS_SINGLE_STOP_RIDE",
    F.when(
        F.col("STOP_ID_NEXT")==F.col("TRIP_STOP_ID_NEXT"),
        F.lit("Y")
    ).otherwise(F.lit("N"))
    ).withColumn("ALIGHT_AT_NEXT_START_LOCATION",
         F.when(F.col("STOP_ID_NEXT")==F.col("STOP_ID_NEXT_NEXT"),
        F.lit("Y")
    ).otherwise(F.lit("N")))
    total_count = single_leg_trips.select(F.count("*").alias("TOTAL_JOURNEYS"))
    single_leg_trips_analysis = single_leg_trips.groupBy(
        # "IS_NULL_JOURNEY",
        "IS_SINGLE_STOP_RIDE",
        "ALIGHT_AT_NEXT_START_LOCATION",
        ).agg(
    F.count("*").alias("N_JOURNEYS"),
    *[
        3.3*F.percentile_approx("ALIGHTING_DISTANCE_TO_NEXT_STOP",p/100).alias(
        f"PERCENTILE_DISTANCE_FROM_ALIGHTING_TO_NEXT_ORIGIN_{p}").cast("int")
        for p in range(0,120,20)
        ]
    )
    single_leg_trips_analysis = single_leg_trips_analysis.join(total_count)
    single_leg_trips_analysis = single_leg_trips_analysis.withColumn(
        "PERCENT_JOURNEYS",
        (100*F.col("N_JOURNEYS")/F.col("TOTAL_JOURNEYS")).cast("int")).drop("N_JOURNEYS","TOTAL_JOURNEYS")
    return single_leg_trips_analysis,single_leg_trips


In [12]:
stop_times = spark.read.parquet("../data/02_intermediate/stop_times_avl/stop_times")
stop_times = stop_times.dropDuplicates(["TRIP_ID","STOP_SEQUENCE"])
next_stop_window = Window.partitionBy("TRIP_ID").orderBy(F.col("STOP_SEQUENCE").asc())
stop_times = stop_times.withColumn(
    "STOP_ID_NEXT",
    F.lead(F.col("STOP_ID"),1).over(next_stop_window)
    )
stop_times = stop_times.select(
    F.col("ROUTE_ID_OLD").alias("LINE_ID"),
    F.col("STOP_ID"),
    F.col("STOP_ID_NEXT").alias("TRIP_STOP_ID_NEXT"),
    F.col("DIRECTION_ID"),
).dropDuplicates(["LINE_ID","STOP_ID","DIRECTION_ID"])
stop_times = stop_times.withColumn(
    "IS_MAX",
    F.when(F.col("LINE_ID").isin([200,190,290,90,100]),"Y").otherwise("N")
    ).cache()
n_lines_per_stop = stop_times.filter(F.col("IS_MAX") == "Y").select("STOP_ID","LINE_ID").groupBy("STOP_ID").agg(F.countDistinct(F.col("LINE_ID")).alias("N_MAX_LINES")).cache()
control_journeys = spark.read.parquet("../data/control_run_2/03_primary/rider_events_partitioned")
control_journeys = prepare_journeys(control_journeys,stop_times)
control_journeys = control_journeys.withColumn(
    "IS_MAX",
    F.when(F.col("LINE_ID").isin([200,190,290,90,100]),"Y").otherwise("N")
    ).cache()
single_leg_control_journeys_analysis,single_leg_control_journeys = analyze_single_leg_trips(control_journeys.filter(F.col("IS_MAX")=="Y"))
fixed_journeys = spark.read.parquet("../data/03_primary/rider_events_partitioned")
fixed_journeys = prepare_journeys(fixed_journeys,stop_times)
fixed_journeys = fixed_journeys.withColumn(
    "IS_MAX",
    F.when(F.col("LINE_ID").isin([200,190,290,90,100]),"Y").otherwise("N")
    ).cache()
single_leg_fixed_journeys_analysis,single_leg_fixed_journeys = analyze_single_leg_trips(fixed_journeys.filter(F.col("IS_MAX")=="Y"))


24/07/29 14:37:53 WARN CacheManager: Asked to cache already cached data.
24/07/29 14:37:53 WARN CacheManager: Asked to cache already cached data.
24/07/29 14:37:54 WARN CacheManager: Asked to cache already cached data.


In [12]:
single_leg_control_journeys_analysis.toPandas().sort_values(
    by = ["IS_SINGLE_STOP_RIDE","ALIGHT_AT_NEXT_START_LOCATION"]
    ).T

24/07/29 14:31:25 WARN MemoryStore: Not enough space to cache rdd_42_79 in memory! (computed 1572.9 KiB so far)
24/07/29 14:31:25 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_42_82 in memory.
24/07/29 14:31:25 WARN MemoryStore: Not enough space to cache rdd_42_80 in memory! (computed 1565.2 KiB so far)
24/07/29 14:31:25 WARN MemoryStore: Not enough space to cache rdd_42_82 in memory! (computed 384.0 B so far)
24/07/29 14:31:25 WARN MemoryStore: Not enough space to cache rdd_42_81 in memory! (computed 1570.7 KiB so far)
24/07/29 14:31:25 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_42_85 in memory.
24/07/29 14:31:25 WARN MemoryStore: Not enough space to cache rdd_42_85 in memory! (computed 384.0 B so far)
24/07/29 14:31:25 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_42_97 in memory.
24/07/29 14:31:25 WARN MemoryStore: Not enou

Unnamed: 0,3,2,1,0
IS_SINGLE_STOP_RIDE,N,N,Y,Y
ALIGHT_AT_NEXT_START_LOCATION,N,Y,N,Y
"(CAST(percentile_approx(ALIGHTING_DISTANCE_TO_NEXT_STOP, 0.0, 10000) AS PERCENTILE_DISTANCE_FROM_ALIGHTING_TO_NEXT_ORIGIN_0 AS INT) * 3.3)",29.7,0.0,29.7,0.0
"(CAST(percentile_approx(ALIGHTING_DISTANCE_TO_NEXT_STOP, 0.2, 10000) AS PERCENTILE_DISTANCE_FROM_ALIGHTING_TO_NEXT_ORIGIN_20 AS INT) * 3.3)",191.4,0.0,2277.0,0.0
"(CAST(percentile_approx(ALIGHTING_DISTANCE_TO_NEXT_STOP, 0.4, 10000) AS PERCENTILE_DISTANCE_FROM_ALIGHTING_TO_NEXT_ORIGIN_40 AS INT) * 3.3)",1250.7,0.0,5695.8,0.0
"(CAST(percentile_approx(ALIGHTING_DISTANCE_TO_NEXT_STOP, 0.6, 10000) AS PERCENTILE_DISTANCE_FROM_ALIGHTING_TO_NEXT_ORIGIN_60 AS INT) * 3.3)",6180.9,0.0,16952.1,0.0
"(CAST(percentile_approx(ALIGHTING_DISTANCE_TO_NEXT_STOP, 0.8, 10000) AS PERCENTILE_DISTANCE_FROM_ALIGHTING_TO_NEXT_ORIGIN_80 AS INT) * 3.3)",23007.6,0.0,33815.1,0.0
"(CAST(percentile_approx(ALIGHTING_DISTANCE_TO_NEXT_STOP, 1.0, 10000) AS PERCENTILE_DISTANCE_FROM_ALIGHTING_TO_NEXT_ORIGIN_100 AS INT) * 3.3)",179335.2,0.0,143391.6,0.0
PERCENT_JOURNEYS,44,10,43,0


In [13]:
single_leg_fixed_journeys_analysis.toPandas().sort_values(
    by = ["IS_SINGLE_STOP_RIDE","ALIGHT_AT_NEXT_START_LOCATION"]
    ).T

24/07/29 14:33:07 WARN MemoryStore: Not enough space to cache rdd_180_101 in memory! (computed 1506.6 KiB so far)
24/07/29 14:33:07 WARN BlockManager: Persisting block rdd_180_101 to disk instead.
24/07/29 14:33:07 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_180_106 in memory.
24/07/29 14:33:07 WARN MemoryStore: Not enough space to cache rdd_180_106 in memory! (computed 384.0 B so far)
24/07/29 14:33:07 WARN BlockManager: Persisting block rdd_180_106 to disk instead.
24/07/29 14:33:08 WARN MemoryStore: Not enough space to cache rdd_180_109 in memory! (computed 1499.0 KiB so far)
24/07/29 14:33:08 WARN BlockManager: Persisting block rdd_180_109 to disk instead.
24/07/29 14:33:08 WARN MemoryStore: Not enough space to cache rdd_180_110 in memory! (computed 1500.3 KiB so far)
24/07/29 14:33:08 WARN BlockManager: Persisting block rdd_180_110 to disk instead.
24/07/29 14:33:10 WARN MemoryStore: Failed to reserve initial memory threshold 

In [None]:
check_fixed = single_leg_fixed_journeys.filter(
    F.col("IS_SINGLE_STOP_RIDE") == F.lit("Y")
    ).groupBy("LINE_ID").agg(F.count("*").alias("count"),F.mean(F.col("CONFIDENCE_NEXT"))).toPandas()
check_fixed["percentage"] = check_fixed["count"]/check_fixed["count"].sum()*100
view_fixed = check_fixed[check_fixed["percentage"] > 0.01].sort_values(by = ["LINE_ID"],ascending = False)

                                                                                

In [None]:
check_control = single_leg_control_journeys.filter(
    F.col("IS_SINGLE_STOP_RIDE") == F.lit("Y")
    ).groupBy("LINE_ID").agg(F.count("*").alias("count"),F.mean(F.col("CONFIDENCE_NEXT"))).toPandas()
check_control["percentage"] = check_control["count"]/check_control["count"].sum()*100
view_control = check_control[check_control["percentage"] > 0.01].sort_values(by = ["LINE_ID"],ascending = False)

                                                                                

In [None]:
compare_lines = view_control.merge(view_fixed,on = ["LINE_ID"],how = "outer",suffixes = ("_control","_fixed"))
compare_lines[sorted(compare_lines.columns)]

Unnamed: 0,LINE_ID,avg(CONFIDENCE_NEXT)_control,avg(CONFIDENCE_NEXT)_fixed,count_control,count_fixed,percentage_control,percentage_fixed
0,290,0.453453,0.451853,8554,8820,10.314723,10.493379
1,200,0.437262,0.429265,8107,8645,9.775714,10.285177
2,190,0.440529,0.423862,7761,8217,9.358495,9.775975
3,100,0.467889,0.465538,35178,35717,42.418908,42.493427
4,90,0.446791,0.446262,23330,22654,28.13216,26.952042


In [None]:
single_leg_control_journeys = single_leg_control_journeys.withColumn(
    "IS_MAX",
    F.when(F.col("LINE_ID").isin([200,190,290,90,100]),"Y").otherwise("N")
    )
single_leg_control_journeys.groupBy("IS_SINGLE_STOP_RIDE","IS_MAX").agg(
    F.count("*").alias("count"),
    F.mean(F.col("CONFIDENCE_NEXT")),
    F.stddev(F.col("CONFIDENCE_NEXT")),
    F.mean(F.col("ALIGHTING_DISTANCE_TO_NEXT_STOP")),
    F.stddev(F.col("ALIGHTING_DISTANCE_TO_NEXT_STOP"))).toPandas()

                                                                                

Unnamed: 0,IS_SINGLE_STOP_RIDE,IS_MAX,count,avg(CONFIDENCE_NEXT),stddev_samp(CONFIDENCE_NEXT),avg(ALIGHTING_DISTANCE_TO_NEXT_STOP),stddev_samp(ALIGHTING_DISTANCE_TO_NEXT_STOP)
0,Y,Y,83042,0.455232,0.107998,5426.071238,5973.616988
1,N,Y,107263,0.403304,0.185313,2934.421983,5146.876853


In [None]:
single_leg_fixed_journeys = single_leg_fixed_journeys.withColumn(
    "IS_MAX",
    F.when(F.col("LINE_ID").isin([200,190,290,90,100]),"Y").otherwise("N")
    )
single_leg_fixed_journeys.groupBy("IS_SINGLE_STOP_RIDE","IS_MAX").agg(
    F.count("*").alias("count"),
    F.mean(F.col("CONFIDENCE_NEXT")),
    F.stddev(F.col("CONFIDENCE_NEXT")),
    F.mean(F.col("ALIGHTING_DISTANCE_TO_NEXT_STOP")),
    F.stddev(F.col("ALIGHTING_DISTANCE_TO_NEXT_STOP"))).toPandas()

                                                                                

Unnamed: 0,IS_SINGLE_STOP_RIDE,IS_MAX,count,avg(CONFIDENCE_NEXT),stddev_samp(CONFIDENCE_NEXT),avg(ALIGHTING_DISTANCE_TO_NEXT_STOP),stddev_samp(ALIGHTING_DISTANCE_TO_NEXT_STOP)
0,Y,Y,84166,0.451222,0.11737,1162.148884,1601.940222
1,N,Y,211789,0.332897,0.229585,3632.383292,6207.7977


In [None]:
single_leg_control_journeys.filter(F.col("IS_MAX") == "Y").groupBy("IS_SINGLE_STOP_RIDE").agg(
    *[F.percentile_approx(F.col("CONFIDENCE_NEXT"),p/100) for p in range(0,120,20)]
    ).toPandas().T.sort_values(by = ["IS_SINGLE_STOP_RIDE"],axis = 1)

                                                                                

Unnamed: 0,1,0
IS_SINGLE_STOP_RIDE,N,Y
"percentile_approx(CONFIDENCE_NEXT, 0.0, 10000)",0.0,0.0
"percentile_approx(CONFIDENCE_NEXT, 0.2, 10000)",0.271264,0.382991
"percentile_approx(CONFIDENCE_NEXT, 0.4, 10000)",0.388828,0.426155
"percentile_approx(CONFIDENCE_NEXT, 0.6, 10000)",0.453406,0.46938
"percentile_approx(CONFIDENCE_NEXT, 0.8, 10000)",0.530042,0.524057
"percentile_approx(CONFIDENCE_NEXT, 1.0, 10000)",1.0,1.0


In [None]:
single_leg_fixed_journeys.filter(F.col("IS_MAX") == "Y").groupBy("IS_SINGLE_STOP_RIDE").agg(
    *[F.percentile_approx(F.col("CONFIDENCE_NEXT"),p/100) for p in range(0,120,20)]
    ).toPandas().T.sort_values(by = ["IS_SINGLE_STOP_RIDE"],axis = 1)

                                                                                

Unnamed: 0,1,0
IS_SINGLE_STOP_RIDE,N,Y
"percentile_approx(CONFIDENCE_NEXT, 0.0, 10000)",0.0,0.0
"percentile_approx(CONFIDENCE_NEXT, 0.2, 10000)",0.122618,0.382249
"percentile_approx(CONFIDENCE_NEXT, 0.4, 10000)",0.273273,0.427513
"percentile_approx(CONFIDENCE_NEXT, 0.6, 10000)",0.405989,0.470711
"percentile_approx(CONFIDENCE_NEXT, 0.8, 10000)",0.508939,0.524735
"percentile_approx(CONFIDENCE_NEXT, 1.0, 10000)",1.0,1.0


In [32]:
check_control_n_lines = single_leg_control_journeys.join(n_lines_per_stop,on = ["STOP_ID"]).groupBy("N_MAX_LINES").agg(
    F.count("*").alias("count"),
    *get_percentile_columns(F.col("CONFIDENCE_NEXT"),"CONFIDENCE",20)
    ).toPandas()
check_fixed_n_lines = single_leg_fixed_journeys.join(n_lines_per_stop,on = ["STOP_ID"]).groupBy("N_MAX_LINES").agg(
    F.count("*").alias("count"),
    *get_percentile_columns(F.col("CONFIDENCE_NEXT"),"CONFIDENCE",20)
    ).toPandas()


24/07/29 15:05:05 WARN MemoryStore: Not enough space to cache rdd_60_156 in memory! (computed 1555.1 KiB so far)
24/07/29 15:05:05 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_60_160 in memory.
24/07/29 15:05:05 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_60_162 in memory.
24/07/29 15:05:05 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_60_159 in memory.
24/07/29 15:05:05 WARN MemoryStore: Not enough space to cache rdd_60_160 in memory! (computed 384.0 B so far)
24/07/29 15:05:05 WARN MemoryStore: Not enough space to cache rdd_60_157 in memory! (computed 1569.3 KiB so far)
24/07/29 15:05:05 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_60_163 in memory.
24/07/29 15:05:05 WARN MemoryStore: Not enough space to cache rdd_60_162 in memory! (computed 384.0 B so far)
24/07/29 15:05

In [33]:
check_control_n_lines.merge(check_fixed_n_lines,on = "N_MAX_LINES",suffixes=("_CONTROL","_FIXED")).sort_values(by = ["N_MAX_LINES"]).T

Unnamed: 0,0,2,1
N_MAX_LINES,1.0,2.0,3.0
count_CONTROL,82556.0,65393.0,42339.0
PERCENTILE_0_CONFIDENCE_CONTROL,0.0,0.0,0.0
PERCENTILE_20_CONFIDENCE_CONTROL,0.380043,0.319018,0.342223
PERCENTILE_40_CONFIDENCE_CONTROL,0.443069,0.386165,0.397731
PERCENTILE_60_CONFIDENCE_CONTROL,0.489913,0.431891,0.443948
PERCENTILE_80_CONFIDENCE_CONTROL,0.550403,0.496866,0.51384
PERCENTILE_100_CONFIDENCE_CONTROL,1.0,1.0,1.0
count_FIXED,119664.0,106159.0,70095.0
PERCENTILE_0_CONFIDENCE_FIXED,0.0,0.0,0.0


24/07/29 17:04:20 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1575863 ms exceeds timeout 120000 ms
24/07/29 17:04:20 WARN SparkContext: Killing executors is not supported by current scheduler.
