## Iniciando el entorno glue+spark+sedona

In [46]:
%pip install apache-sedona==1.6.1

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [47]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.6.1,org.datasyslab:geotools-wrapper:1.6.1-28.2"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,,pyspark,idle,,,,✔


## ETL

In [None]:
from sedona.spark import *
from sedona.core.SpatialRDD import SpatialRDD
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.utils.adapter import Adapter
from pyspark.sql.functions import col
from pyspark.sql import functions as f

sedona = SedonaContext.create(spark)

### Obteniendo las pulsaciones.

In [49]:
pulsaciones = sedona.read.format("csv").options(header=True).load("s3://pruebapulster/data/pulsaciones/*.csv")
pulsaciones = pulsaciones.withColumn(
    "geometry",
    ST_Point(col("Lon of Observation Point"), col("Lat of Observation Point"))
)

pulsaciones.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3456

In [50]:
pulsaciones = pulsaciones.dropDuplicates()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
pulsaciones = pulsaciones.withColumn("datetime", f.to_timestamp(f.concat(f.col('Date'), f.lit(' '), f.col('Time of Day'))))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [72]:
pulsaciones.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+------------------------+------------------------+----------+-----------+-----------+--------------------+-------------------+
|Polygon ID|    Hashed Device ID|Lat of Observation Point|Lon of Observation Point|      Date|Time of Day|Day of Week|            geometry|           datetime|
+----------+--------------------+------------------------+------------------------+----------+-----------+-----------+--------------------+-------------------+
|   GT00771|4da5fc59fdeefefe0...|               14.641143|              -90.509878|2023-03-01|   14:54:55|        Wed|POINT (-90.509878...|2023-03-01 14:54:55|
|   GT00769|57546f4ae7abadc39...|               14.608177|              -90.519319|2023-03-01|   13:44:06|        Wed|POINT (-90.519319...|2023-03-01 13:44:06|
|   GT00769|cfe5756318a4c6786...|               14.626453|              -90.552004|2023-03-01|   16:49:16|        Wed|POINT (-90.552004...|2023-03-01 16:49:16|
|   GT00769|3a23c4496be09b33f...|       

### Obteniendo los comercios

In [51]:
comercios_rdd = ShapefileReader.readToGeometryRDD(sedona, "s3://pruebapulster/shapes/Comercios/")
comercios = Adapter.toDf(comercios_rdd, sedona)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Realizando la consulta

In [73]:
comercios.createOrReplaceTempView("comercios")
pulsaciones.createOrReplaceTempView("pulsaciones")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
sedona.sql("select `Polygon Id`, count(*) from pulsaciones GROUP BY `Polygon Id`").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------+
|Polygon Id|count(1)|
+----------+--------+
|   GT00772|     203|
|   GT00773|     587|
|   GT00769|    1229|
|   GT00771|     721|
|   GT00770|     716|
+----------+--------+

In [76]:
resultado = sedona.sql("""
    SELECT comercios.ARGOSCODE, count(*) as conteo FROM comercios, pulsaciones 
        WHERE ST_Contains(comercios.geometry, pulsaciones.geometry) 
        GROUP BY comercios.ARGOSCODE
    """)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [77]:
resultado.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+------+
|ARGOSCODE|conteo|
+---------+------+
|  GT00772|    25|
|  GT00773|    57|
|  GT00769|   120|
|  GT00771|    37|
|  GT00770|   233|
+---------+------+

In [63]:
resultado.write.mode("overwrite").parquet("s3://pruebapulster/data/resultados/conteo_pulsaciones")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…