<a href="https://colab.research.google.com/github/ldsAS/Tibame-AI-Learning/blob/main/Tibame20250625_DataFlow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Google權限授予

In [1]:
from google.colab import auth
auth.authenticate_user()

### 定義專案ID

In [2]:
import os
# 定義常數（Constants）
project = "tibame-gad251-31-0625"
bucket = "tibame-gad251-31-storage"

# 設定專案 ID。
os.environ['GOOGLE_CLOUD_PROJECT'] = project

### 安裝Apache_beam模組

In [3]:
!pip install apache-beam[gcp,dataframe] --quiet

In [4]:
!pip install apache-beam[interactive]



### Scikit-learn應用

#### 導入模組

In [5]:
import pickle
from sklearn import linear_model
from typing import Tuple

import numpy as np
import apache_beam as beam

from apache_beam.ml.inference.sklearn_inference import ModelFileType
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.options.pipeline_options import PipelineOptions


#### 建立Scikit-learn模型

In [6]:
# 準備訓練 sklearn 模型的輸入資料（5 倍表）。
x = np.arange(0, 100, dtype=np.float32).reshape(-1, 1)
y = (x * 5).reshape(-1, 1)

def train_and_save_model(x, y, model_file_name):
  """訓練線性回歸模型並儲存至檔案。"""
  regression = linear_model.LinearRegression()
  regression.fit(x, y)

  with open(model_file_name, 'wb') as f:
      pickle.dump(regression, f)

# 訓練並儲存 5 倍表模型。
five_times_model_filename = 'sklearn_5x_model.pkl'
train_and_save_model(x, y, five_times_model_filename)

# 訓練並儲存 10 倍表模型。
ten_times_model_filename = 'sklearn_10x_model.pkl'
train_and_save_model(x, y, ten_times_model_filename)
y = (x * 10).reshape(-1, 1)
train_and_save_model(x, y, 'sklearn_10x_model.pkl')



#### 用GCP CLI指令碼指定專案

In [7]:
!gcloud config set project $project

Updated property [core/project].


#### 建立BigQuery資料表

In [8]:
## 將資料填入 BigQuery 資料表

from google.cloud import bigquery

client = bigquery.Client(project=project)

# 確保 dataset_id 在專案中是唯一的。
dataset_id = '{project}.maths'.format(project=project)
dataset = bigquery.Dataset(dataset_id)

# 根據專案設定修改位置。
dataset.location = 'asia-east1'#位置改為台灣
dataset = client.create_dataset(dataset, exists_ok=True)

# BigQuery 資料集中的資料表名稱。
table_name = 'maths_problems_1'

query = """
CREATE OR REPLACE TABLE
  {project}.maths.{table} (
    key STRING OPTIONS(description="A unique key for the maths problem"),
    value FLOAT64 OPTIONS(description="Our maths problem")
  );

INSERT INTO {project}.maths.{table}
VALUES
  ("first_example", 105.00),
  ("second_example", 108.00),
  ("third_example", 1000.00),
  ("fourth_example", 1013.00);
""".format(project=project, table=table_name)

create_job = client.query(query)
create_job.result()


<google.cloud.bigquery.table._EmptyRowIterator at 0x7c5e227e2350>

#### 使用管道測試模型

In [9]:
sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=five_times_model_filename)

# 設定 Dataflow 管道選項，並指定暫存位置。
pipeline_options = PipelineOptions().from_dictionary(
                                      {'temp_location': f'gs://{bucket}/tmp'})

# 定義 BigQuery 資料表規格。
table_name = 'maths_problems_1'
table_spec = f'{project}:maths.{table_name}'

with beam.Pipeline(options=pipeline_options) as p:
  (
      p
      | "從 BigQuery 讀取資料" >> beam.io.ReadFromBigQuery(table=table_spec)
      | "提取輸入值" >> beam.Map(lambda x: [x['value']])
      | "執行 Sklearn 推論" >> RunInference(model_handler=sklearn_model_handler)
      | beam.Map(print)
  )





PredictionResult(example=[108.0], inference=array([540.]), model_id='sklearn_5x_model.pkl')
PredictionResult(example=[1013.0], inference=array([5065.]), model_id='sklearn_5x_model.pkl')
PredictionResult(example=[105.0], inference=array([525.]), model_id='sklearn_5x_model.pkl')
PredictionResult(example=[1000.0], inference=array([5000.]), model_id='sklearn_5x_model.pkl')


