# 📄 Notebook Overview: GeoParquet Transformation Pipeline

This notebook demonstrates how to process and optimize geospatial performance data from Ookla using Apache Sedona and Apache Spark. It focuses on reading mobile and fixed broadband datasets stored in Parquet format from a public S3 bucket, performing spatial transformations, and writing the outputs in the GeoParquet format. The main goals of the notebook include:

- Enabling scalable, distributed spatial data processing.
- Extracting temporal metadata (year, quarter) from file paths.
- Converting WKT columns into geometries and calculating spatial attributes like bounding boxes and geohashes.
- Optimizing datasets through strategic repartitioning and sorting.
- Writing the final results using the GeoParquet specification with custom CRS and compression.

By the end of this workflow, you’ll have a highly optimized, cloud-native format (GeoParquet) ready for scalable querying and analysis, tailored for large-scale spatial datasets.

# 🔧 Setting Up Sedona Connection

This block initializes **Apache Sedona**, an extension of Apache Spark for spatial data processing. It:

- Imports Sedona and Spark SQL functions.
- Configures Spark to access the Ookla Open Data S3 bucket anonymously.
- Creates a `SedonaContext` instance that enables spatial processing capabilities.



In [2]:
from sedona.spark import *
import pyspark.sql.functions as f

config = SedonaContext. \
    builder(). \
    config("spark.hadoop.fs.s3a.bucket.ookla-open-data.aws.credentials.provider","org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"). \
    getOrCreate()

sedona = SedonaContext.create(config)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

# 📱 Loading and Transforming Mobile Parquet Data

```python
from pyspark.sql.functions import input_file_name, regexp_extract
```
**What this does:**
- Imports two useful functions from PySpark:
  - `input_file_name()`: Captures the full path of the input file each row originated from, which is useful for extracting metadata embedded in the directory structure (e.g., year, quarter).
  - `regexp_extract()`: Applies a regular expression to a string column and extracts specific patterns, used here to pull out the `year` and `quarter` from file paths.

---

```python
mobile = sedona.read.format("parquet")\
    .load('s3://ookla-open-data/parquet/performance/type=mobile/*/*/*.parquet') \
```
**What this does:**
- Loads all mobile performance parquet files from Ookla's public S3 bucket.
- Uses wildcard characters to recursively grab all files regardless of year or quarter.
- Reads the data into a Spark DataFrame using Sedona's reader.

---


```python
    .withColumn("file_path", input_file_name()) \
```
**What this does:**
- Adds a new column called `file_path` containing the S3 path where each row came from.
- This column is used to extract temporal metadata.

---


```python
    .withColumn("year", regexp_extract("file_path", r"year=(\\d+)", 1)) \
```
**What this does:**
- Extracts the four-digit `year` from the file path using a regex pattern and stores it in a new column.

---

```python
    .withColumn("quarter", regexp_extract("file_path", r"quarter=(\\d+)", 1)) \
```
**What this does:**
- Extracts the `quarter` (1 to 4) from the file path using another regex pattern.

---

```python
    .withColumn("geometry", expr("ST_GeomFromText(tile)")) \
    
```
**What this does:**
- Converts the `tile` column (in WKT format) into a geometry column using Sedona's `ST_GeomFromText()` function.
- This enables spatial operations on the geometries.

---

```python
    .withColumn("bbox", expr("struct(st_xmin(ST_GeomFromText(tile)) as xmin, st_ymin(ST_GeomFromText(tile)) as ymin, st_xmax(ST_GeomFromText(tile)) as xmax, st_ymax(ST_GeomFromText(tile)) as ymax) as bbox")) \
```
**What this does:**
- Constructs a bounding box for each geometry by extracting the min and max x and y coordinates.
- Creates a struct (`bbox`) with fields `xmin`, `ymin`, `xmax`, and `ymax` for spatial indexing or filtering.

---


```python
    .withColumn("geohash", expr("ST_GeoHash(ST_GeomFromText(tile), 10)")) \
```
**What this does:**
- Generates a geohash with precision 10 for each geometry using Sedona.
- Geohashing encodes spatial location into alphanumeric strings and is useful for spatial partitioning or clustering.

---


```python
    .selectExpr("*", ''' "mobile" as type''') \
```
**What this does:**
- Adds a new column called `type` and sets its value to "mobile" for all rows.
- Helps distinguish this dataset from others, such as fixed broadband data.

