# Import pixiedust
Start by importing pixiedust which if all bootstrap and install steps were run correctly.
You should see below for opening the pixiedust database successfully with no errors.
Depending on the version of pixiedust that gets installed, it may ask you to update.
If so, run this first cell.

In [None]:
!pip install --user --upgrade pixiedust

In [1]:
import pixiedust

Pixiedust database opened successfully


# Creating the SQLContext and inspecting pyspark Context
Pixiedust imports pyspark and the SparkContext + SparkSession should be already available through the "sc" and "spark" variables respectively.

In [2]:
# Print Spark info and create sql_context
print('Spark Version: {0}'.format(sc.version))
print('Python Version: {0}'.format(sc.pythonVer))
print('Application Name: {0}'.format(sc.appName))
print('Application ID: {0}'.format(sc.applicationId))
print('Spark Master: {0}'.format( sc.master))

sql_context = SQLContext(sc, sparkSession=spark)

Spark Version: 2.2.0
Python Version: 3.5
Application Name: pyspark-shell
Application ID: application_1507827733180_0011
Spark Master: yarn


# Installing Geowave jar and other packages
pixiedust allows you to install additional jar packages very easily by specifying a valid url, file path (local,hdfs), or maven repository. After installing the package you'll have to restart the kernel before you can use it.

In [3]:
pixiedust.installPackage('file:///usr/local/geowave/tools/geowave-tools-0.9.6-apache.jar')
pixiedust.installPackage('file:///usr/lib/hadoop-lzo/lib/hadoop-lzo.jar')

Package already installed: file:///usr/local/geowave/tools/geowave-tools-0.9.6-apache.jar
Package already installed: file:///usr/lib/hadoop-lzo/lib/hadoop-lzo.jar


<pixiedust.packageManager.package.Package at 0x7fdca808e518>

# Download and ingest the GPX data

In [None]:
%%bash
cd /mnt/tmp
wget s3.amazonaws.com/geowave/latest/scripts/emr/quickstart/geowave-env.sh
source /mnt/tmp/geowave-env.sh

s3-dist-cp --src=s3://geowave-gpx-data/gpx --dest=hdfs://$HOSTNAME:8020/tmp/

/opt/accumulo/bin/accumulo shell -u root -p secret -e "importtable geowave.germany_gpx_SPATIAL_IDX /tmp/spatial"
/opt/accumulo/bin/accumulo shell -u root -p secret -e "importtable geowave.germany_gpx_GEOWAVE_METADATA /tmp/metadata"

# Setup Datastores

In [None]:
%%bash
# clear out potential old runs
geowave config rmstore kmeans_gpx
geowave config rmstore germany_gpx_accumulo

# configure geowave connection params for name stores "germany_gpx_accumulo" and "kmeans_hbase"
geowave config addstore germany_gpx_accumulo --gwNamespace geowave.germany_gpx -t accumulo --zookeeper $HOSTNAME:2181 --instance accumulo --user root --password secret
geowave config addstore kmeans_gpx --gwNamespace geowave.kmeans -t hbase --zookeeper $HOSTNAME:2181

# Run KMeans

In [4]:
#grab classes from jvm
hbase_options_class = sc._jvm.mil.nga.giat.geowave.datastore.hbase.operations.config.HBaseRequiredOptions
accumulo_options_class = sc._jvm.mil.nga.giat.geowave.datastore.accumulo.operations.config.AccumuloRequiredOptions
kmeans_runner_class = sc._jvm.mil.nga.giat.geowave.analytic.javaspark.kmeans.KMeansRunner
query_options_class = sc._jvm.mil.nga.giat.geowave.core.store.query.QueryOptions
geowave_rdd_class = sc._jvm.mil.nga.giat.geowave.analytic.javaspark.GeoWaveRDD
sf_df_class = sc._jvm.mil.nga.giat.geowave.analytic.javaspark.sparksql.SimpleFeatureDataFrame
byte_array_class = sc._jvm.mil.nga.giat.geowave.core.index.ByteArrayId