#### 使用管道抓取含KEY資料測試模型

In [10]:
sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=five_times_model_filename)
keyed_sklearn_model_handler = KeyedModelHandler(sklearn_model_handler)

# 設定 Dataflow 管道選項，並指定暫存位置。
pipeline_options = PipelineOptions().from_dictionary({'temp_location': f'gs://{bucket}/tmp'})
with beam.Pipeline(options=pipeline_options) as p:
  (
  p
  | "從 BigQuery 讀取資料" >> beam.io.ReadFromBigQuery(table=table_spec)
  | "提取輸入值" >> beam.Map(lambda x: (x['key'], [x['value']]))
  | "執行 Sklearn 推論" >> RunInference(model_handler=keyed_sklearn_model_handler)
  | beam.Map(print)
  )





('second_example', PredictionResult(example=[108.0], inference=array([540.]), model_id='sklearn_5x_model.pkl'))
('fourth_example', PredictionResult(example=[1013.0], inference=array([5065.]), model_id='sklearn_5x_model.pkl'))
('first_example', PredictionResult(example=[105.0], inference=array([525.]), model_id='sklearn_5x_model.pkl'))
('third_example', PredictionResult(example=[1000.0], inference=array([5000.]), model_id='sklearn_5x_model.pkl'))


### Pytorch應用

#### 導入所需模組

In [11]:
import argparse
import csv
import json
import os
import torch
from typing import Tuple

import apache_beam as beam
import numpy
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor
from apache_beam.options.pipeline_options import PipelineOptions

import warnings
warnings.filterwarnings('ignore')


#### 設定模型存放名稱

In [12]:
# 儲存模型的路徑
save_model_dir_multiply_five = 'five_times_table_torch.pt'
save_model_dir_multiply_ten = 'ten_times_table_torch.pt'

#### 建立Pytorch模型

In [13]:
class LinearRegression(torch.nn.Module):
    def __init__(self, input_dim=1, output_dim=1):
        super().__init__()
        self.linear = torch.nn.Linear(input_dim, output_dim)
    def forward(self, x):
        out = self.linear(x)
        return out

In [14]:
x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)
y = (x * 5).reshape(-1, 1)
value_to_predict = numpy.array([20, 40, 60, 90], dtype=numpy.float32).reshape(-1, 1)

In [15]:
five_times_model = LinearRegression()
optimizer = torch.optim.Adam(five_times_model.parameters())
loss_fn = torch.nn.L1Loss()

"""
Train the five_times_model
"""
epochs = 10000
tensor_x = torch.from_numpy(x)
tensor_y = torch.from_numpy(y)
for epoch in range(epochs):
    y_pred = five_times_model(tensor_x)
    loss = loss_fn(y_pred, tensor_y)
    five_times_model.zero_grad()
    loss.backward()
    optimizer.step()

#### 儲存Pytorch五倍模型

In [16]:
torch.save(five_times_model.state_dict(), save_model_dir_multiply_five)
print(os.path.exists(save_model_dir_multiply_five)) # Verify that the model is saved.

True


#### 設定十倍模型參數

In [17]:
x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)
y = (x * 10).reshape(-1, 1)

#### 訓練十倍模型

In [18]:
ten_times_model = LinearRegression()
optimizer = torch.optim.Adam(ten_times_model.parameters())
loss_fn = torch.nn.L1Loss()

epochs = 10000
tensor_x = torch.from_numpy(x)
tensor_y = torch.from_numpy(y)
for epoch in range(epochs):
    y_pred = ten_times_model(tensor_x)
    loss = loss_fn(y_pred, tensor_y)
    ten_times_model.zero_grad()
    loss.backward()
    optimizer.step()

#### 儲存十倍模型

In [19]:
torch.save(ten_times_model.state_dict(), save_model_dir_multiply_ten)
print(os.path.exists(save_model_dir_multiply_ten)) # verify if the model is saved

True


#### 利用管道預測

In [20]:
torch_five_times_model_handler = PytorchModelHandlerTensor(
    state_dict_path=save_model_dir_multiply_five,
    model_class=LinearRegression,
    model_params={'input_dim': 1,
                  'output_dim': 1}
                  )
pipeline = beam.Pipeline()

with pipeline as p:
      (
      p
      | "ReadInputData" >> beam.Create(value_to_predict)
      | "ConvertNumpyToTensor" >> beam.Map(torch.Tensor)
      | "RunInferenceTorch" >> RunInference(torch_five_times_model_handler)
      | beam.Map(print)
      )

