In [1]:
# Setting up environment for the pipeline
project = !gcloud config get-value project
PROJECT_ID = project[0]
REGION = "asia-south1"

In [2]:
from google.cloud import storage
from google.cloud import bigquery
from google.cloud.exceptions import NotFound

import pandas as pd
from sklearn import datasets

In [3]:
# initializing clients 
gcs = storage.Client(project = PROJECT_ID)
bq = bigquery.Client(project = PROJECT_ID)

# initializing bucket
BUCKET = "gcs://emotion-classify-bucket/"
BUCKET

'gcs://emotion-classify-bucket/'

In [4]:
# Services and accoung permissions
SERVICE_ACCOUNT = !gcloud config list --format='value(core.account)'
SERVICE_ACCOUNT = SERVICE_ACCOUNT[0]
SERVICE_ACCOUNT

'1181571513-compute@developer.gserviceaccount.com'

In [5]:
# Enable Cloud resource manager API
!gcloud services enable cloudresourcemanager.googleapis.com

In [6]:
# Listing the current service account roles:
!gcloud projects get-iam-policy $PROJECT_ID --filter="bindings.members:$SERVICE_ACCOUNT" \
    --format='table(bindings.role)'

ROLE
['roles/aiplatform.customCodeServiceAgent', 'roles/aiplatform.serviceAgent', 'roles/artifactregistry.serviceAgent', 'roles/bigquery.admin', 'roles/cloudbuild.builds.builder', 'roles/cloudbuild.serviceAgent', 'roles/cloudfunctions.serviceAgent', 'roles/composer.serviceAgent', 'roles/compute.serviceAgent', 'roles/container.serviceAgent', 'roles/containerregistry.ServiceAgent', 'roles/dataflow.serviceAgent', 'roles/editor', 'roles/notebooks.serviceAgent', 'roles/owner', 'roles/pubsub.serviceAgent', 'roles/run.admin', 'roles/run.serviceAgent', 'roles/storage.admin', 'roles/storage.objectAdmin']


In [7]:
# Kubeflow and cloud components
!pip install kfp -U -q
!pip install google-cloud-pipeline-components -U -q

In [8]:
import kfp
KFP_version = kfp.__version__
KFP_version

'2.3.0'

In [9]:
import google_cloud_pipeline_components
google_cloud_pipeline_components_version = google_cloud_pipeline_components.__version__
google_cloud_pipeline_components_version

'2.5.0'

In [10]:
from google.cloud import aiplatform
aiplatform.__version__

'1.33.1'

In [11]:
# we will use data from BigQuery table to train the model and create pipeline
EXPERIMENT = "emotion"
SERIES = "emotion_01"

# source data
BQ_PROJECT = PROJECT_ID
BQ_DATASET = "emotion"
BQ_TABLE = "emotion"

# sourcing data from gcs bucket
BQ_SOURCE = BUCKET + "Emotion_classify_Data.csv"
BQ_SOURCE

'gcs://emotion-classify-bucket/Emotion_classify_Data.csv'

In [12]:
# create dataset in bigquery
dataset_id = "{}.{}".format(BQ_PROJECT,BQ_DATASET)

try:
    bq.get_dataset(dataset_id)
    print("Dataset {} already exists".format(dataset_id))
except NotFound:
    client = bigquery.Client()
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = REGION
    dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))


Dataset ml-pipeline-project-401216.emotion already exists


In [17]:
# creating table in bigquery from data in gcs bucket
job_config = bigquery.LoadJobConfig(
    schema = [
        bigquery.SchemaField("Comment", "STRING"),
        bigquery.SchemaField("Emotion", "STRING"),
    ],
    skip_leading_rows=1,
    # The source format defaults to CSV, so the line below is optional.
    #source_format=bigquery.SourceFormat.CSV,
)

table_id = "{}.{}.{}".format(PROJECT_ID, BQ_DATASET, BQ_TABLE)

load_job = bq.load_table_from_uri(
    BQ_SOURCE, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

# destination_table = client.get_table(table_id)  # Make an API request.
# print("Loaded {} rows.".format(destination_table.num_rows))

BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/ml-pipeline-project-401216/jobs?prettyPrint=false: Source URI must be a Google Cloud Storage location: gcs://emotion-classify-bucket/Emotion_classify_Data.csv

In [19]:
# Construct a BigQuery client object.
client = bigquery.Client(project = PROJECT_ID)

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("post_abbr", "STRING"),
    ],
    skip_leading_rows=1,
    # The source format defaults to CSV, so the line below is optional.
    source_format=bigquery.SourceFormat.CSV,
)
uri = BQ_SOURCE
table_id = "{}.{}.{}".format(PROJECT_ID, BQ_DATASET, BQ_TABLE)

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))

BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/ml-pipeline-project-401216/jobs?prettyPrint=false: Source URI must be a Google Cloud Storage location: gcs://emotion-classify-bucket/Emotion_classify_Data.csv