# Pre

In [None]:
from __future__ import annotations
%pip install geopy
%pip install python-dotenv
%pip install --upgrade --force-reinstall \
    mediocreatbest@git+https://gist.github.com/player1537/3457b026ed6ef6696d758517f55a58df.git
try:
    from mediocreatbest import auto
except ImportError:
    %pip install --quiet --upgrade pip
    %pip install --upgrade --force-reinstall \
        mediocreatbest@git+https://gist.github.com/player1537/3457b026ed6ef6696d758517f55a58df.git
    from mediocreatbest import auto

from opencage.geocoder import OpenCageGeocode

# Config

In [None]:
auto.dotenv.load_dotenv()
config = auto.types.SimpleNamespace()

config.opencage = auto.types.SimpleNamespace()
config.opencage.token = auto.os.getenv('OPENCAGE_API_KEY')

config.map = auto.types.SimpleNamespace()
config.map.token = auto.os.getenv('MAPBOX_API_KEY')

config.buildings = auto.types.SimpleNamespace()
config.buildings.datadir = auto.pathlib.Path('/mnt/seenas2/data/model-america/data')
config.buildings.csvs = config.buildings.datadir / 'MAv1_CSVS'
config.buildings.gendir = auto.pathlib.Path('data/gen')
assert config.buildings.datadir.exists()
assert config.buildings.csvs.exists()
assert config.buildings.gendir.exists()


# Lib

## Polygon Containment

In [None]:
def is_point_in_polygon(point, polygon):
    x, y = point
    inside = False
    
    # Get the number of vertices in the polygon
    n = len(polygon)
    
    # Store the first and last point of the polygon
    p1x, p1y = polygon[0]
    
    # Iterate through each edge of the polygon
    for i in range(n + 1):
        # Get the next point (wrapping around to the first point when needed)
        p2x, p2y = polygon[i % n]
        
        # Check if the point is within the vertical range of the edge
        if y > min(p1y, p2y):
            if y <= max(p1y, p2y):
                # If the point is within the horizontal range, calculate the x-intercept
                if x <= max(p1x, p2x):
                    # Calculate the x-intercept of the ray
                    if p1y != p2y:
                        xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
                    
                    # If the point is on the right side of the x-intercept, toggle inside
                    if p1x == p2x or x <= xinters:
                        inside = not inside
        
        # Move to the next edge
        p1x, p1y = p2x, p2y
    
    return inside


## State name to abbreviation

In [None]:
def state_abbreviation(state_name):
    state_abbreviations = {
        'alabama': 'AL', 'alaska': 'AK', 'arizona': 'AZ', 'arkansas': 'AR',
        'california': 'CA', 'colorado': 'CO', 'connecticut': 'CT', 'delaware': 'DE',
        'florida': 'FL', 'georgia': 'GA', 'hawaii': 'HI', 'idaho': 'ID',
        'illinois': 'IL', 'indiana': 'IN', 'iowa': 'IA', 'kansas': 'KS',
        'kentucky': 'KY', 'louisiana': 'LA', 'maine': 'ME', 'maryland': 'MD',
        'massachusetts': 'MA', 'michigan': 'MI', 'minnesota': 'MN', 'mississippi': 'MS',
        'missouri': 'MO', 'montana': 'MT', 'nebraska': 'NE', 'nevada': 'NV',
        'new hampshire': 'NH', 'new jersey': 'NJ', 'new mexico': 'NM',
        'new york': 'NY', 'north carolina': 'NC', 'north dakota': 'ND',
        'ohio': 'OH', 'oklahoma': 'OK', 'oregon': 'OR', 'pennsylvania': 'PA',
        'rhode island': 'RI', 'south carolina': 'SC', 'south dakota': 'SD',
        'tennessee': 'TN', 'texas': 'TX', 'utah': 'UT', 'vermont': 'VT',
        'virginia': 'VA', 'washington': 'WA', 'west virginia': 'WV',
        'wisconsin': 'WI', 'wyoming': 'WY'
    }
    
    return state_abbreviations.get(state_name.lower().strip(), None)

