# Pansharpening

A fusion algorithm that upsamples the image to the resolution of the panchromatic band. For more information, see [Pansharpening](https://docs.up42.com/processing-platform/advanced/pansharpening).

## Step 1. Set up the notebook

### 1. Install prerequisites

In [1]:
%pip install requests

import requests, json, os

Note: you may need to restart the kernel to use updated packages.


### 2. Authenticate

Create a `credentials.json` file in a directory named `.up42` under your home directory by running the cell below. The path to the file will be `~/.up42/credentials.json`.

In [2]:
# Define the directory path
up42_directory = os.path.expanduser("~/.up42")

# Create the directory if it doesn't exist
if not os.path.exists(up42_directory):
    os.makedirs(up42_directory)

# Specify the file path
credentials_file_path = os.path.join(up42_directory, "credentials.json")

# Check if the file already exists before creating it
if not os.path.exists(credentials_file_path):
    # Create an empty credentials.json file
    with open(credentials_file_path, "w") as credentials_file:
        print(f"The file {credentials_file_path} has been created.")
        pass
else:
    print(f"The file {credentials_file_path} already exists.")


The file /Users/max.mustermann/.up42/credentials.json already exists.


1. Click the link above to the created file and paste the following code:
    ```json
    {
      "username": "<your-email-address>",
      "password": "<your-password>"
    }
    ```
2. Retrieve the [email address and password](https://docs.up42.com/getting-started/account/management) used for logging into the [console](https://console.up42.com/?utm_source=documentation). Use them as values for `username` and `password`.
3. Save the `credentials.json` file.
4. Check that the authentication was successful as follows:

In [3]:
# Load credentials
with open(credentials_file_path, "r") as f:
    credentials = json.load(f)

# Check that authentication works
def auth(username, password):

    response = requests.post(
        f"https://api.up42.com/oauth/token",
        data=dict(grant_type="password",
                  username=username,
                  password=password)
    )

    result = response.json()
    return result["data"]["accessToken"]

class BearerAuth(requests.auth.AuthBase):
    def __init__(self, token):
        self.token = token
    def __call__(self, r):
        r.headers["authorization"] = "Bearer " + self.token
        return r

token = auth(credentials["username"], credentials["password"])

## Step 2. Define input

Before selecting a STAC item, check the [requirements for input imagery](https://docs.up42.com/processing-platform/advanced/pansharpening#requirements-for-input-imagery).

The process is performed at a STAC item level. To run the process, you need to have a link to the STAC item you want to apply the algorithm to, in the following format: `https://api.up42.com/v2/assets/stac/collections/{collection-id}/items/{item-id}`. 

In [4]:
# If you have a STAC item link, define it here
self_href = "https://api.up42.com/v2/assets/stac/collections/{collection-id}/items/{item-id}"

If you don't already have a STAC item link, retrieve it using one of following options:

1. If you have a STAC collection ID
2. If you have an asset ID
3. If you want to search for specific STAC items

### Option 1. If you have a STAC collection ID

In [5]:
# Define a STAC collection ID
stac_collection_id = "14371401-97a3-4b23-9b55-c3ac10d47750"

In [6]:
# Define STAC search
def STAC_search(stac_collection_id):

    # Authenticate
    token = auth(credentials["username"], credentials["password"])

    # Search for item
    response = requests.get(
        f"https://api.up42.com/v2/assets/stac/collections/{stac_collection_id}/items",
        auth=BearerAuth(token),
    )

    return response

# Get the search result
search_result = STAC_search(stac_collection_id).json()

# Extract the href value of the link with the "self" relation
self_href = next(link["href"] for link in search_result["features"][0]["links"] if link["rel"] == "self")

# Save and print the workspace ID
workspace_id = search_result["features"][0]["properties"]["up42-system:workspace_id"]
print("Workspace ID:", workspace_id)

# Print the result
print("STAC item link:", self_href)


Workspace ID: a0d443a2-41e8-4995-8b54-a5cc4c448227
STAC item link: https://api.up42.com/v2/assets/stac/collections/14371401-97a3-4b23-9b55-c3ac10d47750/items/033b4a5a-c492-4eba-915e-2000a0a84049


### Option 2. If you have an asset ID

In [7]:
# Define an asset ID
asset_id = "68567134-27ad-7bd7-4b65-d61adb11fc78"

In [8]:
# Define STAC search
def STAC_search(search):

    # Authenticate
    token = auth(credentials["username"], credentials["password"])

    # Search for item
    response = requests.post(
        f"https://api.up42.com/v2/assets/stac/search",
        auth=BearerAuth(token),
        json=search,
    )

    return response

# Search by asset ID
search_by_asset_id = {
  "filter": {
        "args": [
          {
            "property": "asset_id"
          },
          asset_id
        ],
        "op": "="
      }
}

# Get the search result
search_result = STAC_search(search_by_asset_id).json()

# Extract the href value of the link with the "self" relation
self_href = next(link["href"] for link in search_result["features"][0]["links"] if link["rel"] == "self")

# Save and print the workspace ID
workspace_id = search_result["features"][0]["properties"]["up42-system:workspace_id"]
print("Workspace ID:", workspace_id)

# Print the result
print("STAC item link:", self_href)

Workspace ID: a0d443a2-41e8-4995-8b54-a5cc4c448227
STAC item link: https://api.up42.com/v2/assets/stac/collections/14371401-97a3-4b23-9b55-c3ac10d47750/items/033b4a5a-c492-4eba-915e-2000a0a84049


### Option 3. If you want to search for specific STAC items

For more information on how to search for specific STAC items, see [CQL2 filters for STAC item search](https://docs.up42.com/developers/api-assets/stac-cql).

In [9]:
# Define STAC search
def STAC_search(search):

    # Authenticate
    token = auth(credentials["username"], credentials["password"])

    # Search for item
    response = requests.post(
        f"https://api.up42.com/v2/assets/stac/search",
        auth=BearerAuth(token),
        json=search,
    )

    return response

# For example, search by tags
search = {
  "filter": {
        "args": [
          {
            "property": "tags"
          },
          ["pan-ready"]
        ],
        "op": "a_contains"
      }
}

# Get the search result
search_result = STAC_search(search).json()

# Extract the href value of the link with the "self" relation
self_href = next(link["href"] for link in search_result["features"][0]["links"] if link["rel"] == "self")

# Save and print the workspace ID
workspace_id = search_result["features"][0]["properties"]["up42-system:workspace_id"]
print("Workspace ID:", workspace_id)

# Print the result
print("STAC item link:", self_href)

Workspace ID: a0d443a2-41e8-4995-8b54-a5cc4c448227
STAC item link: https://api.up42.com/v2/assets/stac/collections/14371401-97a3-4b23-9b55-c3ac10d47750/items/033b4a5a-c492-4eba-915e-2000a0a84049


## Step 3. Check your input data

To check whether the STAC item has [panchromatic and multispectral bands](https://docs.up42.com/data/reference/spectral-bands), check its STAC assets with the `data` role.


In [10]:
# Extract the STAC assets
features = search_result.get("features", [])

if features:
    first_feature = features[0]
    assets = first_feature.get("assets", [])

    for asset_key, asset_info in assets.items():
        roles = asset_info.get("roles", [])

        # Check for "data" role
        if "data" in roles:
            title = asset_info.get("title", "")
            bands = asset_info.get("eo:bands", [],)

            # Print information
            print(title)
            print(bands, "\n")

else:
    print("No STAC assets found in the search result.")

Panchromatic data
[{'name': 'pan', 'common_name': 'pan', 'center_wavelength': 0.65, 'full_width_half_max': 0.32}] 

Multispectral data
[{'name': 'blue', 'common_name': 'blue', 'center_wavelength': 0.495, 'full_width_half_max': 0.07}, {'name': 'green', 'common_name': 'green', 'center_wavelength': 0.56, 'full_width_half_max': 0.08}, {'name': 'red', 'common_name': 'red', 'center_wavelength': 0.65, 'full_width_half_max': 0.08}, {'name': 'nir', 'common_name': 'nir', 'center_wavelength': 0.83, 'full_width_half_max': 0.12}] 



## Step 4. Run a process

### 1. Prepare your request

The following body parameters are required:

- **Input imagery**: already stored in `self_href`.
- **Output title**: define it below.

You can also specify [optional parameters](https://docs.up42.com/processing-platform/advanced/pansharpening#input-parameters).


In [12]:
# Define the process
process_id = "pansharpening"

# Define required parameters
input_imagery = self_href
output_title = "Pansharpening notebook"

# Define optional parameters
# grey_weights = [
#    {"band": "blue", "weight": 0.2},
#    {"band": "green", "weight": 0.34},
#    {"band": "red", "weight": 0.23}
# ]

# Define job composition
job_input = {
    "inputs": {
        "item": input_imagery,
        "title": output_title
        }
    }

### 2. Validate your input

Check the validity of your input against the process schema to ensure that the job can be created with these parameters.

In [13]:
def validate_job(job_input, process_id):

    # Authenticate
    token = auth(credentials["username"], credentials["password"])

    # Make the POST request
    response = requests.post(
        url=f"https://api.up42.com/v2/processing/processes/{process_id}/validation",
        auth=BearerAuth(token),
        json=job_input
    )

    return response

# Print the response
validation = validate_job(job_input, process_id)

if validation.status_code == 200:
    print("The input is valid.")
else:
    print(f"Status code: {validation.status_code}")
    print(f"Status code: {validation.text}")

The input is valid.


### 3. Estimate the cost

Check the cost of a processing job before creating it.

You will receive an overview of the overall UP42 credit amount that will be deducted from your credit balance if you decide to run the selected process.

In [14]:
def estimate_job(item_input, process_id):

    # Authenticate
    token = auth(credentials["username"], credentials["password"])

    # Make the POST request
    response = requests.post(
        url=f"https://api.up42.com/v2/processing/processes/{process_id}/cost",
        auth=BearerAuth(token),
        json=item_input
    )

    return response

# Define the STAC item, no other parameters are needed
item_input = {
    "inputs": {
        "item": input_imagery
        }
    }

# Print the response
estimation = estimate_job(item_input, process_id)

if estimation.status_code == 200:
    totalCredits = estimation.json().get("totalCredits")
    if totalCredits is not None:
        print(f"The cost is {totalCredits} UP42 credits. The size of the STAC item is {estimation.json().get('totalSize')} {estimation.json().get('unit')}.")
    else:
        print("No cost found.")
else:
    print(f"Status code: {estimation.status_code}")
    print(f"Status text: {estimation.text}")

The cost is 0 UP42 credits. The size of the STAC item is 0.1502011413661941 SQ_KM.


### 4. Create a new processing job

Create a new processing job by running the process.

**Warning:** Credits will be deducted upon successful completion of the created processing job. The transaction can't be reversed.

In [15]:
def execute_job(job_input, process_id, workspace_id):

    # Authenticate
    token = auth(credentials["username"], credentials["password"])

    # Make the POST request
    response = requests.post(
        url=f"https://api.up42.com/v2/processing/processes/{process_id}/execution",
        params={"workspaceId": workspace_id},
        auth=BearerAuth(token),
        json=job_input
    )

    return response

# Print the response
execution = execute_job(job_input, process_id, workspace_id)

if execution.status_code == 201:
    print("A new processing job has been created.")
    job_id = json.loads(execution.text)["jobID"]
    print("The job ID:", job_id)
else:
    print(f"Status code: {execution.status_code}")
    print(f"Status code: {execution.text}")

A new processing job has been created.
The job ID: 55434287-31bc-3ad7-1a63-d61aac11ac55


## Step 5. Monitor your processing job

Check the [status](https://docs.up42.com/developers/api-processing/processing-monitor#job-statuses) of your job.

In [16]:
def get_job_status(job_id):

    # Authenticate
    token = auth(credentials["username"], credentials["password"])

    # Get job status
    response = requests.get(
        url=f"https://api.up42.com/v2/processing/jobs/{job_id}",
        auth=BearerAuth(token)
    )

    return response

# Print the response
job_status = get_job_status(job_id).json()
print("The status of the job:", job_status["status"])
print("The job ID:", job_id)

The status of the job: successful
The job ID: 55434287-31bc-3ad7-1a63-d61aac11ac55


The `successful` status means you can access processing results.

## Step 6. Download processing results

### 1. Retrieve download links

In [17]:
# Search by job ID
search_by_job_id = {
  "filter": {
        "args": [
          {
            "property": "job_id"
          },
          job_id
        ],
        "op": "="
      }
}

# Get the search result
search_result = STAC_search(search_by_job_id).json()

if "features" in search_result and len(search_result["features"]) > 0:
    features = search_result["features"][0]

    # Check if "assets" key exists in the first feature
    if "assets" in features:
        assets = features["assets"]

        # Variables to store title and href values of the first asset with "data" role
        data_asset_title = "No title"
        data_asset_href = None

       # Iterate over assets and find the first one with "data" role
        for asset_key, asset_info in assets.items():
            roles = asset_info.get("roles", [])
            if "data" in roles:
                data_asset_title = asset_info.get("title", "No title")  # Default value if title is not present
                data_asset_href = asset_info.get("href")

                # Stop searching after the first asset with "data" role is found
                break

        # Check if any asset with "data" role was found
        if data_asset_href is not None:
            print(f"Title: {data_asset_title}\nHref: {data_asset_href}\n\n")
        else:
            print("No data assets found.")
    else:
        print("No assets found in \"features\".")
else:
    print("No \"features\" found.")

Title: Pansharpened multispectral data
Href: https://api.up42.com/v2/assets/ea36dee9-fed6-457e-8400-2c20ebd30f44




### 2. Download assets

In [18]:
output_directory = "/Users/max.mustermann/Desktop/" # Add an existing directory

In [19]:
def download_assets(data_asset_href, output_directory):

    # Authenticate
    token = auth(credentials["username"], credentials["password"])

    # Get the asset
    response = requests.get(
        url=data_asset_href,
        auth=BearerAuth(token),
        stream=True  # Set stream=True to download large files efficiently
    )

    # Check if the request was successful
    if response.status_code == 200:
        # Extract the file name from the URL
        file_name = os.path.basename(data_asset_href)

        # Append ".tiff" extension to the file name
        file_name_with_extension = f"{file_name}.tiff"

        # Combine the output directory and file name with extension
        output_file_path = os.path.join(output_directory, file_name_with_extension)

        # Open the file for writing in binary mode
        with open(output_file_path, "wb") as output_file:
            # Iterate over the content in chunks and write to the file
            for chunk in response.iter_content(chunk_size=128):
                output_file.write(chunk)

        print(f"Asset downloaded successfully: {output_file_path}\nVisualize the asset using QGIS: https://docs.up42.com/help/visualize")
    else:
        print(f"Failed to download.\nStatus code: {response.status_code}\nText: {response.text}")

download_assets(data_asset_href, output_directory)


Asset downloaded successfully: /Users/max.mustermann/Desktop/ea36dee9-fed6-457e-8400-2c20ebd30f44.tiff
Visualize the asset using QGIS: https://docs.up42.com/help/visualize
