# Define static variables

In [1]:
CHECKPOINT_DIR = "hdfs://namenode:8020/spark/checkpoint"

DAY_TYPE_WEEKDAY = 0
DAY_TYPE_WEEKEND = 1
DATA_ACTUAL_TIMEZONE = "America/Los_Angeles"

BOOTSTRAP_SERVER = "kafka:29092"
TOPIC = "buses-location"

POSTGRES_URL = "jdbc:postgresql://timescaledb:5432/lametro"
POSTGRES_TABLE_BUS_VELOCITY = "bus_velocity"
POSTGRES_TABLE_BUS_ARRIVAL = "bus_arrival"
POSTGRES_USERNAME = "postgres"
POSTGRES_PASSWORD = "password"

STATIC_DATA_DIR = "hdfs://namenode:8020/ola/static_data/"
HISTORICAL_DATA_DIR = "hdfs://namenode:8020/ola/historical_data/"
AGGREGATED_DATA_DIR = "hdfs://namenode:8020/ola/aggregated_data/"
TEMP_DIR = "hdfs://namenode:8020/temp"

LOCAL_STATIC_DATA_DIR = "/home/data/static_data/"
LOCAL_HISTORICAL_DATA_DIR = "/home/data/historical_data/"
LOCAL_AGGREGATED_DATA_DIR = "/home/data/aggregated_data/"
LOCAL_TEMP_DIR = "/home/data/temp"

# Init PySpark

In [2]:
import os
import sys
import re

from pyspark.sql import SparkSession, SQLContext
from pyspark import  SparkContext, SparkConf
from pyspark.streaming import StreamingContext

# Spark session & context
conf = SparkConf()
conf.setMaster("spark://0.0.0.0:7077").setAppName("stream-job")
conf.set("spark.cores.max", "4")
conf.set("spark.driver.extraClassPath", "/usr/local/spark/third-party-jars/*")
conf.set("spark.executor.extraClassPath", "/usr/local/spark/third-party-jars/*")
conf.set("spark.sql.caseSensitive", "true")
conf.set("spark.ui.port", "4040")
conf.set("spark.sql.streaming.checkpointLocation", CHECKPOINT_DIR)
conf.set("spark.streaming.backpressure.enabled", "true")
conf.set("spark.streaming.receiver.maxRate", "100")

sc = SparkContext(conf=conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 1)

In [3]:
%%html
<style>
div.output_area pre {
    white-space: pre;
}
.container { 
    width:95% !important; 
}
</style>

# Define utilities

In [4]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

from datetime import datetime
from pytz import timezone
import math

## Functions

In [5]:
# utils

# check if segment is not an empty segment (two vertex is the same)
def fis_not_the_same(from_latlon, to_latlon):
    return (from_latlon[0] != to_latlon[0]) | (from_latlon[1] != to_latlon[1])

def ffile_path_to_ts(file_path):
    return int(file_path[-18:-4])

def fget_day_type(epoch_seconds):
    dt = datetime.fromtimestamp(epoch_seconds)
    wd = dt.astimezone(timezone(DATA_ACTUAL_TIMEZONE)).weekday()
    return DAY_TYPE_WEEKDAY if wd < 5 else DAY_TYPE_WEEKEND

def fhaversine_meter(lat1, lon1, lat2, lon2):
    # distance between latitudes and longitudes
    dLat = (lat2 - lat1) * math.pi / 180.0
    dLon = (lon2 - lon1) * math.pi / 180.0

    # convert to radians
    lat1 = (lat1) * math.pi / 180.0
    lat2 = (lat2) * math.pi / 180.0
 
    # apply formulae
    a = (math.pow(math.sin(dLat / 2), 2) +
         math.pow(math.sin(dLon / 2), 2) *
             math.cos(lat1) * math.cos(lat2));
    rad = 6371
    c = 2 * math.asin(math.sqrt(a))
    return rad * c * 1000

