# This is OK for desktop

Trying CMAP, use Python as that is the most recently updated package at CMAP\
For now just testing out the code and figuring out how it works.
## Krista Longnecker, 14 June 2025

Not bad for a first attempt, but I can close this now

In [2]:
import os, glob
import concurrent.futures
import pycmap
import sys
sys.path.append("../config")
from config import API_KEY
from settings import DATA_DIR, COLOCALIZED_DIR
from common import halt, makedir, environmental_datasets
import pandas as pd
import datetime
from dateutil.parser import parse


In [3]:
#set the easy/quick functions here
def cyano_csv_files(cyanoDir):
    """
    Returns a list of path to csv files that hold observations of cyanobacteria.
    KL note: leave name for now, though I am working on TOC/DOC data
    """
    return glob.glob(f"{cyanoDir}*.csv")


def add_env_columns(df, envs):
    """
    Adds new columns to the dataframe form each environmental variable.
    """
    for env in envs.values():
        for v in env.get("variables"):
            if v not in df.columns: df[v] = None
    return df
    

def add_env_temporal_coverage(api, envs):
    """
    Adds new entries to the envs dictionary indicating the temporal coverage of each environmental dataset.
    """
    for table, env in envs.items():
        df = api.query(f"SELECT MIN([time]) startTime, MAX([time]) endTime FROM {table}")
        if len(df) > 0:
            envs[table]["startTime"] = df.loc[0, "startTime"]
            envs[table]["endTime"] = df.loc[0, "endTime"]
    return envs

In [4]:
%run collect.py

Downloading  ('tblBATS_Bottle', ['cruise_ID', 'TOC', 'phosphate'])  ...
Downloading  ('tblGLODAP', ['cruise_expocode', 'toc', 'phosphate'])  ...
Downloading  ('tblBATS_Bottle_Validation', ['cruise_ID', 'TOC', 'phosphate'])  ...
Downloading  ('tblGeotraces_Seawater_IDP2021v2', ['cruise_id', 'DOC_D_CONC_BOTTLE', 'PHOSPHATE_D_CONC_BOTTLE'])  ...


In [5]:
#this is match (the query step is what takes the time
def match(df, api, envs, cyanoFile, rowCount):
    """
    Takes a single-row dataframe containing cyano observations and colocalizes with the 
    environmental variables included in the `envs` argument. The tolerance parametrs 
    are also included in the `envs` argument.
    """ 
    def get_month(dt):
        return parse(dt).month

    def shift_dt(dt, delta):
        delta = float(delta)
        dt = parse(dt)
        dt += datetime.timedelta(days=delta)
        return dt.strftime("%Y-%m-%d %H:%M:%S")

    def in_time_window(sourceDT, targetMinDT, targetMaxDT):
        targetMinDT = targetMinDT.split(".000Z")[0]
        targetMaxDT = targetMaxDT.split(".000Z")[0]
        return not (
                    parse(sourceDT) < parse(targetMinDT) or 
                    parse(sourceDT) > parse(targetMaxDT)
                    )

    def construc_query(table, env, t, lat, lon, depth):
        variables = env["variables"] 
        timeTolerance = env["tolerances"][0] 
        latTolerance = env["tolerances"][1] 
        lonTolerance = env["tolerances"][2]  
        depthTolerance = env["tolerances"][3]  
        hasDepth = env["hasDepth"] 
        isClimatology = env["isClimatology"]
        inTimeRange = True
        if not isClimatology:
            startTime = env["startTime"]
            endTime = env["endTime"]    
            inTimeRange = in_time_window(t, startTime, endTime)
        selectClause = "SELECT " + ", ".join([f"AVG({v}) {v}" for v in variables]) + " FROM " + table
        timeClause = f" WHERE [time] BETWEEN '{shift_dt(t, -timeTolerance)}' AND '{shift_dt(t, timeTolerance)}' "
        if not inTimeRange or isClimatology: timeClause = f" WHERE [month]={get_month(t)} "
        latClause = f" AND lat BETWEEN {lat-latTolerance} AND {lat+latTolerance} "
        lonClause = f" AND lon BETWEEN {lon-lonTolerance} AND {lon+lonTolerance} "
        depthClause = f" AND depth BETWEEN {depth-depthTolerance} AND {depth+depthTolerance} "
        if not hasDepth: depthClause = ""                
        return selectClause + timeClause + latClause + lonClause + depthClause        


    if len(df) != 1: halt(f"Invalid dataframe input.\nExpected a single row dataframe but received {len(df)} rows.")
    rowIndex = df.index.values[0]
    df.reset_index(drop=True, inplace=True)
    t= df.iloc[0]["time"]
    lat = df.iloc[0]["lat"]
    lon = df.iloc[0]["lon"] 
    depth = 0
    if 'depth' in df.columns: depth = df.iloc[0]["depth"]
    for table, env in envs.items():
        #KL turn off next line because it's make a rather large output file and I will not browse through it 6/18/2025
        #print(f"{rowIndex} / {rowCount-1}\n\t{datetime.datetime.now()}: Colocalizing {table} with {cyanoFile} ...")
        query = construc_query(table, env, t, lat, lon, depth)
        matchedEnv = api.query(query)
        if len(matchedEnv)>0:
            for v in env["variables"]: df.at[0, v] = matchedEnv.iloc[0][v] 
    return df

