<img width="10%" alt="Naas" src="https://landen.imgix.net/jtci2pxwjczr/assets/5ice39g4.png?w=160"/>

# Common function shared

**Tags:** #automation

**Author:** [Florent Ravenel](https://www.linkedin.com/in/ACoAABCNSioBW3YZHc2lBHVG0E_TXYWitQkmwog/)

## Input

In [20]:
# import naas
# naas.dependency.add()

### Import libraries

In [None]:
import naas
from naas_drivers import notion
from naas_drivers.tools.notion import Link, BlockEmbed
from datetime import datetime
import pytz
from os import path, environ
import json

### Variables

In [None]:
# Folders
INPUT = path.join("..", "input")
OUTPUT = path.join("..", "output")
OUTPUT_PROD = path.join("..", "..", "⚡ → Production", "world-situation-room", "output")

# Dependencies
CHART_CSS_PATH = path.join(INPUT, "chart.css") # Chart css
WHI_IMAGE_PATH = path.join(INPUT, "whi_layout.png") # WHI layout
FONT_PATH = path.join(INPUT, "ArchivoBlack-Regular.ttf") # WHI layout

# Notion - "Report" database url
NOTION_REPORT_DB = naas.secret.get("NOTION_REPORT_DB")
NOTION_TOKEN = naas.secret.get("NOTION_TOKEN")

## Model

### Send dependencies to production

In [19]:
# naas.dependency.add(CHART_CSS_PATH)
# naas.dependency.add(WHI_IMAGE_PATH)
# naas.dependency.add(FONT_PATH)

### Get output path

In [None]:
def get_output_path(filename):
    if not naas.is_production():
        output_path = path.join(OUTPUT_PROD, filename)
    else:
        output_path = path.join(OUTPUT, filename)
    return output_path

### Update chart css

In [None]:
# Fonction to add CSS
def updateChartCss(filename, css_filename=CHART_CSS_PATH):
    map = None
    css = None
    with open(filename) as f:
        map = f.read()

    with open(css_filename) as f:
        css = f.read()
    if (map.find('id="naas_css"') != -1):
        print("To do")
    else:
        result = map.replace("<body>",
                             f'<body><style id="naas_css">{css}</style>')
        with open(filename, "w") as f:
            f.write(result)
            f.close()

## Output

### Save and share with naas

In [3]:
def save_output(output_type, name_output, output_obj, naas_asset=True):
    # Outputs
    csv_output = path.join(OUTPUT, f"{name_output}.csv")
    html_output = path.join(OUTPUT, f"{name_output}.html")
    image_output = path.join(OUTPUT, f"{name_output}.png")

    if output_type == "dataframe-csv":
        output_obj["DATE_EXTRACT"] = datetime.now(pytz.timezone("Europe/Paris")).strftime("%Y-%m-%d %H:%M:%S%z")
        output_obj.to_csv(csv_output, index=False)
        print("💾 Dataframe successfully saved in csv", csv_output)
        if naas_asset:
            return naas.asset.add(csv_output)
    elif output_type == "chart-html":
        output_obj.write_html(html_output)
        updateChartCss(html_output, css_filename=CHART_CSS_PATH)
        print("💾 Graph successfully saved in html", html_output)
        if naas_asset:
            return naas.asset.add(html_output, params={"inline": True})
    elif output_type == "chart-png":
        output_obj.write_image(image_output)
        print("💾 Graph successfully saved in image", image_output)
        if naas_asset:
            return naas.asset.add(image_output, params={"inline": True})
    elif output_type == "image-png":
        img.save(image_output)
        print("💾 Image successfully saved in png", image_output)
        if naas_asset:
            return naas.asset.add(image_output, params={"inline": True})

### Save in "Report" Notion DB

In [None]:
def update_dynamic_properties(page, status, sentiment_icon):
    # Page properties : static
    page.select("Status", status)
    page.select("Sentiment", sentiment_icon)
    page.date("Data updated at", datetime.now(pytz.timezone("Europe/Paris")).strftime("%Y-%m-%d %H:%M:%S%z"))
    return page

In [None]:
def update_report_status(uid, status="🟠 WIP", name_output=None, image_link=None, html_link=None, csv_link=None, sentiment=None):
    sentiment_icon = "⚫"
    if sentiment is not None:
        # Get trend
        if sentiment == 0:
            sentiment_icon = "🟠"
        elif sentiment > 0:
            sentiment_icon = "🟢"
        elif sentiment < 0:
            sentiment_icon = "🔴"
    
    # Get pages from notion database
    pages = notion.connect(NOTION_TOKEN).database.query(NOTION_REPORT_DB, query={})
    
    # Create or update page
    page_new = True
    for page in pages:
        page_temp = page.df()
        page_id = page_temp.loc[page_temp.Name == "ID", "Value"].values
        if page_id == uid:
            page_new = False
            break
    try:
        if page_new:
            database_id = NOTION_REPORT_DB.split("/")[-1].split("?v=")[0]
            page = notion.connect(NOTION_TOKEN).Page.new(database_id=database_id).create()
            page.title("Name", name_output)
            page.rich_text("ID", uid)
            if image_link:
                page.image(image_link)
            if html_link:
                res = page.paragraph("Open dynamic graph")
                res.paragraph.text[0].href = html_link
                res.paragraph.text[0].text.link = Link(html_link)
            if csv_link:
                res = page.paragraph("Download CSV")
                res.paragraph.text[0].href = csv_link
                res.paragraph.text[0].text.link = Link(csv_link)
            update_dynamic_properties(page, status, sentiment_icon)
        else:
            # Page properties : static
            update_dynamic_properties(page, status, sentiment_icon)

        # Create page in Notion
        page.update()
        print(f"✅ Page '{uid}' updated in Notion.", status)
    except Exception as e:
        print(f"❌ Error updating {uid}")
        return e

### Generate workflow

In [None]:
def generate_workflow(notebook_path):
    # Notebook output
    notebook_output = notebook_path.replace(".ipynb", "_prod.ipynb")
    
    # Open notebook as json
    with open(notebook_path) as f:
        nb = json.load(f)
    
    # Json output
    new_cells = []
    check_model = False
    workflow = ""
    cell_markdown_wf = {
        'cell_type': 'markdown',
        'id': 'naas-markdown-wf',
        'metadata': {'papermill': {}, 'tags': ["naas"]},
        'source': '### Execute workflow'
    }
    cell_code_wf = {
        'cell_type': 'code',
        'id': 'naas-code-wf',
        'metadata': {'papermill': {}, 'tags': ["naas"]},
        'source': []
    }
      
    # Get cells
    cells = nb.get("cells")
    # Check each cells
    for cell in cells:
        new_sources = []
        cell_type = cell.get('cell_type')
        sources = cell.get('source')
        if not check_model:
            for index, source in enumerate(sources):
                if "# naas.scheduler.add" in source:
                    source = source.replace("# ", "").replace(")", f", path='{notebook_output}')")
                    cell["source"] = [source]
                if cell_type == "markdown" and source.startswith("## Model"):
                    check_model = True
        if check_model:
            if cell_type == "markdown":
                if len(sources) > 0:
                    new_workflow = f'    {sources[0]}\n'
                    workflow = f'{workflow}{new_workflow}'
            elif cell_type == "code":
                for index, source in enumerate(sources):
                    new_source = source
                    if not source.startswith(" ") and not source.startswith("def") and not source.startswith("\n"):
                        new_source = f'# {source}'
                        if "(" in source and ")" in source:
                            new_code = f'    {source}\n'
                            if "update_report_status" in new_code:
                                new_code = new_code.replace(')', ', status="🟢 Live")')
                                new_code = f'{new_code}'
                            workflow = f'{workflow}{new_code}'
                    new_sources.append(new_source)
                cell["source"] = new_sources
        # Create new cell
        new_cells.append(cell)
                    
    # Update source workflow
    source_workflow = [
        'try:\n'
        f'{workflow}'
        'except Exception as e:\n',
        '    # Update status in Notion DB if error\n',
        '    update_report_status(uid, "🔴 Error")\n',
        '    raise(e)'
    ]
    
    # Add workflow cells
    new_cells.append(cell_markdown_wf)
    cell_code_wf["source"] = source_workflow
    new_cells.append(cell_code_wf)
    
    # Save new notebook
    nb_new = nb.copy()
    nb_new["cells"] = new_cells
    with open(notebook_output, 'w') as f:
        json.dump(nb_new, f)
    print(f"💾 {notebook_output} saved in Naas.")
    return notebook_output

In [None]:
print("👌 Notebook common.ipynb loaded")