def ffind_distance(lat1, lon1, flat1, flon1, slat1, slon1, cl1, lat2, lon2, flat2, flon2, slat2, slon2, cl2):
    if cl1 == cl2:
        return fhaversine_meter(lat1, lon1, lat2, lon2)
        
    if cl1 > cl2:
        return ffind_distance(lat2, lon2, flat2, flon2, slat2, slon2, cl2, lat1, lon1, flat1, flon1, slat1, slon1, cl1)

    return fhaversine_meter(lat1, lon1, slat1, slon1) + cl2 - cl1 - fhaversine_meter(flat2, flon2, slat2, slon2) + fhaversine_meter(flat2, flon2, lat2, lon2)

def fto_datetime(epoch_seconds):
    return datetime.fromtimestamp(epoch_seconds)

In [6]:
print("ffind_distance:", ffind_distance(33.82962, -118.290314, 33.8292539188, -118.2902737058, 33.8296908268, -118.2902719251, 23232.5956910088, 33.794449, -118.290871, 33.7932864432, -118.2908173976, 33.7945806653, -118.2908159585, 19326.593863383834), "m")

ffind_distance: 3913.794621512769 m


## Udf functions

In [7]:
#udf
is_not_the_same = udf(lambda a, b: ffis_not_the_same(a, b), BooleanType())

file_path_to_ts = udf(lambda a: ffile_path_to_ts(a), LongType())

get_day_type = udf(lambda a: fget_day_type(a), IntegerType())

haversine_meter = udf(lambda a, b, c, d: fhaversine_meter(a, b, c, d), DoubleType())

find_distance = udf(lambda a, b, c, d, e, f, g, h, p, q, r, s, t, u: ffind_distance(a, b, c, d, e, f, g, h, p, q, r, s, t, u), DoubleType())

alias_route_id = { 17:16, 48:10, 37:14, 38:35, 52:51, 79:78, 91:90, 240:150, 163:162, 181:180, 215:211, 245:244, 267:264, 243:242, 489:487, 656:237, 687:686, 950:910 }

replace_alias_route_id = udf(
    lambda rid: rid if rid not in alias_route_id else alias_route_id[rid],
    IntegerType()
)

to_datetime = udf(lambda epoch_seconds: fto_datetime(epoch_seconds), TimestampType())

# Load static data

In [8]:
combine_maps = udf(
    lambda maps: {key:f[key] for f in maps for key in f},
    MapType(
        IntegerType(),
        StructType([
            StructField("segment_id", IntegerType(), True),
            StructField("segment_sequence", IntegerType(), True),
            StructField("segment_first_lat", DoubleType(), True),
            StructField("segment_first_lon", DoubleType(), True),
            StructField("segment_second_lat", DoubleType(), True),
            StructField("segment_second_lon", DoubleType(), True),
            StructField("segment_len_meter", DoubleType(), True),
            StructField("segment_cum_len", DoubleType(), True)
        ])
    )
)

combine_deep_maps = udf(
    lambda maps: {key:f[key] for f in maps for key in f},
    MapType(
        IntegerType(),
        MapType(
            IntegerType(),
            StructType([
                StructField("segment_id", IntegerType(), True),
                StructField("segment_sequence", IntegerType(), True),
                StructField("segment_first_lat", DoubleType(), True),
                StructField("segment_first_lon", DoubleType(), True),
                StructField("segment_second_lat", DoubleType(), True),
                StructField("segment_second_lon", DoubleType(), True),
                StructField("segment_len_meter", DoubleType(), True),
                StructField("segment_cum_len", DoubleType(), True)
            ])
        )
    )
)

routes_segments = spark\
    .read\
    .json(STATIC_DATA_DIR + "route_segments.json")\
    .withColumn("route_id", col("route_id").cast("int"))\
    .withColumn("segment_id", col("segment_id").cast("int"))\
    .withColumn("segment_sequence", col("segment_sequence").cast("int"))\
    .dropna()\
    .select(
        replace_alias_route_id("route_id").alias("route_id"),
        create_map("segment_sequence", struct(
            "segment_id", "segment_sequence", "segment_first_lat",
            "segment_first_lon", "segment_second_lat", "segment_second_lon",
            "segment_len_meter", "segment_cum_len"
        )).alias("segment_info")
    )\
    .groupBy("route_id")\
    .agg(combine_maps(collect_list("segment_info")).alias("segments"))\
    .agg(combine_deep_maps(collect_list(create_map("route_id", "segments"))).alias("map"))\

