In [3]:
import os

def create_cognite_transformation_files(base_path):
    """
    Guides the user to create the necessary files for Cognite Transformations
    based on the provided instructions.

    Args:
        base_path (str): The base file path where the transformations directory
                         structure will be created (e.g., /Users/yourname/projects/ice-cream-dataops).
    """
    # Validate base path
    if not os.path.exists(base_path):
        print(f"Error: The provided base path '{base_path}' does not exist.")
        return
    if not os.path.isdir(base_path):
        print(f"Error: The provided base path '{base_path}' is not a directory.")
        return

    transformations_dir_relative = os.path.join('modules', 'bootcamp', 'ice_cream_api', 'transformations')
    full_transformations_path = os.path.join(base_path, transformations_dir_relative)

    print(f"\nAttempting to create directory: {full_transformations_path}")
    try:
        os.makedirs(full_transformations_path, exist_ok=True)
        print(f"Directory '{full_transformations_path}' ensured to exist.")
    except OSError as e:
        print(f"Error creating directory '{full_transformations_path}': {e}")
        return

    # --- Define content for create_asset_hierarchy transformation ---
    create_asset_hierarchy_yaml_content = """externalId: create_asset_hierarchy
name: Create Cognite Asset Hierarchy
destination:
  dataModel:
    space: cdf_cdm
    externalId: CogniteCore
    version: v1
    destinationType: CogniteAsset
  instanceSpace: icapi_dm_space
  type: instances
ignoreNullFields: true
isPublic: true
conflictMode: upsert
authentication:
  clientId: {{ icapi_extractors_client_id }}
  clientSecret: {{ icapi_extractors_client_secret }}
  tokenUri: {{ tokenUri }}
  cdfProjectName: {{ cdfProjectName }}
  scopes: {{ scopes }}
"""

    create_asset_hierarchy_sql_content = """-- SQL query for create_asset_hierarchy.Transformation.sql
-- (Replace this with your actual SQL query from Fusion UI)
-- Example placeholder SQL:
SELECT
    'asset_root' AS externalId,
    NULL AS parentExternalId,
    'Root Asset' AS name,
    'root' AS type
UNION ALL
SELECT
    'ice_cream_factory_1' AS externalId,
    'asset_root' AS parentExternalId,
    'Ice Cream Factory 1' AS name,
    'factory' AS type
UNION ALL
SELECT
    'machine_a_factory_1' AS externalId,
    'ice_cream_factory_1' AS parentExternalId,
    'Machine A' AS name,
    'machine' AS type;
"""

    create_asset_hierarchy_schedule_content = """
# We recommend running the schedule every night at the 0 hour and the minute should be your project number.
# Example: If your project number is 42, set minute: 42
name: create_asset_hierarchy_schedule
externalId: create_asset_hierarchy_schedule
transformationExternalId: create_asset_hierarchy
schedule:
  interval:
    unit: days
    value: 1
  time:
    hour: 0
    minute: 0 # IMPORTANT: Change this to your project number
"""

    # --- Define content for contextualize_ts_assets transformation ---
    contextualize_ts_assets_yaml_content = """externalId: contextualize_ts_assets
name: Contextualize TimeSeries and Assets
destination:
  dataModel:
    space: cdf_cdm
    externalId: CogniteCore
    version: v1
    destinationType: CogniteTimeSeries
  type: instances
ignoreNullFields: true
conflictMode: upsert
authentication:
  clientId: {{ icapi_extractors_client_id }}
  clientSecret: {{ icapi_extractors_client_secret }}
  tokenUri: {{ tokenUri }}
  cdfProjectName: {{ cdfProjectName }}
  scopes: {{ scopes }}
"""

    contextualize_ts_assets_sql_content = """select
  timeseries.externalId,
  array(node_reference("icapi_dm_space", assets.externalId)) as assets,
  timeseries.isStep,
  timeseries.type,
  timeseries.space
from
  cdf_data_models("cdf_cdm", "CogniteCore", "v1", "CogniteTimeSeries") as timeseries
left join
  cdf_data_models("cdf_cdm", "CogniteCore", "v1", "CogniteAsset") as assets
ON
  split_part(timeseries.externalId, ":", 1) = assets.externalId
"""

    files_to_create = {
        "create_asset_hierarchy.Transformation.yaml": create_asset_hierarchy_yaml_content,
        "create_asset_hierarchy.Transformation.sql": create_asset_hierarchy_sql_content,
        "create_asset_hierarchy.schedule.yaml": create_asset_hierarchy_schedule_content,
        "contextualize_ts_assets.Transformation.yaml": contextualize_ts_assets_yaml_content,
        "contextualize_ts_assets.Transformation.sql": contextualize_ts_assets_sql_content,
    }

    for filename, content in files_to_create.items():
        file_path = os.path.join(full_transformations_path, filename)
        print(f"Creating file: {file_path}")
        try:
            with open(file_path, 'w') as f:
                f.write(content)
            print(f"Successfully created '{filename}'.")
        except IOError as e:
            print(f"Error writing to file '{file_path}': {e}")

    print("\nAll specified files and directories have been processed.")
    print("Remember to replace the placeholder SQL query in 'create_asset_hierarchy.Transformation.sql' with your actual query.")
    print("Also, adjust the 'minute' in 'create_asset_hierarchy.schedule.yaml' to your project number.")




