In [1]:
# import matplotlib.pyplot as plt
# from skimage import io
# import numpy as np
# import street_view # panoids function to list nearest panoramas
from streetview import get_panorama_meta, search_panoramas, get_streetview, get_panorama
import os, time, csv
from PIL import Image, PngImagePlugin
import threading
import requests
from itertools import islice
from requests.exceptions import Timeout, ConnectionError

# API key
key = "APIKEY"

**Define Functions** 

In [2]:
########## Progress bar functions ############################

def display_progress_bar(iteration, total, bar_length=50):
    progress = (iteration / total)
    arrow = '=' * int(round(progress * bar_length) - 1) + '>'
    spaces = ' ' * (bar_length - len(arrow))

    # Move the cursor up one line and then print the progress bar
    print(f"""\rProgress: [{arrow + spaces}] {int(progress * 100)}%
""", end='')


def spinning_wheel(duration=0.1):
    wheel_sequence = ['|', '/', '-', '\\']
    wheel_index = 0
    while keep_spinning:
        wheel_char = wheel_sequence[wheel_index % len(wheel_sequence)]
        print(f"\r{wheel_char} ", end='', flush=True)
        time.sleep(duration)
        wheel_index += 1


############## Data functions ###################################

def slice_from_key(data, key) -> dict():
    """
    
    Slices a dictionary from given key (param)
    
    """
    return dict(islice(data.items(), list(data.keys()).index(key), None))

def get_address_from_coordinates(coordinates):
    """
    
    # Example usage:
    coordinates = (52.506810, 13.491683)
    address = get_address_from_coordinates(coordinates)
    print(address["display_name"])
    
    """
    BASE_URL = "https://nominatim.openstreetmap.org/reverse"    
    params = {
        "lat": coordinates[0],
        "lon": coordinates[1],
        "format": "json"
    }
    
    headers = {
        "User-Agent": "YourAppName/1.0"  # Replace 'YourAppName' with the name of your app
    }    
    response = requests.get(BASE_URL, params=params, headers=headers)
    data = response.json()
    if data:
        data = slice_from_key(data, 'lat')
        data.pop('boundingbox', None)
        data.pop('importance', None)
        data.pop('place_rank', None)
        return data
    else:
        return None

def get_coordinates_from_address(address) -> tuple():
    """
    
    # Example usage:
    address = "1600 Amphitheatre Parkway, Mountain View, CA"
    coords = get_coordinates_from_address(address)
    print(coords)
    
    """
    BASE_URL = "https://nominatim.openstreetmap.org/search"
    
    params = {
        "q": address,
        "format": "json",
        "limit": 1
    }
    
    headers = {
        "User-Agent": "YourAppName/1.0"  # Replace 'YourAppName' with the name of your app
    }
    
    response = requests.get(BASE_URL, params=params, headers=headers)
    data = response.json()
    
    if data:
        return (float(data[0]['lat']), float(data[0]['lon']))
    else:
        return None



In [3]:
 # Example usage:
address = "Kottbusser Tor, Berlin, 10999"
coords = get_coordinates_from_address(address)
print(coords)

(52.4989853, 13.4183107)


## **Search by coordinates** <br>
*Berghain* (52.510594, 13.442860)<br>
*Kotti* (52.4989853, 13.4183107) <br>
*Gisela* (52.506756, 13.491681) <br>

In [4]:
panos = search_panoramas(52.4989853, 13.4183107)
print(panos[0])

# Create a dictionary from panos for easy lookup
panos_dict = {pano.pano_id: pano for pano in panos}

