In [65]:
import io
import os

import requests
from dotenv import find_dotenv, load_dotenv
from PIL import Image
from sqlalchemy import create_engine, text
import pandas as pd

In [66]:
load_dotenv(find_dotenv())

tree_density_database = {}

In [67]:
def create_db_engine(
    user=os.environ["DB_USER"], password=os.environ["DB_PASS"], port=6543
):
    """
    Creates engine needed to create connections to the database
    with the credentials and parameters provided.

    Args:
        user (str): Database username credential
        password (str): Database password credential
        port (int): Source port for database connection
    Returns:
        SQLAlchemy engine object
    Notes:
    """

    assert isinstance(port, int), "Port must be numeric"
    assert user is not None, "Username is empty"
    assert password is not None, "Password is empty"

    db_uri = f"postgresql+psycopg2://{user}:{password}@aws-0-sa-east-1.pooler.supabase.com:{port}/postgres"
    engine = create_engine(db_uri, future=True)

    return engine

In [68]:
db_engine = create_db_engine()

In [69]:
def get_sat_image(min_lat, max_lat, min_lon, max_lon):
    # Define the URL template
    url_template = "https://api.mapbox.com/styles/v1/{username}/{style_id}/static/[{min_lon},{min_lat},{max_lon},{max_lat}]/{width}x{height}@2x?access_token={access_token}"

    # Define the variable values
    username = "mapbox"
    style_id = "satellite-v9"
    width = 300
    height = 300
    access_token = os.getenv('MAPBOX_TOKEN')
    # Replace the variables in the URL template
    url = url_template.format(
        username=username,
        style_id=style_id,
        min_lat=min_lat,
        max_lat=max_lat,
        min_lon=min_lon,
        max_lon=max_lon,
        width=width,
        height=height,
        access_token=access_token
    )

    # Send the GET request
    response = requests.get(url)

    # Check the response status code
    if response.status_code == 200:
        # Process the response data
        image = Image.open(io.BytesIO(response.content))
    else:
        print("API call failed with status code:", response)
        
    image.show()
    
    return image

In [70]:
def calculate_green_density(image):
    
    # Convert the image to RGB mode
    image = image.convert("RGB")
    
    # Get the size of the image
    width, height = image.size
    
    # Initialize counters
    total_pixels = width * height
    green_pixels = 0
    # Iterate over each pixel in the image
    for x in range(width):
        for y in range(height):
            # Get the RGB values of the pixel
            r, g, b = image.getpixel((x, y))
            # Check if the pixel is green
            threshold = 10
            if g > r + threshold and g > b + threshold:
                green_pixels += 1
    
    # Calculate the tree density
    tree_density = green_pixels / total_pixels
    
    return tree_density

In [71]:
def define_bounding_box(latitude, longitude, height=0.005, width=0.005):
    # Calculate the minimum and maximum latitude
    
    min_lat = round(round(latitude / 0.005) * 0.005, 4)
    max_lat = round(min_lat + height, 4)
    
    # Calculate the minimum and maximum longitude
    min_lon = round(round(longitude / 0.005) * 0.005, 4)
    max_lon = round(min_lon + width, 4)
        
    return min_lat, max_lat, min_lon, max_lon

In [72]:
def add_green_density_to_db(min_lat, max_lat, min_lon, max_lon, green_density):
    with db_engine.begin() as conn:
        query = text(
        """
        INSERT INTO fact_image_analysis (min_lat, max_lat, min_lon, max_lon, green_density)
        VALUES (:min_lat, :max_lat, :min_lon, :max_lon, :green_density)
        """)
        conn.execute(
            query,
            parameters={
                "min_lat": min_lat,
                "max_lat": max_lat,
                "min_lon": min_lon,
                "max_lon": max_lon,
                "green_density": green_density,
            }
        )
    return

In [115]:
def get_green_density(latitude, longitude):
    with db_engine.begin() as conn:
        query = text(
            """
            SELECT green_density
            FROM fact_image_analysis
            WHERE min_lat <= :latitude AND :latitude <= max_lat
            AND min_lon <= :longitude AND :longitude <= max_lon
        """
        )
        result = conn.execute(
            query, parameters={"latitude": latitude, "longitude": longitude}
        )
        green_densities = result.first()
    if green_densities is None:
        print("Point is not in any bounding box")
        min_lat, max_lat, min_lon, max_lon = define_bounding_box(latitude, longitude)
        image = get_sat_image(min_lat, max_lat, min_lon, max_lon)
        green_density = calculate_green_density(image)
        add_green_density_to_db(min_lat, max_lat, min_lon, max_lon, green_density)
    else:
        green_density = green_densities[0]
    return green_density

In [116]:
get_green_density(-9.1436, -65.4997)

0.9697

In [97]:
latitude = -9.1336
longitude = -65.4997

with db_engine.begin() as conn:
    query = """
    SELECT *
    FROM fact_image_analysis
    WHERE 
        min_lat <= %(latitude)s AND %(latitude)s <= max_lat
        AND min_lon <= %(longitude)s AND %(longitude)s <= max_lon
    """
    result = pd.read_sql(
        query, params={"latitude": latitude, "longitude": longitude},
        con=conn
    )
    display(result)


Unnamed: 0,id,min_lat,max_lat,min_lon,max_lon,green_density
0,2,-9.135,-9.13,-65.5,-65.495,0.942181
