Rihla (lit. "Journey") in Spark 1.5 DataFrame implementations
Java Scala Python Batchfile
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
media
src/main
.gitignore
LICENSE
README.md
broadcast-density.cmd
pom.xml

README.md

Ibn Battuta

This project is named after the great Moroccan explorer Ibn Battuta.

This is my Rihla (Journey) into Spark Spatial DataFrame implementations.

A set of advanced features are now available in the newly released Spark 1.5.0. This includes:

  • The integration of Project Tungsten, that brings Spark closer to the bare metal and takes advantage of off-heap memory.
  • A unified high performance execution environment across Scala, Java, Python and R using the DataFrame API.

This ongoing project is my journey into:

  • Using File GeoDatabases and Shapefiles as "native" DataFrame sources.
  • Creating Spatial User Defined Types to be used in Scala, Python and R.
  • Creating Spatial User Defined Functions to be used in Scala and Python.
  • Influencing the execution plan of spatial join queries in such that a map-side join or a reduce-side join is selected appropriately based on the input data.
  • Submit a Spark Job from ArcGIS Desktop or Server that takes advantage of the above for high performing GeoProcessing tasks.

Setting Up Your Environment

I'm assuming that the following will be done on a Windows based platform. The same can be done on Linux, but most AGS installations are on Windows.

Note to self: One of these days, create an Install program. Or better, use Docker.

  • Download and install JDK 1.8.
  • Define a system environment variable named JAVA_HOME whose value is the folder location where you installed the JDK.
  • Update the PATH system environment variable with %JAVA_HOME%\bin, in such that you can execute the following in a cmd window.
C:\>java -version
java version "1.8.0_60"
Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)
  • Download and unzip Hadoop Common. Spark uses the Hadoop File System API to read data sources and to write data sinks. And NO, you do not need HDFS for this, as a data source path can be a URL with a file:// scheme :-)
  • Define a system environment variable named HADOOP_HOME whose value is the unzipped folder location. There should be a bin\winutils.exe under that path.

  • Download and unpack spark-1.5.0-bin-hadoop2.6.tgz.

  • Define a system environment variable named SPARK_HOME whose value is the folder location where you unpacked Spark.
  • Update the PATH system environment variable with %SPARK_HOME%\bin, in such that you can execute the following in a cmd window.
C:\>spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.0
      /_/

