In [None]:
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import geopandas as gpd
import pandas as pd
import numpy as np
import math
import random
import os
from datetime import datetime
import time
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import LongType
from pyspark.sql.types import IntegerType
from shapely.geometry import Point
from shapely.geometry import Polygon
from sedona.register import SedonaRegistrator
from sedona.core.SpatialRDD import SpatialRDD
from sedona.core.SpatialRDD import PointRDD
from sedona.core.SpatialRDD import PolygonRDD
from sedona.core.SpatialRDD import LineStringRDD
from sedona.core.enums import FileDataSplitter
from sedona.utils.adapter import Adapter
from sedona.core.spatialOperator import KNNQuery
from sedona.core.spatialOperator import JoinQuery
from sedona.core.spatialOperator import JoinQueryRaw
from sedona.core.spatialOperator import RangeQuery
from sedona.core.spatialOperator import RangeQueryRaw
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.core.formatMapper import WkbReader
from sedona.core.formatMapper import WktReader
from sedona.core.formatMapper import GeoJsonReader
from sedona.sql.types import GeometryType
from sedona.core.SpatialRDD import RectangleRDD
from sedona.core.geom.envelope import Envelope
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.core.enums import GridType
from sedona.core.enums import IndexType
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import lit

In [None]:
spark = SparkSession.\
    builder.\
    master("local[*]").\
    appName("Sedona App").\
    config("spark.serializer", KryoSerializer.getName).\
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName) .\
    config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-2.4_2.11:1.0.0-incubating,org.datasyslab:geotools-wrapper:geotools-24.0") .\
    getOrCreate()

In [None]:
SedonaRegistrator.registerAll(spark)

In [None]:
sc = spark.sparkContext
sc.setSystemProperty("sedona.global.charset", "utf8")

## Define Constants

In [None]:
BINARY_WEIGHT = "weight_type_0"
POINT_DISTANCE = 'weight_type_1'
CENTRAL_DISTANCE = 'weight_type_2'
EXPONENTIAL_DISTANCE = 'weight_type_3'
MINIMUM_DISTANCE = 'weight_type_4'
COMMON_BORDER_RATIO = 'weight_type_5'

In [None]:
GEO_FILE_TYPE_SHAPE = "geo_file_type_shape"
GEO_FILE_TYPE_GEO_JSON = "geo_file_type_geo_json"
GEO_FILE_TYPE_WKB = "geo_file_type_wkb"
GEO_FILE_TYPE_WKT = "geo_file_type_wkt"

## Define Necessary Utility Methods

In [None]:
# method to load all different types of spatial data
def load_geo_data(path_to_dataset, geo_file_type):
    if geo_file_type == GEO_FILE_TYPE_SHAPE:
        return ShapefileReader.readToGeometryRDD(sc, path_to_dataset)
    elif geo_file_type == GEO_FILE_TYPE_WKB:
        return WkbReader.readToGeometryRDD(sc, path_to_dataset, 0, True, False)
    elif geo_file_type == GEO_FILE_TYPE_WKT:
        return WktReader.readToGeometryRDD(sc, path_to_dataset, 0, True, False)
    elif geo_file_type == GEO_FILE_TYPE_GEO_JSON:
        return GeoJsonReader.readToGeometryRDD(sc, path_to_dataset)
    else:
        print("Given geo file type is not supported")
        return