In [14]:
# --- How to use this in a Jupyter Notebook ---
# Define your base path here:
my_base_path = r"C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops" # <--- IMPORTANT: Change this to your actual path

# Call the function with your base path
create_cognite_transformation_files(my_base_path)


Attempting to create directory: C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops\modules\bootcamp\ice_cream_api\transformations
Directory 'C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops\modules\bootcamp\ice_cream_api\transformations' ensured to exist.
Creating file: C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops\modules\bootcamp\ice_cream_api\transformations\create_asset_hierarchy.Transformation.yaml
Successfully created 'create_asset_hierarchy.Transformation.yaml'.
Creating file: C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops\modules\bootcamp\ice_cream_api\transformations\create_asset_hierarchy.Transformation.sql
Successfully created 'create_asset_hierarchy.Transformation.sql'.
Creating file: C:\Users\P

In [13]:
import os

def create_cognite_toolkit_files(base_path):
    """
    Guides the user to create the necessary files for Cognite Transformations and Workflows
    based on the provided instructions.

    Args:
        base_path (str): The base file path where the directory structure will be created
                         (e.g., /Users/yourname/projects/ice-cream-dataops).
    """
    # Validate base path
    if not os.path.exists(base_path):
        print(f"Error: The provided base path '{base_path}' does not exist.")
        return
    if not os.path.isdir(base_path):
        print(f"Error: The provided base path '{base_path}' is not a directory.")
        return

    # --- Define paths ---
    ice_cream_api_dir_relative = os.path.join('modules', 'bootcamp', 'ice_cream_api')
    full_ice_cream_api_path = os.path.join(base_path, ice_cream_api_dir_relative)

    transformations_dir = os.path.join(full_ice_cream_api_path, 'transformations')
    workflows_dir = os.path.join(full_ice_cream_api_path, 'workflows')

    # --- Ensure directories exist ---
    for directory in [transformations_dir, workflows_dir]:
        print(f"\nAttempting to create directory: {directory}")
        try:
            os.makedirs(directory, exist_ok=True)
            print(f"Directory '{directory}' ensured to exist.")
        except OSError as e:
            print(f"Error creating directory '{directory}': {e}")
            return

    # --- Content for Transformations ---
    create_asset_hierarchy_yaml_content = """externalId: create_asset_hierarchy
name: Create Cognite Asset Hierarchy
destination:
  dataModel:
    space: cdf_cdm
    externalId: CogniteCore
    version: v1
    destinationType: CogniteAsset
  instanceSpace: icapi_dm_space
  type: instances
ignoreNullFields: true
isPublic: true
conflictMode: upsert
authentication:
  clientId: {{ icapi_extractors_client_id }}
  clientSecret: {{ icapi_extractors_client_secret }}
  tokenUri: {{ tokenUri }}
  cdfProjectName: {{ cdfProjectName }}
  scopes: {{ scopes }}
"""

    create_asset_hierarchy_sql_content = """-- SQL query for create_asset_hierarchy.Transformation.sql
-- IMPORTANT: Replace this with your actual SQL query from Fusion UI or your source.
-- Example placeholder SQL:
SELECT
    'asset_root' AS externalId,
    NULL AS parentExternalId,
    'Root Asset' AS name,
    'root' AS type
UNION ALL
SELECT
    'ice_cream_factory_1' AS externalId,
    'asset_root' AS parentExternalId,
    'Ice Cream Factory 1' AS name,
    'factory' AS type
UNION ALL
SELECT
    'machine_a_factory_1' AS externalId,
    'ice_cream_factory_1' AS parentExternalId,
    'Machine A' AS name,
    'machine' AS type;
"""

    create_asset_hierarchy_schedule_content = """
# We recommend running the schedule every night at the 0 hour and the minute should be your project number.
# Example: If your project number is 42, set minute: 42
name: create_asset_hierarchy_schedule
externalId: create_asset_hierarchy_schedule
transformationExternalId: create_asset_hierarchy
schedule:
  interval:
    unit: days
    value: 1
  time:
    hour: 0
    minute: 0 # IMPORTANT: Change this to your project number
"""

    contextualize_ts_assets_yaml_content = """externalId: contextualize_ts_assets
name: Contextualize TimeSeries and Assets
destination:
  dataModel:
    space: cdf_cdm
    externalId: CogniteCore
    version: v1
    destinationType: CogniteTimeSeries
  type: instances
ignoreNullFields: true
conflictMode: upsert
authentication:
  clientId: {{ icapi_extractors_client_id }}
  clientSecret: {{ icapi_extractors_client_secret }}
  tokenUri: {{ tokenUri }}
  cdfProjectName: {{ cdfProjectName }}
  scopes: {{ scopes }}
"""

    contextualize_ts_assets_sql_content = """select
  timeseries.externalId,
  array(node_reference("icapi_dm_space", assets.externalId)) as assets,
  timeseries.isStep,
  timeseries.type,
  timeseries.space
from
  cdf_data_models("cdf_cdm", "CogniteCore", "v1", "CogniteTimeSeries") as timeseries
left join
  cdf_data_models("cdf_cdm", "CogniteCore", "v1", "CogniteAsset") as assets
ON
  split_part(timeseries.externalId, ":", 1) = assets.externalId
"""

    # --- Content for Workflows ---
    wf_icapi_data_pipeline_workflow_content = """externalId: wf_icapi_data_pipeline
description: A workflow to orchestrate the ice cream API data pipeline, including asset hierarchy creation, time series contextualization, data extraction, and OEE calculation.
"""

    wf_icapi_data_pipeline_workflow_version_content = """workflowExternalId: wf_icapi_data_pipeline
version: '1'
workflowDefinition:
  tasks:
  - externalId: create_asset_hierarchy_task
    type: transformation
    parameters:
      transformation:
        externalId: create_asset_hierarchy
    retries: 3
    timeout: 3600
    onFailure: abortWorkflow
  - externalId: contextualize_ts_assets_task
    type: transformation
    parameters:
      transformation:
        externalId: contextualize_ts_assets
    retries: 3
    timeout: 3600
    onFailure: abortWorkflow
    dependsOn:
      - externalId: create_asset_hierarchy_task
  - externalId: icapi_datapoints_extractor_task
    type: function
    parameters:
      function:
        externalId: icapi_datapoints_extractor
        data: { "hours": 1 } # As per example in instructions
    retries: 3
    timeout: 3600
    onFailure: abortWorkflow
    dependsOn:
      - externalId: contextualize_ts_assets_task
  - externalId: oee_timeseries_task
    type: function
    parameters:
      function:
        externalId: oee_timeseries
        data: { "start_time": "24h-ago" } # Placeholder data, adjust as needed for oee_timeseries function
    retries: 3
    timeout: 3600
    onFailure: abortWorkflow
    dependsOn:
      - externalId: icapi_datapoints_extractor_task
"""

    wf_icapi_data_pipeline_workflow_trigger_content = """externalId: icapi_trigger
triggerRule:
  triggerType: schedule
  cronExpression: '*/15 * * * *' # Runs every 15 minutes
workflowExternalId: wf_icapi_data_pipeline
workflowVersion: '1'
authentication:
  clientId: {{ icapi_trigger_client_id }}
  clientSecret: {{ icapi_trigger_client_secret }}
"""

    # --- Files to create mapping ---
    files_to_create = {
        os.path.join(transformations_dir, "create_asset_hierarchy.Transformation.yaml"): create_asset_hierarchy_yaml_content,
        os.path.join(transformations_dir, "create_asset_hierarchy.Transformation.sql"): create_asset_hierarchy_sql_content,
        os.path.join(transformations_dir, "create_asset_hierarchy.schedule.yaml"): create_asset_hierarchy_schedule_content,
        os.path.join(transformations_dir, "contextualize_ts_assets.Transformation.yaml"): contextualize_ts_assets_yaml_content,
        os.path.join(transformations_dir, "contextualize_ts_assets.Transformation.sql"): contextualize_ts_assets_sql_content,
        os.path.join(workflows_dir, "wf_icapi_data_pipeline.WorkFlow.yaml"): wf_icapi_data_pipeline_workflow_content,
        os.path.join(workflows_dir, "wf_icapi_data_pipeline.WorkflowVersion.yaml"): wf_icapi_data_pipeline_workflow_version_content,
        os.path.join(workflows_dir, "wf_icapi_data_pipeline.WorkflowTrigger.yaml"): wf_icapi_data_pipeline_workflow_trigger_content,
    }

    for file_path, content in files_to_create.items():
        print(f"Creating file: {file_path}")
        try:
            with open(file_path, 'w') as f:
                f.write(content)
            print(f"Successfully created '{os.path.basename(file_path)}'.")
        except IOError as e:
            print(f"Error writing to file '{file_path}': {e}")

    print("\nAll specified files and directories have been processed.")
    print("\n--- Important Post-Processing Steps ---")
    print("1. Replace the placeholder SQL query in 'create_asset_hierarchy.Transformation.sql' with your actual query.")
    print("2. Adjust the 'minute' in 'create_asset_hierarchy.schedule.yaml' to your project number for scheduling.")
    print("3. Review the 'data' parameters for 'icapi_datapoints_extractor_task' and 'oee_timeseries_task' in 'wf_icapi_data_pipeline.WorkflowVersion.yaml' and adjust them as per your function requirements.")
    print("4. Ensure the authentication client IDs and secrets in all YAML files (especially workflow trigger) are correctly configured in your environment.")





