In [1]:
# Import necessary packages
from hdfs3 import HDFileSystem
import pandas as pd
import math

#import time
#import datetime
#import heapq

# Import graph_functions.py
from graph_helpers import *
from a_star_helpers import *

## Building the Graph

In [2]:
hdfs = HDFileSystem(host='hdfs://iccluster044.iccluster.epfl.ch', port=8020, user='ebouille')

# Load the timetable dataframe
files = hdfs.glob('/user/fristedt/timetable.parquet/*.parquet')
timetable_df = pd.DataFrame()
for file in files:
    with hdfs.open(file) as f:
        timetable_df = timetable_df.append(pd.read_parquet(f))

# Load the stops_filt dataframe
files = hdfs.glob('/user/fristedt/stops_filt.parquet/*.parquet')
stops_df = pd.DataFrame()
for file in files:
    with hdfs.open(file) as f:
        stops_df = stops_df.append(pd.read_parquet(f))

In [3]:
network = Network(7)

In [4]:
iterable = timetable_df.iterrows()

for index, row in iterable:
    trip_id = row['trip_id']
    stop_ids = row['stop_ids']
    seq = row['stop_sequence']
    arr = row['arrival_times']
    dept = row['departure_times']
    lats = row['lats']
    lons = row['longs']
    if not all_same_length(len(stop_ids), len(seq), len(arr), len(dept)):
        continue
    
    if not isFullTrip(seq):
        continue
    
    for i in range(len(stop_ids) - 1):
        if not network.isIn(stop_ids[i]):
            network.addStop(stop_ids[i], lats[i], lons[i])
        if not network.isIn(stop_ids[i + 1]):
            network.addStop(stop_ids[i + 1], lats[i + 1], lons[i + 1])
        if int(seq[i + 1]) - int(seq[i]) is not 1:
            continue
        network.addEdge(stop_ids[i + 1], stop_ids[i], (dept[i], arr[i + 1]), trip_id)

In [5]:
allStops = network.getAllStops()

loc = [None for x in allStops]

for i in range(len(allStops)):
    lat, lon = network.getStop(list(allStops)[i]).getLoc()
    loc[i] = (lat, lon)

In [6]:
count = 0
d = []
for i in range(len(loc)):
    lat1, lon1 = loc[i]
    for j in range(i + 1, len(loc)):
        lat2, lon2 = loc[j]
        dist = calc_dist(lat1, lat2, lon1, lon2)
        if dist <= 500 and list(allStops)[i] is not list(allStops)[j]:
            count += 1
            d.append(dist)
            walkTime = math.ceil(float(dist)/50.0)
            network.addWalkEdge(list(allStops)[i], list(allStops)[j], walkTime)
            network.addWalkEdge(list(allStops)[j], list(allStops)[i], walkTime)

## Modified A*

In [7]:
stops_iterable = stops_df.iterrows()

children_station = {}

for index, row in stops_iterable:
    parent = row['parent_station']
    if parent == '':
        continue
    elif parent not in children_station.keys():
        children_station[parent] = [row['stop_id']]
    else:
        children_station[parent].append(row['stop_id'])

In [8]:
import copy

nn = copy.deepcopy(network)

In [9]:
def astar(network, depart, dest, curr):
    df = spark.createDataFrame([curr], "string").toDF('time')
    df = df.withColumn('unix', unix_timestamp(df.time, "HH:mm:ss"))

    pd_df = df.toPandas()
    time_iterable = pd_df.iterrows()

    row0 = next(time_iterable)[1]
    query_time = row0['unix']
    
    unvisited = []
    
    visited = set()
    
    path = []
    
    sink = Node(curr = dest, 
                prev = None, 
                time = query_time, 
                elapsed = 0, 
                trip = None, 
                visited = visited,
                parent = None, 
                arrival = None, 
                walk = True)
    
    e = 0
    
    unvisited.append((e, curr, sink))
    
    heapq.heapify(unvisited)
    
    p = []
    
    
    while len(unvisited) > 0:
        nxNode = heapq.heappop(unvisited)

        n = nxNode[2]
        t = nxNode[0]

        stop_id = n.curr

        if stop_id not in visited:
            if stop_id == depart:
                p = reconstructPath(n)
                return p
            else:
                visited.add(stop_id)
                children = find_children(network, n, query_time)
                for child in children:
                    #print(child.time)
                    ct = child.elapsed
                    #ct += est[child.curr]
                    heapq.heappush(unvisited, (ct, child.time, child))
    return None


def timeToTimestamp(s):
    df = spark.createDataFrame([s], "string").toDF('time')
    df = df.withColumn('unix', unix_timestamp(df.time, "HH:mm:ss"))

    pd_df = df.toPandas()
    time_iterable = pd_df.iterrows()

    row0 = next(time_iterable)[1]
    query_time = row0['unix']
    
    return query_time