In [None]:
# method to get adjacency list between some polygons.
# Polygons are in spatial rdd format.
def get_adjacency_from_polygons_rdd(geoRdd, weight = BINARY_WEIGHT):
    geoRdd.analyze()
    
    if weight == BINARY_WEIGHT:
        # perform spatial join to check which polygons touch which other polygons
        consider_boundary_intersection = True
        using_index = False
        geoRdd.spatialPartitioning(GridType.KDBTREE)
        result = JoinQuery.SpatialJoinQuery(geoRdd, geoRdd, using_index, consider_boundary_intersection).collect()
        dim = len(result)
        adj_mat = [[0]*dim for i in range(dim)]
        
        for row in result:
            i = int(row[0].getUserData())
            for col in row[1]:
                j = int(col.getUserData())
                if i != j:
                    adj_mat[i][j] = 1
        return adj_mat
    
    # spatial join is not required for other types of adjacency as we need to calculate distances
    elif weight == CENTRAL_DISTANCE or weight == EXPONENTIAL_DISTANCE:
        geo_list = geoRdd.rawSpatialRDD.map(lambda x: x.geom.centroid).collect()
        dim = len(geo_list)
        adj_mat = [[0]*dim for i in range(dim)]
        
        if weight == CENTRAL_DISTANCE:
            for i in range(dim):
                for j in range(dim):
                    if i != j:
                        adj_mat[i][j] = geo_list[i].distance(geo_list[j])
                        
        else:
            bandwidth = 0
            for i in range(dim):
                for j in range(dim):
                    if i != j:
                        adj_mat[i][j] = geo_list[i].distance(geo_list[j])
                        if adj_mat[i][j] > bandwidth:
                            bandwidth = adj_mat[i][j]
            if bandwidth > 0:
                for i in range(dim):
                    for j in range(dim):
                        adj_mat[i][j] = math.exp((-1 * adj_mat[i][j]**2)/(bandwidth**2))
        return adj_mat
    
    # common border ratio is the ratio of overlap between two polygons vs length of first polygon
    elif weight == MINIMUM_DISTANCE or weight == COMMON_BORDER_RATIO:
        geo_list = geoRdd.rawSpatialRDD.map(lambda x: x.geom).collect()
        dim = len(geo_list)
        adj_mat = [[0]*dim for i in range(dim)]
        
        if weight == MINIMUM_DISTANCE:
            for i in range(dim):
                for j in range(dim):
                    if i != j:
                        adj_mat[i][j] = geo_list[i].distance(geo_list[j])
                        
        else:
            for i in range(dim):
                for j in range(dim):
                    if i != j:
                        adj_mat[i][j] = geo_list[i].intersection(geo_list[j]).length/geo_list[i].length
        return adj_mat
    else:
        print("Given weight type is not supported for points RDD")

In [None]:
# method to get adjacency list between some points.
# points are in spatial rdd format.
def get_adjacency_from_points_rdd(geoRdd, weight = BINARY_WEIGHT, distanceThreshold = 100):
    geoRdd.analyze()
    geo_list = geoRdd.rawSpatialRDD.map(lambda x: x.geom).collect()
    dim = len(geo_list)
    adj_mat = [[0]*dim for i in range(dim)]
    
    # two points are adjacent if they are within a distance threshold
    if weight == BINARY_WEIGHT:
        for i in range(dim):
            for j in range(dim):
                if i != j and geo_list[i].distance(geo_list[j]) >= distanceThreshold:
                    adj_mat[i][j] = 1
        return adj_mat
        
    elif weight == POINT_DISTANCE:
        for i in range(dim):
            for j in range(dim):
                if i != j:
                    adj_mat[i][j] = geo_list[i].distance(geo_list[j])
        return adj_mat
                        
    elif weight == EXPONENTIAL_DISTANCE:
        bandwidth = 0
        for i in range(dim):
            for j in range(dim):
                if i != j:
                    adj_mat[i][j] = geo_list[i].distance(geo_list[j])
                    if adj_mat[i][j] > bandwidth:
                        bandwidth = adj_mat[i][j]
        if bandwidth > 0:
            for i in range(dim):
                for j in range(dim):
                    adj_mat[i][j] = math.exp((-1 * adj_mat[i][j]**2)/(bandwidth**2))
        return adj_mat
    
    else:
        print("Given weight type is not supported for points RDD")

In [None]:
# to calculate binary adjacency of a grid, no spatial operation is required
def get_binary_adjacency_from_grid(num_rows, num_cols):
    total_cell = num_rows * num_cols
    adj_matrix = np.zeros(shape = (total_cell, total_cell))
    for i in range(total_cell):
        row = math.floor(i/num_cols)
        col = i%num_cols
        if (col - 1) >= 0:
            adj_matrix[i][row * num_cols + (col - 1)] = 1
        if (col + 1) < num_cols:
            adj_matrix[i][row * num_cols + (col + 1)] = 1
        if (row - 1) >= 0:
            adj_matrix[i][(row - 1) * num_cols + col] = 1
        if (row + 1) < num_rows:
            adj_matrix[i][(row + 1) * num_cols + col] = 1
    return adj_matrix