In [6]:
def main():
    """
    Iterates through the list of datasets and colocalizes them with the specified environmental variables.
    Colocalized datasets are stored in the "COLOCALIZED_DIR" as csv files.
    """
    def saveColocalizedCSV(df):
        df.to_csv(f"{COLOCALIZED_DIR}{os.path.basename(cyanoFile)}", index=False) 

    cyanoFiles = cyano_csv_files(DATA_DIR)
    api = pycmap.API(token=API_KEY)
    makedir(COLOCALIZED_DIR)
    envs = environmental_datasets()        
    envs = add_env_temporal_coverage(api, envs)

    for cyanoFile in cyanoFiles:
        df = pd.read_csv(cyanoFile)
        df = add_env_columns(df, envs)
        dfs = [df.loc[i].to_frame().T for i in range(len(df))]
        colocalizedDF  = pd.DataFrame({})
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futureObjs = executor.map(match, dfs, [api] * len(dfs), [envs] * len(dfs), [cyanoFile] * len(dfs), [len(dfs)] * len(dfs))
            for fo in futureObjs:
                if len(colocalizedDF) < 1:
                    colocalizedDF = fo
                else:
                    colocalizedDF = pd.concat([colocalizedDF, fo], ignore_index=True)  
        saveColocalizedCSV(colocalizedDF)


In [None]:
# break out into pieces, then add new script to assemble each colocalized dataframe

In [7]:
cyanoFiles = cyano_csv_files(DATA_DIR)

In [8]:
cyanoFiles

['./data\\tblBATS_Bottle.csv',
 './data\\tblBATS_Bottle_Validation.csv',
 './data\\tblGeotraces_Seawater_IDP2021v2.csv',
 './data\\tblGLODAP.csv']

In [9]:
api = pycmap.API(token=API_KEY)
makedir(COLOCALIZED_DIR)
envs = environmental_datasets()        
envs = add_env_temporal_coverage(api, envs)

In [10]:
envs

{'tblCHL_REP': {'variables': ['chl'],
  'tolerances': [4, 0.25, 0.25, 5],
  'hasDepth': False,
  'isClimatology': False,
  'startTime': '1998-01-01T00:00:00.000Z',
  'endTime': '2018-06-26T00:00:00.000Z'},
 'tblPisces_NRT': {'variables': ['NO3', 'PO4', 'Fe', 'O2', 'Si', 'PP'],
  'tolerances': [4, 0.5, 0.5, 5],
  'hasDepth': True,
  'isClimatology': False,
  'startTime': '2011-12-31T00:00:00.000Z',
  'endTime': '2019-04-27T00:00:00.000Z'},
 'tblWOA_Climatology': {'variables': ['density_WOA_clim',
   'salinity_WOA_clim',
   'nitrate_WOA_clim',
   'phosphate_WOA_clim',
   'silicate_WOA_clim',
   'oxygen_WOA_clim'],
  'tolerances': [1, 0.75, 0.75, 5],
  'hasDepth': True,
  'isClimatology': True}}

