
Demos - Geo - GeoPandas - NYC taxi trips
========================================


# References
* [Notebook - Geo - Demos - GeoPandas - NYC taxi trips (this notebook)](https://github.com/data-engineering-helpers/databricks-examples/blob/main/ipython-notebooks/demos-geo-geopandas-nyc-taxi-trips.ipynb)
* See [Notebook - Geo - Demos - prep-nyc-taxi-geospatial-data](https://github.com/data-engineering-helpers/databricks-examples/blob/main/ipython-notebooks/demos-geo-prep-nyc-taxi-geospatial-data.ipynb)
  for details on how to retrieve the data files, and why this notebook cannot work as is
  (mainly because the geo-coordinates are no longer available in the original data files from the
  NYC autorithies)

## GeoPandas
* GeoPandas home page: http://geopandas.org/

## Data
* https://www.databricks.com/notebooks/prep-nyc-taxi-geospatial-data.html
  + => [Notebook - Geo - Demos - prep-nyc-taxi-geospatial-data](https://github.com/data-engineering-helpers/databricks-examples/blob/main/ipython-notebooks/demos-geo-prep-nyc-taxi-geospatial-data.ipynb) (forked notebook to adapt with new format of data)

## GeoPandas
__Option-1: Using [DBUtils Library Import](https://docs.databricks.com/dev-tools/databricks-utils.html#library-utilities) within Notebook (see cell #2).__

__Option-2: Using [Databricks ML Runtime](https://docs.databricks.com/runtime/mlruntime.html#mlruntime) which includes Anaconda (not used).__

* [Install Cluster Libraries](https://docs.databricks.com/libraries.html#install-a-library-on-a-cluster):
 * geopandas PyPI Coordinates: `geopandas`
 * shapely PyPI Coordinates: `shapely`

In [None]:
%fs ls /tmp/nyc-taxi/taxi-zones/csv/

In [None]:
%pip install geopandas

In [None]:
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, Polygon, shape
from shapely import wkb, wkt
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType,DecimalType
from pyspark.sql.functions import pandas_udf, PandasUDFType
import shapely.speedups
#shapely.speedups.enable() # this makes some spatial queries run faster

In [None]:
df_csv = pd.read_csv("/dbfs/tmp/nyc-taxi/taxi-zones/csv/taxi_zones.csv")
df_csv['the_geom'] = df_csv['the_geom'].apply(wkt.loads)
gdf  = gpd.GeoDataFrame(df_csv, geometry='the_geom')
sc.broadcast(gdf)
def find_borough(latitude, longitude): 
    mgdf = gdf.apply(lambda x: x['borough'] if x['the_geom'].intersects(Point(longitude,latitude)) else None, axis=1)
    idx = mgdf.first_valid_index()
    first_valid_value = mgdf.loc[idx] if idx is not None else None
    return first_valid_value
find_borough_udf = udf(find_borough, StringType())

In [None]:
# test the function
find_borough( 40.69943618774414,-73.9920883178711)

In [None]:
df_raw = spark.read.parquet("/tmp/nyc-taxi/parquet/green_tripdata_*.parquet")
#df_raw = spark.read.format("delta").load("/ml/blogs/geospatial/delta/nyc-green")
df_raw_borough = df_raw.sample(False, 0.01).withColumn("pickup_borough", find_borough_udf(col("pickup_latitude"),col("pickup_longitude")))

In [None]:
display(df_raw_borough.select(["pickup_borough","pickup_datetime","pickup_latitude","pickup_longitude"]))