routes_segments.printSchema()

broadcast_routes_segments = sc.broadcast(routes_segments.collect()[0][0])
# broadcast_routes_segments.value

root
 |-- map: map (nullable = true)
 |    |-- key: integer
 |    |-- value: map (valueContainsNull = true)
 |    |    |-- key: integer
 |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |-- segment_id: integer (nullable = true)
 |    |    |    |-- segment_sequence: integer (nullable = true)
 |    |    |    |-- segment_first_lat: double (nullable = true)
 |    |    |    |-- segment_first_lon: double (nullable = true)
 |    |    |    |-- segment_second_lat: double (nullable = true)
 |    |    |    |-- segment_second_lon: double (nullable = true)
 |    |    |    |-- segment_len_meter: double (nullable = true)
 |    |    |    |-- segment_cum_len: double (nullable = true)



In [9]:
combine_maps = udf(
    lambda maps: {key:f[key] for f in maps for key in f},
    MapType(
        IntegerType(),
        ArrayType(
            StructType([
                StructField("stop_id", IntegerType(), True),
                StructField("stop_lat", DoubleType(), True),
                StructField("stop_lon", DoubleType(), True),
                StructField("stop_sequence", IntegerType(), True),
                StructField("segment_sequence", IntegerType(), True)
            ])
        )
    )
)

route_stops = spark\
    .read\
    .json(STATIC_DATA_DIR + "route_stops.json")\
    .withColumn("route_id", col("route_id").cast("int"))\
    .withColumn("stop_id", col("stop_id").cast("int"))\
    .dropna()\
    .select(
        replace_alias_route_id("route_id").alias("route_id"),
        struct(
            "stop_id", "stop_lat", "stop_lon",
            "stop_sequence", "segment_sequence"
        ).alias("stop_info")
    )\
    .groupBy("route_id")\
    .agg(collect_list("stop_info").alias("stops"))\
    .agg(combine_maps(collect_list(create_map("route_id", "stops"))).alias("map"))\

route_stops.printSchema()

broadcast_route_stops = sc.broadcast(route_stops.collect()[0][0])
# broadcast_route_stops.value

root
 |-- map: map (nullable = true)
 |    |-- key: integer
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- stop_id: integer (nullable = true)
 |    |    |    |-- stop_lat: double (nullable = true)
 |    |    |    |-- stop_lon: double (nullable = true)
 |    |    |    |-- stop_sequence: integer (nullable = true)
 |    |    |    |-- segment_sequence: integer (nullable = true)



# Load aggregated data

In [10]:
combine_maps = udf(
    lambda maps: {key:f[key] for f in maps for key in f},
    MapType(
        StringType(),
        StructType([
            StructField("velocity", DoubleType()),
            StructField("velocity_sign", IntegerType())
        ])
    )
)

bus_velocities = spark\
    .read\
    .json(AGGREGATED_DATA_DIR + "bus_velocities.json")\
    .withColumn("route_id", replace_alias_route_id("route_id"))\
    .select(combine_maps(collect_list(create_map(concat_ws("_", "route_id", "direction", "day_type"), struct("velocity", "velocity_sign")))).alias("map"))\

bus_velocities.printSchema()

broadcast_bus_velocities = sc.broadcast(bus_velocities.collect()[0][0])
# broadcast_bus_velocities.value

root
 |-- map: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- velocity: double (nullable = true)
 |    |    |-- velocity_sign: integer (nullable = true)



# Consume data from Kafka & process