PredictionResult(example=tensor([20.]), inference=tensor([100.0009]), model_id='five_times_table_torch.pt')
PredictionResult(example=tensor([40.]), inference=tensor([200.0017]), model_id='five_times_table_torch.pt')
PredictionResult(example=tensor([60.]), inference=tensor([300.0026]), model_id='five_times_table_torch.pt')
PredictionResult(example=tensor([90.]), inference=tensor([450.0038]), model_id='five_times_table_torch.pt')


#### 解釋預測結果

In [21]:
class PredictionProcessor(beam.DoFn):
  """
  用於格式化 RunInference 轉換輸出的處理器。
  """
  def process(
      self,
      element: PredictionResult):
    input_value = element.example
    output_value = element.inference
    yield (f"輸入值為 {input_value.item()}，輸出值為 {output_value.item()}")

pipeline = beam.Pipeline()

with pipeline as p:
    (
    p
    | "讀取輸入資料" >> beam.Create(value_to_predict)
    | "將 Numpy 轉換為 Tensor" >> beam.Map(torch.Tensor)
    | "執行 Torch 推論" >> RunInference(torch_five_times_model_handler)
    | "後處理預測結果" >> beam.ParDo(PredictionProcessor())
    | beam.Map(print)
    )

輸入值為 20.0，輸出值為 100.00089263916016
輸入值為 40.0，輸出值為 200.00173950195312
輸入值為 60.0，輸出值為 300.0025634765625
輸入值為 90.0，輸出值為 450.00384521484375


#### 建立KEY資料

In [22]:
class PredictionWithKeyProcessor(beam.DoFn):
    def __init__(self):
        beam.DoFn.__init__(self)

    def process(
          self,
          element: Tuple[str, PredictionResult]):
        key = element[0]
        input_value = element[1].example
        output_value = element[1].inference
        yield (f"key值: {key}, 輸入為: {input_value.item()} 輸出為: {output_value.item()}" )

#### 使用BigQuery測試

In [23]:
pipeline_options = PipelineOptions().from_dictionary({'temp_location':f'gs://{bucket}/tmp',
                                                      })
pipeline = beam.Pipeline(options=pipeline_options)

keyed_torch_five_times_model_handler = KeyedModelHandler(torch_five_times_model_handler)

table_name = 'maths_problems_1'
table_spec = f'{project}:maths.{table_name}'

with pipeline as p:
      (
      p
      | "ReadFromBQ" >> beam.io.ReadFromBigQuery(table=table_spec)
      | "PreprocessData" >> beam.Map(lambda x: (x['key'], x['value']))
      | "ConvertNumpyToTensor" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
      | "RunInferenceTorch" >> RunInference(keyed_torch_five_times_model_handler)
      | "PostProcessPredictions" >> beam.ParDo(PredictionWithKeyProcessor())
      | beam.Map(print)
      )



key值: second_example, 輸入為: 108.0 輸出為: 540.004638671875
key值: fourth_example, 輸入為: 1013.0 輸出為: 5065.04296875
key值: first_example, 輸入為: 105.0 輸出為: 525.0045166015625
key值: third_example, 輸入為: 1000.0 輸出為: 5000.04248046875


#### 建立CSV檔案資料

In [24]:
# 建立CSV範例資料
csv_values = [("first_question", 105.00),
      ("second_question", 108.00),
      ("third_question", 1000.00),
      ("fourth_question", 1013.00)]
input_csv_file = "./maths_problem.csv"

with open(input_csv_file, 'w') as f:
  writer = csv.writer(f)
  writer.writerow(['key', 'value'])
  for row in csv_values:
    writer.writerow(row)

assert os.path.exists(input_csv_file) == True

#### 讀取CSV檔進行測試

In [25]:
pipeline_options = PipelineOptions().from_dictionary({'temp_location':f'gs://{bucket}/tmp',
                                                      })
pipeline = beam.Pipeline(options=pipeline_options)

keyed_torch_five_times_model_handler = KeyedModelHandler(torch_five_times_model_handler)

with pipeline as p:
  df = p | beam.dataframe.io.read_csv(input_csv_file)
  pc = to_pcollection(df)
  (pc
    | "ConvertNumpyToTensor" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
    | "RunInferenceTorch" >> RunInference(keyed_torch_five_times_model_handler)
    | "PostProcessPredictions" >> beam.ParDo(PredictionWithKeyProcessor())
    | beam.Map(print)
    )