In [5]:
#setup input datastore
input_store = accumulo_options_class()
input_store.setInstance('accumulo')
input_store.setUser('root')
input_store.setPassword('secret')
input_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')
input_store.setGeowaveNamespace('geowave.germany_gpx')

#Setup output datastore
output_store = hbase_options_class()
output_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')
output_store.setGeowaveNamespace('geowave.kmeans')

#Create a instance of the runner
kmeans_runner = kmeans_runner_class()

input_store_plugin = input_store.createPluginOptions()
output_store_plugin = output_store.createPluginOptions()

In [None]:
#set the appropriate properties
#We want it to execute using the existing JavaSparkContext wrapped by python.
kmeans_runner.setJavaSparkContext(sc._jsc)

kmeans_runner.setAdapterId('gpxpoint')
kmeans_runner.setNumClusters(8)
kmeans_runner.setInputDataStore(input_store_plugin)
kmeans_runner.setOutputDataStore(output_store_plugin)
kmeans_runner.setCqlFilter("BBOX(geometry,  13.3, 52.45, 13.5, 52.5)")
kmeans_runner.setCentroidTypeName('mycentroids')
kmeans_runner.setHullTypeName('myhulls')
kmeans_runner.setGenerateHulls(True)
kmeans_runner.setComputeHullData(True)
#execute the kmeans runner
kmeans_runner.run()

# Load Centroids into DataFrame and display

In [6]:
# Create the dataframe and get a rdd for the output of kmeans
sf_df = sf_df_class(spark._jsparkSession)
adapter_id = byte_array_class('mycentroids')

queryOptions = None
adapterIt = output_store_plugin.createAdapterStore().getAdapters()
adapterForQuery = None
while (adapterIt.hasNext()):
    adapter = adapterIt.next()
    if (adapter.getAdapterId().equals(adapter_id)):
        adapterForQuery = adapter
        queryOptions = query_options_class(adapterForQuery)
        break

output_rdd = geowave_rdd_class.rddForSimpleFeatures(sc._jsc.sc(), output_store_plugin, None, queryOptions)

sf_df.init(output_store_plugin, adapter_id)

df = sf_df.getDataFrame(output_rdd)
# Convert Java DataFrame to Python DataFrame
import pyspark.mllib.common as convert
py_df = convert._java2py(sc, df)

py_df.createOrReplaceTempView('mycentroids')

df = sql_context.sql("select * from mycentroids")

display(df)

geom,ClusterIndex
POINT (13.320387183455338 52.46353825419876),6
POINT (13.320525829883927 52.48654744125337),0
POINT (13.354396490169815 52.47897227097034),5
POINT (13.483897788444445 52.47727456179392),1
POINT (13.451317701787252 52.493912535122114),4
POINT (13.381976883144644 52.48529588077717),2
POINT (13.417024595936164 52.4819903341358),7
POINT (13.449391117686156 52.462778441293416),3
POINT (9.97782222840828 56.00164078515346),3
POINT (8.937666728475522 51.00333559255339),1


# Parse DataFrame data into lat/lon columns and display centroids on map
Using pixiedust's built in map visualization we can display data on a map assuming it has the following properties.
- Keys: put your latitude and longitude fields here. They must be floating values. These fields must be named latitude, lat or y and longitude, lon or x.
- Values: the field you want to use to thematically color the map. Only one field can be used.

Also you will need a access token from whichever map renderer you choose to use with pixiedust (mapbox, google).
Follow the instructions in the token help on how to create and use the access token.

In [7]:
# Convert the string point information into lat long columns and create a new dataframe for those.
import pyspark
def parseRow(row):
    lat_start = row.geom.rfind(' ') + 1
    lat_end = row.geom.rfind(')')
    lat = row.geom[lat_start:lat_end]
    lon_start = row.geom.find('(') + 1
    lon_end = row.geom.rfind(' ', lon_start)
    lon = row.geom[lon_start:lon_end]
    return pyspark.sql.Row(lat=float(lat), lon=float(lon), ClusterIndex=row.ClusterIndex)
    