In [11]:
# Define schema
schema = StructType([
    StructField("route_id", StringType(), True),
    StructField("id", StringType(), True),
    StructField("run_id", StringType(), True),
    StructField("predictable", BooleanType(), True),
    StructField("seconds_since_report", LongType(), True),
    StructField("heading", DoubleType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True)
])

# Subscribe to 1 topic defaults to the earliest and latest offsets
bus_positions = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVER) \
    .option("subscribe", TOPIC) \
    .load()\

# Cast select bus position fields & timestamp
bus_positions = bus_positions\
    .withColumn('value', col('value').cast("string"))\
    .withColumn('json_value', from_json("value", schema))\
    .select(col("timestamp").cast("long"), "json_value.*")

bus_positions.printSchema()

root
 |-- timestamp: long (nullable = true)
 |-- route_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- run_id: string (nullable = true)
 |-- predictable: boolean (nullable = true)
 |-- seconds_since_report: long (nullable = true)
 |-- heading: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



### Process data

In [12]:
# Transform fields
bus_positions_1 = bus_positions\
    .select(
        col("id").cast("int").alias("bus_id"), 
        col("route_id").cast("int"), 
        substring("run_id", -1, 1).cast("int").alias("direction"),
        "latitude",
        "longitude",
        col("timestamp").alias("req_time"),
        (col("timestamp") - col("seconds_since_report")).alias("timestamp")
    )\
    .withColumn("day_type", get_day_type("timestamp"))\
    .dropna("any")\

# Fix route id unconsistent
bus_positions_1 = bus_positions_1\
    .withColumn("fixed_route_id", replace_alias_route_id("route_id"))\

bus_positions_1.printSchema()

root
 |-- bus_id: integer (nullable = true)
 |-- route_id: integer (nullable = true)
 |-- direction: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- req_time: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- day_type: integer (nullable = true)
 |-- fixed_route_id: integer (nullable = true)



In [13]:
def fget_segment_info(route_id, lat, lon):
    if route_id not in broadcast_routes_segments.value:
        return None
    
    segment_dict = broadcast_routes_segments.value[route_id]
    rs_segment_info = -1
    rs_dis = math.inf
    
    for segment_info in segment_dict.values():
        dis = fhaversine_meter(lat, lon, segment_info["segment_first_lat"], segment_info["segment_first_lon"]) \
            + fhaversine_meter(lat, lon, segment_info["segment_second_lat"], segment_info["segment_second_lon"]) \
            - segment_info["segment_len_meter"]
        
        if dis < rs_dis:
            rs_dis = dis
            rs_segment_info = segment_info
            
    return rs_segment_info

get_segment_info = udf(
    lambda rid, lat, lon: fget_segment_info(rid, lat, lon), 
    StructType([
        StructField("segment_id", IntegerType(), True),
        StructField("segment_sequence", IntegerType(), True),
        StructField("segment_first_lat", DoubleType(), True),
        StructField("segment_first_lon", DoubleType(), True),
        StructField("segment_second_lat", DoubleType(), True),
        StructField("segment_second_lon", DoubleType(), True),
        StructField("segment_len_meter", DoubleType(), True),
        StructField("segment_cum_len", DoubleType(), True)
    ])
)

In [14]:
fget_segment_info(256, 34.151728, -118.113001)

Row(segment_id=49, segment_sequence=980002, segment_first_lat=34.1507053912, segment_first_lon=-118.1131582513, segment_second_lat=34.1517276036, segment_second_lon=-118.1131536861, segment_len_meter=113.66560913369601, segment_cum_len=30318.171651227352)

In [15]:
def fget_velocity(route_id, direction, day_type):
    key = f'{route_id}_{direction}_{day_type}'
    if key in broadcast_bus_velocities.value:
        return broadcast_bus_velocities.value[key]
    else:
        return next(iter(broadcast_bus_velocities.value.values()))

get_velocity = udf(
    lambda ri, dr, dt: fget_velocity(ri, dr, dt),
    StructType([
        StructField("velocity", DoubleType()),
        StructField("velocity_sign", IntegerType())
    ])
)