In [None]:
# return a list of all geometries given a spatial rdd
def get_geometries(geoRdd):
    geoRdd.analyze()
    return geoRdd.rawSpatialRDD.map(lambda x: x.geom).collect()

In [None]:
# given the number of partitions, it takes square root of the partition count and create same number of partitions
# in both x and y directions. It returns the polygons of partitions as a list.
def partition_by_grid(geoRdd, partitions):
    geoRdd.analyze()
    boundary = geoRdd.boundaryEnvelope
    x_arr, y_arr = boundary.exterior.coords.xy
    x = list(x_arr)
    y = list(y_arr)
    minX, minY, maxX, maxY = min(x), min(y), max(x), max(y)
    
    partitionsAxis = int(math.sqrt(partitions))
    intervalX = (maxX - minX)/partitionsAxis
    intervalY = (maxY - minY)/partitionsAxis
    
    polygons = []
    for i in range(partitionsAxis):
        for j in range(partitionsAxis):
            polygons.append(Polygon([(minX + intervalX * i, minY + intervalY * i), (minX + intervalX * (i + 1), minY + intervalY * i), (minX + intervalX * (i + 1), minY + intervalY * (i + 1)), (minX + intervalX * i, minY + intervalY * (i + 1))]))
    
    return polygons

In [None]:
# it creates different number of partitions in x and y directions.
# It returns the polygons of partitions as a list.
def partition_by_grid_xy(geoRdd, partitionsX, partitionsY):
    geoRdd.analyze()
    boundary = geoRdd.boundaryEnvelope
    x_arr, y_arr = boundary.exterior.coords.xy
    x = list(x_arr)
    y = list(y_arr)
    minX, minY, maxX, maxY = min(x), min(y), max(x), max(y)
    
    intervalX = (maxX - minX)/partitionsX
    intervalY = (maxY - minY)/partitionsY
    
    polygons = []
    for i in range(partitionsX):
        for j in range(partitionsY):
            polygons.append(Polygon([(minX + intervalX * i, minY + intervalY * i), (minX + intervalX * (i + 1), minY + intervalY * i), (minX + intervalX * (i + 1), minY + intervalY * (i + 1)), (minX + intervalX * i, minY + intervalY * (i + 1))]))
    
    return polygons

In [None]:
# performs row normalization of a matrix.
def row_normalize(adj_matrix):
    # matrix is not converted to rdds as it should be allocatable in memory
    adj_matrix = np.array(adj_matrix)
    sum_of_adj_rows = adj_matrix.sum(axis=1)
    adj_matrix_norm = adj_matrix / sum_of_adj_rows[:, np.newaxis]
    adj_matrix_norm = np.nan_to_num(adj_matrix_norm, copy = False, nan = 0)
    return adj_matrix_norm

In [None]:
# calculates moran's index of an attribute in various spatial zones
def get_morans_i(adj_matrix, attr_feature):
    # matrix and feature are not converted to rdds as they should be allocatable in memory
    total_obs = len(attr_feature)
    sum_val = sum(attr_feature)
    mean_val = sum_val/total_obs
        
    sum_square = 0
    sum_prod = 0
    for i in range(total_obs):
        for j in range(total_obs):
            sum_prod += adj_matrix[i][j] * (attr_feature[i] - mean_val) * (attr_feature[j] - mean_val)
        sum_square += (attr_feature[i] - mean_val) * (attr_feature[i] - mean_val)
    morran_i = (total_obs * sum_prod) / (sum_square * np.sum(adj_matrix))
    return morran_i

# Create a Spatio-Temporal Sequence Array of Number of Taxi Pickups Happened in Various Spatial Zones at Various Time Intervals

## Load Taxi Trips Shape File and Perform Necessary Preprocessing

In [None]:
zones = load_geo_data("data/taxi_trip/taxi_zones_2", GEO_FILE_TYPE_SHAPE)
zones.CRSTransform("epsg:2263", "epsg:2263")
zones_df = Adapter.toDf(zones, spark)
zones_df.createOrReplaceTempView("zones")
zones_df.printSchema()