Type --help for more information.```

Finally, to read FileGeoDatabases (this is so embarrassing), I'm relying on the GeoTools OGR implementation that uses a GDAL implementation. The FileGeoDatabase format is closed, unlike the Shapefile format. Esri does provide a read-only C based API, but do not want to write a Scala wrapper, nor want to reimplement the API in pure Java or Scala. However, one of these days I have to do this to enable a parallel reading off a distributed file system. Today's reliance on the the GeoTools implementation makes the input unsplittable and dependents on a POSIX file system.

  • Download and install the 64bit OSGeo4W.
  • Define a system environment variable named OSGEO4W64_HOME whose value is the folder where you installed OSGeo4W64.
  • Update the PATH system environment variable with %OSGEO4W64_HOME%\bin, in such that you can execute the following in a cmd window.
C:\>ogr2ogr.exe --version
GDAL 1.11.2, released 2015/02/10

Build the FileGDB DataFrame API

The build process is Maven based.

mvn clean package

This will create in the target folder a .jar file and a .zip file. The .zip file contains the .jar file, the ArcPy Toolbox and its supporting files.

By default, the OSGEO4W64_HOME is assumed to be in C:\OSGeo4W64. A different location can be specified using:

mvn -Dosgeo4w64.home=${OSGEO4W64} clean package

Or on Windows:

mvn -Dosgeo4w64.home=%OSGEO4W64% clean package

Using the DataFrame API

This implementation is based on the Spark CSV project. You can download the sample data in FileGDB format from MarineCadaster.gov.

In this implementation, I'm not using a custom User Defined SQL Type. Instead, I'm relying on predefined types, in such that a Point is converted into a StructType with a StructField labeled x and a StructField labeled y. Looking at the Spark source code, there is a reference implementation of an ExamplePoint with Python bindings that I will use in my later explorations.

Scala API Example

spark-shell\
 --master "local[*]"\
 --driver-memory 1G\
 --executor-memory 8G\
 --packages org.geotools:gt-ogr-bridj:14-beta,org.geotools:gt-cql:14-beta\
 --jars target/ibn-battuta-0.2.jar
val df=sqlContext.read.
format("com.esri.battuta.fgb").
option("fields","the_geom,Status").
option("cql","Status > 0").
load("/Users/mraad_admin/Share/Miami.gdb/Broadcast")
df.registerTempTable("POINTS")
df.sqlContext.sql("select the_geom['x'] as lon,the_geom['y'] as lat,Status from POINTS limit 10").show()

Python API Example

pyspark\
 --master "local[*]"\
 --driver-memory 1G\
 --executor-memory 8G\
 --packages org.geotools:gt-ogr-bridj:14-beta,org.geotools:gt-cql:14-beta\
 --jars target/ibn-battuta-0.2.jar
df = sqlContext.read.\
format('com.esri.battuta.fgb').\
option('fields','the_geom,Status').\
load('/Users/mraad_admin/Share/Miami.gdb/Broadcast')
df.registerTempTable('POINTS')
sqlContext.sql("SELECT the_geom['x'],the_geom['y'] FROM POINTS WHERE Status>0 LIMIT 5").show()
df = sqlContext.read.\
format('com.esri.battuta.dbf').\
load('/Users/mraad_admin/Share/data.dbf')
df.registerTempTable('DATA')
sqlContext.sql('SELECT * FROM DATA LIMIT 5').show()
sqlContext.sql('SELECT MIN(X) AS MIN_X,MIN(Y) AS MIN_Y,MAX(X) AS MAX_X,MAX(Y) AS MAX_Y FROM DATA').show()

Submitting Spark Jobs Using ArcPy

Originally, I wanted to write a GeoProcessing Extension in Scala that submits Scala based Spark jobs from ArcGIS Desktop and Server. However, I immediately ran into problems with Java version incompatibilities between the internal 1.6 and my wanting to take advantage of 1.8 features. In addition, I wanted to "script" the GeoProcessing tasks, in such that I can change the logic without an explicit recompilation and redeployment. So, I decided to stay in Python land as ArcGIS and Spark natively support that environment. Now, do not misunderstand me! You cannot send ArcPy code to Spark for fast parallel execution (at least not today :-). However, you can use ArcPy to submit Python, R, Scala and Java based Spark Jobs.

And that is exactly what the BroadcastDensity.pyt toolbox does as a proof of concept. When executed from ArcGIS, it prompts the user to define the values of the input parameters of a Python based Spark job and submits that job using the spark-submit command line interface. It asynchronously monitors the execution of the job and reports its status as a set of ArcPy messages.

The toolbox and all the required resources are packaged in a zip file in the target folder after mvn package.

The tools invokes broadcast-density.py using spark-submit.cmd with the Spark properties in broadcast-density.conf. The following is a snippet of broadcast-density.py that highlights the DataFrame API and SparkSQL:

df = sqlContext.read \
    .format('com.esri.battuta.fgb') \
    .options(fields='the_geom', cql='Status > 0') \
    .load(input_path)

df.registerTempTable("POINTS")

sql = '''
      select c*{cell0}+{cell2} as lon,r*{cell0}+{cell2} as lat,count(1) as pop from
      (select cast(the_geom['x']/{cell0} as int) as c, cast(the_geom['y']/{cell0} as int) as r from POINTS) as T
      group by c,r having pop>{pop}
    '''.format(cell0=cell0, cell2=cell2, pop=pop)

sqlContext.sql(sql).write.json(output_path)

The inner select maps the x/y coordinates to col/row cells. The outer select aggregates and counts the occurrence of the col/row tuple. The dataframe rows are saved as a sequence of JSON documents.

The ViewTool iterates over the part-xxxx files in the output folder and converts each parsed JSON document into a feature in an in-memory feature class.