In [16]:
fget_velocity(256, 1, 0)

Row(velocity=5.461573497132977, velocity_sign=1)

In [17]:
def fget_next_stops(route_id, bus_lat, bus_lon, velocity_sign, bus_segment_info):
    stops = broadcast_route_stops.value[route_id]
    bus_seg_seq = bus_segment_info["segment_sequence"]
    
    result = []
    for stop_info in stops:
        if stop_info["segment_sequence"] * velocity_sign >  bus_seg_seq * velocity_sign:
            result.append(stop_info)
            continue
        
        if stop_info["segment_sequence"] == bus_seg_seq:
            seg_flat = bus_segment_info["segment_first_lat"]
            seg_flon = bus_segment_info["segment_first_lon"]
            stp_lat = stop_info["stop_lat"]
            stp_lon = stop_info["stop_lon"]
            
            if fhaversine_meter(stp_lat, stp_lon, seg_flat, seg_flon) * velocity_sign >= fhaversine_meter(bus_lat, bus_lon, seg_flat, seg_flon) * velocity_sign:
                result.append(stop_info)
    return result

get_next_stops = udf(
    lambda rid, blat, blon, vsign, bseginfo: fget_next_stops(rid, blat, blon, vsign, bseginfo),
    ArrayType(StructType([
        StructField("stop_id", IntegerType(), True),
        StructField("stop_lat", DoubleType(), True),
        StructField("stop_lon", DoubleType(), True),
        StructField("stop_sequence", IntegerType(), True),
        StructField("segment_sequence", IntegerType(), True)
    ]))
)

In [18]:
fget_next_stops(256, 34.151728, -118.113001, 1, fget_segment_info(256, 34.151728, -118.113001))

