In [37]:
import geopandas as gpd
# load bursts we want to process
bursts_to_process = gpd.read_file("/data/users/Public/jonathanbahlmann/coherence-docs/src/processing.geojson")

# go per scene to process, dissolve to extract ids
combinations = bursts_to_process.dissolve(["id", "subswath", "ref_scene"], as_index = False)
# print(combinations.head())
array_of_frames = [v for k, v in combinations.loc[:,["id", "ref_scene", "subswath", "sce_min_burst", "sce_max_burst", "ref_min_burst", "ref_max_burst", "processing_status", "path", "ref_path"]].groupby(['id', "ref_scene"])]
#print(array_of_frames)

# print(combinations.head())
# combinations.loc[:,["id", "ref_scene", "subswath", "sce_min_burst", "sce_max_burst", "ref_min_burst", "ref_max_burst", "processing_status"]]
# combinations.groupby(["id", "ref_scene"])

# each scene and ref_scene could be added together with merge, but not across ref_scenes probably

#print(array_of_frames.head())

# bursts_to_process.iloc[0].loc[["subswath", "burst", "id", "path", "sensor", "polarizations", "mode", "orbit_dicrection", "rel_orbit", "regular_burst_pattern", "ref_scene", "ref_min_burst", "ref_max_burst", "processing_status", "sce_min_burst", "sce_max_burst"]].to_dict()


# out into the spark parallelizer: array of dicts? what does it need: path to scene, which subswaths to process with which ref scenes, maybe like path..., subswaths

In [28]:
int(4)

4

In [38]:
from pyroSAR.snap.auxil import parse_recipe, parse_node
from pyroSAR.snap.auxil import gpt
from pyroSAR.snap.auxil import groupbyWorkers

output_dir = "/data/users/Public/jonathanbahlmann/spark_results/"

continueOnFailAOF = True