key值: first_question, 輸入為: 105.0 輸出為: 525.0045166015625
key值: second_question, 輸入為: 108.0 輸出為: 540.004638671875
key值: third_question, 輸入為: 1000.0 輸出為: 5000.04248046875
key值: fourth_question, 輸入為: 1013.0 輸出為: 5065.04296875


#### 準備十倍模型路徑

In [26]:
torch_ten_times_model_handler = PytorchModelHandlerTensor(state_dict_path=save_model_dir_multiply_ten,
                                        model_class=LinearRegression,
                                        model_params={'input_dim': 1,
                                                      'output_dim': 1}
                                        )
keyed_torch_ten_times_model_handler = KeyedModelHandler(torch_ten_times_model_handler)


#### 多模型管道測試

In [27]:
pipeline_options = PipelineOptions().from_dictionary({'temp_location':f'gs://{bucket}/tmp'})

pipeline = beam.Pipeline(options=pipeline_options)

read_from_bq = beam.io.ReadFromBigQuery(table=table_spec)

with pipeline as p:
  multiply_five = (
      p
      |  read_from_bq
      | "CreateMultiplyFiveTuple" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 5'), x['value']))
      | "ConvertNumpyToTensorFiveTuple" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
      | "RunInferenceTorchFiveTuple" >> RunInference(keyed_torch_five_times_model_handler)
  )
  multiply_ten = (
      p
      | read_from_bq
      | "CreateMultiplyTenTuple" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 10'), x['value']))
      | "ConvertNumpyToTensorTenTuple" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
      | "RunInferenceTorchTenTuple" >> RunInference(keyed_torch_ten_times_model_handler)
  )

  inference_result = ((multiply_five, multiply_ten) | beam.Flatten()
                                 | beam.ParDo(PredictionWithKeyProcessor()))
  inference_result | beam.Map(print)




key值: second_example * 5, 輸入為: 108.0 輸出為: 540.004638671875
key值: fourth_example * 5, 輸入為: 1013.0 輸出為: 5065.04296875
key值: first_example * 5, 輸入為: 105.0 輸出為: 525.0045166015625
key值: third_example * 5, 輸入為: 1000.0 輸出為: 5000.04248046875
key值: second_example * 10, 輸入為: 108.0 輸出為: 1015.0565795898438
key值: fourth_example * 10, 輸入為: 1013.0 輸出為: 9445.9580078125
key值: first_example * 10, 輸入為: 105.0 輸出為: 987.1088256835938
key值: third_example * 10, 輸入為: 1000.0 輸出為: 9324.8505859375


#### 排程多模型預測

In [28]:
def process_interim_inference(element):
    key, prediction_result = element
    input_value = prediction_result.example
    inference = prediction_result.inference
    formatted_input_value = 'original input is `{} {}`'.format(key, input_value)
    return formatted_input_value, inference


pipeline_options = PipelineOptions().from_dictionary(
                                      {'temp_location':f'gs://{bucket}/tmp'})
pipeline = beam.Pipeline(options=pipeline_options)

with pipeline as p:
  multiply_five = (
      p
      | beam.io.ReadFromBigQuery(table=table_spec)
      | "CreateMultiplyFiveTuple" >> beam.Map(lambda x: (x['key'], x['value']))
      | "ConvertNumpyToTensorFiveTuple" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
      | "RunInferenceTorchFiveTuple" >> RunInference(keyed_torch_five_times_model_handler)
  )

  inference_result = (
    multiply_five
      | "ExtractResult" >> beam.Map(process_interim_inference)
      | "RunInferenceTorchTenTuple" >> RunInference(keyed_torch_ten_times_model_handler)
      | beam.ParDo(PredictionWithKeyProcessor())
    )
  inference_result | beam.Map(print)




key值: original input is `second_example tensor([108.])`, 輸入為: 540.004638671875 輸出為: 5039.57421875
key值: original input is `fourth_example tensor([1013.])`, 輸入為: 5065.04296875 輸出為: 47194.4375
key值: original input is `first_example tensor([105.])`, 輸入為: 525.0045166015625 輸出為: 4899.83447265625
key值: original input is `third_example tensor([1000.])`, 輸入為: 5000.04248046875 輸出為: 46588.8984375