---

```python
    .orderBy(expr("ST_GeoHash(ST_GeomFromText(tile), 6)")) \
```
**What this does:**
- Orders the rows based on a geohash of precision 6 (lower precision = larger area).
- This helps optimize data locality for downstream spatial operations.

---

```python
    .drop("file_path")
```
**What this does:**
- Removes the `file_path` column since it was only used temporarily to extract metadata like year and quarter.
- Cleans up the final DataFrame for further analysis or saving.



In [3]:
from pyspark.sql.functions import input_file_name, regexp_extract, expr

fixed = sedona.read.format("parquet")\
    .load('s3://ookla-open-data/parquet/performance/type=fixed/*/*/*.parquet') \

fixed = fixed.withColumn("file_path", input_file_name()) \
    .withColumn("year", regexp_extract("file_path", r"year=(\d+)", 1)) \
    .withColumn("quarter", regexp_extract("file_path", r"quarter=(\d+)", 1)) \
    .withColumn("geometry", expr("ST_GeomFromText(tile)")) \
    .withColumn("bbox", expr("struct(st_xmin(ST_GeomFromText(tile)) as xmin, st_ymin(ST_GeomFromText(tile)) as ymin, st_xmax(ST_GeomFromText(tile)) as xmax, st_ymax(ST_GeomFromText(tile)) as ymax) as bbox")) \
    .withColumn("geohash", expr("ST_GeoHash(ST_GeomFromText(tile), 10)")) \
    .selectExpr("*", ''' "mobile" as type''') \
    .orderBy(expr("ST_GeoHash(ST_GeomFromText(tile), 6)")) \
    .drop("file_path")

                                                                                

In [4]:
from pyspark.sql.functions import input_file_name, regexp_extract

mobile = sedona.read.format("parquet")\
    .load('s3://ookla-open-data/parquet/performance/type=mobile/*/*/*.parquet') \

mobile = mobile.withColumn("file_path", input_file_name()) \
    .withColumn("year", regexp_extract("file_path", r"year=(\d+)", 1)) \
    .withColumn("quarter", regexp_extract("file_path", r"quarter=(\d+)", 1)) \
    .withColumn("geometry", expr("ST_GeomFromText(tile)")) \
    .withColumn("bbox", expr("struct(st_xmin(ST_GeomFromText(tile)) as xmin, st_ymin(ST_GeomFromText(tile)) as ymin, st_xmax(ST_GeomFromText(tile)) as xmax, st_ymax(ST_GeomFromText(tile)) as ymax) as bbox")) \
    .withColumn("geohash", expr("ST_GeoHash(ST_GeomFromText(tile), 10)")) \
    .selectExpr("*", ''' "mobile" as type''') \
    .orderBy(expr("ST_GeoHash(ST_GeomFromText(tile), 6)")) \
    .drop("file_path")

                                                                                

# Store the `projjson`

This section stores the projection information of the data in a `json` format that can be passed to the GeoParquet metadata. 