In [None]:
# --- How to use this in a Jupyter Notebook ---
# Define your base path here:
#my_base_path = "/Users/yourname/projects/ice-cream-dataops" # <--- IMPORTANT: Change this to your actual path

# Call the function with your base path
create_cognite_toolkit_files(my_base_path)

#### Functions

In [11]:
import os

def explain_cognite_function_structure_and_local_testing(base_path):
    """
    Explains the structure of a Cognite Function (handler.py) and how to test it locally
    using the CDF Toolkit.

    Args:
        base_path (str): The base file path (e.g., C:\\Users\\Prafull Ghare\\...\\ice-cream-dataops).
                         This is used for context to show the expected location of handler.py.
                         No files are created or modified by this function.
    """
    print("--- Understanding Cognite Functions and Local Testing ---")

    # Construct the expected path for the function's handler.py
    function_dir = os.path.join(
        base_path,
        'modules',
        'bootcamp',
        'ice_cream_api',
        'functions',
        'icapi_datapoints_extractor'
    )

    print(f"\nBased on previous steps, your 'icapi_datapoints_extractor' function directory is expected to be at:")
    print(f"'{function_dir}'")

    # Check if the directory exists (from previous script's execution)
    if not os.path.isdir(function_dir):
        print(f"\nWARNING: The directory '{function_dir}' was not found.")
        print("Please ensure the previous script that created the function directories ran successfully.")
        return

    

