In [1]:
'''
Getting the ABDU model to work in notebook

EPSG: 5070
'''
import duckdb #version 0.9.2
import geopandas as gpd #version 0.14.1
import leafmap, os, time
from shapely import wkt
import pandas as pd
import pyarrow as pa
import geoarrow.pyarrow as ga
import geoarrow.pandas as _
import rasterio
from rasterio import mask
from shapely.geometry import shape
from threading import Thread, current_thread

con = duckdb.connect()
con.install_extension("spatial")
con.load_extension("spatial")
con.install_extension("azure")
con.load_extension("azure")
con.install_extension("json")
con.load_extension("json")

In [2]:
print(gpd.__version__)
print(duckdb.__version__)
#con.sql('''SELECT * FROM duckdb_settings()''')

0.14.1
0.9.2


In [3]:
inaoifile = ''

'Wed Feb 14 08:10:43 2024'

In [None]:
local_time = time.ctime(time.time())
local_time

In [4]:
'''
Read in AOI.  Need it projected as 4326 and 5070.  Unfortunately projecting isn't working smoothly for me and keeps providing infinite values.
'''
#con.sql("CREATE TABLE aoi AS SELECT * FROM ST_READ('{0}')".format(inaoi))
inaoigpd = gpd.read_parquet(inaoifile)
inepsg = inaoigpd.crs.to_epsg()
inaoi = pd.read_parquet(inaoifile)
if inepsg == 5070:
    con.sql("CREATE OR REPLACE TABLE aoi5070 AS SELECT * EXCLUDE geometry, ST_AsText(ST_GeomFromWKB(geometry)) AS geometry FROM inaoi")
    inaoi4326 = inaoigpd.to_crs(4326)
    inaoi4326['geometry'] = inaoi4326.to_wkb().geometry
    inaoi4326 = pd.DataFrame(inaoi4326)
    con.sql("CREATE OR REPLACE TABLE aoi4326 AS SELECT * EXCLUDE geometry, ST_AsText(ST_GeomFromWKB(geometry)) AS geometry FROM inaoi4326")
elif inepsg == 4326:
    con.sql("CREATE OR REPLACE TABLE aoi4326 AS SELECT * EXCLUDE geometry, ST_AsText(ST_GeomFromWKB(geometry)) AS geometry FROM inaoi")
    inaoi5070 = inaoi.to_crs(5070)
    inaoi5070['geometry'] = inaoi5070.to_wkb().geometry
    inaoi5070 = pd.DataFrame(inaoi5070)
    con.sql("CREATE OR REPLACE TABLE aoi5070 AS SELECT * EXCLUDE geometry, ST_AsText(ST_GeomFromWKB(geometry)) AS geometry FROM inaoi5070")
else:
    inaoi5070 = inaoi.to_crs(5070)
    inaoi5070['geometry'] = inaoi5070.to_wkb().geometry
    inaoi5070 = pd.DataFrame(inaoi5070)
    con.sql("CREATE OR REPLACE TABLE aoi5070 AS SELECT * EXCLUDE geometry, ST_AsText(ST_GeomFromWKB(geometry)) AS geometry FROM inaoi5070")
    inaoi4326 = inaoi.to_crs(4326)
    inaoi4326['geometry'] = inaoi4326.to_wkb().geometry
    inaoi4326 = pd.DataFrame(inaoi4326)    
    con.sql("CREATE OR REPLACE TABLE aoi4326 AS SELECT * EXCLUDE geometry, ST_AsText(ST_GeomFromWKB(geometry)) AS geometry FROM inaoi4326")

