In [1]:
%%bash
pip install sqlmodel psycopg2-binary boto3 shapely geopandas simpledbf pyarrow

Collecting simpledbf
  Downloading simpledbf-0.2.6.tar.gz (17 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: simpledbf
  Building wheel for simpledbf (setup.py): started
  Building wheel for simpledbf (setup.py): finished with status 'done'
  Created wheel for simpledbf: filename=simpledbf-0.2.6-py3-none-any.whl size=13785 sha256=b53f704965c66ca8aa9e5d98be612f22eda34401a79bd6c319720ae11c3b3a47
  Stored in directory: /home/jovyan/.cache/pip/wheels/37/52/21/14be45b7c160488637e82d6a317f4379458bb4dd60be21d5fa
Successfully built simpledbf
Installing collected packages: simpledbf
Successfully installed simpledbf-0.2.6


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import to_json, col
from dataclasses import dataclass
from shapely import Polygon, MultiPolygon
from shapely.geometry import mapping
import json
import datetime

import boto3
import os
import json
# import us
import zipfile
import io
import geopandas as gpd
import pandas as pd
# from simpledbf import Dbf5
from typing import Optional

from sqlmodel import SQLModel, Field
from database import engine, sqlalchemy_engine

from data_models import *



def get_jdbc_options():
    from database import db_endpoint, db_password, db_username
    jdbc_url = f"jdbc:postgresql://{db_endpoint}:5432/"

    # logger.debug(jdbc_url)

    return {
        "url": jdbc_url,
        "user": db_username,
        "password": db_password,
        "driver": "org.postgresql.Driver",
        "stringtype":"unspecified" # this allows you to write strings to jsonb cols
    }

spark = SparkSession.builder.master("local[1]")\
    .config("spark.jars", "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.6.0/postgresql-42.6.0.jar,s3://deployment-zone-117819748843/weather_etl/spark-shp-0.27-3.2-2.12.jar")\
    .config("spark.sql.execution.arrow.pyspark.enabled", "false")\
    .getOrCreate()

In [5]:

@dataclass
class LakeGeometry:
    # permanent_: str = None 
    fdate: str = None
    resolution: str = int
    gnis_id: str = Field(primary_key=True)
    gnis_name: str = None
    areasqkm: float = None
    elevation: float = None
    reachcode: str = None
    ftype: int = None
    fcode: int = None
    visibility: int = None
    shape_length: float = None
    shape_area: float = None
    object_id: int = None
    geometry: str = None # jsonb type in postgres #list[list[list[float]]]
    boundary: list[list[float]] = None
    bounds: list[float] = None
    bounding_box: tuple[tuple[float]] = None
    bounding_box_centroid: tuple[float] = None

    def __post_init__(self):
        polygon = MultiPolygon(
                (Polygon(points) for points in self.geometry) #json.loads()
            )
        
        boundary = mapping(polygon.boundary)['coordinates']
        if len(boundary) != 1:
            # raise Exception(f"Boundary should only be 1 continuous polygon: {boundary[1:]} length: {len(boundary)}")
            self.boundary = []
        else:
            self.boundary = boundary[0]
        self.bounds = polygon.bounds

        min_lat, min_long, max_lat, max_long = self.bounds

        self.bounding_box = ((min_lat, min_long), (min_lat, max_long), (max_lat, max_long), (max_lat, min_long))

        self.centroid = list(polygon.centroid.coords)[0]

        self.bounding_box_centroid = ( (min_lat + max_lat) / 2, (min_long + max_long) / 2 )  # Probably should do coordinate math with a globe projection
        

In [3]:
def get_spark_schema_from_dataclass(the_dataclass):
    from pyspark.sql.types import _type_mappings
    def get_pyspark_type(the_type):
        pyspark_type = _type_mappings.get(the_type)
        if pyspark_type is None:
            pyspark_type = str(the_type)\
                            .replace("[", "<")\
                            .replace("]", ">")\
                            .replace("list", "array")\
                            .replace("tuple", "array")
        else:
            pyspark_type = pyspark_type.typeName()
        return pyspark_type

    return ",\n".join([
                    f"`{name}` {get_pyspark_type(python_type)}"
                    for name, python_type 
                    in the_dataclass.__annotations__.items()
                ])

schema_from_pandas = StructType([StructField('permanent_', StringType(), True), 
                                 StructField('fdate', StringType(), True), 
                                 StructField('resolution', LongType(), True), 
                                 StructField('gnis_id', StringType(), True), 
                                 StructField('gnis_name', StringType(), True), 
                                 StructField('areasqkm', DoubleType(), True), 
                                 StructField('elevation', DoubleType(), True), 
                                 StructField('reachcode', StringType(), True), 
                                 StructField('ftype', LongType(), True), 
                                 StructField('fcode', LongType(), True), 
                                 StructField('visibility', LongType(), True), 
                                 StructField('SHAPE_Leng', DoubleType(), True), 
                                 StructField('SHAPE_Area', DoubleType(), True), 
                                 StructField('ObjectID', LongType(), True), 
                                 StructField('geometry', ArrayType(ArrayType(ArrayType(DoubleType()))), True)])

In [8]:
s3_bucket, s3_prefix = 'prd-tnm', 'StagedProducts/Hydrography/NHD/State/Shape/'
local_data_dir = 'raw_data'
# ls_resp = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)

states = ['Minnesota'] #us.states.STATES_AND_TERRITORIES


for state in states:
    filename = f"NHD_H_{state}_State_Shape.zip"
    s3_key = s3_prefix + filename

    local_filename = os.path.join(local_data_dir, filename) 

    # if not os.path.exists(local_filename):
    #     s3.download_file(s3_bucket, s3_key, local_filename)

    with open(local_filename, 'rb') as f:
        zip_bytes = f.read()

    waterbody_dbf_filename_in_zip = "Shape/NHDWaterbody.dbf"
    local_waterbody_dbf_filename = os.path.join("raw_data", f"{state}_NHDWaterbody.dbf")

    waterbody_shp_filename_in_zip = "Shape/NHDWaterbody.shp"
    local_waterbody_shp_filename = os.path.join("raw_data", f"{state}_NHDWaterbody.shp")

    waterbody_shx_filename_in_zip = "Shape/NHDWaterbody.shx"
    local_waterbody_shx_filename = os.path.join("raw_data", f"{state}_NHDWaterbody.shx")

    
    with zipfile.ZipFile(io.BytesIO(zip_bytes), 'r') as z: # 
        # with z.open(waterbody_dbf_filename_in_zip, 'r') as zip_f:
        #     with open(local_waterbody_dbf_filename, 'wb') as f:
        #         f.write(zip_f.read())

        #     waterbodies: pd.DataFrame = Dbf5(local_waterbody_dbf_filename).to_dataframe()
        #     lakes = waterbodies.dropna(subset=['gnis_name']) 
            # print(waterbodies['gnis_name'])
        if not os.path.exists(local_waterbody_shp_filename):
            with z.open(waterbody_shp_filename_in_zip, 'r') as zip_f:
                with open(local_waterbody_shp_filename, 'wb') as f:
                    f.write(zip_f.read())
        
        if not os.path.exists(local_waterbody_shx_filename):
            with z.open(waterbody_shx_filename_in_zip, 'r') as zip_f:
                with open(local_waterbody_shx_filename, 'wb') as f:
                    f.write(zip_f.read())

    def get_coords(polygon):
        coords = mapping(polygon)['coordinates']
        
        if isinstance(coords, list):
            coords = coords[0]
        if isinstance(coords, tuple) and isinstance(coords[0], tuple) and isinstance(coords[0][0], tuple) and isinstance(coords[0][0][0], float):
            return coords
        else:
            raise Exception(f"bad type for {type(coords)} {coords}")
        
        
    shp = gpd.read_file(local_waterbody_shp_filename)

    lakes = shp.dropna(subset=['gnis_name']) # Filter out water bodies with no name

    lakes['geometry'] = lakes['geometry'].apply(get_coords)
    
    
    df = (
        spark.createDataFrame(lakes, schema=schema_from_pandas)
        .selectExpr(
            *['gnis_id', 'gnis_name', 'fdate', 'resolution', 'areasqkm', 'elevation', 'reachcode', 'ftype', 'fcode', 'visibility', 
              'SHAPE_Leng AS shape_length', 'SHAPE_Area AS shape_area', 'ObjectID as object_id', 'geometry'
             ]
        )
        .rdd
        .map(lambda row: LakeGeometry(**(row.asDict())) )
        .toDF(schema=get_spark_schema_from_dataclass(LakeGeometry))
        .withColumn("boundary", to_json(col("boundary")))
        .withColumn("bounding_box",  to_json(col("bounding_box")))
    )

    # df.show()

    # geo = df.take(1)[0]
    # geo = LakeGeometry(**df.take(1)[0].asDict())
    
    # print(df.schema)
    # df.selectExpr("geometry[0][0][0]").show()

    (
        df
        .write
        .option("dbtable", "lake_geometry_test")
        .options(**get_jdbc_options())
        .format("jdbc")
        .mode("append")
        .save()
    )

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  super().__setitem__(key, value)


In [176]:
StringType.typeName()

'string'

In [166]:
dir(list[list[float]])
from pyspark.sql.types import _parse_datatype_string
from pyspark.sql.types import _infer_type, _type_mappings, _create_converter, _all_complex_types
from pydoc import locate
# _parse_datatype_string(str(list[list[float]]))
import pyspark.sql.types
dir(pyspark.sql.types)

# _parse_datatype(locate('int'))
# _type_mappings
# _create_converter()
_all_complex_types

{'array': pyspark.sql.types.ArrayType,
 'map': pyspark.sql.types.MapType,
 'struct': pyspark.sql.types.StructType}

In [44]:
from database import engine, sqlalchemy_engine

geo = (
    spark
    .read
    .option("dbtable", "lake_geometry")
    .options(**get_jdbc_options())
    .format("jdbc")
    .load()
    .limit(10)
    .rdd
    .map(lambda row: LakeGeometry(**row.asDict()))
).take(1)[0]

In [45]:
geo

LakeGeometry(permanent_='23422571-D6B8-4D92-BA6E-21B5EB32A3F6', fdate='2011-06-20', resolution=2, gnis_id='-1', gnis_name='Upper Basin A', areasqkm=0.075, elevation=None, reachcode='09030003038461', ftype=390, fcode=39000, visibility=250000, shape_leng=0.018095882714808, shape_area=9.238174656e-06, objectid=1, geometry='[[[-91.59163399952195, 48.826645300252835, 0.0], [-91.59134519952238, 48.826740700252685, 0.0], [-91.59120419952256, 48.82690990025242, 0.0], [-91.59087399952313, 48.82702280025222, 0.0], [-91.59093579952298, 48.82724850025187, 0.0], [-91.59084779952315, 48.82747230025154, 0.0], [-91.59065689952342, 48.827479000251515, 0.0], [-91.59028989952401, 48.827447500251594, 0.0], [-91.59001739952441, 48.82744420025159, 0.0], [-91.589650399525, 48.827412700251614, 0.0], [-91.58938759952542, 48.82755340025142, 0.0], [-91.58919639952569, 48.82756910025137, 0.0], [-91.58920719952567, 48.827668200251196, 0.0], [-91.58928849952554, 48.82768710025118, 0.0], [-91.58947919952527, 48.8276

In [60]:
min_lat, min_long, max_lat, max_long = geo.bounds

bounding_box = ((min_lat, min_long), (min_lat, max_long), (max_lat, max_long), (max_lat, min_long))
bounding_box

((-91.5916982995218, 48.82480580025566),
 (-91.5916982995218, 48.82798010025073),
 (-91.58573969953108, 48.82798010025073),
 (-91.58573969953108, 48.82480580025566))

In [78]:
polygon = MultiPolygon(
                (Polygon(points) for points in json.loads(geo.geometry))
            )

dir(
    list(polygon.centroid.coords)
)

list(polygon.centroid.coords)[0]

(-91.58923008455515, 48.82655216677615)