In [None]:
# make sure to install the packages in the requirements.txt file!

In [2]:
from dotenv import load_dotenv
import os

load_dotenv()
api_key = os.getenv("GSV_API_KEY")

# to keep API your key private, create a .env file in your project directory
# and assign some name, for example, "GSV_API_KEY" to your API key, for example,
# GSV_API_KEY=[my_api_key]

In [None]:
import os
import requests
import json
import cv2

headings = [0, 90, 180, 270]
generated_locs_fp = 'chile1000locs.json'

with open(generated_locs_fp, 'r') as f:
    locs  = json.load(f)

#iterate through coordinate pairs of json of generated locs and request 4 images for each cardinal direction heading from the GSV API
for i, loc in enumerate(locs):
    lat, lon = loc["lat"], loc["lng"]
    location_folder = os.path.join('panoramas', f"location_{i}")
    os.makedirs(location_folder, exist_ok=True)

    images = []
    for heading in headings:
        url = f"https://maps.googleapis.com/maps/api/streetview?size=256x256&location={lat},{lon}&heading={heading}&key={api_key}&fov=105"
        response = requests.get(url)

        img_path = os.path.join(location_folder, f"heading_{heading}.jpg")
        with open(img_path, "wb") as f:
            f.write(response.content)
        images.append(cv2.imread(img_path))
    
    #stitch pano by concatenating the 4 images horizontally
    stitched_pano = cv2.hconcat(images)
    pano_path = os.path.join(location_folder, "stitched_panorama.jpg")
    cv2.imwrite(pano_path, stitched_pano)




After generating the stitched panos, we will run them through the StreetCLIP model to generate embeddings. A metadata map is created between each of the location file paths and embedding file paths.

In [10]:
from PIL import Image
from transformers import CLIPProcessor, CLIPModel

import torch

model = CLIPModel.from_pretrained("geolocal/StreetCLIP")
processor = CLIPProcessor.from_pretrained("geolocal/StreetCLIP")

pano_folder = 'panoramas'
output_embeddings_folder = "embeddings"
os.makedirs(output_embeddings_folder, exist_ok=True)

embedding_metadata = []

for loc_folder in os.listdir(pano_folder):
    loc_path = os.path.join(pano_folder, loc_folder)
    if os.path.isdir(loc_path):
        panorama_path = os.path.join(loc_path, "stitched_panorama.jpg")
        pano_image = Image.open(panorama_path)

        inputs = processor(images = pano_image, return_tensors="pt")
        with torch.no_grad():
            embeddings = model.get_image_features(**inputs)

        #L2 normalization--want to ensure uniform scale but retain original dimensionality
        embeddings = embeddings / torch.linalg.vector_norm(embeddings, ord=2, dim=-1, keepdim=True)

        embedding_path = os.path.join(output_embeddings_folder, f"{loc_folder}_embedding.pt")
        torch.save(embeddings, embedding_path)

        embedding_metadata.append({
            "location_id": loc_folder,
            "embedding_path": embedding_path,
            "panorama_path": panorama_path
        })

metadata_path = os.path.join(output_embeddings_folder, "embedding_metadata_t2.json")
with open(metadata_path, "w") as f:
    json.dump(embedding_metadata, f, indent=4)


In [None]:
from sklearn.cluster import KMeans
import numpy as np
from sklearn.metrics import silhouette_score

embedding_metadata_path = "embeddings/embedding_metadata_t2.json"
with open(embedding_metadata_path, "r") as f:
    embedding_metadata = json.load(f)

embeddings = []
locations = []

total_locations = 1000

for i in range(0, total_locations):  
    file_name = f"location_{i}_embedding.pt"
    file_path = f"embeddings/{file_name}"
    embedding = torch.load(file_path).numpy()
    embeddings.append(embedding.flatten())  # flatten to ensure 1D
    locations.append(f"location_{i}")

embeddings = np.array(embeddings)

def cluster_embeddings(embeddings, n_clusters):
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    labels = kmeans.fit_predict(embeddings)

    silhouette_avg = silhouette_score(embeddings, labels)
    print(f"Silhouette Score: {silhouette_avg}")

    return labels, kmeans.cluster_centers_

from sklearn.decomposition import PCA

# dimensionality reduction
pca = PCA(n_components=50)
reduced_embeddings = pca.fit_transform(embeddings)

"""
code for comparing silhouette scores between diff # of clusters

silhouette_scores = []
for k in range(2, 21):
    kmeans = KMeans(n_clusters=k, random_state=42)
    labels = kmeans.fit_predict(embeddings)
    score = silhouette_score(embeddings, labels)
    silhouette_scores.append(score)

import matplotlib.pyplot as plt
plt.plot(range(2, 21), silhouette_scores)
plt.xlabel('Number of clusters (k)')
plt.xaxis.set_major_locator(ticker.MaxNLocator(integer=True))
plt.ylabel('Silhouette Score')
plt.title('Silhouette Score for different values of k')
plt.show()
"""

In [39]:
n_clusters = 5
cluster_labels, cluster_centers = cluster_embeddings(embeddings, n_clusters)
rd_cluster_labels, rd_cluster_centers = cluster_embeddings(reduced_embeddings, n_clusters)

cluster_results = {}
for label, location in zip(cluster_labels, locations):
    if label not in cluster_results:
        cluster_results[int(label)] = []
    cluster_results[int(label)].append(location)

output_clusters_path = f"cluster_results.json"
with open(output_clusters_path, "w") as f:
    json.dump(cluster_results, f, indent=4)

Silhouette Score: 0.07047548145055771
Silhouette Score: 0.11749286204576492


First try, using n_clusters = 18, resulted in a very low Silhouette Score of 0.051498. lesser clusters results in higher silhouette score, as well as dimensionality reduction. Can try other clustering method such as DBSCAN  

Need to create output file, duplicate of original json, but with "extra":{"tags":["str(clusterid)"]} as a k:v pair for each location object in the JSON, so we can import this into map-making.app

In [40]:
original_json = 'chile1000locs.json'
with open(original_json, "r") as f:
    locs = json.load(f)

for i, location in enumerate(locs):
    if "extra" not in location:
        location["extra"] = {}
    if "tags" not in location["extra"]:
        location["extra"]["tags"] = []

    cluster_id = str(int(rd_cluster_labels[i]))
    location["extra"]["tags"].append(cluster_id)

new_json = f'locations_tagged_{n_clusters}_clusters.json'
with open(new_json, "w") as f:
    json.dump(locs, f)