# Teradata Dataframeを主体としたELT(Extract/Load/Transform)バッチ処理のサンプル

## 1. 背景と目的

teradatamlパッケージに含まれるTeradata Dataframeは大変強力な機能を有している。
もちろんデータ分析で有用であるが、同時にバッチ処理でも有効であると考えられる。

伝統的なTeradataのバッチ処理は、Shell Script/Bteq Scriptで記述されている。しかし、左記の組み合わせは、記述力が弱く冗長、煩雑になりがちである。
これに対して、teradatamlのTeradata Dataframeは、当然ながらPythonでのプログラミングが可能であり、これは大きな可能性を持っている。

1. 各種の関数化、ライブラリ化が可能となる
2. PyTestなどの強力なテストフレームワークを利用することが可能になる

つまり、これまでの手動テストや、同じような処理の重複した記述を廃することができる。これにより、モダンなソフトウェア開発の手法が適用可能となる。
ひいては開発生産性の大幅な向上が期待できる。

本稿では、このバッチ処理開発のモダン化の可能性を考察する。

## 2. 方針

以下をサンプルとして実装し、機能性や記述性を考察する。

1. 典型的なELTバッチ処理の以下を実装する
   - CSVロード
   - ジョインによる変換とフィルタリング
   - SUMなどの集約とテーブル出力
2. Python UDF(User Defined Function)による複雑なデータ変換

## 3. 実装

### 3-0. DB接続

In [1]:
user = 'dbc'
password = 'dbc'
host = "192.168.11.9"
database = "example"
dbs_port = 1025

In [2]:
import sqlalchemy
engine = sqlalchemy.create_engine((
  f"teradatasql://{user}:{password}@{host}/?"
  f"&database={database}"
  f"&dbs_port={dbs_port}"
))

In [3]:
import teradataml as tdml
context = tdml.create_context(tdsqlengine=engine)

### 3-1. CSVロード