# this iteration is done by spark
for df in array_of_frames:
    # this needs to be put into each worker node
    name = str(df.iloc[0]["id"]) + "_" + df.iloc[0]["ref_scene"] + "_" + str(int(df.iloc[0]["sce_min_burst"])) + "_" + str(int(df.iloc[0]["sce_max_burst"]))
    workflow_filename = name + ".xml"
    out_filename = name + "_res"
    
    # each of these frames that I get here are similar to the pyroXX_workflow.py
    processing_dict = {"IW1": {"min_sce": None, "max_sce": None, "min_ref": None, "max_ref": None}, 
                       "IW2": {"min_sce": None, "max_sce": None, "min_ref": None, "max_ref": None}, 
                       "IW3": {"min_sce": None, "max_sce": None, "min_ref": None, "max_ref": None}}
    
    for i, row in df.iterrows():
        swath = row["subswath"]
        processing_dict[swath]["min_sce"] = row["sce_min_burst"]
        processing_dict[swath]["max_sce"] = row["sce_max_burst"]
        processing_dict[swath]["min_ref"] = row["ref_min_burst"]
        processing_dict[swath]["max_ref"] = row["ref_max_burst"]

    print(processing_dict)
    
    workflow = parse_recipe('blank')

    # reference
    read = parse_node("Read")
    read.parameters["file"] = df["ref_path"]
    read.parameters["formatName"] = "SENTINEL-1"
    workflow.insert_node(read)

    # secondary
    read2 = parse_node("Read")
    read2.parameters["file"] = df["path"]
    read2.parameters["formatName"] = "SENTINEL-1"
    workflow.insert_node(read2)

    merge_list = []
    
    # check IW1, empty?
    if processing_dict["IW1"]["max_sce"] is None or processing_dict["IW1"]["max_ref"] is None:
        print("[PROCESSING IW1]: No bursts needed from IW1")
    else:
        print("[PROCESSING IW1]: ", processing_dict["IW1"])
        # TopSAR Split
        split = parse_node("TOPSAR-Split")
        split.parameters["subswath"] = "IW1"
        split.parameters["selectedPolarisations"] = ["VV"]
        split.parameters["firstBurstIndex"] = processing_dict["IW1"]["min_ref"]
        split.parameters["lastBurstIndex"] = processing_dict["IW1"]["max_ref"]
        workflow.insert_node(split, before = read.id, resetSuccessorSource = False)

        # TopSAR Split 2
        split2 = parse_node("TOPSAR-Split")
        split2.parameters["subswath"] = "IW1"
        split2.parameters["selectedPolarisations"] = ["VV"]
        split2.parameters["firstBurstIndex"] = processing_dict["IW1"]["min_sce"]
        split2.parameters["lastBurstIndex"] = processing_dict["IW1"]["max_sce"]
        workflow.insert_node(split2, before = read2.id, resetSuccessorSource = False)

        # apply orbit file 1
        aof = parse_node("Apply-Orbit-File")
        aof.parameters["orbitType"] = "Sentinel Restituted (Auto Download)"
        aof.parameters["polyDegree"] = 3
        aof.parameters["continueOnFail"] = continueOnFailAOF
        workflow.insert_node(aof, before = split.id)

        # apply orbit file 2
        aof2 = parse_node("Apply-Orbit-File")
        aof2.parameters["orbitType"] = "Sentinel Restituted (Auto Download)"
        aof2.parameters["polyDegree"] = 3
        aof2.parameters["continueOnFail"] = continueOnFailAOF
        workflow.insert_node(aof2, before = split2.id)

        # Back-Geocoding
        geocode = parse_node("Back-Geocoding")
        geocode.parameters["demName"] = "SRTM 1Sec HGT"
        workflow.insert_node(geocode, before = [aof.id, aof2.id])

        # deburst
        deb = parse_node("TOPSAR-Deburst")
        workflow.insert_node(deb, before = geocode.id)

        merge_list.append(deb.id)
    # check IW2, empty?
    if processing_dict["IW2"]["max_sce"] is None or processing_dict["IW2"]["max_ref"] is None:
        print("[PROCESSING IW2]: No bursts needed from IW2")
    else:
        print("[PROCESSING IW2]: ", processing_dict["IW2"])
        # TopSAR Split
        split3 = parse_node("TOPSAR-Split")
        split3.parameters["subswath"] = "IW2"
        split3.parameters["selectedPolarisations"] = ["VV"]
        split3.parameters["firstBurstIndex"] = processing_dict["IW2"]["min_ref"]
        split3.parameters["lastBurstIndex"] = processing_dict["IW2"]["max_ref"]
        workflow.insert_node(split3, before = read.id, resetSuccessorSource = False)

        # TopSAR Split 2
        split4 = parse_node("TOPSAR-Split")
        split4.parameters["subswath"] = "IW2"
        split4.parameters["selectedPolarisations"] = ["VV"]
        split4.parameters["firstBurstIndex"] = processing_dict["IW2"]["min_sce"]
        split4.parameters["lastBurstIndex"] = processing_dict["IW2"]["max_sce"]
        workflow.insert_node(split4, before = read2.id, resetSuccessorSource = False)

        # apply orbit file 1
        aof3 = parse_node("Apply-Orbit-File")
        aof3.parameters["orbitType"] = "Sentinel Restituted (Auto Download)"
        aof3.parameters["polyDegree"] = 3
        aof3.parameters["continueOnFail"] = continueOnFailAOF
        workflow.insert_node(aof3, before = split3.id)

        # apply orbit file 2
        aof4 = parse_node("Apply-Orbit-File")
        aof4.parameters["orbitType"] = "Sentinel Restituted (Auto Download)"
        aof4.parameters["polyDegree"] = 3
        aof4.parameters["continueOnFail"] = continueOnFailAOF
        workflow.insert_node(aof4, before = split4.id)

        # Back-Geocoding
        geocode2 = parse_node("Back-Geocoding")
        geocode2.parameters["demName"] = "SRTM 1Sec HGT"
        workflow.insert_node(geocode2, before = [aof3.id, aof4.id])

        # deburst
        deb2 = parse_node("TOPSAR-Deburst")
        workflow.insert_node(deb2, before = geocode2.id)

        merge_list.append(deb2.id)
    # check IW3, empty?
    if processing_dict["IW3"]["max_sce"] is None or processing_dict["IW3"]["max_ref"] is None:
        print("[PROCESSING IW3]: No bursts needed from IW3")
    else:
        print("[PROCESSING IW3]: ", processing_dict["IW3"])
        # TopSAR Split
        split5 = parse_node("TOPSAR-Split")
        split5.parameters["subswath"] = "IW3"
        split5.parameters["selectedPolarisations"] = ["VV"]
        split5.parameters["firstBurstIndex"] = processing_dict["IW3"]["min_ref"]
        split5.parameters["lastBurstIndex"] = processing_dict["IW3"]["max_ref"]
        workflow.insert_node(split5, before = read.id, resetSuccessorSource = False)

        # TopSAR Split 2
        split6 = parse_node("TOPSAR-Split")
        split6.parameters["subswath"] = "IW3"
        split6.parameters["selectedPolarisations"] = ["VV"]
        split6.parameters["firstBurstIndex"] = processing_dict["IW3"]["min_sce"]
        split6.parameters["lastBurstIndex"] = processing_dict["IW3"]["max_sce"]
        workflow.insert_node(split6, before = read2.id, resetSuccessorSource = False)

        # apply orbit file 1
        aof5 = parse_node("Apply-Orbit-File")
        aof5.parameters["orbitType"] = "Sentinel Restituted (Auto Download)"
        aof5.parameters["polyDegree"] = 3
        aof5.parameters["continueOnFail"] = continueOnFailAOF
        workflow.insert_node(aof5, before = split5.id)

        # apply orbit file 2
        aof6 = parse_node("Apply-Orbit-File")
        aof6.parameters["orbitType"] = "Sentinel Restituted (Auto Download)"
        aof6.parameters["polyDegree"] = 3
        aof6.parameters["continueOnFail"] = continueOnFailAOF
        workflow.insert_node(aof6, before = split6.id)

        # Back-Geocoding
        geocode3 = parse_node("Back-Geocoding")
        geocode3.parameters["demName"] = "SRTM 1Sec HGT"
        workflow.insert_node(geocode3, before = [aof5.id, aof6.id])

        # deburst
        deb3 = parse_node("TOPSAR-Deburst")
        workflow.insert_node(deb3, before = geocode3.id)

        merge_list.append(deb3.id)
        
    # hope this works even with only one subswath
    merge = parse_node("TOPSAR-Merge")
    workflow.insert_node(merge, before = merge_list)
                        
    write = parse_node("Write")
    write.parameters["file"] = out_filename
    write.parameters["formatName"] = "BEAM-DIMAP"
    workflow.insert_node(write, before = merge.id)
                        
    workflow.write(output_dir + workflow_filename)
                        
    groups = groupbyWorkers(output_dir + workflow_filename, n=1)
    print(groups)