row_rdd = df.rdd
new_rdd = row_rdd.map(lambda row: parseRow(row))
new_df =new_rdd.toDF() 
display(new_df)

# Export KMeans Hulls to DataFrame
If you have some more complex data to visualize pixiedust may not be the best option.

The Kmeans hull generation outputs polygons that would be difficult for pixiedust to display without
creating a special plugin. 

Instead, we can use another map renderer to visualize our data. For the Kmeans hulls we will use ipyleaflet to visualize the data. We will start by grabbing the results for the hull generation and putting them into a DataFrame

In [8]:
# Create the dataframe and get a rdd for the output of kmeans
sf_df_hulls = sf_df_class(spark._jsparkSession)
adapter_id = byte_array_class('myhulls')

queryOptions = None
adapterIt = output_store_plugin.createAdapterStore().getAdapters()
adapterForQuery = None
while (adapterIt.hasNext()):
    adapter = adapterIt.next()
    if (adapter.getAdapterId().equals(adapter_id)):
        adapterForQuery = adapter
        queryOptions = query_options_class(adapterForQuery)
        break

output_rdd_hulls = geowave_rdd_class.rddForSimpleFeatures(sc._jsc.sc(), output_store_plugin, None, queryOptions)

sf_df_hulls.init(output_store_plugin, adapter_id)

df_hulls = sf_df_hulls.getDataFrame(output_rdd_hulls)
# Convert Java DataFrame to Python DataFrame
import pyspark.mllib.common as convert
py_df_hulls = convert._java2py(sc, df_hulls)

py_df_hulls.createOrReplaceTempView('myhulls')

df_hulls = sql_context.sql("select * from myhulls order by Density")

display(df_hulls)