二つのCSVファイルをロードする
1. Irisデータセット
   - 機械学習のベンチマークで用いられる一般的なデータ。[scikit-learn](https://scikit-learn.org/stable/)で簡単に入手可能
   - FastLoadを利用する
2. ダミーのマスターデータ
   - 何らかジョインを試したいので、追加したデータ
   - FastLoadを利用しない

#### Irisデータセット

In [4]:
# ダウンロードして、Pandas Dataframeとして保持する
import pandas as pd
from sklearn import datasets

iris = datasets.load_iris()
pd_df = pd.DataFrame(iris.data, columns=iris.feature_names)
pd_df['target'] = iris.target_names[iris.target]
pd_df.columns = ['sepallength', 'sepalwidth', 'petallength', 'petalwidth', 'target']
pd_df.insert(loc=0, column='idx', value=pd_df.index + 1)
pd_df

Unnamed: 0,idx,sepallength,sepalwidth,petallength,petalwidth,target
0,1,5.1,3.5,1.4,0.2,setosa
1,2,4.9,3.0,1.4,0.2,setosa
2,3,4.7,3.2,1.3,0.2,setosa
3,4,4.6,3.1,1.5,0.2,setosa
4,5,5.0,3.6,1.4,0.2,setosa
...,...,...,...,...,...,...
145,146,6.7,3.0,5.2,2.3,virginica
146,147,6.3,2.5,5.0,1.9,virginica
147,148,6.5,3.0,5.2,2.0,virginica
148,149,6.2,3.4,5.4,2.3,virginica


In [5]:
# CSVファイルに出力する
pd_df.to_csv('iris.csv', index=False)

In [6]:
# FastLoadする
# 参考 https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-Package-for-Python-User-Guide/teradataml-General-Functions/Data-Transfer-Utility/Loading-Data-from-CSV-to-Vantage/read_csv

from collections import OrderedDict
from teradatasqlalchemy import INTEGER, VARCHAR, FLOAT

# Delete All
with engine.connect() as conn:
    conn.execute(sqlalchemy.text("delete from iris"))

# FastLoad
types = OrderedDict(
    idx=INTEGER,
    sepallength=FLOAT,
    sepalwidth=FLOAT,
    petallength=FLOAT, 
    petalwidth=FLOAT,
    target=VARCHAR,
)
tdml.read_csv('iris.csv', table_name='iris_temp', types=types)

with engine.connect() as conn:
    conn.execute(sqlalchemy.text("insert into iris select * from iris_temp"))
    
# ロード結果のサンプルを表示する
tdml.DataFrame.from_table("iris")

idx,sepallength,sepalwidth,petallength,petalwidth,target
141,6.7,3.1,5.6,2.4,virginica
99,5.1,2.5,3.0,1.1,versicolor
17,5.4,3.9,1.3,0.4,setosa
139,6.0,3.0,4.8,1.8,virginica
15,5.8,4.0,1.2,0.2,setosa
137,6.3,3.4,5.6,2.4,virginica
118,7.7,3.8,6.7,2.2,virginica
120,6.0,2.2,5.0,1.5,virginica
101,6.3,3.3,6.0,2.5,virginica
122,5.6,2.8,4.9,2.0,virginica


#### ダミーのマスターデータ

In [7]:
# Pandas Dataframeをロードする

import pandas as pd

from teradataml.dataframe.copy_to import copy_to_sql

category = [{'target': 'setosa', 'category': 'CAT-S'},
            {'target': 'versicolor', 'category': 'CAT-V'},
            {'target': 'versicolor', 'category': 'CAT-V'}]
pd_category = pd.DataFrame(category)

copy_to_sql(df = pd_category, table_name = "category", primary_index="target", if_exists="replace")

# ロード結果のサンプルを表示する
tdml.DataFrame.from_table("category")

  TIMESTAMP(timezone=True) if pt.is_datetime64_ns_dtype(df.dtypes[key])
  else _get_sqlalchemy_mapping_types(str(df.dtypes[key]))
  TIMESTAMP(timezone=True) if pt.is_datetime64_ns_dtype(df.dtypes[key])
  else _get_sqlalchemy_mapping_types(str(df.dtypes[key]))


target,category
versicolor,CAT-V
versicolor,CAT-V
setosa,CAT-S


### 3-2. ジョインによる変換とフィルタリング

#### ジョイン

In [8]:
# ジョインを実行する。この時点では、まだ、実際のSQLが実行される訳ではない (遅延評価)
td_iris = tdml.DataFrame.from_table("iris")
td_category = tdml.DataFrame.from_table("category")
td_joined = td_iris.join(other=td_category, on=["target"], how = "inner", lsuffix = "t1", rsuffix = "t2")

In [9]:
# 実際に実行されるSQLを表示してみる
td_joined.show_query()

'select "idx", "sepallength", "sepalwidth", "petallength", "petalwidth", "lhs"."target" as "target_t1", "rhs"."target" as "target_t2", "category" from "iris" as lhs inner join "category" as rhs on "lhs"."target" = "rhs"."target"'

#### フィルタリング

In [10]:
td_filtered = td_joined[(td_joined.sepallength > 4.5) & (td_joined.petalwidth > 0.1)]
td_filtered.show_query()

'select * from (select "idx", "sepallength", "sepalwidth", "petallength", "petalwidth", "lhs"."target" as "target_t1", "rhs"."target" as "target_t2", "category" from "iris" as lhs inner join "category" as rhs on "lhs"."target" = "rhs"."target") as temp_table where sepallength > CAST(4.5 as FLOAT) AND petalwidth > CAST(0.1 as FLOAT)'

### 3-3. Python UDF(User Defined Function)による複雑なデータ変換

In [11]:
# UDFを定義する
from teradataml.dataframe.functions import udf

@udf(returns=INTEGER()) 
def udf_example(sepallength, sepalwidth, petallength, petalwidth):
    if sepallength / sepalwidth > petallength / petalwidth:
        return 1
    return 0

In [15]:
# UDFを適用する
res = td_filtered.assign(udf_example = udf_example('sepallength', 'sepalwidth', 'petallength', 'petalwidth'))

TeradataMlException: [Teradata][teradataml](TDML_2102) Failed to execute SQL: '[Version 20.0.0.21] [Session 1105] [Teradata Database] [Error 9134] Error in function SCRIPT: SCRIPT_COMMAND returned exit value 127 - errmsg "bash: /var/opt/teradata/languages/sles12sp3/Python//bin/python3: No such file or directory"
 at gosqldriver/teradatasql.formatError ErrorUtil.go:92
 at gosqldriver/teradatasql.(*teradataConnection).formatDatabaseError ErrorUtil.go:252
 at gosqldriver/teradatasql.(*teradataConnection).makeChainedDatabaseError ErrorUtil.go:268
 at gosqldriver/teradatasql.(*teradataConnection).processErrorParcel TeradataConnection.go:751
 at gosqldriver/teradatasql.(*TeradataRows).processResponseBundle TeradataRows.go:2308
 at gosqldriver/teradatasql.(*TeradataRows).executeSQLRequest TeradataRows.go:874
 at gosqldriver/teradatasql.newTeradataRows TeradataRows.go:720
 at gosqldriver/teradatasql.(*teradataStatement).QueryContext TeradataStatement.go:122
 at gosqldriver/teradatasql.(*teradataConnection).QueryContext TeradataConnection.go:1261
 at database/sql.ctxDriverQuery ctxutil.go:48
 at database/sql.(*DB).queryDC.func1 sql.go:1776
 at database/sql.withLock sql.go:3530
 at database/sql.(*DB).queryDC sql.go:1771
 at database/sql.(*Conn).QueryContext sql.go:2027
 at main.createRows goside.go:1080
 at main.goCreateRows goside.go:959
 at _cgoexp_e3ee842aae7c_goCreateRows _cgo_gotypes.go:414
 at runtime.cgocallbackg1 cgocall.go:403
 at runtime.cgocallbackg cgocall.go:322
 at runtime.cgocallback asm_amd64.s:1079
 at runtime.goexit asm_amd64.s:1695'

↑↑↑UDFはうまく動作しなかったが、Teradata Expressの問題と考えられる。実機ではうまくいくはず...

### 3-4. SUMなどの集約とテーブル出力

#### 集約

In [17]:
td_aggregated = td_filtered.groupby("category").sum()
td_aggregated.show_query()

'select "category", sum(idx) AS sum_idx, sum(sepallength) AS sum_sepallength, sum(sepalwidth) AS sum_sepalwidth, sum(petallength) AS sum_petallength, sum(petalwidth) AS sum_petalwidth from (select * from (select "idx", "sepallength", "sepalwidth", "petallength", "petalwidth", "lhs"."target" as "target_t1", "rhs"."target" as "target_t2", "category" from "iris" as lhs inner join "category" as rhs on "lhs"."target" = "rhs"."target") as temp_table where sepallength > CAST(4.5 as FLOAT) AND petalwidth > CAST(0.1 as FLOAT)) as temp_table group by "category"'

#### テーブル出力

In [18]:
# テーブルに書き込む
td_aggregated.to_sql("iris_aggegation", if_exists="replace")

# もう一度フェッチして表示する
tdml.DataFrame.from_table("iris_aggegation").to_pandas()

Unnamed: 0,category,sum_idx,sum_sepallength,sum_sepalwidth,sum_petallength,sum_petalwidth
0,CAT-V,7550,593.6,277.0,426.0,132.6
1,CAT-S,1034,208.5,143.2,60.9,10.9
