In [3]:
from geojson import LineString
import pandas as pd
import turfpy.measurement as turf_measurement
from turfpy.measurement import along
from typing import List, Tuple
from sqlalchemy import create_engine
import psycopg2

# Setting up database


In [4]:
multiplier = 1

# database account
db_username = "postgres"
db_password = "postgres"
db_host = "localhost"
db_port = "5432"
db_name = "bus_trial"


# Interpolation code! 

In [4]:
def storeData(coordinates, status, route, startPoint) :

    # Create a database connection string
    connection = psycopg2.connect(user=db_username,
                                    password=db_password,
                                    host=db_host,
                                    port=db_port,
                                    database=db_name)

    # Create a cursor to perform database operations
    cursor = connection.cursor()

    nested_coordinate_list = [[float(item) for item in sublist] for sublist in coordinates]
    status_list = [int(item) for item in status]

    cursor.execute(
    "INSERT INTO points (int_points, bus_status, bus_number, start_time) VALUES (%s, %s, %s, %s)",
    (nested_coordinate_list, status_list, route, startPoint)
    )
    connection.commit()
    # Close the connection
    cursor.close()
    connection.close()

def unique_route_steps(timestamps) :
    individual_steps = []
    for i in range(len(timestamps) - 1):
        difference = timestamps[i + 1] - timestamps[i]
        multiplied_value = round(difference * multiplier)
        individual_steps.append(multiplied_value)
    return individual_steps

def is_equal(coord1, coord2) :
    return coord1[0] == coord2[0] and coord1[1] == coord2[1]

def getShapeCoords(shape_id) :
    # Create a database connection string
    connection = psycopg2.connect(user=db_username,
                                    password=db_password,
                                    host=db_host,
                                    port=db_port,
                                    database=db_name)

    # Create a cursor to perform database operations
    cursor = connection.cursor()
    #Get the shape data in the database
    query = "SELECT * FROM shapes WHERE shape_id = %s ORDER BY shape_pt_sequence"
    cursor.execute(query, (shape_id,))
    # Get all the results
    results = cursor.fetchall()
    # Close the connection
    cursor.close()
    connection.close()
    # Create a pandas dataframe
    shape_df = pd.DataFrame(results, columns = ['id', 'shape_id', 'lat', 'lon', 'shape_pt_sequence'])
    # Combine the coordinates together 
    shape_df['coordinates'] = shape_df.apply(lambda row: [row['lon'], row['lat']], axis=1)
    # Get all the coordinates into one list 
    shape_coordinates = shape_df['coordinates'].tolist()
    return(shape_coordinates)
    

def interpolation(subsetDataset) :
    subsetDataset = subsetDataset.reset_index(drop=True)
    unique_coordinates = subsetDataset["coordinates"]
    shape_id = str(subsetDataset["shape_id"].unique()[0])
    unique_timestamps = subsetDataset["timestamps"]
    unique_status = subsetDataset["status"]
    
    # Get the bus route coordinates
    unique_route = getShapeCoords(shape_id)

    # Get the number of frames we want to allocate between each observatoin 
    unique_steps = unique_route_steps(unique_timestamps)
    
    interpolated_points = []
    status_conditions = [] 
    # First is used to see if we are looking at the first observation 
    first = True

    for i in range(len(unique_timestamps) - 1):
        # Check if the bus status has changed. If it has we need to change the colour of the bus 
        changed_bus = False
        first_status = unique_status[i]
    
        if unique_status[i] != unique_status[i + 1]:
            second_status = unique_status[i + 1]
            changed_bus = True
            status_difference = abs(second_status - first_status)

        # subset coordinates are the coordinates we are interested in 
        subset_coordinates = []
        start_point = unique_coordinates[i]
        end_point = unique_coordinates[i + 1]

        # J is a counter 
        j = 0

        # Want to get all the points in between the current value and the next
        while j < len(unique_route):
            # Get the current value 
            current_coordinates = unique_route[j]

            # If we find the value that we want (which is the current node)
            if is_equal(current_coordinates, start_point):
                # Put it into the coordinate system
                subset_coordinates.append(current_coordinates)
                # Collect all the points in between 
                while not is_equal(current_coordinates, end_point):
                    j += 1
                    current_coordinates = unique_route[j]
                    subset_coordinates.append(current_coordinates)
                break
            j += 1

        # Create a linestring object
        line = LineString(subset_coordinates)

        # Get the distance along the line 
        line_distance = turf_measurement.length(line)

        if not changed_bus:
            z = 0
            while z < line_distance:
                interpolated_point = along(line, z)
                interpolated_points.append(tuple(interpolated_point['geometry']['coordinates']))
                status_conditions.append(first_status)
                z += line_distance / unique_steps[i]
        else:
            z = 0
            while z < line_distance:
                interpolated_point = along(line, z)
                interpolated_points.append(tuple(interpolated_point['geometry']['coordinates']))

                interpolation_index = z / line_distance

                if status_difference == 1:
                    if interpolation_index < 0.5:
                        status = first_status
                    else:
                        status = second_status
                elif status_difference == 2:
                    if interpolation_index < 1/3:
                        status = first_status
                    elif 1/3 <= interpolation_index < 2/3:
                        if first_status < second_status:
                            status = first_status + 1
                        else:
                            status = first_status - 1
                    else:
                        status = second_status

                status_conditions.append(status)
                z += line_distance / unique_steps[i]
    
    # With all the data that we have, we need to input them all into the sql database
    startPoint = subsetDataset["timestamps"].min().item()
    route_name = subsetDataset['route_short_name'].unique()[0]

    # Function store data gets the dataset and stores it
    storeData(interpolated_points, status_conditions, route_name, startPoint)

In [5]:
trial_data = pd.read_csv("complete_data.csv")
trial_data['coordinates'] = trial_data.apply(lambda row: [row['stop_lon'], row['stop_lat']], axis=1)

In [None]:
# Group the data by 'trip_id'
grouped = trial_data.groupby('trip_id')

# Get the trip_ids
subset_groups = [grouped.get_group(x) for x in list(grouped.groups.keys())]

# Attempting Parallelisation. Though most didn't work :(


In [9]:
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
    executor.map(interpolation, subset_groups)

KeyboardInterrupt: 

In [11]:
# Group by 'trip_id' and get the first N groups
n_groups = 10  # number of groups you want
grouped = trial_data.groupby('trip_id')
subset = grouped.head(1).iloc[:n_groups]
# Get the indices of the groups and create a subset of the dataframe
indices = subset.index
subset_df = trial_data.loc[trial_data.index.isin(indices)]

In [167]:
import os
from multiprocessing import Pool

def parallelize_dataframe(df, func, n_cores=4):
    pool = Pool(n_cores)
    pool.map(func, [group for name, group in df.groupby('trip_id')])
    pool.close()
    pool.join()

def interpolation_wrapper(group):
    try:
        group = group.reset_index(drop=True)
        return interpolation(group)
    except Exception as e:
        print(f"Error processing group: {e}")

n_cores = os.cpu_count()
parallelize_dataframe(subset_df, interpolation_wrapper, n_cores)

KeyboardInterrupt: 

In [23]:
from multiprocessing import Pool, cpu_count

def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        p.map(func, [group for name, group in dfGrouped])

applyParallel(trial_data.groupby('trip_id'), interpolation)


KeyboardInterrupt: 

In [24]:
def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        p.map(func, [group.reset_index(drop=True) for name, group in dfGrouped])
applyParallel(trial_data.groupby('trip_id'), interpolation)

KeyboardInterrupt: 