[Row(stop_id=10274, stop_lat=34.16914, stop_lon=-118.12481, stop_sequence=108, segment_sequence=1080001),
 Row(stop_id=10706, stop_lat=34.166921, stop_lon=-118.121392, stop_sequence=106, segment_sequence=1060001),
 Row(stop_id=2800, stop_lat=34.169146, stop_lon=-118.132093, stop_sequence=111, segment_sequence=1100003),
 Row(stop_id=10662, stop_lat=34.158095, stop_lon=-118.121346, stop_sequence=102, segment_sequence=1020001),
 Row(stop_id=3677, stop_lat=34.180119, stop_lon=-118.131598, stop_sequence=115, segment_sequence=1140003),
 Row(stop_id=10674, stop_lat=34.159867, stop_lon=-118.121345, stop_sequence=103, segment_sequence=1020003),
 Row(stop_id=10273, stop_lat=34.169091, stop_lon=-118.129944, stop_sequence=110, segment_sequence=1100001),
 Row(stop_id=3675, stop_lat=34.177308, stop_lon=-118.131719, stop_sequence=114, segment_sequence=1140001),
 Row(stop_id=3345, stop_lat=34.156215, stop_lon=-118.113014, stop_sequence=101, segment_sequence=1010001),
 Row(stop_id=10660, stop_lat=34.15

In [19]:
def fcalc_seconds_till_meet(route_id, bus_lat, bus_lon, bus_velocity, bus_segment_info, next_stop_info):
    print(bus_segment_info)
    
    lat1 = bus_lat
    lon1 = bus_lon
    flat1 = bus_segment_info["segment_first_lat"]
    flon1 = bus_segment_info["segment_first_lon"]
    slat1 = bus_segment_info["segment_second_lat"]
    slon1 = bus_segment_info["segment_second_lon"]
    cl1 = bus_segment_info["segment_cum_len"]
    
    stop_segment_info = broadcast_routes_segments.value[route_id][next_stop_info["segment_sequence"]]
    lat2 = next_stop_info["stop_lat"]
    lon2 = next_stop_info["stop_lon"]
    flat2 = stop_segment_info["segment_first_lat"]
    flon2 = stop_segment_info["segment_first_lon"]
    slat2 = stop_segment_info["segment_second_lat"]
    slon2 = stop_segment_info["segment_second_lon"]
    cl2 = stop_segment_info["segment_cum_len"]
    
    dis = ffind_distance(lat1, lon1, flat1, flon1, slat1, slon1, cl1, lat2, lon2, flat2, flon2, slat2, slon2, cl2)
    return math.floor(dis/bus_velocity)

calc_seconds_till_meet = udf(
    lambda rid, blat, blon, bvel, bseginfo, stpinfo: fcalc_seconds_till_meet(rid, blat, blon, bvel, bseginfo, stpinfo),
    LongType()
)

### Write historical data

In [20]:
# Write data to HDFS for later batch processing
historical_data_writer = bus_positions\
    .writeStream\
    .format("json")\
    .option("path", HISTORICAL_DATA_DIR)\
    .start()

In [21]:
# historical_data_writer.stop()

### Write to TimescaleDB

In [38]:
# def processBatch(batch, batch_id):
#     batch_df =  batch\
#         .withColumn("velocity", get_velocity("fixed_route_id", "direction", "day_type"))\
#         .withColumn("timestamp", to_datetime("timestamp"))\
#         .withColumn("req_time", to_datetime("req_time"))\
#         .withColumn("segment_info", get_segment_info("fixed_route_id", "latitude", "longitude"))\
#         .dropna("any")\
#         .withColumn("next_stop_info", explode(get_next_stops("fixed_route_id", "latitude", "longitude", "velocity.velocity_sign", "segment_info")))\
#         .withColumn("seconds_till_meet", calc_seconds_till_meet("fixed_route_id", "latitude", "longitude", "velocity.velocity", "segment_info", "next_stop_info"))\
#         .select(
#             "timestamp",
#             "req_time",
#             "route_id",
#             "bus_id",
#             "latitude",
#             "longitude",
#             "direction",
#             col("velocity.velocity").alias("velocity"),
#             col("next_stop_info.stop_id").alias("stop_id"),
#             "seconds_till_meet"
#         )
    
#     batch_df \
#         .write \
#         .format("jdbc") \
#         .option("url", POSTGRES_URL) \
#         .option("dbtable", POSTGRES_TABLE_BUS_ARRIVAL) \
#         .option("user", POSTGRES_USERNAME) \
#         .option("password", POSTGRES_PASSWORD) \
#         .mode("append") \
#         .save()
    
# aggregated_data_writer = bus_positions_1\
#     .writeStream\
#     .foreachBatch(processBatch)\
#     .start()

In [37]:
def writeJDBC(batch, batch_id):
    batch \
        .write \
        .format("jdbc") \
        .option("url", POSTGRES_URL) \
        .option("dbtable", POSTGRES_TABLE_BUS_ARRIVAL) \
        .option("user", POSTGRES_USERNAME) \
        .option("password", POSTGRES_PASSWORD) \
        .mode("append") \
        .save()


aggregated_data_writer = bus_positions_1\
    .withColumn("velocity", get_velocity("fixed_route_id", "direction", "day_type"))\
    .withColumn("timestamp", to_datetime("timestamp"))\
    .withColumn("req_time", to_datetime("req_time"))\
    .withColumn("segment_info", get_segment_info("fixed_route_id", "latitude", "longitude"))\
    .dropna("any")\
    .withColumn("next_stop_info", explode(get_next_stops("fixed_route_id", "latitude", "longitude", "velocity.velocity_sign", "segment_info")))\
    .withColumn("seconds_till_meet", calc_seconds_till_meet("fixed_route_id", "latitude", "longitude", "velocity.velocity", "segment_info", "next_stop_info"))\
    .select(
        "timestamp",
        "req_time",
        "route_id",
        "bus_id",
        "latitude",
        "longitude",
        "direction",
        col("velocity.velocity").alias("velocity"),
        col("next_stop_info.stop_id").alias("stop_id"),
        "seconds_till_meet"
    )\
    .writeStream\
    .foreachBatch(writeJDBC)\
    .start()

In [35]:
aggregated_data_writer.stop()