In [53]:
!pip install -r req.txt



In [54]:
from google.cloud import vision
import os
import requests
from math import radians, sin, cos, sqrt, atan2, degrees
from dotenv import load_dotenv
from PIL import Image
from io import BytesIO
import math

In [55]:
import os

# Set the path to your service account key file

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "temporal-ring-451800-g4-c96565bd1d32.json"

In [56]:
%pip install pyarrow
import pyarrow.parquet as pq

file_path = "cleaned_data/traffic_data_sample_filtered.parquet"
table = pq.read_table(file_path)
df = table.to_pandas()

Note: you may need to restart the kernel to use updated packages.


In [57]:
load_dotenv()

True

In [58]:
api_key = os.getenv("GOOGLE_API_KEY")

# Check if the API key is loaded
if not api_key:
   print("Error: API key is not set or is empty!")
else:
   print(f"API Key successfully loaded: {api_key[:5]}...")  # Print first 5 characters for validation

API Key successfully loaded: AIzaS...


In [59]:
def pretty_print(func):
    """
    Decorator to calculate and pretty print the visibility score as a percentage.
    """
    def wrapper(*args, **kwargs):
        visibility_score = func(*args, **kwargs)
        visibility_percentage = visibility_score * 100
        print(f"Visibility Score: {visibility_percentage:.2f}%")
    return wrapper

In [60]:
def geocode_address(address, api_key):
   geocode_url = f"https://maps.googleapis.com/maps/api/geocode/json?address={address}&key={api_key}"
   response = requests.get(geocode_url)
   geocode_data = response.json()

   if geocode_data['status'] == 'OK':
       lat = geocode_data['results'][0]['geometry']['location']['lat']
       lng = geocode_data['results'][0]['geometry']['location']['lng']
       return lat, lng
   else:
       print(f"Error geocoding address: {geocode_data['status']}")
       return None, None

In [61]:
def find_nearest_road(lat, lng, api_key):
   roads_url = f"https://roads.googleapis.com/v1/nearestRoads?points={lat},{lng}&key={api_key}"
   response = requests.get(roads_url)
   roads_data = response.json()

   if 'snappedPoints' in roads_data:
       nearest_road = roads_data['snappedPoints'][0]
       return nearest_road['location']['latitude'], nearest_road['location']['longitude']
   else:
       print("Error finding nearest road:", roads_data)
       return None, None

In [62]:
# Step 3: Calculate distance using Haversine formula
def calculate_distance(lat1, lon1, lat2, lon2):
   R = 6371.0  # Radius of Earth in km
   lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])

   dlon = lon2 - lon1
   dlat = lat2 - lat1

   a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
   c = 2 * atan2(sqrt(a), sqrt(1 - a))
   distance = R * c  # Distance in km
   return distance

In [63]:
import requests
import pandas as pd
from geopy.distance import geodesic

def find_closest_osm_id(missing_osm_id, df):
    df["osm_id"] = df["osm_id"].astype(int)  # Ensure numerical comparison
    closest_osm_id = df.iloc[(df["osm_id"] - missing_osm_id).abs().argsort()[0]]["osm_id"]
    return closest_osm_id

def find_nearest_road_to_osm_in_db(lat, lng, df):
    overpass_url = "http://overpass-api.de/api/interpreter"
    overpass_query = f"""
    [out:json];
    way
    (around:100,{lat},{lng})
    ["highway"];
    out center;
    """
    response = requests.get(overpass_url, params={"data": overpass_query})

    if response.status_code != 200:
        print(f"Error: Received status code {response.status_code}")
        return None

    try:
        data = response.json()
    except requests.JSONDecodeError as e:
        print(f"Error: Failed to parse JSON response - {e}")
        return None

    valid_osm_ids = set(df["osm_id"].astype(int))  # Convert to integers

    nearest_road = None
    min_distance = float("inf")
    closest_missing_osm = None

    for way in data.get("elements", []):
        osm_id = int(way["id"])  # Convert to integer for comparison
        if osm_id in valid_osm_ids:
            if "center" in way:
                way_lat = way["center"]["lat"]
                way_lon = way["center"]["lon"]
                distance = geodesic((lat, lng), (way_lat, way_lon)).meters

                if distance < min_distance:
                    min_distance = distance
                    nearest_road = (osm_id, way["tags"].get("name", "Unknown"), way["tags"].get("highway", "Unknown"))
        else:
            closest_missing_osm = osm_id  # Keep track of the first missing OSM ID

    if nearest_road:
        return nearest_road
    elif closest_missing_osm is not None:
        # Find the closest numerical OSM ID in the dataset
        best_match_osm_id = find_closest_osm_id(closest_missing_osm, df)
        return (best_match_osm_id, "Closest Match", "Unknown")

    print("No matching or close OSM ID found in dataset.")
    return None