## DMS To Decimal

In [None]:
def dms_to_decimal(dms_str):
    # Regular expression to parse the DMS format
    match = auto.re.match(r"(\d+)° (\d+)' ([\d.]+)'' ([NSEW])", dms_str)
    if not match:
        raise ValueError("Invalid DMS format")

    degrees = int(match.group(1))
    minutes = int(match.group(2))
    seconds = float(match.group(3))
    direction = match.group(4)

    # Convert to decimal degrees
    decimal_degrees = degrees + minutes / 60 + seconds / 3600

    # Adjust sign for South and West
    if direction in 'SW':
        decimal_degrees *= -1

    return decimal_degrees

## Address To Coordinates

In [None]:
def addr_to_coords(addr: str) -> (float, float):
    geocoder = auto.opencage.geocoder.OpenCageGeocode(config.opencage.token)

    data = geocoder.geocode(addr)[0]
    lat = data['annotations']['DMS']['lat']
    lng = data['annotations']['DMS']['lng']
    lat = dms_to_decimal(lat)
    lng = dms_to_decimal(lng)
    return (lat, lng)

## Address To State

In [None]:
def addr_to_state(addr: str) -> str:
    geocoder = auto.opencage.geocoder.OpenCageGeocode(config.opencage.token)

    data = geocoder.geocode(addr)[0]
    state = data['components']
    return state_abbreviation(state.get('state'))

## Get Point from distance and direction

In [None]:
def get_point_at_distance(lat, lon, distance_miles, bearing_degrees):
    """
    Calculate a new coordinate point given:
    - Starting point (latitude, longitude)
    - Distance in miles
    - Bearing in degrees (0° is North, 90° is East, etc.)
    
    Returns: (new_latitude, new_longitude)
    """
    # Convert to radians
    lat_rad = auto.math.radians(lat)
    lon_rad = auto.math.radians(lon)
    bearing_rad = auto.math.radians(bearing_degrees)
    
    # Earth's radius in miles
    R = 3959
    
    # Calculate angular distance
    d = distance_miles / R
    
    # Calculate new latitude
    new_lat_rad = auto.math.asin(
        auto.math.sin(lat_rad) * auto.math.cos(d) +
        auto.math.cos(lat_rad) * auto.math.sin(d) * auto.math.cos(bearing_rad)
    )
    
    # Calculate new longitude
    new_lon_rad = lon_rad + auto.math.atan2(
        auto.math.sin(bearing_rad) * auto.math.sin(d) * auto.math.cos(lat_rad),
        auto.math.cos(d) - auto.math.sin(lat_rad) * auto.math.sin(new_lat_rad)
    )
    
    # Convert back to degrees
    new_lat = auto.math.degrees(new_lat_rad)
    new_lon = auto.math.degrees(new_lon_rad)
    
    return (new_lat, new_lon)

def scope():
    lat, lng = addr_to_coords('8000 Middlebrook Pike Knoxville TN')
    newpoint_north = get_point_at_distance(lat, lng, 1.0, 0.0)
    print((lat, lng))
    print(newpoint_north)

/scope

## (Lat, Lon) to Address

In [None]:
def coords_to_addr(lat=None, lon=None):
    url = f"https://api.opencagedata.com/geocode/v1/json?q={lat}+{lon}&key={config.opencage.token}"
    response = auto.requests.get(url)
    data = response.json()
    if data['results']:
        address = data['results'][0]['formatted']
        # print("Address:", address)
        return address
    else:
        return None
        print("No results found.")

def scope():
    addr = '1131 West Nokomis Circle Knoxville TN'
    lat, lon = addr_to_coords(addr)
    print(f'{addr} -> {(lat, lon)}')

    print(f'{(lat, lon)} -> {coords_to_addr(lat=lat, lon=lon)}')
/scope

