## Pipeline 용 Customized schema 만들기

- 아래 예시는 한 번이라도 Schema 없이 pipeline 돌려봤다는 가정 하에 가장 최근 Schema를 불러와서 진행
- Schema에 기본적으로 infer 된 사항 외에 추가적인 제약 사항 부여
- 실제 분석 시에는 여러 Stats을 가져와 비교해보고 도메인 지식을 더해 Schema 커스터마이징
- Skew 값의 경우도 비교해보면서 적정값을 찾아야 함
- Skew 관련 참조 내용: https://github.com/jinwoo1990/mlops-with-tensorflow/blob/main/tfdv/tfdv_skew_metrics.ipynb

In [1]:
import os
from dotenv import load_dotenv
import ml_metadata as mlmd
from ml_metadata import metadata_store
from ml_metadata.proto import metadata_store_pb2
from tfx.types import standard_artifacts
import tensorflow_data_validation as tfdv
from tensorflow_metadata.proto.v0 import schema_pb2

In [2]:
ENV_FILE_DIR = os.path.join(os.getcwd(), '.env')
load_dotenv(ENV_FILE_DIR)  # load secret env

True

In [43]:
connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.mysql.host = os.environ.get('MYSQL_HOST')
connection_config.mysql.port = int(os.environ.get('MYSQL_PORT'))
connection_config.mysql.database = os.environ.get('MYSQL_DATABASE')
connection_config.mysql.user = os.environ.get('MYSQL_USERNAME')
connection_config.mysql.password = os.environ.get('MYSQL_PASSWORD')
store = metadata_store.MetadataStore(connection_config,
                                     enable_upgrade_migration=True)
mlmd.downgrade_schema(
    config=connection_config,
    downgrade_to_schema_version=6
)  # 버전 호환 문제로 작성

In [4]:
artifact_types = store.get_artifact_types()
artifact_types = [artifact_type.name for artifact_type in artifact_types]

In [5]:
artifact_types

['Examples',
 'ExampleStatistics',
 'Schema',
 'ExampleAnomalies',
 'TransformGraph',
 'TransformCache',
 'ModelRun',
 'Model',
 'ModelBlessing',
 'ModelEvaluation',
 'PushedModel',
 'HyperParameters']

In [23]:
temp = store.get_artifacts_by_type('ExampleStatistics')

In [41]:
stats_path = 'gs://tfx-project-348306-pipelines/tfx_pipeline_output/advert_pipeline/StatisticsGen/statistics/20'
train_stats_file = os.path.join(stats_path, 'Split-train', 'stats_tfrecord')

In [42]:
a = tfdv.load_statistics(train_stats_file)

OSError: Invalid input path gs://tfx-project-348306-pipelines/tfx_pipeline_output/advert_pipeline/StatisticsGen/statistics/20/Split-train/stats_tfrecord.

In [37]:
tfdv.visualize_statistics(a)

ValueError: lhs_statistics proto contains no dataset.

In [6]:
schema_artifacts = store.get_artifacts_by_type('Schema')

In [7]:
# SchemaGen으로 부터 생성된 Schema Aritfact만 골라주기
schema_uri_list = []

for schema_artifact in schema_artifacts:
    schema_uri = schema_artifact.uri
    if 'SchemaGen' in schema_uri:
        # print(schema_uri)
        schema_uri_list.append(schema_uri)

print(schema_uri_list[-1])

gs://tfx-project-348306-pipelines/tfx_pipeline_output/advert_pipeline/SchemaGen/schema/184


In [8]:
# Schema 불러오기
# 이렇게 할 수도 있고 stats으로부터 불러와서 tfdv로 infer schema 할 수도 있음

schema_file = os.path.join(schema_uri_list[-1], 'schema.pbtxt')
schema = tfdv.load_schema_text(schema_file)

In [9]:
tfdv.display_schema(schema)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'AdTopicLine',BYTES,required,,-
'City',BYTES,required,,-
'Country',BYTES,required,,-
'Timestamp',BYTES,required,,-
'Age',INT,required,,-
'AreaIncome',FLOAT,required,,-
'ClickedOnAd',INT,required,,-
'DailyInternetUsage',FLOAT,required,,-
'DailyTimeSpentOnSite',FLOAT,required,,-
'Male',INT,required,,-


In [10]:
schema

feature {
  name: "AdTopicLine"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "City"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Country"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Timestamp"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Age"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "AreaIncome"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "ClickedOnAd"
  type: INT
  bool_domain {
  }
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    di

In [11]:
tfdv.get_feature(schema, 'Age')

name: "Age"
type: INT
presence {
  min_fraction: 1.0
  min_count: 1
}
shape {
  dim {
    size: 1
  }
}

In [12]:
# domain 설정
tfdv.set_domain(schema, 'Age', schema_pb2.IntDomain(name='age', min=17, max=100))

In [13]:
# skew 설정
for item in ['City', 'Country']:  # Bytes로 Feature가 읽혀서 STRING으로 읽도록 추후에 고치거나 방법 찾아야 할 듯
    tfdv.get_feature(schema, item).skew_comparator.infinity_norm.threshold = 0.00000000000001  # 조정

for item in ['Age', 'AreaIncome', 'DailyInternetUsage', 'DailyTimeSpentOnSite']:
    tfdv.get_feature(schema, item).skew_comparator.jensen_shannon_divergence.threshold = 0.00000000000001  # 조정

In [14]:
schema

feature {
  name: "AdTopicLine"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "City"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  skew_comparator {
    infinity_norm {
      threshold: 1e-14
    }
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Country"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  skew_comparator {
    infinity_norm {
      threshold: 1e-14
    }
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Timestamp"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Age"
  type: INT
  int_domain {
    name: "age"
    min: 17
    max: 100
  }
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  skew_comparator {
    jensen_shannon_divergence {
      threshold: 1e-14
    }
  }
  shape {
    dim {
      size: 1
    

In [15]:
# 파일로 저장
from tensorflow.python.lib.io import file_io

OUTPUT_DIR = "schema"
file_io.recursive_create_dir(OUTPUT_DIR)
schema_file = os.path.join(OUTPUT_DIR, 'schema.pbtxt')
tfdv.write_schema_text(schema, schema_file)

In [16]:
# 불러오기
# loaded_schema = tfdv.load_schema_text('./schema/schema.pbtxt')
# loaded_schema