In [1]:
#****************************************************************************
# (C) Cloudera, Inc. 2020-2024
#  All rights reserved.
#
#  Applicable Open Source License: GNU Affero General Public License v3.0
#
#  NOTE: Cloudera open source products are modular software products
#  made up of hundreds of individual components, each of which was
#  individually copyrighted.  Each Cloudera open source product is a
#  collective work under U.S. Copyright Law. Your license to use the
#  collective work is as provided in your written agreement with
#  Cloudera.  Used apart from the collective work, this file is
#  licensed for your use pursuant to the open source license
#  identified above.
#
#  This code is provided to you pursuant a written agreement with
#  (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
#  this code. If you do not have a written agreement with Cloudera nor
#  with an authorized and properly licensed third party, you do not
#  have any rights to access nor to use this code.
#
#  Absent a written agreement with Cloudera, Inc. (“Cloudera”) to the
#  contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
#  KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
#  WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
#  IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
#  FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
#  AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
#  ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
#  OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
#  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
#  CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
#  RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
#  BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
#  DATA.
#
# #  Author(s): Paul de Fusco
#***************************************************************************/

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
import os, warnings, sys, logging
import pandas as pd
import numpy as np
from datetime import date
import cml.data_v1 as cmldata
import seaborn as sns
import stumpy
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import pandas as pd
from sedona.spark import *
from sedona.core.geom.envelope import Envelope

Skipping SedonaKepler import, verify if keplergl is installed
Skipping SedonaPyDeck import, verify if pydeck is installed


In [3]:
USERNAME = os.environ["PROJECT_OWNER"]
DBNAME = "LOGISTICS_MLOPS_DEMO"
STORAGE = "s3a://goes-se-sandbox01"
CONNECTION_NAME = "se-aw-mdl"
DATE = date.today()

In [6]:
config = SedonaContext.builder() .\
    config('spark.jars.packages',
           'org.apache.sedona:sedona-spark-3.0_2.12:1.5.1,'
           'org.datasyslab:geotools-wrapper:1.5.1-28.2,'
           'uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.4'). \
    config('spark.jars.repositories', 'https://artifacts.unidata.ucar.edu/repository/unidata-all'). \
    getOrCreate()

In [7]:
sedona = SedonaContext.create(config)
sc = sedona.sparkContext

24/03/15 05:13:06 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.geom.Geometry, which is already registered.
24/03/15 05:13:06 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.index.SpatialIndex, which is already registered.
24/03/15 05:13:06 WARN UDTRegistration: Cannot register UDT for org.geotools.coverage.grid.GridCoverage2D, which is already registered.
24/03/15 05:13:06 WARN SimpleFunctionRegistry: The function st_union_aggr replaced a previously registered function.
24/03/15 05:13:06 WARN SimpleFunctionRegistry: The function st_envelope_aggr replaced a previously registered function.
24/03/15 05:13:06 WARN SimpleFunctionRegistry: The function st_intersection_aggr replaced a previously registered function.
24/03/15 05:13:06 WARN SimpleFunctionRegistry: The function rs_union_aggr replaced a previously registered function.


### Import State Boundaries

In [98]:
states_wkt = sedona.read.option("delimiter", "\t")\
                    .option("header", "false")\
                    .csv("data/boundary-each-state.tsv")\
                    .toDF("s_name","s_bound")

In [99]:
states = states_wkt.selectExpr("s_name", "ST_GeomFromWKT(s_bound) as s_bound")
states.show()
states.printSchema()
states.createOrReplaceTempView("states")