## Zoom by radius

In [None]:
def zoom_by_radius(radius_miles: float):
    EARTH_CIRCUMFERENCE_MILES = 24901  # Earth's circumference in miles
    TILE_SIZE = 256  # Tile size in pixels (standard Web Mercator tiles)
    BASE_RESOLUTION = EARTH_CIRCUMFERENCE_MILES / TILE_SIZE  # Resolution at zoom level 0
    
    # Calculate the zoom level
    zoom_level = auto.math.log2(EARTH_CIRCUMFERENCE_MILES / (radius_miles * 2))
    
    # Clamp zoom level to valid range for Mapbox (0-22 typically)
    zoom_level = max(0, min(22, zoom_level))
    
    return round(zoom_level)

def scope():
    r = 0.5
    print(zoom_by_radius(r))
/scope

## MapUtil

In [None]:
class MapUtil:
    token:str = None
    geocoder = None

    @classmethod
    def __init__(self, token=config.opencage.token):
        self.token = token
        self.geocoder = auto.opencage.geocoder.OpenCageGeocode(self.token)
    
    @classmethod
    def addr_to_state(cls, addr: str) -> str:
        data = cls.geocoder.geocode(addr)[0]
        state = data['components']
        return cls.state_abbreviation(state.get('state'))

    @classmethod
    def coords_to_addr(cls, lat=None, lon=None):
        url = f"https://api.opencagedata.com/geocode/v1/json?q={lat}+{lon}&key={cls.token}"
        response = auto.requests.get(url)
        data = response.json()
        if data['results']:
            address = data['results'][0]['formatted']
            return address
        else:
            return None
            print("No results found.")

    @classmethod
    def addr_to_coords(cls, addr:str) -> (float, float):
        data = cls.geocoder.geocode(addr)[0]
        lat = data['annotations']['DMS']['lat']
        lng = data['annotations']['DMS']['lng']
        lat = cls.dms_to_decimal(lat)
        lng = cls.dms_to_decimal(lng)
        return (lat, lng)

    @staticmethod
    def get_point_at_distance(lat, lon, distance_miles, bearing_degrees):
        lat_rad = auto.math.radians(lat)
        lon_rad = auto.math.radians(lon)
        bearing_rad = auto.math.radians(bearing_degrees)
        
        R = 3959 # Earth radius in miles
        
        d = distance_miles / R
        
        new_lat_rad = auto.math.asin(
            auto.math.sin(lat_rad) * auto.math.cos(d) +
            auto.math.cos(lat_rad) * auto.math.sin(d) * auto.math.cos(bearing_rad)
        )
        
        new_lon_rad = lon_rad + auto.math.atan2(
            auto.math.sin(bearing_rad) * auto.math.sin(d) * auto.math.cos(lat_rad),
            auto.math.cos(d) - auto.math.sin(lat_rad) * auto.math.sin(new_lat_rad)
        )
        
        new_lat = auto.math.degrees(new_lat_rad)
        new_lon = auto.math.degrees(new_lon_rad)
        
        return (new_lat, new_lon)

    @staticmethod
    def state_abbreviation(state_name: str):
        state_abbreviations = {
            'alabama': 'AL', 'alaska': 'AK', 'arizona': 'AZ', 'arkansas': 'AR',
            'california': 'CA', 'colorado': 'CO', 'connecticut': 'CT', 'delaware': 'DE',
            'florida': 'FL', 'georgia': 'GA', 'hawaii': 'HI', 'idaho': 'ID',
            'illinois': 'IL', 'indiana': 'IN', 'iowa': 'IA', 'kansas': 'KS',
            'kentucky': 'KY', 'louisiana': 'LA', 'maine': 'ME', 'maryland': 'MD',
            'massachusetts': 'MA', 'michigan': 'MI', 'minnesota': 'MN', 'mississippi': 'MS',
            'missouri': 'MO', 'montana': 'MT', 'nebraska': 'NE', 'nevada': 'NV',
            'new hampshire': 'NH', 'new jersey': 'NJ', 'new mexico': 'NM',
            'new york': 'NY', 'north carolina': 'NC', 'north dakota': 'ND',
            'ohio': 'OH', 'oklahoma': 'OK', 'oregon': 'OR', 'pennsylvania': 'PA',
            'rhode island': 'RI', 'south carolina': 'SC', 'south dakota': 'SD',
            'tennessee': 'TN', 'texas': 'TX', 'utah': 'UT', 'vermont': 'VT',
            'virginia': 'VA', 'washington': 'WA', 'west virginia': 'WV',
            'wisconsin': 'WI', 'wyoming': 'WY'
        }
        
        return state_abbreviations.get(state_name.lower().strip(), None)

    @staticmethod
    def dms_to_decimal(dms_str):
        # Regular expression to parse the DMS format
        match = auto.re.match(r"(\d+)° (\d+)' ([\d.]+)'' ([NSEW])", dms_str)
        if not match:
            raise ValueError("Invalid DMS format")

        degrees = int(match.group(1))
        minutes = int(match.group(2))
        seconds = float(match.group(3))
        direction = match.group(4)

        # Convert to decimal degrees
        decimal_degrees = degrees + minutes / 60 + seconds / 3600

        # Adjust sign for South and West
        if direction in 'SW':
            decimal_degrees *= -1

        return decimal_degrees
    
    @staticmethod
    def zoom_from_radius(radius_miles: float):
        EARTH_CIRCUMFERENCE_MILES = 24901  # Earth's circumference in miles
        TILE_SIZE = 256  # Tile size in pixels (standard Web Mercator tiles)
        BASE_RESOLUTION = EARTH_CIRCUMFERENCE_MILES / TILE_SIZE  # Resolution at zoom level 0
        
        zoom_level = auto.math.log2(EARTH_CIRCUMFERENCE_MILES / (radius_miles * 2))
        
        zoom_level = max(0, min(22, zoom_level))
        
        return round(zoom_level)

    @staticmethod
    def point_distance(lat1:float, lon1:float, lat2:float, lon2:float) -> float:
        return auto.haversine.haversine((lat1, lon1), (lat2, lon2), unit=auto.haversine.Unit.MILES)

