Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
555 lines (364 sloc) 16.7 KB

Workshop Geospatial Analysis with PySpark

Shoaib Burq & Kashif Rasul

@sabman / @krasul

^ Thank everyone for coming ^ Introduce myself


Agenda (Total Time: 3-3.5 hours)

  • Quick Introduction to Docker (30 min)
  • Setup machines (30 min)
  • What is Apache Spark/PySpark (20 min)
  • Spark DataFrames (20 min)
  • Geo Ecosystem (10 min)
  • Overview of our use cases (20 min)
  • GeoPandas & Hands-on Workshop (2 hours)

[fit] Getting Setup ✌️


Wifi

  • Connect to the wifi

Windows and Mac Users: Install Docker Toolbox


Start Docker Instance

Follow instructions here:

https://github.com/sabman/PySparkGeoAnalysis/blob/master/docker/README.md


Introduction to Docker

🐳

^ we are going to be using Docker to setup today so lets do a 20 min primer to Docker ^ We will do the following:


Docker Agenda

  • What is Docker?
  • How to use Docker Toolbox
  • Install on Mac and Windows

What is Docker

  • Linux Containers
  • Instances of environments

^ Creates Instances of environments ^ Configured environments use them save their states ^ Ideas of images and containers


fit

^ It's linux based Software. It creates environments that can be used across different machines. ^ Docker runs in linux, inside docker we have an image (like a class) and containers (like instance of class). These are driven by docker File. ^ Dockerfile -> Image -> Container(s) ^ we can save state on containers and move them across machines too.


fit

^ Lets look at the structure of docker. ^ you have a Linux machine inside that you have docker running.


Docker Toolbox


Docker Toolbox

  • Docker Engine
  • Docker Client
  • Docker Machine
  • Docker Compose
  • Docker Kitematic
  • VirtualBox

^ Docker Engine: Run time for docker (bin/libs, Apps) ^ Docker Client: communicate with docker ^ Docker Machine: Provisioning and communicate between docker client and engine ^ Docker Compose: Manage Multiple Containers ^ Kitematic: GUI ^ VirtualBox: Lightweight VM


fit


About me

Shoaib Burq (twitter: @sabman)

  • 🏫 Geomatics Engineer and Software Developer
  • 🐳 Underwater mapping & ...
  • 🔥 Disaster Response work for Australian Gov.
  • ✈️: Moved to Berlin 2011

What is Apache Spark

The big picture

^ Apache Spark is an in memory data processing and analysis library that is made up of the following parts:


fit

^ Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

^ Cluster manager An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN)

^ Standalone – a simple cluster manager included with Spark that makes it easy to set up a cluster. ^ Apache Mesos – a general cluster manager that can also run Hadoop MapReduce and service applications. ^ Hadoop YARN – the resource manager in Hadoop 2. ^ In addition, Spark’s EC2 launch scripts make it easy to launch a standalone cluster on Amazon EC2.


PySpark 🐍

internals


fit

^ PySpark is built on top of Spark's Java API. Data is processed in Python and cached / shuffled in the JVM. In the Python driver program, SparkContext uses Py4J to launch a JVM and create a JavaSparkContext. Py4J is only used on the driver for local communication between the Python and Java SparkContext objects; large data transfers are performed through a different mechanism. RDD transformations in Python are mapped to transformations on PythonRDD objects in Java. On remote worker machines, PythonRDD objects launch Python subprocesses and communicate with them using pipes, sending the user's code and the data to be processed.

^ Pickle with try to create a function closure and serialize it. In order for the JVM to communicate with Python/PySpark we need to able to pickle functions and take our data and translate it into bytes.


Resilient Distributed Datasets (RDDs)

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data, 10)
distData.reduce(lambda a, b: a + b)

^Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

^ Once created, the distributed dataset (distData) can be operated on in parallel. For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list. We describe operations on distributed datasets later on.

^ One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.


Map / Reduce - Calculate Average Age

cat data.txt
bob 24
jane 19
jimmy 36
tim 45
# RDD Version:
data = sc.textFile("data.txt").split("\t")
data.map(lambda x: (x[0], [int(x[1]), 1])) \
  .reduceByKey(lambda x, y: [x[0] + y[0], x[1] + y[1]]) \
  .map(lambda x: [x[0], x[1][0] / x[1][1]]) \
  .collect()

^ now I know what you are thinking this is horrible.. Preicly why we need DataFrames


DataFrames Version:

sqlCtx.table("people") \
   .groupBy("name") \
   .agg("name", avg("age")) \
   .collect()

^ In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.


fit


Optimization happens as late as possible, therefore Spark SQL can optimize across functions ✌️


fit


Creating DataFrame

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.json("examples/src/main/resources/people.json")


df.show()
## age  name
## null Michael

^ The entry point into all functionality in Spark SQL is the SQLContext class, or one of its descendants. To create a basic SQLContext, all you need is a SparkContext.


Supported Formats

  • JSON files
  • Parquet files
  • Hive tables
  • local file systems
  • distributed file systems (HDFS)
  • cloud storage (S3)
  • external relational database systems via JDBC

^ Modern applications often need to collect and analyze data from a variety of sources. Out of the box, DataFrame supports reading data from the most popular formats, including (JSON files, Parquet files, Hive tables). It can read from local file systems, distributed file systems (HDFS), cloud storage (S3), and external relational database systems via JDBC. In addition, through Spark SQL’s external data sources API, DataFrames can be extended to support any third-party data formats or sources. Existing third-party extensions already include Avro, CSV, ElasticSearch, and Cassandra.