+-------------+--------------------+
|       s_name|             s_bound|
+-------------+--------------------+
|       Alaska|POLYGON ((-141.02...|
|      Alabama|POLYGON ((-88.195...|
|     Arkansas|POLYGON ((-94.041...|
|      Arizona|POLYGON ((-112.59...|
|   California|POLYGON ((-124.40...|
|     Colorado|POLYGON ((-109.04...|
|  Connecticut|POLYGON ((-73.487...|
|     Delaware|POLYGON ((-75.791...|
|      Florida|POLYGON ((-87.605...|
|      Georgia|POLYGON ((-85.608...|
|       Hawaii|POLYGON ((-154.62...|
|         Iowa|POLYGON ((-95.762...|
|        Idaho|POLYGON ((-117.03...|
|     Illinois|POLYGON ((-90.629...|
|      Indiana|POLYGON ((-87.525...|
|       Kansas|POLYGON ((-102.05...|
|     Kentucky|POLYGON ((-89.537...|
|    Louisiana|POLYGON ((-94.043...|
|Massachusetts|POLYGON ((-72.778...|
|     Maryland|POLYGON ((-79.477...|
+-------------+--------------------+
only showing top 20 rows

root
 |-- s_name: string (nullable = true)
 |-- s_bound: geometry (nullable = true)



### Import IOT Fleet Data

In [103]:
iotDf = sedona.read.option("delimiter", ",")\
            .option("header", "true")\
            .csv("data/iot_fleet_data.csv")

iotDf.createOrReplaceTempView("IOT_FLEET_DATA")

In [104]:
iotGeoDf = sedona.sql("""SELECT ST_Point(CAST(IOT_FLEET_DATA.latitude AS Decimal(24,20)),
                CAST(IOT_FLEET_DATA.longitude AS Decimal(24,20))) AS iot_coords,
                IOT_FLEET_DATA.device_id,
                IOT_FLEET_DATA.event_type
                FROM IOT_FLEET_DATA
                """)

iotGeoDf.show()

iotGeoDf.createOrReplaceTempView("IOT_GEO_DATA")

+--------------------+---------------+------------------+
|          iot_coords|      device_id|        event_type|
+--------------------+---------------+------------------+
|POINT (42.184032 ...|0x1000000000005|system malfunction|
|POINT (42.094486 ...|0x100000000001d|    tank below 10%|
|POINT (41.742584 ...|0x1000000000008|    tank below 10%|
|POINT (42.07303 -...|0x100000000001b|    tank below 10%|
|POINT (42.397186 ...|0x1000000000014|    tank below 10%|
|POINT (41.98392 -...|0x100000000001c|      device error|
|POINT (41.606995 ...|0x1000000000005|system malfunction|
|POINT (42.44871 -...|0x100000000000b|      device error|
|POINT (42.146732 ...|0x1000000000024|    tank below 10%|
|POINT (41.681465 ...|0x1000000000009|    tank below 10%|
|POINT (42.48936 -...|0x1000000000025|      device error|
|POINT (41.808537 ...|0x1000000000001|    tank below 10%|
|POINT (41.654545 ...|0x1000000000020|system malfunction|
|POINT (41.785316 ...|0x1000000000017|     tank below 5%|
|POINT (41.707

### What US State are the IOT Fleet Devices located in?

In [105]:
containsDf = sedona.sql("""SELECT * 
                            FROM states s, IOT_GEO_DATA iot 
                            WHERE ST_Covers(s.s_bound, ST_FlipCoordinates(iot.iot_coords))""")
containsDf.show()

+------+--------------------+--------------------+---------------+------------------+
|s_name|             s_bound|          iot_coords|      device_id|        event_type|
+------+--------------------+--------------------+---------------+------------------+
|  Iowa|POLYGON ((-95.762...|POINT (41.596134 ...|0x100000000001c|     tank below 5%|
|  Iowa|POLYGON ((-95.762...|POINT (41.59631 -...|0x100000000001e|system malfunction|
|  Iowa|POLYGON ((-95.762...|POINT (41.5991 -9...|0x100000000000f|system malfunction|
|  Iowa|POLYGON ((-95.762...|POINT (41.604393 ...|0x1000000000007|     tank below 5%|
|  Iowa|POLYGON ((-95.762...|POINT (41.619007 ...|0x100000000001d|    tank below 10%|
|  Iowa|POLYGON ((-95.762...|POINT (41.620396 ...|0x1000000000015|     tank below 5%|
|  Iowa|POLYGON ((-95.762...|POINT (41.62198 -...|0x1000000000015|      device error|
|  Iowa|POLYGON ((-95.762...|POINT (41.629917 ...|0x1000000000027|      device error|
|  Iowa|POLYGON ((-95.762...|POINT (41.645958 ...|0x10