def scope():
    MapUtil()
    addr = '8000 Middlebrook Pike Knoxville TN'
    addr2 = '1131 West Nokomis Circle Knoxville TN'
    lat, lon = MapUtil.addr_to_coords(addr)
    print(lat, lon)
    print(MapUtil.coords_to_addr(lat, lon))
    print(MapUtil.addr_to_state(addr))
    print(MapUtil.zoom_from_radius(1.0))

    lat2, lon2 = MapUtil.addr_to_coords(addr2)
    print(MapUtil.point_distance(lat, lon, lat2, lon2))

/scope

## Colorscheme

In [None]:
class ColorType(auto.enum.Enum):
    HEX=1
    RGB=2

class Colormap:
    cmap=None

    def __init__(self, name: str, output_type=ColorType.HEX):
        self.cmap = auto.plt.get_cmap(name)

    def colors_from_values(self, values, min=None, max=None):
        minimum = min(values) if min is None else min
        maximum = max(values) if max is None else max

        normalized = auto.matplotlib.colors.Normalize(vmin=minimum, vmax=maximum)

        colors = self.cmap(normalized(values))
        match output_type:
            case ColorType.HEX:
                colors= [auto.matplotlib.colors.to_hex(color) for color in colors]
            case ColorType.RGB:
                colors= [auto.matplotlib.colors.to_rgb(color) for color in colors]

        return colors
    
    def sm_from_values(self, values):
        minimum = min(values)
        maximum = max(values)
        normalized = auto.matplotlib.colors.Normalize(vmin=minimum, vmax=maximum)
        colors = self.cmap(normalized(values))
        sm = auto.matplotlib.cm.ScalarMappable(cmap=self.cmap, norm=normalized)
        sm.set_array([])
        return sm