# --- How to use this in a Jupyter Notebook ---
# IMPORTANT: Replace this with the actual base path of your 'ice-cream-dataops' project.
# This path should be the same as what you used for previous script executions.



In [12]:
my_base_path = r"C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops"

# Call the function to get the explanation and instructions
explain_cognite_function_structure_and_local_testing(my_base_path)

--- Understanding Cognite Functions and Local Testing ---

Based on previous steps, your 'icapi_datapoints_extractor' function directory is expected to be at:
'C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops\modules\bootcamp\ice_cream_api\functions\icapi_datapoints_extractor'


#### OEE Function

In [15]:
import os

def create_oee_function_files(base_path):
    """
    Creates the necessary directory structure and YAML configuration files
    for the OEE Cognite Function based on the provided instructions.

    Args:
        base_path (str): The base file path (e.g., C:\\Users\\Prafull Ghare\\...\\ice-cream-dataops).
    """
    print("--- Creating OEE Function Files ---")

    # Define the base path for the OEE use case
    oee_base_dir_relative = os.path.join('modules', 'bootcamp', 'use_cases', 'oee')
    full_oee_base_path = os.path.join(base_path, oee_base_dir_relative)

    # Define paths for subdirectories
    oee_functions_dir = os.path.join(full_oee_base_path, 'functions')
    oee_timeseries_dir = os.path.join(oee_functions_dir, 'oee_timeseries') # This is where handler.py will go

    # Ensure all necessary directories exist
    directories_to_create = [
        os.path.join(full_oee_base_path, 'auth'),
        os.path.join(full_oee_base_path, 'data_sets'),
        oee_functions_dir,
        oee_timeseries_dir
    ]

    for directory in directories_to_create:
        print(f"\nAttempting to create directory: {directory}")
        try:
            os.makedirs(directory, exist_ok=True)
            print(f"Directory '{directory}' ensured to exist.")
        except OSError as e:
            print(f"Error creating directory '{directory}': {e}")
            return

    # --- Content for functions.Function.yaml ---
    functions_function_yaml_content = """- name: OEE TimeSeries
  externalId: oee_timeseries
  owner: CDF Bootcamp Team
  description: Function to calculate OEE
  metadata:
      version: "1.0"
  runtime: py311
  functionPath: ./handler.py
"""

    # --- Content for schedules.Schedule.yaml ---
    schedules_schedule_yaml_content = """- name: Run calculations every 5 minutes for last hour of data
  cronExpression: "5-59/5 * * * *"
  functionExternalId: oee_timeseries
  authentication:
      clientId: {{ data_pipeline_oee_client_id }}
      clientSecret: {{ data_pipeline_oee_client_secret }}
"""

    # --- Files to create mapping ---
    files_to_create = {
        os.path.join(oee_functions_dir, "functions.Function.yaml"): functions_function_yaml_content,
        os.path.join(oee_functions_dir, "schedules.Schedule.yaml"): schedules_schedule_yaml_content,
    }

    for file_path, content in files_to_create.items():
        print(f"Creating file: {file_path}")
        try:
            with open(file_path, 'w') as f:
                f.write(content)
            print(f"Successfully created '{os.path.basename(file_path)}'.")
        except IOError as e:
            print(f"Error writing to file '{file_path}': {e}")

    print("\nAll specified files and directories for OEE function have been processed.")
    print("\n--- Important Post-Processing Steps for OEE Function ---")
    print("1. Remember that the 'handler.py' file for 'oee_timeseries' function is expected to be in the 'oee_timeseries' directory (which has been created), but its content is not generated by this script.")
    print("2. Ensure the authentication client IDs and secrets (e.g., `data_pipeline_oee_client_id`, `data_pipeline_oee_client_secret`) are correctly configured as environment variables in your deployment environment.")
    print("3. If you need to pass specific 'sites' or 'lookback_minutes' to the OEE function beyond its defaults, you would add a 'data' section to the schedule in 'schedules.Schedule.yaml' (e.g., `data: { sites: [\"site1\"], lookback_minutes: 120 }`).")


# --- How to use this in a Jupyter Notebook ---
# IMPORTANT: Replace this with the actual base path of your 'ice-cream-dataops' project.



In [16]:
# This path should be the same as what you used for previous script executions.
#my_base_path = r"C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops"

# Call the function to create the OEE function files
create_oee_function_files(my_base_path)

--- Creating OEE Function Files ---

Attempting to create directory: C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops\modules\bootcamp\use_cases\oee\auth
Directory 'C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops\modules\bootcamp\use_cases\oee\auth' ensured to exist.

Attempting to create directory: C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops\modules\bootcamp\use_cases\oee\data_sets
Directory 'C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops\modules\bootcamp\use_cases\oee\data_sets' ensured to exist.

Attempting to create directory: C:\Users\Prafull Ghare\OneDrive - Cognite AS\Documents\codebase\pune_bootcamp\pune-rockwell-bootcamp\ice-cream-dataops\modules\bootcamp\use_cases\oee\functions
Directory 'C:\U