# Print the schema in a tree format
df.printSchema()
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
## name
## Michael
## Andy
## Justin

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
## name    (age + 1)
## Michael null
## Andy    31
## Justin  20

# Select people older than 21
df.filter(df['age'] > 21).show()
## age name
## 30  Andy

# Count people by age
df.groupBy("age").count().show()
## age  count
## null 1
## 19   1
## 30   1

Running SQL Queries Programmatically

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT * FROM table")

^ What if you are familiar with SQL? ^ The sql function on a SQLContext enables applications to run SQL queries programmatically and returns the result as a DataFrame.


Inferring the Schema Using Reflection

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

^ Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, and the types are inferred by looking at the first row. Since we currently only look at the first row, it is important that there is no missing data in the first row of the RDD. In future versions we plan to more completely infer the schema by looking at more data, similar to the inference that is performed on JSON files.


Shared Variables

^ Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

Broadcast Variables

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]

^Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

^Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

^Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code below shows this:


fit

^ Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads. This has the benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). However, it also means that data cannot be shared across different Spark applications (instances of SparkContext) without writing it to an external storage system. ^ Spark is agnostic to the underlying cluster manager. As long as it can acquire executor processes, and these communicate with each other, it is relatively easy to run it even on a cluster manager that also supports other applications (e.g. Mesos/YARN). ^ The driver program must listen for and accept incoming connections from its executors throughout its lifetime (e.g., see spark.driver.port and spark.fileserver.port in the network config section). As such, the driver program must be network addressable from the worker nodes. ^ Because the driver schedules tasks on the cluster, it should be run close to the worker nodes, preferably on the same local area network. If you’d like to send requests to the cluster remotely, it’s better to open an RPC to the driver and have it submit operations from nearby than to run a driver far away from the worker nodes.

^ Cluster Architecture ^ TODO: Running Spark on Clusters https://youtu.be/OkyRdKahMpk


& 🌐

What's happening in Spark and GeoAnalysis


Current Options

  • Magellan harsha2010/magellan
  • GeoSpark DataSystemsLab/GeoSpark
  • SpatialSpark syoummer/SpatialSpark
  • Spark kNN Graphs tdebatty/spark-knn-graphs

^


[fit] Big Picture ☁️


fit


[fit] Our Data

lets explore...


[fit] 🌐 + 🐼 = GeoPandas

http://geopandas.org

^ GeoPandas is an open source project to make working with geospatial data in python easier. GeoPandas extends the datatypes used by pandas to allow spatial operations on geometric types. Geometric operations are performed by shapely. Geopandas further depends on fiona for file access and descartes and matplotlib for plotting.


Make working with geographic data like working with other kinds of data in python

Work with existing tools:

  • Desktop GIS (ArcGIS, QGIS)
  • Geospatial databases (e.g., PostGIS)
  • Web maps (Leaflet, D3, etc.)
  • Python data tools (pandas, numpy, etc.)

Supports

  • Geometry operations (Shapely)
  • Data alignment (pandas)
  • Coordinate transformations (pyproj)
  • Read/write GIS file formats (Fiona)
  • Create a GeoDataFrame from PostGIS table
  • Output any object as geoJSON
  • Plotting

GeoPandas depends on

  • Python (2.6, 2.7, 3.3+)
  • Pandas (0.13 and up)
  • Shapely (GEOS)
  • Fiona (GDAL/OGR)
  • Pyproj (PROJ.4)
  • Matplotlib (and Descartes)
  • psycopg2, sqlalchemy, geopy, rtree (optional)

It can do

  • Geometry operations (Shapely)
  • Data alignment (pandas)
  • Coordinate transformations (pyproj)
  • Read/write GIS file formats (Fiona)
  • Create a GeoDataFrame from PostGIS table
  • Output any object as GeoJSON
  • Plotting

##GeoPandas Data Structures

Pandas ➡️ GeoPandas

  • Series (1-D) ➡️ GeoSeries (1-D)
  • DataFrame (2-D table) ➡️ GeoDataFrame (2-D)
  • Panel (3-D) ➡️ None Yet

Loading data

>>> boros = GeoDataFrame.from_file('nybb.shp') # also has from_postgis()
>>> boros.set_index('BoroCode', inplace=True)
>>> boros.sort()
              BoroName    Shape_Area     Shape_Leng  \
BoroCode
1             Manhattan  6.364422e+08  358532.956418
2                 Bronx  1.186804e+09  464517.890553
3              Brooklyn  1.959432e+09  726568.946340
4                Queens  3.049947e+09  861038.479299
5         Staten Island  1.623853e+09  330385.036974

                                                   geometry
BoroCode
1         (POLYGON ((981219.0557861328125000 188655.3157...
2         (POLYGON ((1012821.8057861328125000 229228.264...
3         (POLYGON ((1021176.4790039062500000 151374.796...
4         (POLYGON ((1029606.0765991210937500 156073.814...
5         (POLYGON ((970217.0223999023437500 145643.3322...

Plotting

left fit

boros.plot()

Convex Hull

left fit

boros.convex_hull.plot()

Convex Hull

left fit

boros.buffer(0.5).plot()

Convex Hull

left fit

boros.buffer(0.5).plot()

Output to GeoJSON

left fit

boros.to_json()

GeoPandas: lets explore...