geom,ClusterIndex,Count,Area,Density
"POLYGON ((8 58.1667, 7.35 58.2, 6.71667 58.2833, 6.33333 58.35, 5.68333 58.5333, 4.9 59.3, 6.26667 59.8667, 9.98333 59.7833, 10 59.0667, 8.1 58.2, 8 58.1667))",2,396,40508.517690159824,0.0097757218130989
"POLYGON ((2.82627 50.0126, 1.52526 50.0414, 1.44597 50.046, 0.633333 50.8833, 0.9 51.3167, 1.43333 52.05, 1.68333 52.3333, 2.91185 51.2234, 2.94854 50.7526, 2.83176 50.0234, 2.82627 50.0126))",7,2271,27313.40826070446,0.0831459764494959
"POLYGON ((10 52, 9.61667 52.0667, 9.21667 52.25, 8.48333 52.6167, 8.03333 52.85, 7.73333 53.5833, 7.88333 54.2, 8.05 54.6667, 9.63333 54.6167, 9.96667 54.05, 10 53.55, 10 52))",6,3151,34116.93350087639,0.0923588282023956
"POLYGON ((9.76667 54.7167, 8.33333 54.9, 7.83418 55.4838, 8.26553 56.5005, 9.58971 57.248, 9.698 57.2719, 9.94116 57.0355, 9.98823 56.8971, 10 56, 9.76667 54.7167))",3,7005,27029.29687856577,0.2591632343035517
"POLYGON ((6.4 50.6667, 6.06608 50.8674, 5.87476 50.9957, 5.28123 51.4129, 4.33333 52.0833, 3 55.3333, 6.66667 53.5833, 7.4 53.0667, 7.51667 52.6833, 7.1 51.55, 6.96667 51.25, 6.95 51.2167, 6.73333 50.8333, 6.4 50.6667))",4,20052,73468.57797846758,0.2729330082566306
"POLYGON ((3.5 50, 2.97714 50.025, 2.98519 50.8711, 3 51, 3.0032 51.0254, 3.18188 51.329, 3.3593 51.5631, 4.28317 52.0976, 4.61288 51.8724, 6.13333 50.7667, 6.23333 50.55, 6.35 50.0333, 6.09333 50.0178, 5.71844 50.0035, 3.5 50))",5,24059,38549.83986288589,0.6241011658043996
"POLYGON ((0.216667 50.7833, 0.066667 50.85, 0.016667 50.9667, 0 51.7333, 0 52.6667, 0.033333 53.7333, 0.916667 53.4, 1.6 52.7833, 1.61667 52.7667, 1.75 52.6333, 1.45 52.1, 1.25 51.8333, 0.6 50.9833, 0.483333 50.85, 0.25 50.7833, 0.216667 50.7833))",0,17957,24606.88319209844,0.7297551607741287
"POLYGON ((8.58333 50, 7.16667 50.0833, 6.7 50.2167, 6.8 50.8, 7.45 52.2833, 8.13333 52.3, 8.91667 52.2, 9.35 52.1, 9.93333 51.85, 9.96667 51.6, 9.93333 50.9833, 9 50, 8.58333 50))",1,40663,45854.219560941936,0.8867886181326755
"POLYGON ((13.4439443 52.4500005, 13.435513 52.450011, 13.4206216 52.4501, 13.4205883 52.45012, 13.4205783 52.4501416, 13.4206083 52.4510483, 13.4206116 52.4511116, 13.424709 52.45806, 13.4293809 52.465937, 13.4372 52.4791, 13.4374 52.4791, 13.438153 52.479097, 13.4572649 52.477916, 13.459339 52.477773, 13.466092 52.471261, 13.4661683 52.4711433, 13.4672749 52.468509, 13.4730566 52.4546733, 13.468309 52.450024, 13.46598 52.450009, 13.4646785 52.4500018, 13.4644143 52.4500008, 13.4439443 52.4500005))",3,29698,8.506288592078233,3491.299369698938
"POLYGON ((13.405199 52.450043, 13.396737 52.451247, 13.3966093 52.4529647, 13.399521 52.483841, 13.4001392 52.4903368, 13.4009183 52.4984, 13.401309 52.498883, 13.4042161 52.4999445, 13.40436 52.49996, 13.40787 52.4999899, 13.4156276 52.4999992, 13.41718 52.5, 13.42281 52.5, 13.423862 52.499996, 13.42995 52.49997, 13.43001 52.4999, 13.43013 52.49956, 13.4307263 52.4978496, 13.432331 52.4932444, 13.4370699 52.4796, 13.4371 52.4795, 13.437156 52.4791607, 13.43717 52.47906, 13.4206266 52.4512, 13.405199 52.450043))",7,52526,11.949162665465868,4395.789183773082


# Convert Kmeans hull results to geojson
ipyleaflet provides an easy way to visualize leaflet maps in jupyter notebooks.

Our hull data contains wkt geometry strings that we will use with a small python library to convert the geometry to GeoJson. Once our data is converted to a proper GeoJson feature collection we can use ipyleaflet to easily load and display that data on a map.

For more information on the GeoJson format visit: http://geojson.org/

In [9]:
from geomet import wkt
from ipyleaflet import (
    Map,
    Marker,
    TileLayer, ImageOverlay,
    Polyline, Polygon, Rectangle, Circle, CircleMarker,
    GeoJSON,
    DrawControl
)

# Collecting the results will give a array of Rows.
hulls_results = df_hulls.collect()
hulls_geojson = {
    "type": "FeatureCollection",
    "features": []
}
for hull in hulls_results:
    hull = hull.asDict(True)
    output_geojson = {
        "type": "Feature",
        "geometry": {},
        "properties": {}
    }
    # Convert geometry to geojson with geomet
    geom = wkt.loads(hull["geom"])
    output_geojson["geometry"] = geom
    for propKey in hull:
        if propKey != "geom":
            output_geojson["properties"][propKey] = hull[propKey]
    hulls_geojson["features"].append(output_geojson)
print("Count: {0} Features".format(len(hulls_geojson["features"])))

Count: 16 Features


In [12]:
center = [52.54, 13.49]
zoom = 10

In [13]:
m = Map(center=center, zoom=zoom)
g = GeoJSON(data=hulls_geojson)
m.add_layer(g)
m