## BigQuery ELT of EarthGenome Embeddings for Vector Search

*Note: This notebook will create and consume resources on Google Cloud. Though it should be minimal, be mindful of cost and always delete resources after running demos.*

In order to run this demo you will need Google Cloud IAM permissions to:
* read, write, and create Cloud Storage objects
* read, write, and create BigQuery resources

Refer to [docs](https://cloud.google.com/iam/docs/roles-overview) for more info if you get a permissions-related error

## Downloading Earthgenome Geoparquet's

### Earth Genome has hosted it on Source.Coop - let's check how its organized -> [link](https://source.coop/repositories/earthgenome/earthindexembeddings/description)

### In [00_s2_tile_management.ipynb](./00_s2_tile_management.ipynb) we've already aggregated UTM tile IDs to country boundaries 

#### we'll use that JSON file to help us pull only the EG parquet files we need for a country..

In [None]:
# Read in our country-tile JSON reference
import os
import json
import geopandas as gpd
tile_dict = json.load(open("../esa_grid/adm0_tiles_by_country.json"))
print(tile_dict.keys())

dict_keys(['Kenya'])


In [3]:
country = "Kenya"
tiles = tile_dict[country]
tiles.sort()
print(f"{len(tiles)} S2 tiles covering {country}")
for t in tiles:
    print(t)

89 S2 tiles covering Kenya
36MWD
36MWE
36MXD
36MXE
36MYC
36MYD
36MYE
36MZC
36MZD
36MZE
36NWF
36NXF
36NXG
36NXH
36NXJ
36NXK
36NXL
36NYF
36NYG
36NYH
36NYJ
36NYK
36NYL
36NZF
36NZG
36NZH
36NZJ
36NZK
36NZL
37MBS
37MBT
37MBU
37MBV
37MCR
37MCS
37MCT
37MCU
37MCV
37MDQ
37MDR
37MDS
37MDT
37MDU
37MDV
37MEQ
37MER
37MES
37MET
37MEU
37MEV
37MFS
37MFT
37MFU
37MFV
37MGT
37MGU
37MGV
37NBA
37NBB
37NBC
37NBD
37NBE
37NBF
37NCA
37NCB
37NCC
37NCD
37NCE
37NDA
37NDB
37NDC
37NDD
37NDE
37NEA
37NEB
37NEC
37NED
37NEE
37NFA
37NFB
37NFC
37NFD
37NFE
37NGA
37NGB
37NGC
37NGD
37NGE
37NHE


In [None]:
import subprocess

dryrun=True

for i,t in enumerate(tiles):
    suffix = "2024-01-01_2025-01-01.parquet"
    pattern = f"s3://earthgenome/earthindexembeddings/2024/{t}_{suffix}"
    cmd = f"aws s3 cp {pattern} ../embeddings/earthgenome/2024/{t}_{suffix} --endpoint-url=https://data.source.coop"
    if dryrun:

        print(cmd)
    else:
        print(f"Running {cmd}")
        try:
            subprocess.run(cmd, shell=True, capture_output=True, check=True)
        except subprocess.CalledProcessError as e:
            print(f"Error copying {t}: {e}")
            # If the file does not exist, we can skip it
            if "does not exist" in e.stderr.decode():
                print(f"File {t} does not exist, skipping.")
                continue
            else:
                raise
    # break


aws s3 cp s3://earthgenome/earthindexembeddings/2024/36MWD_2024-01-01_2025-01-01.parquet ../embeddings/earthgenome/2024/36MWD_2024-01-01_2025-01-01.parquet --endpoint-url=https://data.source.coop


Look at a geoparquet file

In [47]:
# look at one
files = os.listdir("../embeddings/earthgenome/2024")
print(f"{len(list(files))} files:\n {files}")
file = os.path.join("../embeddings/earthgenome/2024", files[0])
print(file)
df = gpd.read_parquet(file)
print(df.head())


88 files:
 ['36MYC_2024-01-01_2025-01-01.parquet', '37NBF_2024-01-01_2025-01-01.parquet', '36NZL_2024-01-01_2025-01-01.parquet', '36MWD_2024-01-01_2025-01-01.parquet', '36NXG_2024-01-01_2025-01-01.parquet', '37NBE_2024-01-01_2025-01-01.parquet', '36MZC_2024-01-01_2025-01-01.parquet', '37NFA_2024-01-01_2025-01-01.parquet', '36NXK_2024-01-01_2025-01-01.parquet', '37NEC_2024-01-01_2025-01-01.parquet', '36MWE_2024-01-01_2025-01-01.parquet', '37MDS_2024-01-01_2025-01-01.parquet', '37NCB_2024-01-01_2025-01-01.parquet', '37NDE_2024-01-01_2025-01-01.parquet', '37NFC_2024-01-01_2025-01-01.parquet', '37NCA_2024-01-01_2025-01-01.parquet', '37NDB_2024-01-01_2025-01-01.parquet', '37MCU_2024-01-01_2025-01-01.parquet', '37MCV_2024-01-01_2025-01-01.parquet', '37NDC_2024-01-01_2025-01-01.parquet', '37NGE_2024-01-01_2025-01-01.parquet', '37NBB_2024-01-01_2025-01-01.parquet', '37MDU_2024-01-01_2025-01-01.parquet', '37MFT_2024-01-01_2025-01-01.parquet', '37NGC_2024-01-01_2025-01-01.parquet', '37NEE_2024-0

we'll add a tile column to help us stay organized

In [34]:
# overwrite all files to add tile column
for file in files:
    file_path = os.path.join("../embeddings/earthgenome/2024", file)
    df = gpd.read_parquet(file_path)
    df.loc[:,'tile'] = os.path.basename(file).split("_")[0]
    df.to_parquet(file_path, index=False)
    print(f"Updated {file} with tile column.")


Updated 36MYC_2024-01-01_2025-01-01.parquet with tile column.
Updated 37NBF_2024-01-01_2025-01-01.parquet with tile column.
Updated 36NZL_2024-01-01_2025-01-01.parquet with tile column.
Updated 36MWD_2024-01-01_2025-01-01.parquet with tile column.
Updated 36NXG_2024-01-01_2025-01-01.parquet with tile column.
Updated 37NBE_2024-01-01_2025-01-01.parquet with tile column.
Updated 36MZC_2024-01-01_2025-01-01.parquet with tile column.
Updated 37NFA_2024-01-01_2025-01-01.parquet with tile column.
Updated 36NXK_2024-01-01_2025-01-01.parquet with tile column.
Updated 37NEC_2024-01-01_2025-01-01.parquet with tile column.
Updated 36MWE_2024-01-01_2025-01-01.parquet with tile column.
Updated 37MDS_2024-01-01_2025-01-01.parquet with tile column.
Updated 37NCB_2024-01-01_2025-01-01.parquet with tile column.
Updated 37NDE_2024-01-01_2025-01-01.parquet with tile column.
Updated 37NFC_2024-01-01_2025-01-01.parquet with tile column.
Updated 37NCA_2024-01-01_2025-01-01.parquet with tile column.
Updated 

In [48]:
print(gpd.read_parquet(file_path).head())

                  id                                          embedding  \
0  21363011648817723  [3.6608987, 0.4451008, 3.3323448, 1.6121677, 1...   
1  21363011672943977  [3.8903985, 0.748016, 3.8796003, 1.3907712, 1....   
2  21363011653017210  [3.4100828, 0.6301242, 3.1301181, 1.398467, 1....   
3  21363011677153661  [3.1949708, 1.050448, 3.9431, 1.1274718, 1.851...   
4  21363011665615662  [3.1119442, 0.5853041, 3.586332, 1.4484098, 1....   

                    geometry   tile  
0  POINT (38.10094 -4.60938)  37MDR  
1  POINT (38.10238 -4.60794)  37MDR  
2  POINT (38.10382 -4.60938)  37MDR  
3  POINT (38.10526 -4.60794)  37MDR  
4  POINT (38.10671 -4.60938)  37MDR  


### Loading Data into BigQuery

You'll need a GCS bucket and a BigQuery Dataset 

In [36]:
# change to your GCS settings
BUCKET = "gs://embeddings-kenya"
PROJECT_ID = "g4g-eaas"
LOCATION = "us-central1"
DATASET_ID = "embeddings_kenya"
TABLE_ID = "earthgenome_kenya"

In [None]:
# create the storage bucket and BigQuery dataset 
!gcloud storage buckets create {BUCKET} --location {LOCATION} --project {PROJECT_ID}
!bq mk -d --data_location={LOCATION} --project_id {PROJECT_ID} {DATASET_ID}  

Creating gs://embeddings-kenya/...
[1;31mERROR:[0m (gcloud.storage.buckets.create) HTTPError 409: Your previous request to create the named bucket succeeded and you already own it.
BigQuery error in mk operation: Dataset 'g4g-eaas:embeddings_kenya' already
exists.


In [38]:
# upload parquet files to gcs
# try gcloud storage sync..
gcloud_folder = f"{BUCKET}/earthgenome/2024"
!gcloud storage rsync ../embeddings/earthgenome/2024 $gcloud_folder \
    --project=$PROJECT_ID

At file://../embeddings/earthgenome/2024/*, worker process 1285094 thread 130040955144000 listed 88...
At gs://embeddings-kenya/earthgenome/2024/*, worker process 1285094 thread 130040955144000 listed 1...
uploading large objects. If you would like to opt-out and instead
perform a normal upload, run:
`gcloud config set storage/parallel_composite_upload_enabled False`
`gcloud config set storage/parallel_composite_upload_enabled True`
Note that with parallel composite uploads, your object might be
uploaded as a composite object
(https://cloud.google.com/storage/docs/composite-objects), which means
that any user who downloads your object will need to use crc32c
checksums to verify data integrity. gcloud storage is capable of
computing crc32c checksums, but this might pose a problem for other
clients.

Copying file://../embeddings/earthgenome/2024/36MWD_2024-01-01_2025-01-01.parquet to gs://embeddings-kenya/earthgenome/2024/36MWD_2024-01-01_2025-01-01.parquet
Copying file://../embeddings/e

In [39]:
FULL_TABLE = f"{PROJECT_ID}:{DATASET_ID}.{TABLE_ID}"
FOLDER = "earthgenome/2024"
print(FULL_TABLE)
for file in files:
    URI = f"{BUCKET}/{FOLDER}/{file}" 
    
    print(URI)
    !bq --location=$LOCATION --project_id=$PROJECT_ID \
            load \
                --source_format=PARQUET \
                $FULL_TABLE \
                $URI

g4g-eaas:embeddings_kenya.earthgenome_kenya
gs://embeddings-kenya/earthgenome/2024/36MYC_2024-01-01_2025-01-01.parquet
Waiting on bqjob_r5bddcafebe36db92_00000197e6f627f4_1 ... (34s) Current status: DONE   
gs://embeddings-kenya/earthgenome/2024/37NBF_2024-01-01_2025-01-01.parquet
Waiting on bqjob_rdce0b3eb965f41d_00000197e6f6bfca_1 ... (34s) Current status: DONE   
gs://embeddings-kenya/earthgenome/2024/36NZL_2024-01-01_2025-01-01.parquet
Waiting on bqjob_r76dae7561340ac55_00000197e6f755f7_1 ... (34s) Current status: DONE   
gs://embeddings-kenya/earthgenome/2024/36MWD_2024-01-01_2025-01-01.parquet
Waiting on bqjob_r769afeb0e07f1105_00000197e6f7eb89_1 ... (34s) Current status: DONE   
gs://embeddings-kenya/earthgenome/2024/36NXG_2024-01-01_2025-01-01.parquet
Waiting on bqjob_r7c129e00e11d1473_00000197e6f881af_1 ... (34s) Current status: DONE   
gs://embeddings-kenya/earthgenome/2024/37NBE_2024-01-01_2025-01-01.parquet
Waiting on bqjob_r53db244277ea59c7_00000197e6f9177d_1 ... (34s) Cur

### Minor transforms of the BQ table 

we will do a small post-processing query on the loaded embeddings table to get the embedding field converted correctly for vector search..

vector search indexing requires the embedding field to be of type `ARRAY<FLOAT>`

the load operation turns 'embedding' field into a double-nested STRUCT data type, innermost child containing list of floats.. 

so we have to unpack that list from the nested structure, final data type being `ARRAY<FLOAT64>`

In [49]:
from google.cloud import bigquery
query = f"""
SELECT
  eg.id,
  eg.tile,
  ST_GEOGFROMTEXT(grouped.geometry_text) AS geometry,
  ARRAY_AGG(e.element) AS embedding
FROM
  `{PROJECT_ID}`.`{DATASET_ID}`.`{TABLE_ID}` AS eg
CROSS JOIN
  UNNEST(eg.embedding.list) AS e
JOIN (
  SELECT id, tile, ST_ASTEXT(geometry) AS geometry_text
  FROM `{PROJECT_ID}`.`{DATASET_ID}`.`{TABLE_ID}`
  GROUP BY id, tile, geometry_text
) AS grouped ON eg.id = grouped.id AND eg.tile = grouped.tile AND ST_ASTEXT(eg.geometry) = grouped.geometry_text
GROUP BY eg.id, eg.tile, grouped.geometry_text
"""

# Run the query and save the result to a new table
result_table = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}_v1"
job_config = bigquery.QueryJobConfig(destination=result_table)
client = bigquery.Client()
job = client.query(query, job_config=job_config)
job.result()  # Wait for the job to complete

<google.cloud.bigquery.table.RowIterator at 0x73e1e7c0bd90>

In [50]:
# Check if the result_table exists

def table_exists(client, table_id):
    try:
        client.get_table(table_id)
        print(f"Table {table_id} exists.")
        return True
    except Exception as e:
        print(f"Table {table_id} does not exist. Error: {e}")
        return False

table_exists(client, result_table)

Table g4g-eaas.embeddings_kenya.earthgenome_kenya_v1 exists.


True

In [51]:
# check the resulting table's schema and data 
query = f"SELECT * FROM `{result_table}` LIMIT 10"
query_job = client.query(query)
# print schema 
schema = query_job.result().schema
for field in schema:
    print(f"{field.name}: {field.field_type}")
for row in query_job:
    print(row)

id: INTEGER
tile: STRING
geometry: GEOGRAPHY
embedding: FLOAT
Row((21366417132058520, '36MWD', 'POINT(33.9463656625207 -1.68593695103644)', [2.6084940433502197, 0.7507420182228088, 4.672660827636719, 0.8154568672180176, 1.4184563159942627, -0.22790896892547607, 1.5954563617706299, 1.036309003829956, 2.0508384704589844, 0.14478261768817902, 0.7479745745658875, -2.875718355178833, 1.7202969789505005, 0.21487678587436676, 0.2987199127674103, 0.34173423051834106, 0.42625629901885986, 1.3673490285873413, 0.7782024145126343, 2.138732671737671, 2.358751058578491, 0.16330485045909882, -0.6646090745925903, 0.06753062456846237, -0.7604683041572571, -0.6707466244697571, 2.156445026397705, 0.8436424732208252, 1.4777283668518066, 4.545442581176758, 0.15262389183044434, -1.3420811891555786, -1.5844206809997559, 1.8792147636413574, -2.439932346343994, -2.174422025680542, 1.0232149362564087, 0.7525823712348938, -1.1864889860153198, 0.6765024065971375, 0.046289533376693726, -0.8331053853034973, -0.3657

### Index BQ table to enable Vector Search

In [52]:
# test VECTOR SEARCH operations
in_table = '.'.join(result_table.split(".")[1:])
print(f'indexing {in_table} for vector search')
query = f"""
CREATE VECTOR INDEX my_index ON {in_table}(embedding)
OPTIONS(distance_type='COSINE', index_type='IVF', ivf_options='{{"num_lists": 1000}}');
"""

# Run the query to create the index
client = bigquery.Client(project=PROJECT_ID)
job = client.query(query)
job.result()  # Wait for the job to complete

indexing embeddings_kenya.earthgenome_kenya_v1 for vector search


<google.cloud.bigquery.table._EmptyRowIterator at 0x73e26ec38190>

Create a test target table of 1 record to perform vector search with

In [53]:
result_table = result_table+"_test_target"
query = f"SELECT * FROM {in_table} LIMIT 1"

job_config = bigquery.QueryJobConfig(destination=result_table)
job = client.query(query,job_config=job_config)
job.result()  # Wait for the job to complete

<google.cloud.bigquery.table.RowIterator at 0x73e26ec382d0>

Run a Vector Search!

In [54]:
import datetime
target_table = '.'.join(result_table.split(".")[1:])
print(target_table)
query = f"""
SELECT query.id AS target_id,
  query.tile AS target_tile,
  base.id AS base_id,
  base.tile AS base_tile,
  distance
FROM
  VECTOR_SEARCH(
    TABLE {in_table},
    'embedding',
    TABLE {target_table},
    top_k => 11,
    distance_type => 'COSINE',
    options => '{{"fraction_lists_to_search": 0.005}}')
ORDER BY distance
LIMIT 10
OFFSET 1;
"""

# Run the query to create the index
client = bigquery.Client(project=PROJECT_ID)
search_result_table = f"{PROJECT_ID}.{DATASET_ID}.vector_search_results_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
job_config = bigquery.QueryJobConfig(destination=search_result_table)
job = client.query(query,job_config=job_config)
job.result()  # Wait for the job to complete

embeddings_kenya.earthgenome_kenya_v1_test_target


<google.cloud.bigquery.table.RowIterator at 0x73e26ec38690>

In [55]:
query = f"SELECT * FROM `{search_result_table}` LIMIT 10"
query_job = client.query(query)
# print schema 
schema = query_job.result().schema
for field in schema:
    print(f"{field.name}: {field.field_type}")
for row in query_job:
    print(row)

target_id: INTEGER
target_tile: STRING
base_id: INTEGER
base_tile: STRING
distance: FLOAT
Row((21366417132058520, '36MWD', 21366407109274221, '36MWD', 0.007518227532904431), {'target_id': 0, 'target_tile': 1, 'base_id': 2, 'base_tile': 3, 'distance': 4})
Row((21366417132058520, '36MWD', 21366523674059601, '36MXD', 0.008048853362623731), {'target_id': 0, 'target_tile': 1, 'base_id': 2, 'base_tile': 3, 'distance': 4})
Row((21366417132058520, '36MWD', 21366417109700061, '36MWD', 0.008368872203980748), {'target_id': 0, 'target_tile': 1, 'base_id': 2, 'base_tile': 3, 'distance': 4})
Row((21366417132058520, '36MWD', 21366425136559033, '36MWD', 0.008518929140563403), {'target_id': 0, 'target_tile': 1, 'base_id': 2, 'base_tile': 3, 'distance': 4})
Row((21366417132058520, '36MWD', 21368022179282379, '36MXD', 0.009059108625534273), {'target_id': 0, 'target_tile': 1, 'base_id': 2, 'base_tile': 3, 'distance': 4})
Row((21366417132058520, '36MWD', 21366436235634204, '36MWD', 0.009121250886910293), {

### You can take a look at your newly created BQ tables and the vector search results in [BQ studio](https://console.cloud.google.com/bigquery)