# Apache Spark with GeoMesa and Apache HBase

In [1]:
%matplotlib inline

In [2]:
import os
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

import geomesa_pyspark

In [3]:
geomesa_fs_jar_filepath = os.path.join(
    os.environ['GEOMESA_FS_HOME'], 
    "dist/spark/geomesa-fs-spark-runtime_2.11-3.0.0.jar")
geomesa_hbase_jar_filepath = os.path.join(
    os.environ['GEOMESA_HBASE_HOME'],
    "dist/spark/geomesa-hbase-spark-runtime-hbase2_2.11-3.0.0.jar")

In [4]:
conf = geomesa_pyspark.configure(
    jars=[geomesa_hbase_jar_filepath],
    packages=['geomesa_pyspark','pytz'],
    spark_home=os.environ['SPARK_HOME']).\
    setAppName('geomesa_hbase')

In [5]:
%%time
spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .enableHiveSupport() \
    .getOrCreate()

CPU times: user 48.9 ms, sys: 21.8 ms, total: 70.7 ms
Wall time: 30.9 s


# Prepare DataFrames

In [6]:
%%time
params = {"hbase.catalog": "gdelt_custom_20200101"}

df_hbase = spark.read.format("geomesa") \
                .options(**params) \
                .option("geomesa.feature", "gdelt_custom") \
                .load()

df_hbase.count()

CPU times: user 9.78 ms, sys: 4.66 ms, total: 14.4 ms
Wall time: 50 s


86770

In [7]:
%%time
params = {"hbase.catalog": "ne_countries"}

df_hbase_ne = spark.read.format("geomesa") \
                   .options(**params) \
                   .option("geomesa.feature", "ne_countries") \
                   .load()

df_hbase_ne.count()

CPU times: user 3.99 ms, sys: 0 ns, total: 3.99 ms
Wall time: 2.42 s


176

In [8]:
%%time
df_hbase.createOrReplaceTempView("gdelt_custom_20200101")
df_hbase_ne.createOrReplaceTempView("ne_countries")
spark.sql("SHOW TABLES").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|      gdelt_csv_2019|      false|
| default|      gdelt_csv_2020|      false|
| default|  gdelt_parquet_2020|      false|
| default|gdelt_parquet_ins...|      false|
| default|gdelt_parquet_ins...|      false|
| default|ne_10_states_prov...|      false|
| default|ne_110_countries_...|      false|
|        |gdelt_custom_2020...|       true|
|        |        ne_countries|       true|
+--------+--------------------+-----------+

CPU times: user 29.9 ms, sys: 14.9 ms, total: 44.7 ms
Wall time: 3min 1s


# Geospatial Functions

In [9]:
EXTENT = [9.5307, 46.3723, 17.1608, 49.0205]

In [10]:
%%time
df = spark.sql("""
  SELECT
    event_root_code,
    COUNT(event_id) AS cnt
  FROM
    gdelt_custom_20200101
  WHERE 
    ST_Within(geom, st_makeBBOX({}, {}, {}, {}))
  GROUP BY event_root_code
  ORDER BY cnt DESC
""".format(*EXTENT)).toPandas()

df.head()

CPU times: user 479 ms, sys: 241 ms, total: 720 ms
Wall time: 2min 8s


Unnamed: 0,event_root_code,cnt
0,4,72
1,1,39
2,5,31
3,3,18
4,2,13


# Spatial Join

In [11]:
%%time
df = spark.sql("""
  SELECT
    c.iso_a2,
    COUNT(g.event_id) AS cnt
  FROM
    gdelt_custom_20200101 AS g,
    ne_countries AS c
  WHERE ST_Within(g.geom, c.polygons)
  GROUP BY c.iso_a2
""").toPandas()

df.head()

CPU times: user 30.9 ms, sys: 10.6 ms, total: 41.5 ms
Wall time: 59.2 s


Unnamed: 0,iso_a2,cnt
0,MM,65
1,DZ,56
2,LT,30
3,CI,47
4,AZ,48