In [5]:
'''
Read in hucs partitioned to huc2/huc4 level that overlap with the aoi.  Don't clip hucs
'''
con.sql("SET azure_storage_connection_string = 'DefaultEndpointsProtocol=https;AccountName=giscog;EndpointSuffix=core.windows.net';")
con.sql(f"""
CREATE OR REPLACE TABLE huc12 AS
SELECT LEFT(huc12,2) AS huc2,LEFT(huc12,4) AS huc4, huc12, areaacres, huc.geometry
FROM (SELECT huc12, areaacres, geometry FROM read_parquet('azure://abdu/huc/**/*.parquet')
WHERE CAST(LEFT(huc12,2) AS INTEGER)<=12) AS huc
JOIN aoi5070 ON 
ST_Intersects(ST_GeomFromWKB(huc.geometry), ST_GeomFromText(aoi5070.geometry))
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [6]:
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################
'''
Wetland energy calculation
'''

'\nWetland energy calculation\n'

In [7]:
hucs = con.sql("select huc4 from huc12 GROUP BY huc4").df().values.tolist()
hucs = sorted([item for items in hucs for item in items])
print(hucs)

['0801', '0802']


In [8]:
''' NOT NEEDED.  Using threading instead
%%time
Clip wetlands that aren't riverine to the hucs selected by the aoi.

selectstring = """CREATE OR REPLACE TABLE nothreadwetlands AS

SELECT ATTRIBUTE,huc2, huc4, huc12, ST_AsText(ST_Intersection(ST_GeomFromWKB(wetlnd.geometry), 
ST_GeomFromWKB(huc12.geometry))) as geometry
FROM (SELECT ATTRIBUTE, geometry FROM read_parquet('azure://abdu/nwi/**/*.parquet') 
WHERE WETLAND_TYPE != 'Riverine' AND huc4 IN {0}) AS wetlnd
JOIN huc12 ON 
ST_Intersects(ST_GeomFromWKB(wetlnd.geometry), ST_GeomFromWKB(huc12.geometry))
""".format(tuple(hucs))
con.sql(selectstring)
'''

' NOT NEEDED.  Using threading instead\n%%time\nClip wetlands that aren\'t riverine to the hucs selected by the aoi.\n\nselectstring = """CREATE OR REPLACE TABLE nothreadwetlands AS\n\nSELECT ATTRIBUTE,huc2, huc4, huc12, ST_AsText(ST_Intersection(ST_GeomFromWKB(wetlnd.geometry), \nST_GeomFromWKB(huc12.geometry))) as geometry\nFROM (SELECT ATTRIBUTE, geometry FROM read_parquet(\'azure://abdu/nwi/**/*.parquet\') \nWHERE WETLAND_TYPE != \'Riverine\' AND huc4 IN {0}) AS wetlnd\nJOIN huc12 ON \nST_Intersects(ST_GeomFromWKB(wetlnd.geometry), ST_GeomFromWKB(huc12.geometry))\n""".format(tuple(hucs))\ncon.sql(selectstring)\n'

In [9]:
##### TEST THREADING #####

In [10]:
con.execute("""
    CREATE OR REPLACE TABLE my_wetlands (
        ATTRIBUTE VARCHAR,
        huc2 VARCHAR,
        huc4 VARCHAR,
        huc12 VARCHAR,
        geometry VARCHAR,
    )
""")
def write_from_thread(con):
    local_con = con.cursor()
    huc = str(current_thread().name)
    local_con.sql("SET azure_storage_connection_string = 'DefaultEndpointsProtocol=https;AccountName=giscog;EndpointSuffix=core.windows.net';")
    result = local_con.execute('''INSERT INTO my_wetlands (SELECT ATTRIBUTE,huc2, huc4, huc12, ST_AsText(ST_Intersection(ST_GeomFromWKB(wetlnd.geometry), 
        ST_GeomFromWKB(huc12.geometry))) as geometry
        FROM (SELECT ATTRIBUTE, geometry FROM read_parquet('azure://abdu/nwi/**/*.parquet') 
        WHERE WETLAND_TYPE != 'Riverine' AND huc4 = '{0}') AS wetlnd
        JOIN huc12 ON 
        ST_Intersects(ST_GeomFromWKB(wetlnd.geometry), ST_GeomFromWKB(huc12.geometry)))'''.format(huc)).fetchall()

In [11]:
threads = []
print(hucs)
for i in range(len(hucs)):
    huc = hucs[i]
    print(huc)
    threads.append(Thread(target = write_from_thread,
                            args = (con,),
                            name = huc))
    
threads

['0801', '0802']
0801
0802


[<Thread(0801, initial)>, <Thread(0802, initial)>]

In [12]:
%%time
# Kick off all threads in parallel
for thread in threads:
    thread.start()

# Ensure all threads complete before printing final results
for thread in threads:
    thread.join()

con.sql("""
    CREATE OR REPLACE TABLE wetlands AS 
    SELECT * FROM my_wetlands 
""")

CPU times: total: 6.8 s
Wall time: 42.9 s


In [13]:
##### END THREADING #####

In [14]:
'''
Import wetland crossclass data and assign classes to the nwi table
'''
con.sql("""CREATE OR REPLACE TABLE crossnwi AS (UNPIVOT (FROM (SELECT * FROM read_json_auto('aoiWetland.json', maximum_object_size=100000000))) ON COLUMNS(*))""")
con.sql("""CREATE OR REPLACE TABLE crossnwi AS SELECT name, UNNEST(value) AS value FROM crossnwi""")
con.sql("""CREATE OR REPLACE TABLE wetlands AS
SELECT name, huc12, geometry FROM (SELECT * FROM wetlands) AS wetselect
LEFT JOIN crossnwi ON wetselect.ATTRIBUTE LIKE crossnwi.value
""")
con.sql(f"""CREATE OR REPLACE TABLE wetlands AS
(SELECT replace(wetlands.name, '_', '') AS name, huc12, ST_Area(ST_geomfromtext(geometry))*0.0001 AS ha, kcal, kcal*ha AS avalNrgy, geometry FROM wetlands
LEFT JOIN read_csv_auto('azure://abdu/kcal.csv') ON replace(wetlands.name, '_', '') = read_csv_auto.habitatType
WHERE wetlands.name IS NOT NULL)
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [15]:
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################

In [16]:
'''
Read in PADUS
'''
con.sql("""
CREATE OR REPLACE TABLE protected AS 
SELECT CATEGORY, huc12, huc2, huc4, ST_AsText(ST_Intersection(ST_GeomFromWKB(huc12.geometry), ST_GeomFromWKB(prot.geometry))) as geometry
FROM (SELECT CATEGORY, geometry FROM read_parquet('azure://abdu/padus/**/*.parquet')
WHERE CATEGORY IN ('Fee', 'Easements', 'Other') AND huc4 IN {0}) AS prot
JOIN huc12 ON 
ST_Intersects(ST_GeomFromWKB(huc12.geometry), ST_GeomFromWKB(prot.geometry))
""".format(tuple(hucs)))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [17]:
con.sql("""
CREATE OR REPLACE TABLE protwetlands AS
SELECT name, wetlands.huc12, kcal, ST_AsText(ST_Intersection(ST_GeomFromText(protected.geometry), ST_GeomFromText(wetlands.geometry))) as geometry
FROM (SELECT * from protected) as protected
JOIN wetlands ON 
ST_Intersects(ST_GeomFromText(wetlands.geometry), ST_GeomFromText(protected.geometry))
""")

In [18]:
con.sql("""
CREATE OR REPLACE TABLE protwetlands AS
SELECT DISTINCT name, huc12, ST_Area(ST_GeomFromText(geometry))*0.0001 AS ProtHabHa, kcal, kcal*ProtHabHa AS protNrgy, geometry FROM protwetlands
""")

In [19]:
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################

In [20]:
# Need huc12 geometry
df = con.sql('SELECT ST_AsText(ST_geomfromwkb(geometry)) as geometry from huc12').df()
df['geometry'] = df['geometry'].apply(wkt.loads)
df = gpd.GeoDataFrame(df, geometry='geometry', crs=5070)

In [21]:
'''
Read in NLCD clipped to hucs
'''
with rasterio.open('https://giscog.blob.core.windows.net/newcontainer/nlcd2019_cog.tif') as src:
    # Clip the raster to the geometry of the shapefile
    clipped_data, transform = mask.mask(src, df.geometry, crop=True)

clipped_data[clipped_data>23]=0
clipped_data[clipped_data<21]=0
clipped_data[clipped_data==21]=1
clipped_data[clipped_data==22]=1
clipped_data[clipped_data==23]=1
shapes = rasterio.features.shapes(clipped_data[0], transform=transform, mask=clipped_data[0] == 1)
# Create a GeoDataFrame from the vector polygons
gdf_vector = gpd.GeoDataFrame({'geometry': [shape(geom) for geom, value in shapes]})
gdf_vector['geometry'] = gdf_vector.to_wkb().geometry
con.sql("CREATE OR REPLACE TABLE urban AS SELECT * EXCLUDE geometry, ST_AsText(ST_GeomFromWKB(geometry)) AS geometry FROM gdf_vector")

In [22]:
con.sql("""
CREATE OR REPLACE TABLE urban AS 
SELECT huc12, ST_Intersection(ST_GeomFromWKB(huc12.geometry), ST_GeomFromText(urban.geometry)) as geometry
FROM (SELECT  geometry FROM urban) as urban
JOIN huc12 ON 
ST_Intersects(ST_GeomFromWKB(huc12.geometry), ST_GeomFromText(urban.geometry))
""")

In [23]:
con.sql("""
CREATE OR REPLACE TABLE urbanwetlands AS
SELECT name, wetlands.huc12, kcal, ST_AsText(ST_Intersection((urban.geometry), ST_GeomFromText(wetlands.geometry))) as geometry
FROM (SELECT * from urban) as urban
JOIN wetlands ON 
ST_Intersects(ST_GeomFromText(wetlands.geometry), (urban.geometry))
""")
con.sql("""
CREATE OR REPLACE TABLE urbanwetlands AS
SELECT DISTINCT name, huc12, ST_Area(ST_geomfromtext(geometry))*0.0001 AS ha, kcal, kcal*ha AS urbanNrgy, geometry FROM urbanwetlands
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [24]:
con.sql("""
CREATE OR REPLACE TABLE urban AS
SELECT huc12, ST_Area(geometry)*0.0001 AS urbanHa, geometry FROM urban
""")

In [25]:
con.sql("""
CREATE OR REPLACE TABLE unavailable AS
SELECT huc12, ST_Area(geometry)*0.0001 AS unavailHa, ST_Union_Agg(geometry) as geometry FROM
(
SELECT huc12, geometry FROM urban
UNION ALL
SELECT huc12, geometry from protected
)
group by huc12, geometry
""")

In [26]:
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################
'''
#################################
End of data import
Starting model process
#################################
'''
#### Prepping energy - Join energy to nwi.  Need to create the spatial kcal table first. What's the best way to do this?
# parquet is the best to read in but it's not easily editable.  Rest service would be ok but again, not great because
# reading those is difficult.  I wonder if

'\n#################################\nEnd of data import\nStarting model process\n#################################\n'

In [27]:
'''
Demand

######
Need to proportion demand based on available energy.  Available energy is spatially explicit but demand is at the fips
count level.  We need to calculate total energy and demand at the huc12 scale.
To proportion demand we need to calclulate total energy by fips then calculate how much energy is in each huc12. A proportion
can then be calculated by dividing total energy within a fips by (huc12,fips) group.  Demand at the huc12 level is multiplied
by that energy proportion.
######
'''

'\nDemand\n\n######\nNeed to proportion demand based on available energy.  Available energy is spatially explicit but demand is at the fips\ncount level.  We need to calculate total energy and demand at the huc12 scale.\nTo proportion demand we need to calclulate total energy by fips then calculate how much energy is in each huc12. A proportion\ncan then be calculated by dividing total energy within a fips by (huc12,fips) group.  Demand at the huc12 level is multiplied\nby that energy proportion.\n######\n'

In [28]:
'''
Read in demand clipped by hucs
'''
con.sql(f"""
CREATE OR REPLACE TABLE demandfull AS SELECT * EXCLUDE geometry, ST_AsText(ST_GeomFromWKB(read_parquet.geometry)) as geometry
FROM read_parquet('azure://abdu/Demand9Species.parquet')
JOIN huc12 ON 
ST_Intersects(ST_GeomFromWKB(read_parquet.geometry), ST_GeomFromWKB(huc12.geometry))
""")
con.sql(f"""
CREATE OR REPLACE TABLE demand AS SELECT * EXCLUDE geometry, ST_AsText(ST_Intersection(ST_GeomFromWKB(huc12.geometry), ST_GeomFromWKB(read_parquet.geometry))) as geometry
FROM read_parquet('azure://abdu/Demand9Species.parquet')
JOIN huc12 ON 
ST_Intersects(ST_GeomFromWKB(huc12.geometry), ST_GeomFromWKB(read_parquet.geometry))
""")
con.sql("""CREATE OR REPLACE TABLE demand AS SELECT fips, huc12, CODE, LTADUD, LTADemand, LTAPopObj, x80DUD, X80Demand, X80PopObj, ST_Area(geometry)*0.0001 AS ha, geometry FROM (
SELECT fips, huc12.huc12, CODE, LTADUD, LTADemand, LTAPopObj, x80DUD, X80Demand, X80PopObj, ST_Intersection(ST_GeomFromText(demand.geometry), ST_GeomFromWKB(huc12.geometry)) as geometry FROM demand
JOIN huc12 ON ST_Intersects(ST_GeomFromText(demand.geometry), ST_GeomFromWKB(huc12.geometry))
WHERE species='All'
)
""")

In [29]:
con.sql("""
CREATE OR REPLACE TABLE demandfull AS SELECT DISTINCT fips, ST_Union_Agg(ST_GeomFromText(geometry)) as geometry from demandfull
group by fips
""")
fullfips = con.sql("""SELECT fips from demandfull""").df().values.tolist()
fullfips

[['47097'], ['05093'], ['29155'], ['29069'], ['05111']]

In [30]:
'''
%%time

Iterate through fips.
Clip wetlands that aren't riverine to the fips selected by the aoi to calculate total energy within the fips

tmp = pd.DataFrame()
for fip in fullfips:
    print(fip)
    selectstring = """
    CREATE OR REPLACE table tmpwet AS
    (
    SELECT ATTRIBUTE,fips, ST_AsText(ST_Intersection(ST_GeomFromWKB(wetlnd.geometry), dmd.geometry)) as geometry
    FROM (SELECT ATTRIBUTE, geometry FROM read_parquet('azure://abdu/nwi/**/*.parquet')
    WHERE WETLAND_TYPE != 'Riverine') AS wetlnd
    JOIN (SELECT fips, geometry FROM demandfull WHERE fips='{0}') as dmd ON 
    ST_Intersects(ST_GeomFromWKB(wetlnd.geometry), dmd.geometry)
    )
    """.format(fip[0])
    con.sql(selectstring)
    con.sql("""CREATE OR REPLACE TABLE crossnwi AS (UNPIVOT (FROM (SELECT * FROM read_json_auto('aoiWetland.json', maximum_object_size=100000000))) ON COLUMNS(*))""")
    con.sql("""CREATE OR REPLACE TABLE crossnwi AS SELECT name, UNNEST(value) AS value FROM crossnwi""")
    con.sql("""CREATE OR REPLACE TABLE tmpwet AS
    SELECT name, fips, geometry FROM (SELECT * FROM tmpwet) AS wetselect
    LEFT JOIN crossnwi ON wetselect.ATTRIBUTE LIKE crossnwi.value
    """)
    con.sql(f"""CREATE OR REPLACE TABLE tmpwet AS
    (SELECT replace(tmpwet.name, '_', '') AS name, fips, geometry, kcal FROM tmpwet
    LEFT JOIN read_csv_auto('azure://abdu/kcal.csv') ON replace(tmpwet.name, '_', '') = read_csv_auto.habitatType
    WHERE tmpwet.name IS NOT NULL)
    """)
    con.sql("""CREATE OR REPLACE TABLE newdata AS (
    SELECT name, fips, kcal, ST_Area(geometry)*0.0001 AS ha, ha*kcal AS avalNrgy FROM(
    SELECT name, fips, kcal, ST_Union_Agg(ST_GeomFromText(geometry)) AS geometry FROM tmpwet
    GROUP BY name, fips, kcal))
    """)
    final = con.sql("SELECT fips, SUM(avalNrgy) from newdata GROUP BY fips").df()
    tmp = pd.concat([tmp, final])
con.sql("""CREATE OR REPLACE TABLE fipsavalNRGtext AS SELECT fips, "sum(avalNrgy)" as fipsnrgysum from tmp""")
con.sql("""select * from fipsavalNRGtext""")
'''

'\n%%time\n\nIterate through fips.\nClip wetlands that aren\'t riverine to the fips selected by the aoi to calculate total energy within the fips\n\ntmp = pd.DataFrame()\nfor fip in fullfips:\n    print(fip)\n    selectstring = """\n    CREATE OR REPLACE table tmpwet AS\n    (\n    SELECT ATTRIBUTE,fips, ST_AsText(ST_Intersection(ST_GeomFromWKB(wetlnd.geometry), dmd.geometry)) as geometry\n    FROM (SELECT ATTRIBUTE, geometry FROM read_parquet(\'azure://abdu/nwi/**/*.parquet\')\n    WHERE WETLAND_TYPE != \'Riverine\') AS wetlnd\n    JOIN (SELECT fips, geometry FROM demandfull WHERE fips=\'{0}\') as dmd ON \n    ST_Intersects(ST_GeomFromWKB(wetlnd.geometry), dmd.geometry)\n    )\n    """.format(fip[0])\n    con.sql(selectstring)\n    con.sql("""CREATE OR REPLACE TABLE crossnwi AS (UNPIVOT (FROM (SELECT * FROM read_json_auto(\'aoiWetland.json\', maximum_object_size=100000000))) ON COLUMNS(*))""")\n    con.sql("""CREATE OR REPLACE TABLE crossnwi AS SELECT name, UNNEST(value) AS value FROM

In [31]:
##### TEST THREADING #####

In [32]:
print(fullfips)
listfips = sorted([item for items in fullfips for item in items])
print(listfips)

[['47097'], ['05093'], ['29155'], ['29069'], ['05111']]
['05093', '05111', '29069', '29155', '47097']


In [33]:
con.execute("""
    CREATE OR REPLACE TABLE my_inserts (
        ATTRIBUTE VARCHAR,
        fips VARCHAR,
        geometry VARCHAR
    )""")
def write_from_thread(con):
    local_con = con.cursor()
    fip = str(current_thread().name)
    local_con.sql("SET azure_storage_connection_string = 'DefaultEndpointsProtocol=https;AccountName=giscog;EndpointSuffix=core.windows.net';")
    result = local_con.execute("""
    INSERT INTO my_inserts
    (
    SELECT ATTRIBUTE,fips, ST_AsText(ST_Intersection(ST_GeomFromWKB(wetlnd.geometry), dmd.geometry)) as geometry
    FROM (SELECT ATTRIBUTE, geometry FROM read_parquet('azure://abdu/nwi/**/*.parquet')
    WHERE WETLAND_TYPE != 'Riverine') AS wetlnd
    JOIN (SELECT fips, geometry FROM demandfull WHERE fips='{0}') as dmd ON 
    ST_Intersects(ST_GeomFromWKB(wetlnd.geometry), dmd.geometry)
    )
    """.format(fip)).fetchall()

In [34]:
threads = []
print(listfips)
for i in range(len(listfips)):
    fip = listfips[i]
    print(fip)
    threads.append(Thread(target = write_from_thread,
                            args = (con,),
                            name = fip))
    
threads

['05093', '05111', '29069', '29155', '47097']
05093
05111
29069
29155
47097


[<Thread(05093, initial)>,
 <Thread(05111, initial)>,
 <Thread(29069, initial)>,
 <Thread(29155, initial)>,
 <Thread(47097, initial)>]

In [35]:
%%time
# Kick off all threads in parallel
for thread in threads:
    thread.start()

# Ensure all threads complete before printing final results
for thread in threads:
    thread.join()

CPU times: total: 15min 23s
Wall time: 39min 51s


In [36]:
%%time
con.sql("""CREATE OR REPLACE TABLE crossnwi AS (UNPIVOT (FROM (SELECT * FROM read_json_auto('aoiWetland.json', maximum_object_size=100000000))) ON COLUMNS(*))""")
con.sql("""CREATE OR REPLACE TABLE crossnwi AS SELECT name, UNNEST(value) AS value FROM crossnwi""")
con.sql("""CREATE OR REPLACE TABLE tmpwet AS
SELECT name, fips, geometry FROM (SELECT * FROM my_inserts) AS wetselect
LEFT JOIN crossnwi ON wetselect.ATTRIBUTE LIKE crossnwi.value
""")
con.sql(f"""CREATE OR REPLACE TABLE tmpwet AS
(SELECT replace(tmpwet.name, '_', '') AS name, fips, geometry, kcal FROM tmpwet
LEFT JOIN read_csv_auto('azure://abdu/kcal.csv') ON replace(tmpwet.name, '_', '') = read_csv_auto.habitatType
WHERE tmpwet.name IS NOT NULL)
""")
con.sql("""CREATE OR REPLACE TABLE newdata AS (
SELECT name, fips, kcal, ST_Area(geometry)*0.0001 AS ha, ha*kcal AS avalNrgy FROM(
SELECT name, fips, kcal, ST_Union_Agg(ST_GeomFromText(geometry)) AS geometry FROM tmpwet
GROUP BY name, fips, kcal))
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

CPU times: total: 20min 21s
Wall time: 4min 20s


In [37]:
con.sql('describe newdata')

┌─────────────┬─────────────┬─────────┬─────────┬─────────┬───────┐
│ column_name │ column_type │  null   │   key   │ default │ extra │
│   varchar   │   varchar   │ varchar │ varchar │ varchar │ int32 │
├─────────────┼─────────────┼─────────┼─────────┼─────────┼───────┤
│ name        │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
│ fips        │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
│ kcal        │ BIGINT      │ YES     │ NULL    │ NULL    │  NULL │
│ ha          │ DOUBLE      │ YES     │ NULL    │ NULL    │  NULL │
│ avalNrgy    │ DOUBLE      │ YES     │ NULL    │ NULL    │  NULL │
└─────────────┴─────────────┴─────────┴─────────┴─────────┴───────┘

In [38]:
con.sql("""CREATE OR REPLACE TABLE fipsavalNRG AS SELECT fips, sum(avalNrgy) as fipsnrgysum from newdata group by fips""")
con.sql("""select * from fipsavalNRG""")

┌─────────┬────────────────────┐
│  fips   │    fipsnrgysum     │
│ varchar │       double       │
├─────────┼────────────────────┤
│ 47097   │  6412826086.457126 │
│ 29155   │  3659891166.052708 │
│ 05093   │  4343586557.220324 │
│ 29069   │ 2179175072.3020926 │
│ 05111   │  4853062003.605463 │
└─────────┴────────────────────┘

In [39]:
##### END TEST THREADING #####

In [40]:
'''
######
Read in kcal.csv from azure and join to wetlands.
This file is the habitat type (habitatType) and the kcal/Ha (kcal)
######
'''
#
con.sql(f"""CREATE OR REPLACE TABLE demandwetlands AS
(SELECT replace(wetlands.name, '_', '') AS name, huc12, geometry, read_csv_auto.kcal FROM wetlands
LEFT JOIN read_csv_auto('azure://abdu/kcal.csv') ON replace(wetlands.name, '_', '') = read_csv_auto.habitatType
WHERE wetlands.name IS NOT NULL)
""")

In [41]:
con.sql("""CREATE OR REPLACE TABLE hucdemandenergy AS 
    (SELECT name,fips, demandwetlands.huc12, CODE, LTADUD, LTADemand, LTAPopObj, x80DUD, X80Demand, X80PopObj, kcal, 
    ST_Intersection(ST_GeomFromText(demandwetlands.geometry), (demand.geometry)) as geometry FROM demandwetlands
    JOIN demand ON ST_Intersects(ST_GeomFromText(demandwetlands.geometry), (demand.geometry)))""")

In [42]:
'''
######
Calculate available energy (avalNrgy) of wetlands by calculating area in Hectares (HA) and multiplying by kcal.
Select only distinct rows.
Create new table habitatenergy
######
'''
#
con.sql("""CREATE OR REPLACE TABLE hucdemandenergy AS (SELECT DISTINCT name, fips, huc12, CODE, LTADUD, LTADemand, LTAPopObj, x80DUD, X80Demand, X80PopObj, kcal, geometry, ST_Area(geometry)*0.0001 AS ha,ha*kcal AS avalNrgy FROM hucdemandenergy)""")
# DATA CHECK
#con.sql("""COPY (SELECT DISTINCT name, ST_AsWKB(ST_GeomFromText(geometry)) as geometry, ST_Area(ST_GeomFromText(geometry))*0.0001 AS ha, ha*kcal AS avalNrgy FROM wetlands) TO 'testwetland.parquet' (FORMAT PARQUET)""")

In [43]:
con.sql("""CREATE OR REPLACE TABLE test AS (select name, hucdemandenergy.fips as fips,huc12, CODE, LTADUD, LTADemand, LTAPopObj, x80DUD, X80Demand, X80PopObj, kcal, avalNrgy, fipsnrgysum, (hucdemandenergy.avalNrgy/fipsavalNRG.fipsnrgysum) as pct, geometry from hucdemandenergy
    JOIN fipsavalNRG on hucdemandenergy.fips = fipsavalNRG.fips
    GROUP BY name, hucdemandenergy.fips, huc12, CODE, LTADUD, LTADemand, LTAPopObj, x80DUD, X80Demand, X80PopObj, kcal, avalNrgy, fipsnrgysum, geometry)""")

In [44]:
fullfips = sorted([item for items in fullfips for item in items])
sqlcall ="""CREATE OR REPLACE TABLE rdydemand as SELECT * FROM test WHERE fips In {0}""".format(tuple(fullfips))
con.sql(sqlcall)

In [45]:
#con.sql("""describe rdydemand""")
con.sql("""CREATE OR REPLACE TABLE hucdemand AS (SELECT huc12, code, 
sum(pct * LTADUD) AS LTADUD,
sum(pct * LTADemand) AS LTADemand,
sum(pct * LTAPopObj) AS LTAPopObj,
sum(pct * x80DUD) AS x80DUD,
sum(pct * X80Demand) AS X80Demand,
sum(pct * X80PopObj) AS X80PopObj,
FROM rdydemand
GROUP BY huc12, code)""")

In [46]:
'''
END Demand
'''
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################
#################################

'\nEND Demand\n'

In [47]:
# Just added urbanNrgy line.  Going a very long time.  Might be a problem.
# Testing unavailHa.still running.  TAking a long time.  Check this line too.

# Specified selection in a cell or two below.  Many don't need geometry at this later point.  Joining is by huc12 so selecting
# only the required columns makes the join go much faster.
con.sql("""CREATE OR REPLACE TABLE athuclevel AS SELECT huc12.huc12, CODE, 
LTADUD, LTADemand, LTAPopObj, X80DUD, X80Demand, X80PopObj, 
sum(avalNrgy) as tothabitat_kcal, 
sum(protNrgy) as protected_kcal, 
sum(ProtHabHa) as protectedhabitat_ha,
sum(urbanHa) as urbanHa,
huc12.geometry
FROM huc12
LEFT JOIN hucdemand on hucdemand.huc12 = huc12.huc12
LEFT JOIN wetlands on wetlands.huc12 = huc12.huc12
LEFT JOIN protwetlands on protwetlands.huc12 = huc12.huc12
LEFT JOIN urban on urban.huc12 = huc12.huc12
GROUP BY huc12.huc12, CODE, LTADUD, LTADemand, LTAPopObj, X80DUD, X80Demand, X80PopObj, huc12.geometry
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [48]:
con.sql('''select count(huc12) from huc12''')
con.sql('select * from huc12 order by huc12')

┌─────────┬─────────┬──────────────┬───────────┬───────────────────────────────────────────────────────────────────────┐
│  huc2   │  huc4   │    huc12     │ areaacres │                               geometry                                │
│ varchar │ varchar │   varchar    │  double   │                                 blob                                  │
├─────────┼─────────┼──────────────┼───────────┼───────────────────────────────────────────────────────────────────────┤
│ 08      │ 0801    │ 080101000604 │  51086.98 │ \x01\x06\x00\x00\x00\x01\x00\x00\x00\x01\x03\x00\x00\x00\x01\x00\x0…  │
│ 08      │ 0802    │ 080202031201 │  34522.78 │ \x01\x06\x00\x00\x00\x01\x00\x00\x00\x01\x03\x00\x00\x00\x01\x00\x0…  │
│ 08      │ 0802    │ 080202031203 │  30202.13 │ \x01\x06\x00\x00\x00\x01\x00\x00\x00\x01\x03\x00\x00\x00\x01\x00\x0…  │
│ 08      │ 0802    │ 080202031204 │  23311.82 │ \x01\x06\x00\x00\x00\x01\x00\x00\x00\x01\x03\x00\x00\x00\x01\x00\x0…  │
│ 08      │ 0802    │ 0802020407

In [49]:
con.sql("""CREATE OR REPLACE TABLE athuclevel AS 
SELECT athuclevel.huc12,
ST_Area(ST_GeomFromWKB(geometry))*0.0001 huc12_ha,
CODE, 
COALESCE(LTADUD, 0) dud_lta,
COALESCE(LTADemand,0) demand_lta_kcal, 
COALESCE(LTAPopObj,0) popobj_lta, 
COALESCE(X80DUD,0) dud_80th, 
COALESCE(X80Demand,0) demand_80th_kcal, 
COALESCE(X80PopObj,0) popobj_80th, 
COALESCE(tothabitat_kcal,0) tothabitat_kcal,
COALESCE(protected_kcal,0) protected_kcal,
COALESCE(protectedhabitat_ha,0) protectedhabitat_ha,
COALESCE(urbanHa,0) urbanHa, 
COALESCE(sum(urbanNrgy),0) urbanNrgy,
COALESCE(sum(unavailHa),0) unavailha,
COALESCE(sum(unavailHa),0) huc12_ha_unavailha,
COALESCE(tothabitat_kcal - demand_lta_kcal,0) surpdef_lta_kcal,
COALESCE(tothabitat_kcal - demand_80th_kcal,0) surpdef_80th_kcal,
athuclevel.geometry
FROM athuclevel
LEFT JOIN (SELECT huc12, urbanNrgy FROM urbanwetlands) as urbanwetlands on urbanwetlands.huc12 = athuclevel.huc12
LEFT JOIN (SELECT huc12, unavailHa FROM unavailable) as unavailable on unavailable.huc12 = athuclevel.huc12
GROUP BY athuclevel.huc12, CODE, LTADUD, LTADemand, LTAPopObj, X80DUD, X80Demand, X80PopObj, tothabitat_kcal, protected_kcal, protectedhabitat_ha,urbanHa, geometry
""")

In [50]:
con.sql('''select count(huc12) from athuclevel''')
con.sql('select * from athuclevel order by huc12')

┌──────────────┬────────────────────┬─────────┬───┬────────────────────┬────────────────────┬──────────────────────┐
│    huc12     │      huc12_ha      │  CODE   │ … │  surpdef_lta_kcal  │ surpdef_80th_kcal  │       geometry       │
│   varchar    │       double       │ varchar │   │       double       │       double       │         blob         │
├──────────────┼────────────────────┼─────────┼───┼────────────────────┼────────────────────┼──────────────────────┤
│ 080101000604 │ 20674.165854007184 │ 4B      │ … │ 56639275059168.625 │  56639275010604.04 │ \x01\x06\x00\x00\x…  │
│ 080101000604 │ 20674.165854007184 │ 4D      │ … │  56638785024753.72 │  56638672828820.29 │ \x01\x06\x00\x00\x…  │
│ 080202031201 │ 13970.871465084027 │ 4D      │ … │  8873379862.022955 │   8871696741.83188 │ \x01\x06\x00\x00\x…  │
│ 080202031203 │ 12222.365149180703 │ 4D      │ … │  5552134614.293705 │  5550898957.422803 │ \x01\x06\x00\x00\x…  │
│ 080202031204 │   9433.95872268888 │ 4B      │ … │  5478210909.

In [51]:
con.sql(''' CREATE OR REPLACE TABLE athuclevel AS 
SELECT *,
CASE WHEN 
demand_lta_kcal - protected_kcal > 0
THEN
demand_lta_kcal - protected_kcal
ELSE 0
END
AS nrgprot_lta_kcal,
CASE WHEN
demand_80th_kcal - protected_kcal > 0 
THEN
demand_80th_kcal - protected_kcal
ELSE 0
END
AS nrgprot_80th_kcal
FROM athuclevel
''')

In [52]:
'''
Calculate weighted mean
'''
con.sql('''
CREATE OR REPLACE TABLE wtmean AS 
SELECT huctotal.huc12, name, avalNrgname/avalNrgtot as pct, hucnametotal.avalNrgname * pct as wtmean FROM
((SELECT huc12, sum(avalNrgy) as avalNrgtot from wetlands group by huc12) huctotal
join
(SELECT huc12, name, sum(avalNrgy) as avalNrgname from wetlands group by huc12, name) hucnametotal
on hucnametotal.huc12 = huctotal.huc12)
''')
con.sql('''CREATE OR REPLACE TABLE wtmeanpivot AS
(select * exclude pct FROM
(pivot wtmean
    on name
    USING sum(wtmean)))
''')
con.sql('''CREATE OR REPLACE TABLE wtmeanpivot AS 
SELECT
huc12,
COALESCE(DeepwaterFresh, 0) DeepwaterFresh,
COALESCE(FreshMarsh, 0) FreshMarsh, 
COALESCE(FreshShallowOpenWater, 0) FreshShallowOpenWater,
COALESCE(FreshwaterWoody, 0) FreshwaterWoody,
COALESCE(ManagedFreshMarsh, 0) ManagedFreshMarsh,
COALESCE(ManagedFreshShallowOpenWater, 0) ManagedFreshShallowOpenWater,
COALESCE(ManagedFreshwaterAquaticBed, 0) ManagedFreshwaterAquaticBed
FROM wtmeanpivot
''')
con.sql('''create or replace table wtmeanbyhuc as
        select huc12, 
        sum(DeepwaterFresh + FreshMarsh + FreshShallowOpenWater + FreshwaterWoody + ManagedFreshMarsh +ManagedFreshShallowOpenWater + ManagedFreshwaterAquaticBed)
        as wtmean from wtmeanpivot group by huc12''')

In [53]:
con.sql('''select count(huc12) from athuclevel''')
con.sql('select * from athuclevel order by huc12')

┌──────────────┬────────────────────┬─────────┬───┬──────────────────────┬────────────────────┬────────────────────┐
│    huc12     │      huc12_ha      │  CODE   │ … │       geometry       │  nrgprot_lta_kcal  │ nrgprot_80th_kcal  │
│   varchar    │       double       │ varchar │   │         blob         │       double       │       double       │
├──────────────┼────────────────────┼─────────┼───┼──────────────────────┼────────────────────┼────────────────────┤
│ 080101000604 │ 20674.165854007184 │ 4B      │ … │ \x01\x06\x00\x00\x…  │                0.0 │                0.0 │
│ 080101000604 │ 20674.165854007184 │ 4D      │ … │ \x01\x06\x00\x00\x…  │                0.0 │                0.0 │
│ 080202031201 │ 13970.871465084027 │ 4D      │ … │ \x01\x06\x00\x00\x…  │  7638924.053511167 │    9322044.2445851 │
│ 080202031203 │ 12222.365149180703 │ 4D      │ … │ \x01\x06\x00\x00\x…  │  5608089.691441812 │  6843746.562343769 │
│ 080202031204 │   9433.95872268888 │ 4B      │ … │ \x01\x06\x00

In [54]:
con.sql('select * from wtmeanbyhuc order by huc12')

┌──────────────┬────────────────────┐
│    huc12     │       wtmean       │
│   varchar    │       double       │
├──────────────┼────────────────────┤
│ 080101000604 │ 2139736477.4576807 │
│ 080202031201 │ 20558285.142592266 │
│ 080202031203 │   8137433.75467029 │
│ 080202031204 │  18741296.25220691 │
│ 080202040712 │  37346920.11745108 │
│ 080202040901 │ 26822207.628195055 │
│ 080202040902 │ 33087927.504779287 │
└──────────────┴────────────────────┘

In [55]:
##########
########    Way too many rows after join.
con.sql('''CREATE OR REPLACE TABLE athuclevel AS
SELECT * 
from athuclevel
left join wtmeanbyhuc on athuclevel.huc12=wtmeanbyhuc.huc12
order by athuclevel.huc12
''')
con.sql('ALTER TABLE athuclevel RENAME wtmean TO wtMean_kcal_per_ha')

In [56]:
con.sql('''CREATE OR REPLACE TABLE athuclevel AS
SELECT *,
CASE WHEN 
surpdef_lta_kcal < 0
THEN
abs(surpdef_lta_kcal/wtMean_kcal_per_ha)
ELSE 0
END
AS restoregoal_lta_ha,

CASE WHEN 
surpdef_80th_kcal < 0
THEN
abs(surpdef_80th_kcal/wtMean_kcal_per_ha)
ELSE 0
END
AS restoregoal_80th_ha

FROM athuclevel
''')

In [57]:
## Need to double check huc12_ha and unavailha
con.sql('''CREATE OR REPLACE TABLE athuclevel AS 
        select huc12, huc12_ha, CODE as code, dud_lta, demand_lta_kcal, popobj_lta, dud_80th, demand_80th_kcal, popobj_80th,
        tothabitat_kcal, protected_kcal, protectedhabitat_ha, urbanHa, urbanNrgy, unavailha, surpdef_lta_kcal, surpdef_80th_kcal,
        nrgprot_lta_kcal, nrgprot_80th_kcal, wtMean_kcal_per_ha, restoregoal_lta_ha, restoregoal_80th_ha, 
        CASE WHEN
        huc12_ha - unavailha > 0
        THEN
        huc12_ha - unavailha
        ELSE 0
        END 
        AS available_ha,
        geometry
        FROM athuclevel
        ''')

In [58]:
con.sql('''CREATE OR REPLACE TABLE athuclevel AS
SELECT * EXCLUDE (restoregoal_lta_ha, restoregoal_80th_ha),
CASE WHEN 
restoregoal_lta_ha > available_ha
THEN
available_ha
ELSE restoregoal_lta_ha
END
AS restoregoal_lta_ha,

CASE WHEN 
restoregoal_80th_ha > available_ha
THEN
available_ha
ELSE restoregoal_80th_ha
END
AS restoregoal_80th_ha,

FROM athuclevel
''')

In [59]:
#field='protectgoal_lta_ha', expression="(!nrgprot_lta_kcal!/!wtMean_kcal_per_ha!) if !nrgprot_lta_kcal! > 0 else 0"
#field='protectgoal_80th_ha', expression="(!nrgprot_80th_kcal!/!wtMean_kcal_per_ha!) if !nrgprot_80th_kcal! > 0 else 0"
con.sql('''CREATE OR REPLACE TABLE athuclevel AS
SELECT *,
CASE WHEN 
nrgprot_lta_kcal > 0 
THEN
nrgprot_lta_kcal/wtMean_kcal_per_ha
ELSE 0
END
AS protectgoal_lta_ha,

CASE WHEN 
nrgprot_80th_kcal > 0
THEN
nrgprot_80th_kcal/wtMean_kcal_per_ha
ELSE 0
END
AS protectgoal_80th_ha,
FROM athuclevel
''')

In [60]:
#field='protectgoal_lta_ha', expression="!available_ha! if !protectgoal_lta_ha! > !available_ha! else !protectgoal_lta_ha!"
#field='protectgoal_80th_ha', expression="!available_ha! if !protectgoal_80th_ha! > !available_ha! else !protectgoal_80th_ha!"
con.sql('''CREATE OR REPLACE TABLE athuclevel AS
SELECT * EXCLUDE (protectgoal_lta_ha, protectgoal_80th_ha),
CASE WHEN 
protectgoal_lta_ha > available_ha
THEN
available_ha
ELSE protectgoal_lta_ha
END
AS  protectgoal_lta_ha,

CASE WHEN 
protectgoal_80th_ha > available_ha
THEN
available_ha
ELSE protectgoal_80th_ha
END
AS protectgoal_80th_ha,
FROM athuclevel
''')

In [61]:
'''
Protected wetlands, urban wetlands, and wetland energy all calculated by huc12.  Need to calculate total urban outside of
wetland energy

Calculations:
    Energy supply
        Total habitat energy within huc - THabNrg
        Total habitat hectares within huc - THabHA

    Energy demand
        LTA and X80 DUD by huc - TLTADUD anc X80DUD
        LTA and X80 Demand by huc - TLTADemand and X80Demand
        LTA and X80 Population objective by huc - LTAPopObj and X80PopObj
        
    Protected lands
        Total protected hectares by huc - ProtHA

    Protected habitat hectares and energy
        Total protected hectares - ProtHabHA
        Total protected energy - ProtHabNrg

    Weighted mean and calculations based off of it
        Weighted mean kcal/ha with weight being Total habitat energy
        Energy Protection needed - NrgProtRq
        Restoration HA based off of weighted mean - RstorHA
        Protection HA based off weighted mean - RstorProtHA  

'''
#################################
#################################
#################################


'\nProtected wetlands, urban wetlands, and wetland energy all calculated by huc12.  Need to calculate total urban outside of\nwetland energy\n\nCalculations:\n    Energy supply\n        Total habitat energy within huc - THabNrg\n        Total habitat hectares within huc - THabHA\n\n    Energy demand\n        LTA and X80 DUD by huc - TLTADUD anc X80DUD\n        LTA and X80 Demand by huc - TLTADemand and X80Demand\n        LTA and X80 Population objective by huc - LTAPopObj and X80PopObj\n        \n    Protected lands\n        Total protected hectares by huc - ProtHA\n\n    Protected habitat hectares and energy\n        Total protected hectares - ProtHabHA\n        Total protected energy - ProtHabNrg\n\n    Weighted mean and calculations based off of it\n        Weighted mean kcal/ha with weight being Total habitat energy\n        Energy Protection needed - NrgProtRq\n        Restoration HA based off of weighted mean - RstorHA\n        Protection HA based off weighted mean - RstorProtHA 

In [62]:
con.sql('describe athuclevel').df()

Unnamed: 0,column_name,column_type,null,key,default,extra
0,huc12,VARCHAR,YES,,,
1,huc12_ha,DOUBLE,YES,,,
2,code,VARCHAR,YES,,,
3,dud_lta,DOUBLE,YES,,,
4,demand_lta_kcal,DOUBLE,YES,,,
5,popobj_lta,DOUBLE,YES,,,
6,dud_80th,DOUBLE,YES,,,
7,demand_80th_kcal,DOUBLE,YES,,,
8,popobj_80th,DOUBLE,YES,,,
9,tothabitat_kcal,DOUBLE,YES,,,


In [63]:
end_time = time.ctime(time.time())
end_time

'Wed Feb 14 08:58:15 2024'

In [64]:
con.sql("""COPY (SELECT * EXCLUDE geometry, ST_AsWKB(ST_GeomFromWKB(geometry)) as geometry, FROM athuclevel) TO 'testwetland.parquet' (FORMAT PARQUET)""")