{'IW1': {'min_sce': 1.0, 'max_sce': 5.0, 'min_ref': 5.0, 'max_ref': 9.0}, 'IW2': {'min_sce': 1.0, 'max_sce': 5.0, 'min_ref': 5.0, 'max_ref': 9.0}, 'IW3': {'min_sce': 1.0, 'max_sce': 5.0, 'min_ref': 5.0, 'max_ref': 9.0}}
[PROCESSING IW1]:  {'min_sce': 1.0, 'max_sce': 5.0, 'min_ref': 5.0, 'max_ref': 9.0}
[PROCESSING IW2]:  {'min_sce': 1.0, 'max_sce': 5.0, 'min_ref': 5.0, 'max_ref': 9.0}
[PROCESSING IW3]:  {'min_sce': 1.0, 'max_sce': 5.0, 'min_ref': 5.0, 'max_ref': 9.0}
[['Read', 'TOPSAR-Split (5)'], ['Apply-Orbit-File (5)'], ['Read', 'TOPSAR-Split (3)'], ['Apply-Orbit-File (3)'], ['Read', 'TOPSAR-Split'], ['Apply-Orbit-File'], ['Read (2)', 'TOPSAR-Split (6)'], ['Apply-Orbit-File (6)'], ['Back-Geocoding (3)'], ['TOPSAR-Deburst (3)'], ['Read (2)', 'TOPSAR-Split (4)'], ['Apply-Orbit-File (4)'], ['Back-Geocoding (2)'], ['TOPSAR-Deburst (2)'], ['Read (2)', 'TOPSAR-Split (2)'], ['Apply-Orbit-File (2)'], ['Back-Geocoding'], ['TOPSAR-Deburst'], ['TOPSAR-Merge', 'Write']]
{'IW1': {'min_sce': 6.0,

In [26]:
dic["path"]
dic["subswaths"][0]["IW1"]["ref"]
dic["subswaths"][2] = {"test": "hi"}

IndexError: list assignment index out of range

In [38]:
d = {"path", "fg" }
d["path"] = "fr"
d

TypeError: 'set' object does not support item assignment

In [16]:
start_date = "2020/12/31"
#str(root / start_date.strftime("%Y/%m/%d"))
from pathlib import Path
import datetime
import glob
start_date = datetime.datetime.strptime("2020/12/30", "%Y/%m/%d")
end_date = datetime.datetime.strptime("2021/01/03", "%Y/%m/%d")

root = Path("/data/MTDA/CGS_S1/CGS_S1_SLC_L1/IW/DV/")
start_dir = str(root / start_date.strftime("%Y/%m/%d"))
end_dir = str(root / end_date.strftime("%Y/%m/%d"))

list_of_products = []
for year in range(start_date.year, end_date.year + 1):
    year_path = root / str(year)
    for day_dir in year_path.glob("[01][0123456789]/[0123][0123456789]"):
        if start_dir <= str(day_dir) < end_dir:
            print(day_dir)

/data/MTDA/CGS_S1/CGS_S1_SLC_L1/IW/DV/2020/12/30
/data/MTDA/CGS_S1/CGS_S1_SLC_L1/IW/DV/2020/12/31
/data/MTDA/CGS_S1/CGS_S1_SLC_L1/IW/DV/2021/01/02
/data/MTDA/CGS_S1/CGS_S1_SLC_L1/IW/DV/2021/01/01


In [3]:
from src.util import list_products_by_time
start = "2017/06/28"
end = "2017/07/03"
path = "/data/MTDA/CGS_S1/CGS_S1_SLC_L1/IW/DV/"
expected = 18 # usually 18
products = list_products_by_time(start, end, path)
# products

import pathlib
 
path = pathlib.Path('.')
full_path = path.absolute()
 
my_path = full_path.as_posix()
full_path
 

PosixPath('/data/users/Public/jonathanbahlmann/coherence-docs')

In [20]:
from src.util import search_for_reference, create_gpd_for_scene
import geopandas as gpd

scene_gpd = create_gpd_for_scene(path = "/data/MTDA/CGS_S1/CGS_S1_SLC_L1/IW/DV/2021/04/17/S1B_IW_SLC__1SDV_20210417T174054_20210417T174130_026510_032A4B_DA8B/S1B_IW_SLC__1SDV_20210417T174054_20210417T174130_026510_032A4B_DA8B.zip")
ref_gpd = gpd.read_file("src/reference_bursts.geojson")
references = search_for_reference(scene_gpd, ref_gpd)
print(references)
assert references == []

{}


  right.crs))


AssertionError: 