In [29]:
def get_month(dt):
    return parse(dt).month

def shift_dt(dt, delta):
    delta = float(delta)
    dt = parse(dt)
    dt += datetime.timedelta(days=delta)
    return dt.strftime("%Y-%m-%d %H:%M:%S")

def in_time_window(sourceDT, targetMinDT, targetMaxDT):
    targetMinDT = targetMinDT.split(".000Z")[0]
    targetMaxDT = targetMaxDT.split(".000Z")[0]
    return not (
                parse(sourceDT) < parse(targetMinDT) or 
                parse(sourceDT) > parse(targetMaxDT)
                )

def construc_query(table, env, t, lat, lon, depth):
    variables = env["variables"] 
    timeTolerance = env["tolerances"][0] 
    latTolerance = env["tolerances"][1] 
    lonTolerance = env["tolerances"][2]  
    depthTolerance = env["tolerances"][3]  
    hasDepth = env["hasDepth"] 
    isClimatology = env["isClimatology"]
    inTimeRange = True
    if not isClimatology:
        startTime = env["startTime"]
        endTime = env["endTime"]    
        inTimeRange = in_time_window(t, startTime, endTime)
    selectClause = "SELECT " + ", ".join([f"AVG({v}) {v}" for v in variables]) + " FROM " + table
    timeClause = f" WHERE [time] BETWEEN '{shift_dt(t, -timeTolerance)}' AND '{shift_dt(t, timeTolerance)}' "
    if not inTimeRange or isClimatology: timeClause = f" WHERE [month]={get_month(t)} "
    latClause = f" AND lat BETWEEN {lat-latTolerance} AND {lat+latTolerance} "
    lonClause = f" AND lon BETWEEN {lon-lonTolerance} AND {lon+lonTolerance} "
    depthClause = f" AND depth BETWEEN {depth-depthTolerance} AND {depth+depthTolerance} "
    if not hasDepth: depthClause = ""                
    return selectClause + timeClause + latClause + lonClause + depthClause       

In [12]:
cyanoFiles[1]

'./data\\tblBATS_Bottle_Validation.csv'

In [13]:
cyanoFile = cyanoFiles[1]

In [14]:
df = pd.read_csv(cyanoFile)
df = add_env_columns(df, envs)
dfs = [df.loc[i].to_frame().T for i in range(len(df))]
colocalizedDF  = pd.DataFrame({})

In [15]:
rowIndex = df.index.values[0]
df.reset_index(drop=True, inplace=True)
t= df.iloc[0]["time"]
lat = df.iloc[0]["lat"]
lon = df.iloc[0]["lon"] 
depth = 0

In [16]:
if 'depth' in df.columns: depth = df.iloc[0]["depth"]

In [17]:
df

Unnamed: 0,time,lat,lon,depth,cruise_ID,TOC,phosphate,chl,NO3,PO4,Fe,O2,Si,PP,density_WOA_clim,salinity_WOA_clim,nitrate_WOA_clim,phosphate_WOA_clim,silicate_WOA_clim,oxygen_WOA_clim
0,1991-04-29T14:30:00,32.165,-64.503,2.4,5000100112,,,,,,,,,,,,,,,
1,1991-04-29T14:30:00,32.165,-64.503,24.2,5000100111,,,,,,,,,,,,,,,
2,1991-04-29T14:30:00,32.165,-64.503,50.4,5000100110,,,,,,,,,,,,,,,
3,1991-04-29T14:30:00,32.165,-64.503,73.9,5000100109,,,,,,,,,,,,,,,
4,1991-04-29T14:30:00,32.165,-64.503,99.7,5000100108,,,,,,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
14934,2022-06-15T22:34:00,19.669,-65.965,150.2,5005902514,,,,,,,,,,,,,,,
14935,2022-06-15T22:34:00,19.669,-65.965,200.6,5005902515,,0.11,,,,,,,,,,,,,
14936,2022-06-15T22:34:00,19.669,-65.965,301.6,5005902516,,0.32,,,,,,,,,,,,,
14937,2022-06-15T22:34:00,19.669,-65.965,401.6,5005902517,,0.58,,,,,,,,,,,,,