[Panorama(pano_id='BIM0gsPRUJnvXK2CLLzBXw', lat=52.4988666136225, lon=13.41902154981506, heading=113.2177047729492, pitch=89.79857635498047, roll=1.13162100315094, date=None), Panorama(pano_id='diL_FOsFg-AfqwF1VOb-Ew', lat=52.49882670812256, lon=13.41915142874563, heading=116.1135025024414, pitch=89.6610336303711, roll=1.623445510864258, date=None), Panorama(pano_id='SFa-5HVAIkPVgKOh3634Tg', lat=52.49874416947811, lon=13.4183684856016, heading=107.5211944580078, pitch=90.1708755493164, roll=359.9935607910156, date=None), Panorama(pano_id='LUV9MFB1TchAJXsCKleaEg', lat=52.49868869856489, lon=13.41847957618998, heading=144.7259674072266, pitch=89.88567352294922, roll=1.279295802116394, date=None), Panorama(pano_id='uqokX41UIeq9x1BvEEc_vQ', lat=52.49859440685731, lon=13.41851410984919, heading=173.9994659423828, pitch=89.90898895263672, roll=2.272359132766724, date=None), Panorama(pano_id='33LRHutwhZB9jXMPbLbxcA', lat=52.49850144458902, lon=13.41853135396557, heading=171.4117126464844, pit

**Get metadata of nearest available panoramas** 

In [5]:
meta_list = []
try:
    for x in panos:
        meta = get_panorama_meta(pano_id=x.pano_id, api_key=key)
        meta_list.append(meta)
except:
    print("Done.")

total_images = len(meta_list) # for the progress bar 

**Get "normal" imgs (no pano) for every MetaData object** 
-> and save them as png

In [8]:
# Use a relative path for the directory
directory_streetview = "imgs/streetviews"

# Check and create the directory if it doesn't exist
if not os.path.exists(directory_streetview):
    os.makedirs(directory_streetview)

# Control variable for the spinning wheel
keep_spinning = True
# Start the spinning wheel in a separate thread
wheel_thread = threading.Thread(target=spinning_wheel)
wheel_thread.start()

for index, x in enumerate(meta_list):
    # Fetch additional metadata from panos_dict
    pano_data = panos_dict.get(x.pano_id, None)
    if not pano_data:
        continue  # Skip if no matching data in panos
        
    # Fetch the address using the coordinates
    coordinates = (x.location.lat, x.location.lng)
    address = get_address_from_coordinates(coordinates)
    address_display_name = address["display_name"]
    address_zip_code = address["address"]['postcode']
        
    # Create a subfolder named after Zip code within the date folder
    zip_code_directory = os.path.join(directory_streetview, address_zip_code)
    if not os.path.exists(zip_code_directory):
        os.makedirs(zip_code_directory)

    # Create a subfolder named after the date
    date_directory = os.path.join(zip_code_directory, x.date)
    if not os.path.exists(date_directory):
        os.makedirs(date_directory)
        
    # Create a subfolder named after address_display_name within the date folder
    address_directory = os.path.join(date_directory, address_display_name)
    if not os.path.exists(address_directory):
        os.makedirs(address_directory)

        
    image = get_streetview(
        pano_id=x.pano_id,
        api_key=key,
    )    
    
    # Convert the image to a PIL Image object
    pil_image = image

    # Create metadata for the image
    meta_info = PngImagePlugin.PngInfo()
    meta_info.add_text("date", x.date)
    meta_info.add_text("lat", str(x.location.lat))
    meta_info.add_text("lng", str(x.location.lng))
    meta_info.add_text("pano_id", x.pano_id)
    meta_info.add_text("address", address_display_name)
    meta_info.add_text("heading", str(pano_data.heading))
    meta_info.add_text("pitch", str(pano_data.pitch))
    meta_info.add_text("roll", str(pano_data.roll))

    
    # Save the image with metadata as PNG in the address subfolder
    pil_image.save(f"{address_directory}/{x.pano_id}.png", "PNG", pnginfo=meta_info)

    # Write merged metadata to a CSV file
    csv_filename = os.path.join(address_directory, "metadata.csv")
    with open(csv_filename, mode='a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)
        # If the file is empty, write the headers
        if csv_file.tell() == 0:
            csv_writer.writerow(["pano_id", "date", "lat", "lng", "heading", "pitch", "roll"])
        csv_writer.writerow([x.pano_id, x.date, x.location.lat, x.location.lng, pano_data.heading, pano_data.pitch, pano_data.roll])
    
    # Update the progress bar
    display_progress_bar(index + 1, total_images)
    
# Stop the spinning wheel
keep_spinning = False
wheel_thread.join()  # Wait for the wheel thread to finish

print("\nDone!")

Progress: [=>                                                ] 4%
Progress: [===>                                              ] 8%

Done!


**Get Panoramas for every MetaData object** 
-> and save them as jpeg

In [9]:
directory_panos = "imgs/panoramas"
# Check and create the directory if it doesn't exist
if not os.path.exists(directory_panos):
    os.makedirs(directory_panos)
    
total_images = len(meta_list)

# Control variable 
keep_spinning = True

# Start the spinning wheel in a separate thread
wheel_thread = threading.Thread(target=spinning_wheel) 
wheel_thread.start()

for index, x in enumerate(meta_list):
    try:
        # Fetch additional metadata from panos_dict
        pano_data = panos_dict.get(x.pano_id, None)
        if not pano_data:
            continue  # Skip if no matching data in panos
    
        # Fetch the address using the coordinates
        coordinates = (x.location.lat, x.location.lng)
        address = get_address_from_coordinates(coordinates)
        address_display_name = address["display_name"]
        address_zip_code = address["address"]['postcode']
        
        # Create a subfolder named after Zip code within the date folder
        zip_code_directory = os.path.join(directory_panos, address_zip_code)
        if not os.path.exists(zip_code_directory):
            os.makedirs(zip_code_directory)
    
        # Create a subfolder named after the date
        date_directory = os.path.join(zip_code_directory, x.date)
        if not os.path.exists(date_directory):
            os.makedirs(date_directory)
            
        # Create a subfolder named after address_display_name within the date folder
        address_directory = os.path.join(date_directory, address_display_name)
        if not os.path.exists(address_directory):
            os.makedirs(address_directory)

        
            
        max_retries = 3
        retries = 0    
        while retries < max_retries:
            try:
                image = get_panorama(pano_id=x.pano_id)
                # Convert the image to a PIL Image object
                pil_image = image
            
                # Create metadata for the image
                meta_info = PngImagePlugin.PngInfo()
                meta_info.add_text("date", x.date)
                meta_info.add_text("lat", str(x.location.lat))
                meta_info.add_text("lng", str(x.location.lng))
                meta_info.add_text("pano_id", x.pano_id)
                meta_info.add_text("address", address_display_name)
                meta_info.add_text("heading", str(pano_data.heading))
                meta_info.add_text("pitch", str(pano_data.pitch))
                meta_info.add_text("roll", str(pano_data.roll))
                
                 # Save the image with metadata as PNG in the date subfolder
                pil_image.save(f"{address_directory}/{x.pano_id}.png", "PNG", pnginfo=meta_info)
                
                # Write merged metadata to a CSV file
                csv_filename = os.path.join(address_directory, "metadata.csv")
                
                with open(csv_filename, mode='a', newline='') as csv_file:
                    csv_writer = csv.writer(csv_file)
                    # If the file is empty, write the headers
                    if csv_file.tell() == 0:
                        csv_writer.writerow(["pano_id", "date", "lat", "lng", "heading", "pitch", "roll", "address"])
                    csv_writer.writerow([x.pano_id, x.date, x.location.lat, x.location.lng, 
                                         pano_data.heading, pano_data.pitch, pano_data.roll, 
                                         address_display_name])
                    break
            except Timeout:
                print("The request timed out! Retrying...")
                retries += 1
            except ConnectionError:
                print("There was a connection error! Retrying...")
                retries += 1
            except Exception as e:         
                print(f"Unexpected error for pano_id {x.pano_id}. Error: {e}")
                break
    except Exception as e:
        print(x.pano_id, "Skipped.\nTraceback:", e, "\n")
    
    # Update the progress bar
    display_progress_bar(index + 1, total_images)
    
# Stop the spinning wheel
keep_spinning = False
wheel_thread.join()  # Wait for the thread to finish
    
print("\nDone!")


Progress: [=>                                                ] 4%
Progress: [===>                                              ] 8%
- The request timed out! Retrying...

Done!