def timestampToTime(s):
    df = spark.createDataFrame([s], "float").toDF('time')
    df = df.withColumn('unix', from_unixtime(df.time, "HH:mm:ss"))

    pd_df = df.toPandas()
    time_iterable = pd_df.iterrows()

    row0 = next(time_iterable)[1]
    query_time = row0['unix']
    
    return query_time

In [10]:
def shortestPath(network, depart, dest, arrival_time, multiple):
    if depart not in children_station.keys():
        return astar(network, depart, dest, arrival_time)
    depart_children = children_station[depart]
    if depart_children is None or len(depart_children) is 0:
        return astar(network, depart, dest, arrival_time)
    paths = list(map(lambda x: astar(network, x, dest, arrival_time), depart_children))
    paths_filtered = list(filter(lambda x: x is not None, paths))
    paths_filtered.sort(key = lambda x: x[0]['time'])
    
    if multiple:
        return paths_filtered
    else:
        return paths_filtered[0]
    


def ksp_yen(depart, dest, curr, max_k=2):
    path = shortestPath(network, depart, dest, curr, False)

    #A = [{'cost': distances[node_end], 
          #'path': path(previous, node_start, node_end)}]
    A = [path]
    B = []
    #print(A)
    #if not A[0]['path']: return A
    if not A[0]: return A

    for k in range(1, max_k):
        for i in range(len(A[-1]) - 1):
            node_spur = A[-1][i]
            path_root = A[-1][:i+1]

            edges_removed = []
            for path_k in A:
                curr_path = path_k
                if len(curr_path) > i and samePath(path_root, curr_path[:i+1]) and not curr_path[i]['walk']:
                    depart = curr_path[i]['time']
                    arrival = curr_path[i + 1]['arrival']
                    trip = curr_path[i]['trip']
                    s = curr_path[i]['stop']
                    t = curr_path[i + 1]['stop']
                    #network.removeEdge(t, s, (depart, arrival), trip)
                    times = network.getAllTime(t, s)
                    network.removeConnection(t, s)
                    edges_removed.append((t, s, times))
                    
            path_spur = shortestPath(network, node_spur['stop'], dest, curr, False)
            

            if path_spur:
                path_total = path_root[:-1] + path_spur

                if notIn(path_total, B) and notIn(path_total, A):
                    B.append(path_total)

            for edge in edges_removed:
                network.setTime(edge[0], edge[1], edge[2])

        if len(B):
            B = sorted(B, key=lambda tup: tup[0]['time'])
            A.append(B[0])
            B.pop(0)
        else:
            break

    return A

In [11]:
def route_probability(routes):

    import datetime
    route = []
    stop = []
    route_counter = 0
    
    routes_2 = routes.copy()
    
    for route in routes_2:
        for stop in route:
            if stop['walk']:
                stop['certainty'] = 1
            elif stop['arrival'] is None:
                stop['certainty'] = 1
            else:
                stop['max_delay'] = (float(stop['time']) - float(stop['arrival']))/60
                if stop['max_delay'] == 0.0:
                    stop['certainty'] = 1
                else:
                    time = datetime.datetime.fromtimestamp(int(stop['arrival'])).strftime('%H:%M:%S')
                    time = datetime.datetime.strptime(time, '%H:%M:%S').hour
                    distribution = distribution_df.where((distribution_df.stop_id == stop['stop'].decode("utf-8")) & (distribution_df.arrival_hour == str(time)))
                    distribution = distribution.toPandas().fillna(0)
                    distribution['values_count'] = distribution['values_count'].astype(int)
                    distribution = distribution.nlargest(1, columns=['values_count'])
                    mean, std = distribution['mean_minute_delay'].tolist(), distribution['std_minute_delay'].tolist()
                    mean, std = [str(integer) for integer in mean], [str(integer) for integer in std]
                    if not mean and not std:
                        stop['certainty'] = 1
                    else:
                        mean, std = float("".join(mean)), float("".join(std))
                        stop['certainty'] = calc_proba(stop['max_delay'], mean, std)
                        
        route_number = {'route_number': route_counter}
        route.append(route_number.copy())
        route_counter += 1           
                                                
    for i in range(len(routes_2)):
        certainties = [d['certainty'] for d in routes_2[i][:-1]]
        route_probability = np.prod(certainties)
        route_probability = {'probability': route_probability}
        routes_2[i].append(route_probability.copy())

    
    return routes_2

In [12]:
routes = ksp_yen('8503000P', '8591049','12:30:00', max_k = 30)

NameError: name 'spark' is not defined

In [85]:
rroutes = route_probability(routes)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…