In [18]:
envs.items

<function dict.items()>

In [19]:
envs

{'tblCHL_REP': {'variables': ['chl'],
  'tolerances': [4, 0.25, 0.25, 5],
  'hasDepth': False,
  'isClimatology': False,
  'startTime': '1998-01-01T00:00:00.000Z',
  'endTime': '2018-06-26T00:00:00.000Z'},
 'tblPisces_NRT': {'variables': ['NO3', 'PO4', 'Fe', 'O2', 'Si', 'PP'],
  'tolerances': [4, 0.5, 0.5, 5],
  'hasDepth': True,
  'isClimatology': False,
  'startTime': '2011-12-31T00:00:00.000Z',
  'endTime': '2019-04-27T00:00:00.000Z'},
 'tblWOA_Climatology': {'variables': ['density_WOA_clim',
   'salinity_WOA_clim',
   'nitrate_WOA_clim',
   'phosphate_WOA_clim',
   'silicate_WOA_clim',
   'oxygen_WOA_clim'],
  'tolerances': [1, 0.75, 0.75, 5],
  'hasDepth': True,
  'isClimatology': True}}

In [23]:
env = envs[1]


KeyError: 1

In [24]:
type(envs)

dict

In [25]:
next(iter(envs))

'tblCHL_REP'

In [34]:
from IPython import embed #then add embed() where I want to stop

In [32]:
for table, env in envs.items():
    query = construc_query(table, env, t, lat, lon, depth)
    matchedEnv = api.query(query)
    if len(matchedEnv)>0:
        for v in env["variables"]: df.at[0, v] = matchedEnv.iloc[0][v] 

In [33]:
df

Unnamed: 0,time,lat,lon,depth,cruise_ID,TOC,phosphate,chl,NO3,PO4,Fe,O2,Si,PP,density_WOA_clim,salinity_WOA_clim,nitrate_WOA_clim,phosphate_WOA_clim,silicate_WOA_clim,oxygen_WOA_clim
0,1991-04-29T14:30:00,32.165,-64.503,2.4,5000100112,,,0.135655,1.939887,0.158448,0.000035,234.741243,2.917146,0.023538,26.057992,36.622066,0.10174,0.016469,0.644847,5.308658
1,1991-04-29T14:30:00,32.165,-64.503,24.2,5000100111,,,,,,,,,,,,,,,
2,1991-04-29T14:30:00,32.165,-64.503,50.4,5000100110,,,,,,,,,,,,,,,
3,1991-04-29T14:30:00,32.165,-64.503,73.9,5000100109,,,,,,,,,,,,,,,
4,1991-04-29T14:30:00,32.165,-64.503,99.7,5000100108,,,,,,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
14934,2022-06-15T22:34:00,19.669,-65.965,150.2,5005902514,,,,,,,,,,,,,,,
14935,2022-06-15T22:34:00,19.669,-65.965,200.6,5005902515,,0.11,,,,,,,,,,,,,
14936,2022-06-15T22:34:00,19.669,-65.965,301.6,5005902516,,0.32,,,,,,,,,,,,,
14937,2022-06-15T22:34:00,19.669,-65.965,401.6,5005902517,,0.58,,,,,,,,,,,,,


In [35]:
rowIndex = df.index.values[0]

In [36]:
rowIndex

np.int64(0)

In [37]:
df.index.values[0:4]


array([0, 1, 2, 3])

In [44]:
dfs