You can find this projection and other as [EPSG.io](https://epsg.io/4326)

In [5]:
projjson = '''{
    "$schema": "https://proj.org/schemas/v0.7/projjson.schema.json",
    "type": "GeographicCRS",
    "name": "WGS 84",
    "datum_ensemble": {
        "name": "World Geodetic System 1984 ensemble",
        "members": [
            {
                "name": "World Geodetic System 1984 (Transit)",
                "id": {
                    "authority": "EPSG",
                    "code": 1166
                }
            },
            {
                "name": "World Geodetic System 1984 (G730)",
                "id": {
                    "authority": "EPSG",
                    "code": 1152
                }
            },
            {
                "name": "World Geodetic System 1984 (G873)",
                "id": {
                    "authority": "EPSG",
                    "code": 1153
                }
            },
            {
                "name": "World Geodetic System 1984 (G1150)",
                "id": {
                    "authority": "EPSG",
                    "code": 1154
                }
            },
            {
                "name": "World Geodetic System 1984 (G1674)",
                "id": {
                    "authority": "EPSG",
                    "code": 1155
                }
            },
            {
                "name": "World Geodetic System 1984 (G1762)",
                "id": {
                    "authority": "EPSG",
                    "code": 1156
                }
            },
            {
                "name": "World Geodetic System 1984 (G2139)",
                "id": {
                    "authority": "EPSG",
                    "code": 1309
                }
            }
        ],
        "ellipsoid": {
            "name": "WGS 84",
            "semi_major_axis": 6378137,
            "inverse_flattening": 298.257223563
        },
        "accuracy": "2.0",
        "id": {
            "authority": "EPSG",
            "code": 6326
        }
    },
    "coordinate_system": {
        "subtype": "ellipsoidal",
        "axis": [
            {
                "name": "Geodetic latitude",
                "abbreviation": "Lat",
                "direction": "north",
                "unit": "degree"
            },
            {
                "name": "Geodetic longitude",
                "abbreviation": "Lon",
                "direction": "east",
                "unit": "degree"
            }
        ]
    },
    "scope": "Horizontal component of 3D system.",
    "area": "World.",
    "bbox": {
        "south_latitude": -90,
        "west_longitude": -180,
        "north_latitude": 90,
        "east_longitude": 180
    },
    "id": {
        "authority": "EPSG",
        "code": 4326
    }
}'''

# 📊 GeoParquet Optimizations

```python
sedona.conf.set("spark.sql.parquet.page.size", "128MB")
```
**What this does:**
- Adjusts the Parquet page size to 128 MB.
- Larger page sizes reduce metadata overhead and improve read performance for large datasets.

---

```python
mobile = mobile.repartition(1)
fixed = fixed.repartition(1)
```
**What this does:**
- Repartitions both mobile and fixed datasets into a single partition.
- This ensures each is written to a single output file, useful for testing but not scalable for large data.

---

```python
mobile = mobile.repartitionByRange(10, "geohash") \
    .sortWithinPartitions("geohash") \
    .drop("geohash")

fixed = fixed.repartitionByRange(10, "geohash") \
    .sortWithinPartitions("geohash") \
    .drop("geohash")
```
**What this does:**
- Repartitions the data into 10 ranges based on `geohash` - *note you will need to adjust this number to achieve your ideal partitoned file size*.
- Sorts data within each partition by `geohash` for spatial locality.
- Drops the `geohash` column after it's used for partitioning and sorting.

---


```python
import os
user_uri = os.getenv("USER_S3_PATH")
```
**What this does:**
- Loads a custom S3 path from an environment variable named `USER_S3_PATH`.
- This is where the final GeoParquet files will be saved.

---


```python
mobile.write \
    .format("geoparquet") \
    .option("geoparquet.version", "1.1.0") \
    .option("geoparquet.crs", projjson) \
    .option("geoparquet.covering", "bbox") \
    .save(user_uri + "ookla_mobile", mode='overwrite', compression='zstd')
```
**What this does:**
- Writes the `mobile` DataFrame to S3 in the GeoParquet format.
- Sets the format version, CRS in `projjson`, and specifies bounding box coverage.
- Uses Zstandard (zstd) compression for efficient storage.

---

```python
mobile.count()
fixed.count()
```
**What this does:**
- Triggers a count operation to evaluate and materialize the transformations.
- Useful for logging or validating the number of records written.



In [6]:
sedona.conf.set("spark.sql.parquet.page.size", "128MB")  # Set page size to 128 MB

In [7]:
mobile = mobile.repartition(1)
fixed = fixed.repartition(1)

In [8]:
mobile = mobile.repartitionByRange(10, "geohash") \
    .sortWithinPartitions("geohash") \
    .drop("geohash") 

fixed = fixed.repartitionByRange(10, "geohash") \
    .sortWithinPartitions("geohash") \
    .drop("geohash") 

In [9]:
import os
user_uri = os.getenv("USER_S3_PATH")
user_uri

's3://wbts-wbc-ymm1bun8sj/jf3gkm4ile/data/customer-besg0oop07pktb/'

In [None]:
%%time

mobile.write \
    .format("geoparquet") \
    .option("geoparquet.version", "1.1.0") \
    .option("geoparquet.crs", projjson) \
    .option("geoparquet.covering", "bbox") \
    .save(user_uri + "ookla_mobile", mode='overwrite', compression='zstd')

In [None]:
%%time

fixed.write \
    .format("geoparquet") \
    .option("geoparquet.version", "1.1.0") \
    .option("geoparquet.crs", projjson) \
    .option("geoparquet.covering", "bbox") \
    .save(user_uri + "ookla_fixed", mode='overwrite', compression='zstd')

In [None]:
mobile.count()

In [None]:
fixed.count()