# Data Pipelines

<a target="_blank" href="https://colab.research.google.com/github/superwise-ai/dsl-spa/blob/main/examples/data_pipeline_example.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

Data pipelines are used to load dynamic datasets based on an LLM response. The LLM response will detail key details that a user may want to pick from, like a project name or a timeframe. In this example, a StandardPipeline will be created but there are [other types of data pipelines](https://github.com/superwise-ai/dsl-spa/blob/main/docs/Pipelines.md#data-querying-pipelines) that might fit your use case better.

To run this demo, make sure to download the `activies.csv` and `project_updates.csv` from the [data folder](https://github.com/superwise-ai/dsl-spa/tree/main/data).

In this demo, you will create a pipeline that accepts a project_id (which can be determined dynamically by an LLM), and generates a summary and a set of visualizations about that project. Both the summary and the visualizations can then be passed back to an LLM. Passing the summary to an LLM with a user question is a great way to build a chat bot. Sending the visualization titles and descriptions to an LLM is a great way to convert an LLM into a visualization selector based on a user request.

Example, a user asks "Can I see a pie chat of the status of activities for Project POO1?"

An LLM application reads the question and generates JSON indicating the name of the Project. This JSON is passed to the pipeline in this notebook, which generates all the visualizations, including their titles and descriptions, for that project. Which is then passed to a different LLM application as a set of tools, asking the LLM to pick the graph that most appropriately matches the user request. Resulting in a user getting a pie graph of the status of the activities for project P001.

# Install Dependencies

%pip install dsl-spa==0.4.1

# Create Pipeline Schema

### Import Schema Definitions

In [1]:
from dsl_spa.utils.schema import PipelineField,CSV,Dataset,SummaryDataset,Summary,PieChart,LineGraph,StandardPipelineSchema

### Define Fields

In [2]:
project_field = PipelineField(field_name="project_id", field_type="type", required=True, description="The ID of the project")
fields = [project_field]

### Define CSVs

In [3]:
project_update_csv = CSV(csv_name="project_updates",connector_name="csvs",filename="project_updates.csv")
project_update_csv.add_column_filter("base.project_id",column_name="project_id",value="{base.project_id}")

activity_csv = CSV(csv_name="activities",connector_name="csvs",filename="activities.csv")
activity_csv.add_column_filter("base.project_id",column_name="project_id",value="{base.project_id}")

stadium_projects_csv = CSV(csv_name="projects",connector_name="csvs",filename="stadium_projects.csv")
activity_csv.add_column_filter("base.project_id",column_name="project_id",value="{base.project_id}")

csvs = [stadium_projects_csv,project_update_csv,activity_csv]

### Define Datasets

In [4]:
project_dataset = SummaryDataset(dataset_name="Project Data", 
                                 summary_by_row="The project ID is {project_id}. The project name is {project_name}. The stadium is being built in {location}. It has a budget of {budget}. It is {percent_complete} done. It started on {start_date}. It is estimated to finish on {estimated_end_date}. ",
                                 empty_dataset="No project found with this ID")
project_dataset.create_from_query("projects")

activity_dataset = Dataset(dataset_name="Activity Data")
activity_dataset.create_from_query("activities")

project_updates_dataset = Dataset(dataset_name="Project Updates")
project_updates_dataset.create_from_query("project_updates")

activities_by_status = SummaryDataset("Activities By Status", summary_by_row="There are {count} phases that are {Activity Status}. ")
activities_by_status.create_from_dataset(dataset_name="Activity Data")
activities_by_status.add_function(function_name="build_pie_graph",
                                  function_params_dict={"index_field": "activity_status",
                                                        "value_field": "count",
                                                        "label_field": "Activity Status"})

activity_status_by_days = Dataset("Activity Status By Days")
activity_status_by_days.create_from_dataset(dataset_name="Activity Data")
activity_status_by_days.add_function(function_name="build_pie_graph", 
                                     function_params_dict={"index_field": "activity_status", 
                                                           "value_field": "Duration in Days",
                                                           "label_field": "Activity Status",
                                                           "count_field": "duration"})

current_activity_dataset = SummaryDataset(dataset_name="Current Activity",summary_by_row="The current activity phase is {activity_name}. ")
current_activity_dataset.create_from_dataset(dataset_name="Activity Data")
current_activity_dataset.add_function(function_name="filter_by_value",function_params_dict={"column": "activity_status",
                                                                                            "value": "In Progress"})

summary_datasets = [project_dataset,activities_by_status,current_activity_dataset]
datasets = [project_dataset,activity_dataset,project_updates_dataset,activities_by_status,activity_status_by_days,current_activity_dataset]

### Define Summary

In [5]:
summary = Summary(datasets=summary_datasets,prefix="The following summary is information about project and activity status for Project {base.project_id}")

### Define Visualizations

In [6]:
pie_activities_by_status = PieChart(dataset=activities_by_status, 
                                    title="Activities By Status",
                                    value_column="count",
                                    label_column="Activity Status",
                                    description="Pie Graph of activities by their progress status.")

line_project_updates = LineGraph(dataset=project_updates_dataset,
                                 title="Project Progress",
                                 x_axis="date",
                                 y_axis="percent_complete",
                                 description="Line Graph of a projects percent complete over time.")

pie_activity_status_by_days = PieChart(dataset=activity_status_by_days,
                                       title="Activity Status by Days",
                                       value_column="Duration in Days",
                                       label_column="Activity Status",
                                       description="Pie Graph of activity statuses' by their duration in days.")
visualizations = [pie_activities_by_status,line_project_updates,pie_activity_status_by_days]

In [7]:
pipeline_schema = StandardPipelineSchema(pipeline_name="project_updates_pipeline",
                                         fields=fields,
                                         queries=[],
                                         csvs=csvs,
                                         datasets=datasets,
                                         scope="Information for project {base.project_id}",
                                         scope_description="Details on project activities, costs, and current status.",
                                         summary=summary,
                                         visualizations=visualizations)
schema = pipeline_schema.get_schema()
schema

{'pipeline_name': 'project_updates_pipeline',
 'scope': 'Information for project {base.project_id}',
 'scope_description': 'Details on project activities, costs, and current status.',
 'fields': {'base': {'project_id': {'name': 'project_id',
    'type': 'type',
    'required': True,
    'description': 'The ID of the project'}}},
 'csvs': [{'name': 'projects',
   'connector': 'csvs',
   'csv_name': 'stadium_projects.csv'},
  {'name': 'project_updates',
   'connector': 'csvs',
   'csv_name': 'project_updates.csv',
   'column_filters': [{'field': 'base.project_id',
     'column': 'project_id',
     'value': '{base.project_id}'}]},
  {'name': 'activities',
   'connector': 'csvs',
   'csv_name': 'activities.csv',
   'column_filters': [{'field': 'base.project_id',
     'column': 'project_id',
     'value': '{base.project_id}'},
    {'field': 'base.project_id',
     'column': 'project_id',
     'value': '{base.project_id}'}]}],
 'datasets': [{'name': 'Project Data',
   'create': [{'type': 'qu

# Create Pipeline

### Create Mock LLM Outputs (Fields)

These mock outputs will act as exmaple outputs from our LLM

In [8]:
fields_project_P001 = {
    "project_id": "P001"
}

fields_project_P006 = {
    "project_id": "P006"
}

### Create Connector

Now the connector that connects our pipeline to the data is created. In this case, the data is a local CSV. Make sure the csvs are in the current folder or update the folder to reflect the CSV location.

In [9]:
from dsl_spa.pipeline.connector import LocalCSVConnector

In [10]:
csv_location = "./"
csv_connector = LocalCSVConnector(folder=csv_location)
connectors = {"csvs":csv_connector}

### Test Pipeline

In [11]:
from dsl_spa.pipeline.pipeline import StandardPipeline, PipelineException

In [12]:
pipeline = StandardPipeline(fields_input_dict=fields_project_P001, json_schema=schema, connectors=connectors)
pipeline.initialize_data()
pipeline.process_data()
f"{pipeline.get_scope()} - {pipeline.get_scope_description()}"

'Information for project P001 - Details on project activities, costs, and current status.'

Let's get the summary from the pipeline.

In [13]:
pipeline.get_summary()

'The following summary is information about project and activity status for Project P001The project ID is P001. The project name is Stadium Alpha. The stadium is being built in New York, NY. It has a budget of 1200000000.0. It is 83.1 done. It started on 2023-01-10 00:00:00.000000 UTC. It is estimated to finish on 2025-12-15 00:00:00.000000 UTC. The project ID is P002. The project name is Stadium Beta. The stadium is being built in Los Angeles, CA. It has a budget of 1500000000.0. It is 70.8 done. It started on 2023-03-05 00:00:00.000000 UTC. It is estimated to finish on 2026-05-20 00:00:00.000000 UTC. The project ID is P003. The project name is Stadium Gamma. The stadium is being built in Chicago, IL. It has a budget of 1000000000.0. It is 30.5 done. It started on 2023-02-15 00:00:00.000000 UTC. It is estimated to finish on 2028-10-30 00:00:00.000000 UTC. The project ID is P004. The project name is Stadium Delta. The stadium is being built in Houston, TX. It has a budget of 1800000000

Now let's generate the output for an LLM to select visualizations.

In [14]:
graphs = pipeline.get_visualizations()
for graph_name in graphs.keys():
    description = graphs[graph_name]["description"]
    print(f"{graph_name} - {description}")

Activities By Status - Pie Graph of activities by their progress status.
Project Progress - Line Graph of a projects percent complete over time.
Activity Status by Days - Pie Graph of activity statuses' by their duration in days.


Now let's add the altair renderer so vega lite charts can be displayed in jupyter notebooks and then visualize the Activities By Status Pie Chart.

In [15]:
import altair as alt
alt.renderers.enable('mimetype')

RendererRegistry.enable('mimetype')

In [17]:
alt.Chart.from_dict(graphs["Activities By Status"]["vega_lite"])

<VegaLite 5 object>

If you see this message, it means the renderer has not been properly enabled
for the frontend that you are using. For more information, see
https://altair-viz.github.io/user_guide/display_frontends.html#troubleshooting