In [None]:
zones_polies = get_geometries(zones)
zones_polies

In [None]:
# drop unnecessary columns
zones_df = zones_df.drop("Shape_Leng")
zones_df = zones_df.drop("Shape_Area")
zones_df = zones_df.drop("zone")
zones_df = zones_df.drop("LocationID")
zones_df = zones_df.drop("borough")
zones_df = zones_df.drop("OBJECTID")
zones_df.printSchema()

In [None]:
#zones_df = zones_df.withColumn("_id", monotonically_increasing_id())
#zones_df.printSchema()
zones_df = zones_df.rdd.zipWithIndex().toDF()
zones_df = zones_df.select(col("_1.*"), col("_2").alias('_id'))
zones_df.createOrReplaceTempView("zones")
zones_df.show(5, False)

In [None]:
# convert taxi zones dataframe into spatial RDD
zones_rdd = Adapter.toSpatialRdd(zones_df, "geometry")
zones_rdd.CRSTransform("epsg:2263", "epsg:2263")

In [None]:
zones_rdd.analyze()
boundary = zones_rdd.boundaryEnvelope
x_arr, y_arr = boundary.exterior.coords.xy
x = list(x_arr)
y = list(y_arr)
minX, minY, maxX, maxY = min(x), min(y), max(x), max(y)
print(minX, minY, maxX, maxY)

In [None]:
# load taxi trip pickup dataset of a month and select necessary columns
tripDf = spark.read.format("csv").option("delimiter",",").option("header","true").load("data/taxi_trip/yellow_tripdata_2009-01.csv")
tripDf = tripDf.select("Trip_Pickup_DateTime", "Start_Lon", "Start_Lat", "Passenger_Count", "Trip_Distance", "Fare_Amt")
tripDf.createOrReplaceTempView("tripDf")
tripDf.show(5, False)

In [None]:
# add an id starting from 0 for all pickups
tripDf = tripDf.rdd.zipWithIndex().toDF()
tripDf = tripDf.select(col("_1.*"), col("_2").alias('Serial_ID'))
tripDf.createOrReplaceTempView("tripDf")
tripDf.show(5, False)

In [None]:
# convert the pickup time into timestamp format
tripDf = tripDf.withColumn("pickup_time", unix_timestamp("Trip_Pickup_DateTime", "yyyy-MM-dd HH:mm:ss"))
tripDf = tripDf.drop("Trip_Pickup_DateTime")
tripDf.createOrReplaceTempView("tripDf")
tripDf.show(5, False)

In [None]:
# take a backup as we will need it for spatial grid array generation
tripDf_backup = tripDf

In [None]:
adj_mat = get_adjacency_from_polygons_rdd(zones_rdd)
len(adj_mat)

In [None]:
t_start = time.time()
adj_mat = get_adjacency_from_polygons_rdd(zones_rdd, weight = CENTRAL_DISTANCE)
t_end = time.time()
print("Time:", str(t_end - t_start))
len(adj_mat)

In [None]:
# finalize the pickup data columns we need for spatio-temporal array
tripDf = tripDf.select(["Serial_ID", "pickup_time", "Start_Lon", "Start_Lat"])
tripDf.show(5, False)

In [None]:
# to generate intervals, find min and max of timestamps
t_start = time.time()
tripDf.createOrReplaceTempView("tripDf")
min_max_time = spark.sql('select min(pickup_time), max(pickup_time) from tripDf').collect()[0]
minTime = min_max_time[0]
maxTime = min_max_time[1]
t_end = time.time()
print(minTime)
print(maxTime)
print("Time:", t_end - t_start)

In [None]:
# generate ingtervals
interval = 3600
interval_start = np.arange(minTime, maxTime + 1, interval)
interval_end = np.arange(minTime + interval, maxTime + interval + 1, interval)
if interval_end[-1] > maxTime + 1:
    interval_end[-1] = maxTime + 1
print(len(interval_start), len(interval_end))
print(interval_start[0], interval_end[0], interval_start[-1], interval_end[-1])

In [None]:
intervals_count = len(interval_start)
first_interval = interval_start[0]
print(intervals_count)
print(first_interval)