[                  time     lat     lon depth   cruise_ID  TOC phosphate   chl  \
 0  1991-04-29T14:30:00  32.165 -64.503   2.4  5000100112  NaN       NaN  None   
 
     NO3   PO4    Fe    O2    Si    PP density_WOA_clim salinity_WOA_clim  \
 0  None  None  None  None  None  None             None              None   
 
   nitrate_WOA_clim phosphate_WOA_clim silicate_WOA_clim oxygen_WOA_clim  
 0             None               None              None            None  ,
                   time     lat     lon depth   cruise_ID  TOC phosphate   chl  \
 1  1991-04-29T14:30:00  32.165 -64.503  24.2  5000100111  NaN       NaN  None   
 
     NO3   PO4    Fe    O2    Si    PP density_WOA_clim salinity_WOA_clim  \
 1  None  None  None  None  None  None             None              None   
 
   nitrate_WOA_clim phosphate_WOA_clim silicate_WOA_clim oxygen_WOA_clim  
 1             None               None              None            None  ,
                   time     lat     lon depth   cruise

In [38]:
api = pycmap.API(token=API_KEY)


match(df.iloc[1], api, envs, cyanoFile, rowCount)

NameError: name 'rowCount' is not defined

In [39]:
df.size


298780

In [41]:
df.iloc[1]

time                  1991-04-29T14:30:00
lat                                32.165
lon                               -64.503
depth                                24.2
cruise_ID                      5000100111
TOC                                   NaN
phosphate                             NaN
chl                                  None
NO3                                  None
PO4                                  None
Fe                                   None
O2                                   None
Si                                   None
PP                                   None
density_WOA_clim                     None
salinity_WOA_clim                    None
nitrate_WOA_clim                     None
phosphate_WOA_clim                   None
silicate_WOA_clim                    None
oxygen_WOA_clim                      None
Name: 1, dtype: object

In [42]:
df.iloc[0]

time                  1991-04-29T14:30:00
lat                                32.165
lon                               -64.503
depth                                 2.4
cruise_ID                      5000100112
TOC                                   NaN
phosphate                             NaN
chl                              0.135655
NO3                              1.939887
PO4                              0.158448
Fe                               0.000035
O2                             234.741243
Si                               2.917146
PP                               0.023538
density_WOA_clim                26.057992
salinity_WOA_clim               36.622066
nitrate_WOA_clim                  0.10174
phosphate_WOA_clim               0.016469
silicate_WOA_clim                0.644847
oxygen_WOA_clim                  5.308658
Name: 0, dtype: object

In [12]:
len(dfs)

14939

In [17]:
df.size

298780

In [None]:
    for cyanoFile in cyanoFiles:
        df = pd.read_csv(cyanoFile)
        df = add_env_columns(df, envs)
        dfs = [df.loc[i].to_frame().T for i in range(len(df))]
        colocalizedDF  = pd.DataFrame({})
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futureObjs = executor.map(match, dfs, [api] * len(dfs), [envs] * len(dfs), [cyanoFile] * len(dfs), [len(dfs)] * len(dfs))
            for fo in futureObjs:
                if len(colocalizedDF) < 1: #e.g., this is the first file
                    colocalizedDF = fo
                else: #any subsequent files are concatenated
                    colocalizedDF = pd.concat([colocalizedDF, fo], ignore_index=True)  
        saveColocalizedCSV(colocalizedDF)

In [18]:
len(colocalizedDF)

0

In [19]:
one = match(df,api,envs,cyanoFile,len(df))


Invalid dataframe input.
Expected a single row dataframe but received 14939 rows.


SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [20]:
idx = 1

In [23]:
one = match(df[idx],api,envs,cyanoFile,len(df))

KeyError: 1

In [22]:
envs.size

AttributeError: 'dict' object has no attribute 'size'

In [29]:
pip install debugger

Note: you may need to restart the kernel to use updated packages.


In [32]:
#used this to step into the function and debug it, also need line with Tracer()() 
# from IPython.core.debugger import Tracer 
from IPython.core.debugger import Tracer 

ImportError: cannot import name 'Tracer' from 'IPython.core.debugger' (C:\Users\klongnecker\AppData\Local\miniconda3\Lib\site-packages\IPython\core\debugger.py)

In [None]:
for cyanoFile in cyanoFiles:
    df = pd.read_csv(cyanoFile)
    df = add_env_columns(df, envs)
    dfs = [df.loc[i].to_frame().T for i in range(len(df))]
    colocalizedDF  = pd.DataFrame({})
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futureObjs = executor.map(match, dfs, [api] * len(dfs), [envs] * len(dfs), [cyanoFile] * len(dfs), [len(dfs)] * len(dfs))
        embed()
        for fo in futureObjs:
            if len(colocalizedDF) < 1: #e.g., this is the first file
                colocalizedDF = fo
            else: #any subsequent files are concatenated
                colocalizedDF = pd.concat([colocalizedDF, fo], ignore_index=True)  
    saveColocalizedCSV(colocalizedDF)

Python 3.13.2 | packaged by Anaconda, Inc. | (main, Feb  6 2025, 18:49:14) [MSC v.1929 64 bit (AMD64)]
Type 'copyright', 'credits' or 'license' for more information
IPython 9.3.0 -- An enhanced Interactive Python. Type '?' for help.
Tip: You can use `%hist` to view history, see the options with `%history?`



In [1]:  match


Out[1]: <function __main__.match(df, api, envs, cyanoFile, rowCount)>



In [2]:  dfs


Out[2]: 
[                  time     lat     lon depth   cruise_ID  TOC phosphate  \
 0  1988-10-21T04:55:00  31.833 -64.167   1.0  1000108112  NaN       0.0   
 
         chl       NO3       PO4        Fe          O2        Si        PP  \
 0  0.075749  0.007559  0.023802  0.000213  213.651904  2.568182  0.002471   
 
   density_WOA_clim salinity_WOA_clim nitrate_WOA_clim phosphate_WOA_clim  \
 0         24.23414         36.458775          0.05295           0.011797   
 
   silicate_WOA_clim oxygen_WOA_clim  
 0          0.854648        4.743575  ,
                   time     lat     lon depth   cruise_ID  TOC phosphate  \
 0  1988-10-21T04:55:00  31.833 -64.167  10.0  1000108111  NaN       0.0   
 
         chl       NO3       PO4        Fe          O2        Si        PP  \
 0  0.075749  0.008065  0.023859  0.000212  213.828594  2.568457  0.002281   
 
   density_WOA_clim salinity_WOA_clim nitrate_WOA_clim phosphate_WOA_clim  \
 0        24.272935         36.464915         0.076731 

In [3]:  dfs.size


---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[3], line 1
----> 1 dfs.size

AttributeError: 'list' object has no attribute 'size'



In [4]:  dfs.len


---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[4], line 1
----> 1 dfs.len

AttributeError: 'list' object has no attribute 'len'



In [5]:  len(dfs)


Out[5]: 58989



In [6]:  cyanoFile.head


---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[6], line 1
----> 1 cyanoFile.head

AttributeError: 'str' object has no attribute 'head'



In [7]:  cyanoFile


Out[7]: './data\\tblBATS_Bottle.csv'



In [8]:  futureObjs


Out[8]: <generator object Executor.map.<locals>.result_iterator at 0x0000022A10D16110>



In [9]:  futureObjs.size


---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[9], line 1
----> 1 futureObjs.size

AttributeError: 'generator' object has no attribute 'size'



In [10]:  exit





In [None]:
 raise SystemExit("Stop execution here")

#Stick some code below this spot as a holding zone

In [None]:
#this was helpful to dig around when I was trying to find the error in the cyano_ML code

def _atomic_match(
                 spName, sourceTable, sourceVar, targetTable, targetVar, 
                 dt1, dt2, lat1, lat2, lon1, lon2, depth1, depth2, 
                 temporalTolerance, latTolerance, lonTolerance, depthTolerance
                 ):

    """
    Colocalizes the source variable (from source table) with a single target variable (from target table).
    The tolerance parameters set the matching boundaries between the source and target data sets. 
    Returns a dataframe containing the source variable joined with the target variable.
    """

    args = [spName, sourceTable, sourceVar, targetTable, targetVar, 
            dt1, dt2, lat1, lat2, lon1, lon2, depth1, depth2, 
            temporalTolerance, latTolerance, lonTolerance, depthTolerance]
    args = [str(arg) for arg in args]        
    query = "EXEC %s '%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s'" % tuple(args)
    return API().query(query)  


In [None]:
len(args) #args is a list