In [72]:
def calculate_traffic_volume(osm):
    df_osm = df[df['osm_id'] == osm]
    if df_osm.empty:
        print(f"Error: OSM ID {osm} not found in dataset.")
        return None

    # Suppose df is your DataFrame
    total_trips_volume = df['trips_volume'].median()
    print("Total trips volume:", total_trips_volume)
    total_sample_count = df['trips_sample_count'].median()
    print("Total sample count:", total_sample_count)
    ##scaling_factor = total_trips_volume / max(total_sample_count, 1)
    scaling_factor = total_trips_volume / total_sample_count
    
    scaling_factor *= 50  # Adjust based on expected range
    print("Scaling factor:", scaling_factor)
    trips_volume = df_osm['trips_volume'].median()
    trips_sample_count = df_osm['trips_sample_count'].median()

    traffic_volume = (trips_volume / trips_sample_count) * scaling_factor

    return traffic_volume.item()

In [65]:
# Step 4: Calculate bearing between two points
def calculate_bearing(lat1, lon1, lat2, lon2):
   lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
  
   delta_lon = lon2 - lon1
   x = sin(delta_lon) * cos(lat2)
   y = cos(lat1) * sin(lat2) - sin(lat1) * cos(lat2) * cos(delta_lon)
  
   bearing = atan2(x, y)
  
   # Convert bearing from radians to degrees
   bearing = degrees(bearing)
  
   # Normalize bearing to be between 0 and 360
   bearing = (bearing + 360) % 360
   return bearing

In [None]:
# Step 5: Get Google Street View Image
def get_street_view_image(lat, lng, bearing, api_key, rotate):
   heading = (bearing + 180) % 360 if rotate else bearing % 360
   # Google Street View Static API URL
   street_view_url = f"https://maps.googleapis.com/maps/api/streetview?size=600x300&location={lat},{lng}&heading={heading}&key={api_key}"
  
   # Send the request and get the image
   response = requests.get(street_view_url)
  
   if response.status_code == 200:
       # Open and save the image
       image = Image.open(BytesIO(response.content))
       image.save("street_view_image.jpg")
       print("Street view image saved as street_view_image.jpg")
       return "street_view_image.jpg"
   else:
       print("Error retrieving street view image:", response.status_code)
       return None

In [67]:
def analyze_image_with_vision(image_path):
    # Initialize the Vision client
    client = vision.ImageAnnotatorClient()

    # Load the image
    with open(image_path, "rb") as image_file:
        content = image_file.read()

    image = vision.Image(content=content)  # No 'types' module needed here

    # Call the Vision API to perform object localization
    response = client.object_localization(image=image)

    # Check for errors
    if response.error.message:
        raise Exception(f"Error: {response.error.message}")

    # Initialize variables to track total area and blocked area
    total_area = 0
    blocked_area = 0

    # Image dimensions (width and height) - use the image size for a more accurate area calculation
    image_width = 1  # These values should be set according to your image's size
    image_height = 1

    # Iterate through the detected objects
    for object_ in response.localized_object_annotations:
        # Print the detected objects and their confidence
        print(f"Object: {object_.name}, Confidence: {object_.score}")

        # Define a list of objects that could potentially block the building
        if object_.name.lower() in ["car", "person", "tree", "sign", "bush"]:  # List of blocking objects
            # Calculate the area of the object by its bounding box (normalized coordinates)
            object_area = (object_.bounding_poly.normalized_vertices[2].x - object_.bounding_poly.normalized_vertices[0].x) * \
                          (object_.bounding_poly.normalized_vertices[2].y - object_.bounding_poly.normalized_vertices[0].y)
            blocked_area += object_area
            print(f"Blocked Object: {object_.name}, Area: {object_area}")

        # Increment total area calculation (for simplicity, we count all objects here)
        total_area += (object_.bounding_poly.normalized_vertices[2].x - object_.bounding_poly.normalized_vertices[0].x) * \
                      (object_.bounding_poly.normalized_vertices[2].y - object_.bounding_poly.normalized_vertices[0].y)

    # Calculate the blocked percentage
    if total_area > 0:
        block_percentage = (blocked_area / total_area) * 100
        print(f"Total area blocked by objects: {block_percentage:.2f}%")
    else:
        block_percentage = 0
        print("No objects detected in the image.")
    return block_percentage