In [None]:
zones_count = zones_rdd.rawSpatialRDD.count()
zones_count

In [None]:
# generate an interval df to join with taxi df
intervalDf = spark.createDataFrame(zip(interval_start.tolist(), interval_end.tolist()), schema=['interval_start', 'interval_end'])
intervalDf.show(5, False)

In [None]:
# this section finds the number of pickups at various zones at various timestamps
buildOnSpatialPartitionedRDD = True
usingIndex = True
considerBoundaryIntersection = True

zones_rdd.analyze()
zones_rdd.spatialPartitioning(GridType.KDBTREE, 4)

temporalTripDf = spark.createDataFrame([], StructType([]))

for i in range(15):
    start_id = i * 1000000
    end_id = (i + 1) * 1000000 - 1
    # join ingtervals with pickups and convert pick location into Spatial points
    pointDf = spark.sql("select ST_Point(double(tripDf.Start_Lat), double(tripDf.Start_Lon)) as point_loc, tripDf.Serial_ID as Serial_ID, tripDf.pickup_time as pickup_time from tripDf where tripDf.Serial_ID >= {0} and tripDf.Serial_ID <= {1}".format(start_id, end_id))
    pointDf = intervalDf.join(pointDf, [pointDf.pickup_time >=  intervalDf.interval_start, pointDf.pickup_time <  intervalDf.interval_end], "inner")
    
    pointRDD = Adapter.toSpatialRdd(pointDf, "point_loc")
    pointRDD.CRSTransform("epsg:4326", "epsg:2263")
    
    # perform spatial join to find the number of pickups within various spatial zones
    pointRDD.analyze()
    pointRDD.spatialPartitioning(zones_rdd.getPartitioner())
    pointRDD.buildIndex(IndexType.QUADTREE, buildOnSpatialPartitionedRDD)
    result_pair_rdd = JoinQueryRaw.SpatialJoinQueryFlat(pointRDD, zones_rdd, usingIndex, considerBoundaryIntersection)
    pickupInfoPartDf = Adapter.toDf(result_pair_rdd, zones_rdd.fieldNames, pointRDD.fieldNames, spark)
    pickupInfoPartDf.createOrReplaceTempView("pickupInfoPartDf")
    
    # group the pickups based on zones and time intervals
    pickupInfoPartDf = spark.sql("SELECT int(a._id) as _id, a.interval_start as interval_start, count(a.rightgeometry) as point_cnt FROM pickupInfoPartDf a group by a.interval_start, a._id order by a.interval_start, a._id asc")
    
    if i == 0:
        temporalTripDf = pickupInfoPartDf
    else:
        temporalTripDf.union(pickupInfoPartDf)
    
temporalTripDf.createOrReplaceTempView("temporalTripDf")
temporalTripDf.show(5, False)

In [None]:
# aggregate pickup counts
temporalTripDf = spark.sql("SELECT a.interval_start, a._id, sum(a.point_cnt) as point_cnt FROM temporalTripDf a group by a.interval_start, a._id order by a.interval_start, a._id asc")
temporalTripDf.createOrReplaceTempView("temporalTripDf")
temporalTripDf.show(5, False)

In [None]:
# calculate the final spatio-temporal array
st_pickups = [[0]*zones_count for i in range(intervals_count)]

