<img width="8%" alt="Naas.png" src="https://raw.githubusercontent.com/jupyter-naas/awesome-notebooks/master/.github/assets/logos/Naas.png" style="border-radius: 15%">

# Pipeline

**Tags:** #naas #pipeline #jupyter #notebook #dataanalysis #workflow #streamline

**Author:** [Florent Ravenel](https://www.linkedin.com/in/florent-ravenel)

**Description:** This notebook creates a notebooks pipeline.

## Input

### Import libraries

In [None]:
from naas.pipeline import (
    Pipeline,
    NotebookStep,
    End,
    ParallelStep,
)
import naas_data_product
import naas
from naas_drivers import gsheet, linkedin
import glob
import os
from unidecode import unidecode
import pandas as pd

### Setup variables
**Inputs**
- `abi_spreadsheet`: Google Sheets spreadsheet URL
- `sheet_entity`: Entity sheet name that stores all your personal data
- `long_lived_token`: Long lived token to connect to Naas API.
- `limit_linkedin`: Max call on a specific LinkedIn endpoint (profile top card, company info).
- `limit_llm`: Max call on LLM (Naas API) to enrich specific data from Growth OBT (PEOPLE, ORGANIZATIONS, CONTACTS).
- `cron`: Represents the scheduling pattern of your notebook (https://crontab.guru/)

**Outputs**
- `datalake_dir`: Datalake directory

In [None]:
# Inputs
abi_spreadsheet = naas.secret.get("ABI_SPREADSHEET")
sheet_entity = "ENTITY"
long_lived_token = naas.secret.get('NAAS_API_TOKEN')
entity_start = 0
entity_end = None
limit_linkedin = 30
limit_llm = 50
cron = "0 12 * * *"

# Outputs
datalake_dir = naas.secret.get("ABI_DATALAKE_DIR")

## Model

### Get data from Google Sheet spreadsheet

In [None]:
df_gsheet = gsheet.connect(abi_spreadsheet).get(sheet_name=sheet_entity).fillna("NA")
if entity_end is None:
    entity_end = len(df_gsheet)
df_gsheet = df_gsheet[entity_start:entity_end]
df_gsheet.head(len(df_gsheet))

### Run pipeline

In [None]:
for row in df_gsheet.itertuples():
    index = row.Index
    entity_name = row.ENTITY
    emails = row.EMAILS
    linkedin_url = row.LINKEDIN_URL
    li_at = row.LINKEDIN_LI_AT
    JSESSIONID = row.LINKEDIN_JSESSIONID
    if "SPREADSHEET_URL" in df_gsheet.columns and row.SPREADSHEET_URL != "NA":
        abi_spreadsheet = row.SPREADSHEET_URL
    print("- ABI spreadsheet:", abi_spreadsheet)
    print("- Entity:", entity_name)
    print("- Emails:", emails)
    print("- LinkedIn URL:", linkedin_url)
    entity_code = unidecode(row.ENTITY.lower().replace(" ", "_").replace(".", ""))
    entity_dir = os.path.join(datalake_dir, entity_code)
    os.makedirs(entity_dir, exist_ok=True) # Create dirs
    os.makedirs(os.path.join(entity_dir, "plugins"), exist_ok=True) # Create dirs
    print("- Directory:", entity_dir)
    workspace_ids = []
    if "WORKSPACE_IDS" in df_gsheet.columns and row.WORKSPACE_IDS != "NA":
        workspace_ids = row.WORKSPACE_IDS.split(",")
    print("- Workspace IDs:", workspace_ids)
    print()
    
    # Save entity data
    print("- Saving dependencies:")
    output_dir = os.path.join(datalake_dir, "entities", str(index))
    pdump(output_dir, abi_spreadsheet, "abi_spreadsheet")
    naas.dependency.add(os.path.join(output_dir, "abi_spreadsheet.pickle"))
    pdump(output_dir, entity_name, "entity_name")
    naas.dependency.add(os.path.join(output_dir, "entity_name.pickle"))
    pdump(output_dir, emails, "emails")
    naas.dependency.add(os.path.join(output_dir, "emails.pickle"))
    pdump(output_dir, linkedin_url, "linkedin_url")
    naas.dependency.add(os.path.join(output_dir, "linkedin_url.pickle"))
    pdump(output_dir, entity_dir, "entity_dir")
    naas.dependency.add(os.path.join(output_dir, "entity_dir.pickle"))
    pdump(output_dir, workspace_ids, "workspace_ids")
    naas.dependency.add(os.path.join(output_dir, "workspace_ids.pickle"))

    # Save secrets
    for x in ["LINKEDIN_LI_AT", "LINKEDIN_JSESSIONID"]:
        value = df_gsheet.loc[index, x]
        if value != "NA":
            print(f"Secret '{x}' to be added:")
            naas.secret.add(f"{x}_{entity_code.upper()}", value)
            if index == 0:
                naas.secret.add(x, value)
    if li_at == "NA":
        li_at = naas.secret.get("li_at") or naas.secret.get("LINKEDIN_LI_AT")
    if JSESSIONID == "NA":
        JSESSIONID = naas.secret.get("JSESSIONID") or naas.secret.get("LINKEDIN_JSESSIONID")
    print("- LinkedIn li_at:", li_at)
    print("- LinkedIn JSESSIONID:", JSESSIONID)
    
    # Set timezone: Timezone will be defined from the first profile region's and country's
    if index == 0:
        timezone = pload(output_dir, "timezone")
        if timezone is None:
            linkedin_dir = os.path.join(datalake_dir, "datalake", "linkedin", "profiles")
            df = get_linkedin_data(linkedin_url, linkedin_dir, "identity", li_at, JSESSIONID)
            region = df.loc[0, "REGION"]
            country = df.loc[0, "COUNTRY"]
            print("- Region:", region)
            print("- Country:", country)
            timezone = create_chat_completion(
                long_lived_token,
                prompt="Find timezone in the format 'Region/City'. If there is no exact match, please return a subjective answer based on the data you received",
                message=f"Region: {region}, Country: {country}",
            )
            pdump(output_dir, timezone, "timezone")
            naas.dependency.add(os.path.join(output_dir, "timezone.pickle"))
        print("- Timezone:", timezone)
        naas.set_remote_timezone(timezone)
    print()
    
    # Create notebook steps
    content = NotebookStep(
        name="📲 Content",
        notebook_path=os.path.join(naas_data_product.MODELS_PATH, "content-engine", "__pipeline__.ipynb"),
        parameters={
            "datalake_dir": datalake_dir,
            "spreadsheet_url": abi_spreadsheet,
            "entity_name": entity_name,
            "emails": emails,
            "linkedin_url": linkedin_url,
            "li_at": li_at,
            "JSESSIONID": JSESSIONID,
            "entity_dir": entity_dir
        }
    )
    growth = NotebookStep(
        name="🚀 Growth",
        notebook_path=os.path.join(naas_data_product.MODELS_PATH, "growth-engine", "__pipeline__.ipynb"),
        parameters={
            "datalake_dir": datalake_dir,
            "spreadsheet_url": abi_spreadsheet,
            "entity_name": entity_name,
            "emails": emails,
            "linkedin_url": linkedin_url,
            "li_at": li_at,
            "JSESSIONID": JSESSIONID,
            "entity_dir": entity_dir,
            "limit_linkedin": limit_linkedin,
            "limit_llm": limit_llm
        }
    )
    sales = NotebookStep(
        name="⚡️ Sales",
        notebook_path=os.path.join(naas_data_product.MODELS_PATH, "sales-engine", "__pipeline__.ipynb"),
        parameters={
            "datalake_dir": datalake_dir,
            "spreadsheet_url": abi_spreadsheet,
            "entity_name": entity_name,
            "emails": emails,
            "linkedin_url": linkedin_url,
            "entity_dir": entity_dir
        }
    )
    operations = NotebookStep(
        name="⚙️ Operations",
        notebook_path=os.path.join(naas_data_product.MODELS_PATH, "operations-engine", "__pipeline__.ipynb"),
        parameters={
            "datalake_dir": datalake_dir,
            "spreadsheet_url": abi_spreadsheet,
            "entity_name": entity_name,
            "emails": emails,
            "entity_dir": entity_dir
        }
    )
    finance = NotebookStep(
        name="💰 Finance",
        notebook_path=os.path.join(naas_data_product.MODELS_PATH, "finance-engine", "__pipeline__.ipynb"),
        parameters={
            "datalake_dir": datalake_dir,
            "spreadsheet_url": abi_spreadsheet,
            "entity_name": entity_name,
            "emails": emails,
            "entity_dir": entity_dir
        }
    )
    opendata = NotebookStep(
        name="🌍 Open Data",
        notebook_path=os.path.join(naas_data_product.MODELS_PATH, "opendata-engine", "__pipeline__.ipynb"),
        parameters={
            "datalake_dir": datalake_dir,
            "spreadsheet_url": abi_spreadsheet,
            "entity_name": entity_name,
            "emails": emails,
            "entity_dir": entity_dir
        }
    )
    # Run Pipeline
    pipeline = Pipeline()
    pipeline >> content >> growth >> sales >> operations >> finance >> opendata >> End()
    pipeline.run(outputs_path=os.path.join(entity_dir, "pipeline_executions"))

## Output

### Add scheduler

In [None]:
# Schedule pipeline
print("⏰ Scheduler:", cron)
naas.scheduler.add(cron=cron)
# naas.scheduler.delete()

### Add pipeline templates to dependencies

In [None]:
engines = [
    "content-engine",
    "growth-engine",
    "sales-engine",
    "operations-engine",
    "finance-engine",
    "opendata-engine",
]

for engine in engines:
    # Engine model directory
    engine_model_dir = os.path.join(naas_data_product.MODELS_PATH, engine)

    # Add dependencies
    files = (
        glob.glob(os.path.join(engine_model_dir, "core", "domain", "*.ipynb")) + 
        glob.glob(os.path.join(engine_model_dir, "custom", "*.ipynb")) + 
        [os.path.join(engine_model_dir, "__pipeline__.ipynb")] + 
        [os.path.join(engine_model_dir, "__plugin__.ipynb")]
    )
    for file in files:
        print("🔗 Dependency:", file)
        naas.dependency.add(file)
        print()

        # Uncomment the line below to delete your dependencies
        # naas.dependency.delete(file)

### Add utils to dependencies

In [None]:
files = glob.glob(f"{naas_data_product.UTILS_PATH}/*.ipynb")

for file in files: 
    naas.dependency.add(file)
    
#     # Uncomment the line below to delete your dependencies
#     naas.dependency.delete(file)