In [None]:
!pip install kfp

In [None]:
from datetime import datetime
import google.cloud.aiplatform as aip
# from google_cloud_pipeline_components import aiplatform as gcc_aip
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component
import json
from kfp.v2.google.client import AIPlatformClient
from kfp.v2.dsl import (Artifact,
                        ClassificationMetrics, 
                        component,
                        Dataset,
                        Input,
                        Metrics,
                        Model,
                        Output
                       )
from typing import NamedTuple


In [None]:
# Project ID 
Val = !gcloud config list --format 'value(core.project)'
PROJECT_ID = Val[0]
REGION = "us-west1"
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
SERVICE_ACCOUNT = "559647087083-compute@developer.gserviceaccount.com"
BUCKET_NAME = "gs://demo-account"
PIPELINE_ROOT = "{}/pipeline_root/".format(BUCKET_NAME)
PIPELINE_JSON_FILE = "final.json"
PIPELINE_EXPERIMENT_NAME = "mainscoringpipeline" + TIMESTAMP
MODEL_DISPLAY_NAME = "main-model"

In [None]:
aip.init(project=PROJECT_ID, location=REGION,staging_bucket=BUCKET_NAME)

In [None]:
@component(base_image="gcr.io/ml-pipeline/google-cloud-pipeline-components:latest",
           packages_to_install = ["pandas"],          
          )
def bq_load() -> str:
    return "Hello"
#     from google.cloud import bigquery
#     client = bigquery.Client(location="US", project='hackteam-mythbusters1')
    
#     query = """
#     SELECT * FROM `hackteam-mythbusters1.covid_dataset.combined1`
#     """
#     query_job = client.query(
#         query,
#         location="US",
#     )

#     df = query_job.to_dataframe()
#     df.to_csv(train_data.path, index=False)
#     return(train_data.path.replace("/gcs/", "gs://"))

In [None]:
@component(base_image="gcr.io/ml-pipeline/google-cloud-pipeline-components:latest", packages_to_install=['google-cloud-aiplatform'],)
def import_model(
    project_id: str,
    display_name: str,
    artifact_gcs_bucket: str,
    model: Output[Model],
    location: str,
    serving_container_image_uri: str,
    description: str
) -> NamedTuple(
    'Outputs', 
    [ 
        ('display_name', str), 
        ('resource_name', str)
    ]
):
    from google.cloud import aiplatform
    aiplatform.init(project=project_id, location=location)
    model_resp = aiplatform.Model.upload(
        display_name=display_name,
        artifact_uri=artifact_gcs_bucket,
        serving_container_image_uri=serving_container_image_uri,
        description=description)
    model_resp.wait()
    with open(model.path, 'w') as f: 
      f.write(model_resp.resource_name)
    model.path = f"aiplatform://v1/{model_resp.resource_name}" #update the resource path to aiplaform://v1 prefix so that off the shelf tasks can consume the output
    return (model_resp.display_name, model_resp.resource_name,)   

In [None]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name=PIPELINE_EXPERIMENT_NAME,
)
def pipeline(
    project: str = PROJECT_ID,
    region:str = REGION,
):
    dataset_op = bq_load()
    
    
   

In [None]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=PIPELINE_JSON_FILE
)


job = aip.PipelineJob(
    display_name=PIPELINE_EXPERIMENT_NAME,
    template_path=PIPELINE_JSON_FILE,
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project":PROJECT_ID,"region":REGION},
)

job.run()


In [None]:
from kfp.v2.google.client import AIPlatformClient  # noqa: F811

api_client = AIPlatformClient(project_id=PROJECT_ID, region=REGION)

# adjust time zone and cron schedule as necessary
response = api_client.create_schedule_from_job_spec(
    job_spec_path=PIPELINE_JSON_FILE,
    schedule="0 6 1 * *",
    time_zone="America/Los_Angeles",  # change this as necessary
    pipeline_root=PIPELINE_ROOT  # this argument is necessary if you did not specify PIPELINE_ROOT as part of the pipeline definition.
)