t1 = time.time()
for row in temporalTripDf.collect(): # toLocalIterator for large rdd or df
    st_pickups[(int(row['interval_start']) - first_interval)//interval][row['_id']] = row['point_cnt']
t2 = time.time()

print("Required time:", t2-t1, "seconds")
print("st_pickups shape:", len(st_pickups), "x", len(st_pickups[0]))

In [None]:
# save the spatio-temporal pickup array as a numpy array into file
with open("data/taxi_trip/st_pickups.npy", "wb") as f:
    np.save(f, st_pickups)

# Create a Spatial Grid Array of Number of Total Taxi Pickup Information in Various Cells of Spatial Grids

In [None]:
# generate the grid and get the grid polygons
grid_poly_list1 = partition_by_grid(zones_rdd, 1000)
print(len(grid_ploy_list1))

In [None]:
grid_ploy_list1

In [None]:
num_rows = 75
num_cols = 70
grid_polies = partition_by_grid_xy(zones_rdd, num_rows, num_cols)
print(len(grid_polies))

In [None]:
grid_polies

In [None]:
# generate id for each of the ploygons
_ids = [i for i in range(len(grid_polies))]

In [None]:
# define the schema for polygon dataframe
schema = StructType(
    [
        StructField("_id", IntegerType(), False),
        StructField("geometry", GeometryType(), False)
    ]
)

In [None]:
# create polygon daaframe
gridPolyDf = spark.createDataFrame(
    zip(_ids, grid_polies),
    schema = schema
)
gridPolyDf.show(5, False)

In [None]:
# convert grid polygon dataframe into spatial rdd
gridPolyRdd = Adapter.toSpatialRdd(gridPolyDf, "geometry")
gridPolyRdd.CRSTransform("epsg:2263", "epsg:2263") # although it seems unnecessary, error occurs without this

In [None]:
tripDf = tripDf_backup
tripDf.createOrReplaceTempView("tripDf")

In [None]:
buildOnSpatialPartitionedRDD = True
usingIndex = True
considerBoundaryIntersection = True

gridPolyRdd.analyze()
gridPolyRdd.spatialPartitioning(GridType.KDBTREE, 4)

spatialTripDf = spark.createDataFrame([], StructType([]))

for i in range(15):
    start_id = i * 1000000
    end_id = (i + 1) * 1000000 - 1
    pointDf = spark.sql("select ST_Point(double(tripDf.Start_Lat), double(tripDf.Start_Lon)) as point_loc, int(tripDf.Passenger_Count) as passenger_count, float(tripDf.Trip_Distance) as trip_dist, float(tripDf.Fare_Amt) as fare from tripDf where tripDf.Serial_ID >= {0} and tripDf.Serial_ID <= {1}".format(start_id, end_id))
    pointRDD = Adapter.toSpatialRdd(pointDf, "point_loc")
    pointRDD.CRSTransform("epsg:4326", "epsg:2263")
    
    pointRDD.analyze()
    pointRDD.spatialPartitioning(gridPolyRdd.getPartitioner())
    pointRDD.buildIndex(IndexType.QUADTREE, buildOnSpatialPartitionedRDD)
    result_pair_rdd = JoinQueryRaw.SpatialJoinQueryFlat(pointRDD, gridPolyRdd, usingIndex, considerBoundaryIntersection)
    
    spatialTripPartDf = Adapter.toDf(result_pair_rdd, zones_rdd.fieldNames, pointRDD.fieldNames, spark)
    spatialTripPartDf.createOrReplaceTempView("spatialTripPartDf")
    spatialTripPartDf = spark.sql("SELECT int(a._id) as _id, count(a.rightgeometry) as point_cnt, sum(a.passenger_count) as passenger_cnt, sum(a.trip_dist) as total_trip_dist, sum(a.fare) as total_fare FROM spatialTripPartDf a group by a._id order by a._id asc")
    
    if i == 0:
        spatialTripDf = spatialTripPartDf
    else:
        spatialTripDf.union(spatialTripPartDf)
    
spatialTripDf.createOrReplaceTempView("spatialTripDf")
spatialTripDf.show(5, False)

In [None]:
num_attrs = 4
grid_trip_info = np.zeros(shape = (num_rows, num_cols, num_attrs))

t1 = time.time()
for row in spatialTripDf.toLocalIterator(): # toLocalIterator for large rdd or df
    _id = row[0]
    grid_trip_info[_id//num_cols][_id%num_cols][0] += int(row[1])
    grid_trip_info[_id//num_cols][_id%num_cols][1] += int(row[2])
    grid_trip_info[_id//num_cols][_id%num_cols][2] += row[3]
    grid_trip_info[_id//num_cols][_id%num_cols][3] += row[4]
t2 = time.time()

print("Required time:", t2-t1, "seconds")
print("grid_trip_info shape:", grid_trip_info.shape)

In [None]:
# save the spatial grid array as a numpy array into file
with open("data/taxi_trip/grid_trip_info.npy", "wb") as f:
    np.save(f, grid_trip_info)