## Parallel

In [None]:
@auto.dataclasses.dataclass
class TaskResult:
    task_id: int
    result: Any
    execution_time: float
    success: bool
    error: Exception = None

class ThreadExecutor:
    def __init__(self, max_workers:int=4):
        self.max_workers = max_workers
        self.executor = auto.concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)

    def _execute_task(self, id:int, task: Callable, *args, **kwargs) -> TaskResult:
        start = auto.time.time()
        try:
            result = task(*args, **kwargs)
            success = True
            error = None
        except Exception as e:
            result = None
            success = False
            error = e
        
        runtime = auto.time.time() - start
        return TaskResult(
            task_id=id,
            result=result,
            execution_time=runtime,
            success=success,
            error=error
        )

    def execute(self, tasks: List[tuple[Callable, tuple, dict]]) -> List[TaskResult]:
        futures = [
            self.executor.submit(
                self._execute_task,
                task_id,
                func,
                *task_args,
                **task_kwargs
            ) for task_id, (func, task_args, task_kwargs) in enumerate(tasks)
        ]
        results = [future.result() for future in futures]
        return results


    def execute_single(self, task: Callable, *args, **kwargs) -> TaskResult:
        return self._execute_task(0, task, *args, **kwargs)

## Buildings

### Preprocess buildings

In [None]:
def fmt(point_str: str) -> (float, float):
    return (
        float(point_str.split('/')[0])
        ,float(point_str.split('/')[1])
    )

def process(state, inpath):
    print(f'Processing {state}...')
    start = auto.time.time()
    outname = f'{state}_gen.csv'
    genpath = config.buildings.gendir


    out = []
    indf = auto.pd.read_csv(inpath / f'{state}.csv')

    out = []
    for row in indf.iterrows():
        try:
            centroid = fmt(row[1]['Centroid'])
            footprint = [fmt(p) for p in row[1]['Footprint2D'].split('_')] if not '' in row[1]['Footprint2D'].split('_') else [fmt(row[1]['Centroid'])]
            out.append({
                'state': row[1]['State_Abbr'],
                'centroid': centroid,
                'footprint': footprint,
            })
        except:
            pass

    # # outdf = auto.pd.DataFrame(out)
    # # outdf.to_csv(genpath / outname)
    # # print(f'Done. Took {auto.time.time() - start} seconds')
    # print(auto.termcolor.colored(f'{state} done', 'green'))
    return out
    

def scope():
    # process('AR', config.buildings.csvs)
    tasks = []
    for state in [
        "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA",
        "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD",
        "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
        "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC",
        "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"
    ]:
        tasks.append(
            (process, (state, config.buildings.csvs), {})
        )
    threader = ThreadExecutor(max_workers=12)
    results = threader.execute(tasks)

    out = []
    print(results)
    for res in results:
        if res.success:
            print('slkdjfsdlkf')
            out.extend(res.result)

    df = auto.pd.DataFrame(out)
    /display df
    df.to_csv(config.buildings.gendir / 'buildings.csv')

/scope

In [None]:
def scope():
    return  auto.pd.read_csv(config.buildings.gendir / 'buildings.csv')
buildings = scope()

def scope() 
    state = buildings[buildings['state'] == 'AK']
    /display state
/scope

### Buildings Class

In [None]:
class Buildings:
    state: str
    path: auto.pathlib.Path
    buildings_raw: auto.pd.DataFrame

    def __init__(self, state:str, path: auto.pathlib.Path):
        self.state = state
        self.path = path

        self._load_buildings()

    def _load_buildings(self):
        print(f'Opening {self.state}.csv...', end='')
        df = auto.pd.read_csv(self.path / f'{self.state}.csv')
        print('Done.')
        /display df

    def map_data(self, data):
        # TODO
        pass


def scope():
    state = 'AK'
    buildings = Buildings(state, config.buildings.csvs)

/scope