In [68]:
def calculate_visibility_score(distance, angle, obstruction_percent, alpha=0.1):
    
    # Step 1: Calculate the distance factor using exponential decay
    distance_factor = math.exp(-alpha * distance)  # Exponential decay for distance
    
    # Step 2: Calculate the angle factor using cosine
    # Convert angle to radians
    angle_radians = math.radians(angle)
    angle_factor = math.cos(angle_radians)  # Cosine of the angle
    
    # Step 3: Calculate the obstruction factor
    obstruction_factor = 1 - (obstruction_percent / 100)  # Fraction of visible storefront
    
    # Step 4: Calculate the total visibility score
    visibility_score = distance_factor * angle_factor * obstruction_factor
    if visibility_score < 0:
        print(f"Warning: Negative visibility score detected: {visibility_score}")
    
    # Ensure the score is between 0 and 1
    visibility_score = max(0, min(visibility_score, 1))
    return visibility_score

# Queries
59 Hillside Trace Dallas, Georgia
48 5th St NW, Atlanta, GA 30308
1715 Charles Hardy Pkwy, Dallas, GA, 30157 (showing no objects detected)
1380 Atlantic Dr NW #14275, Atlanta, GA 30363

In [69]:
testing = False
def main():
    
    address_string = ["48 5th St NW, Atlanta, GA 30308", "73 Peachtree Pl NW", "Georgia Tech, 266 4th St NW, Atlanta, GA 30313"]
    address = address_string[0] if testing else input("Enter the address: ")

    # Geocode the address
    lat, lng = geocode_address(address, api_key)
    if lat and lng:
        print(f"Latitude: {lat}, Longitude: {lng}")

        # Find the nearest road
        nearest_lat, nearest_lng = find_nearest_road(lat, lng, api_key)
        if nearest_lat and nearest_lng:
            print(f"Nearest Road Latitude: {nearest_lat}, Nearest Road Longitude: {nearest_lng}")

            # Calculate the distance between the address and the nearest road
            distance = calculate_distance(lat, lng, nearest_lat, nearest_lng)
            print(f"Distance from {address} to nearest road: {distance} km")

            # Calculate the bearing (angle) between the address and the nearest road
            bearing = calculate_bearing(lat, lng, nearest_lat, nearest_lng)
            print(f"Angle from {address} to nearest road: {bearing}°")

            osm, _, _ = find_nearest_road_to_osm_in_db(lat, lng, df)
            traffic_volume = calculate_traffic_volume(osm)
            if traffic_volume is not None:
                # print(f"Traffic volume: {traffic_volume:.2f} vehicles per day")
                # Get the Street View Image for the location
                image_path = get_street_view_image(lat, lng, bearing, api_key)
                if image_path:
                    # Analyze the image using Google Vision API
                    obstruct = analyze_image_with_vision(image_path)
                    visib = calculate_visibility_score(distance, bearing, obstruct)
                    total = traffic_volume * visib
                    print("Final Score: ", total)
                
                    # print("Final Score:" + final_score)
            else:
                print("Traffic volume could not be calculated.")
        else:
            print("Nearest road could not be found.")
    else:
        print("Geocoding failed.")

In [71]:
main()

Latitude: 33.759365, Longitude: -84.3758872
Nearest Road Latitude: 33.759367626391594, Nearest Road Longitude: -84.37554867795723
Distance from 180 Jackson St Northeast, Atlanta Georgia to nearest road: 0.03129606757702596 km
Angle from 180 Jackson St Northeast, Atlanta Georgia to nearest road: 89.46523864816237°
Total trips volume: 2100.0
Total sample count: 13567.0
Scaling factor: 7.739367583106066
Street view image saved as street_view_image.jpg
Object: Building, Confidence: 0.5031560063362122
Total area blocked by objects: 0.00%
Final Score:  0.006763810062343905
