- 20230126 : 원천데이터 로드 로직 수정 -> opn_date <= yyyymmdd < cls_date
- 자동적재용 날짜 확인 후 적재

# 라이브러리 로드

In [28]:
import google.cloud.aiplatform as aip

from typing import NamedTuple

import kfp
from kfp.v2 import dsl, compiler
from kfp.v2.dsl import Artifact, Input, component, Output, Dataset, Metrics

from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp

from jedi import settings
settings.case_insensitive_completion = True

# 파이프라인 설정

In [29]:
PROJECT_ID = "mpp-biz-prd"
REGION = "asia-northeast3"
BUCKET_URI = f"gs://gcs-mpp-prd"
PIPELINE_ROOT = "{}/analytics-pipeline/3.Cannibalization".format(BUCKET_URI)
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

In [30]:
aip.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

In [31]:
# !gsutil ls -al $BUCKET_URI

# 컴포넌트 생성

- 매장간 이격거리함수 (자동적재)

In [25]:
@component(
    packages_to_install=['google-cloud-bigquery', 'google-cloud-storage', 'google-oauth2-tool', 'pandas==1.3.5', 'numpy==1.21.6', 'db-dtypes', 'fsspec', 'gcsfs'
                        ,'haversine==2.7.0'], 
    base_image='python:3.7.13',
    output_component_file='get_near_stor_distance.yaml'
)
def read_and_preprocess():
    
    import pandas as pd
    import numpy as np
    import json
    from datetime import datetime
    from haversine import haversine
    from google.cloud import bigquery
    from google.cloud import storage
    from google.oauth2 import service_account
    from datetime import date, timedelta, datetime
    from pytz import timezone

    '''
        - 빅쿼리로부터 원천 테이블 읽어오는 함수
    '''
    def get_near_stor_distance(load_day) -> pd.DataFrame():

        # gcs bucket으로부터 key.json 불러오기
        storage_client = storage.Client()
        bucket = storage_client.get_bucket('gcs-mpp-prd')
        blob = bucket.blob('analytics-pipeline/1.deficit_prediction/mpp-biz-prd-3758b372e68f.json')

        gcs_json_dict = json.loads(blob.download_as_string(client=None))

        # Credentials 객체 생성
        credentials = service_account.Credentials.from_service_account_info(gcs_json_dict)

        # GCP 클라이언트 객체 생성
        client = bigquery.Client(credentials=credentials, project=credentials.project_id)
        
        df = f"""
        SELECT A.partition_date, A.YYYYMMDD, A.STOR_CD, A.DT_YN, B.LONGITUDE, B.LATITUDE, A.OPN_DATE, A.CLS_DATE, B.ROADNM_ADDR, B.ROADNM_ADDR_DTL,
        CASE 
        WHEN A.ADMDIST_SIDO_NM IN ('서울','경기','대전','대구','부산','울산','광주','인천') THEN '50'
        WHEN A.ADMDIST_SIGUNGU_NM IN ('청주시','천안시','전주시','포항시','구미시','창원시','김해시','제주시') THEN '50'
        ELSE '250'
        END AS GRID
        FROM `mpp-biz-prd.SCK_DW.DM_STOR_S` AS A, `mpp-biz-prd.SCK_DW.DM_XO_STOR` AS B
        WHERE A.STOR_CD = B.STOR_CD
        AND partition_date = '{load_day}'
        """

        df = client.query(df).result()

        # 원천 데이터프레임 변환
        df1 = df.to_dataframe()
        
        # 230126 조건 추가
        df1 = df1[(df1['YYYYMMDD'] >= df1['OPN_DATE']) & (df1['YYYYMMDD'] < df1['CLS_DATE'])].reset_index(drop=True)
        
        # 기준 데이터프레임 반환
        standard_df = df1[df1.columns.difference(['DT_YN'])]

        '''
        1km 내 인접 점포 list 추가
        '''

        df2= df1[['YYYYMMDD','STOR_CD','LONGITUDE','LATITUDE','GRID']]

        # 매장별 거리구하는 코드
        imsi_stor_cd=[] ; imsi_stor_cd_2=[] ; imsi_distance=[]
        for idx in df2.itertuples():
            start = (float(idx.LATITUDE), float(idx.LONGITUDE) ) # (lat, lon)

            for imsi in df2.itertuples():
                goal = (float(imsi.LATITUDE),float(imsi.LONGITUDE))
                imsi_stor_cd.append(idx.STOR_CD)
                imsi_stor_cd_2.append(imsi.STOR_CD)
                imsi_distance.append(haversine(start, goal,unit='m'))

        df3 = pd.DataFrame(data={'STOR_CD' : imsi_stor_cd, 'STOR_CD_2' : imsi_stor_cd_2, 'DISTANCE' : imsi_distance})
        df4 = pd.merge(df3,df1[['STOR_CD','GRID']],how='left', on='STOR_CD')
        df5 = df4.loc[((df4.DISTANCE<=1000) & (df4.DISTANCE!=0)) | ((df4.STOR_CD != df4.STOR_CD_2) & (df4.DISTANCE==0))]
        df6 = df5.sort_values(by=['STOR_CD', 'DISTANCE']).reset_index(drop=True)

        # 그리드50기준
        df7 = df6[df6['GRID']=='50']
        near_stor_list_500 = [] ; near_stor_list_1000 = []
        for idx in df7.itertuples():

            # 인접 점포
            single_near_stor_list = df7[(df7['STOR_CD'] == idx.STOR_CD) & (df7['DISTANCE'] <= 500) ]['STOR_CD_2'].to_list()
            single_near_stor_list =",".join(single_near_stor_list)
            near_stor_list_500.append(single_near_stor_list)

            single_near_stor_list = df7[(df7['STOR_CD'] == idx.STOR_CD) & (df7['DISTANCE'] <= 1000) ]['STOR_CD_2'].to_list()
            single_near_stor_list =",".join(single_near_stor_list)
            near_stor_list_1000.append(single_near_stor_list)

        df7 = df7.copy()
        df7['NEAR_STOR_2'] = near_stor_list_500
        df7['NEAR_STOR_3'] = near_stor_list_1000
        df7['NEAR_STOR_GROUP'] = near_stor_list_500


        # 그리드250기준
        df8 = df6[df6['GRID']=='250']
        near_stor_list_500 = [] ; near_stor_list_1000 = []
        for idx in df8.itertuples():

            # 인접 점포
            single_near_stor_list = df8[(df8['STOR_CD'] == idx.STOR_CD) & (df8['DISTANCE'] <= 500)]['STOR_CD_2'].to_list()
            single_near_stor_list =",".join(single_near_stor_list)
            near_stor_list_500.append(single_near_stor_list)

            single_near_stor_list_2 = df8[(df8['STOR_CD'] == idx.STOR_CD) & (df8['DISTANCE'] <= 1000)]['STOR_CD_2'].to_list()
            single_near_stor_list_2 =",".join(single_near_stor_list_2)
            near_stor_list_1000.append(single_near_stor_list_2)

        df8 = df8.copy()
        df8['NEAR_STOR_2'] = near_stor_list_500
        df8['NEAR_STOR_3'] = near_stor_list_1000
        df8['NEAR_STOR_GROUP'] = near_stor_list_1000

        df9 = pd.concat([df7,df8])

        '''
        5km 내 인접 DT 점포 list 추가
        '''

        df_2_dt= df1[['STOR_CD','DT_YN','LONGITUDE','LATITUDE']]

        # 매장 - DT 매장과의 거리구하는 코드
        imsi_stor_cd=[] ; imsi_stor_cd_2=[] ; imsi_distance=[]
        for idx in df_2_dt.itertuples():
            start = (float(idx.LATITUDE), float(idx.LONGITUDE) ) # (lat, lon)

            for imsi in df_2_dt.itertuples():
                if df_2_dt['DT_YN'][imsi.Index] == 'Y':
                    goal = (float(imsi.LATITUDE),float(imsi.LONGITUDE))
                    imsi_stor_cd.append(idx.STOR_CD)
                    imsi_stor_cd_2.append(imsi.STOR_CD)
                    imsi_distance.append(haversine(start, goal,unit='m'))

                else: 
                    continue

        df_3_dt = pd.DataFrame(data={'STOR_CD' : imsi_stor_cd, 'STOR_CD_2' : imsi_stor_cd_2, 'DISTANCE' : imsi_distance})
        df_4_dt = df_3_dt.loc[(df_3_dt.DISTANCE<=5000) & (df_3_dt.DISTANCE!=0)]
        df_5_dt = df_4_dt.sort_values(by=['STOR_CD', 'DISTANCE']).reset_index(drop=True)

        near_stor_list_5000 = []
        for idx in df_5_dt.itertuples():

            # 인접 점포
            single_near_stor_list = df_5_dt[(df_5_dt['STOR_CD'] == idx.STOR_CD) & (df_5_dt['DISTANCE'] <= 5000) ]['STOR_CD_2'].to_list()
            single_near_stor_list =",".join(single_near_stor_list) # 공백 제거
            near_stor_list_5000.append(single_near_stor_list)

        df_5_dt = df_5_dt.copy()
        df_5_dt['NEAR_STOR_GROUP_DT'] = near_stor_list_5000
        df_6_dt = df_5_dt[['STOR_CD', 'NEAR_STOR_GROUP_DT']]

        stor_distance_df1 = pd.merge(df9, df_6_dt, how='outer', on='STOR_CD')
        final_df = pd.merge(stor_distance_df1, standard_df, how='left', on='STOR_CD')
        final_df = final_df.drop_duplicates().reset_index(drop=True) 

        # 최종 df 생성
        final_df.drop('GRID_x', axis=1, inplace=True)
        final_df.rename(columns={'GRID_y' : 'GRID', 'STOR_CD_2' : 'NEAR_STOR_1'
                            ,'ROADNM_ADDR' : 'STOR_ADDR_1', 'ROADNM_ADDR_DTL' : 'STOR_ADDR_2'}, inplace=True)
        final_df = final_df[['YYYYMMDD','STOR_CD','NEAR_STOR_1','LONGITUDE','LATITUDE','OPN_DATE','CLS_DATE','STOR_ADDR_1','STOR_ADDR_2'
                         ,'DISTANCE','NEAR_STOR_2','NEAR_STOR_3','NEAR_STOR_GROUP','GRID','NEAR_STOR_GROUP_DT']]

        final_df = final_df.astype({'OPN_DATE':'str','CLS_DATE':'str','YYYYMMDD':'str'})
        final_df = final_df.replace('',np.nan)
        
        return final_df

    # 적재 날짜 컬럼 생성
    viewing_day = datetime.now(timezone('Asia/Seoul'))
    viewing_day = datetime(year=viewing_day.year, month=viewing_day.month, day=viewing_day.day).date()
    load_day = viewing_day - timedelta(days=1)
    
    final_df = get_near_stor_distance(load_day)
    
    
    storage_client = storage.Client()
    bucket = storage_client.get_bucket('gcs-mpp-prd')
    blob = bucket.blob('analytics-pipeline/1.deficit_prediction/mpp-biz-prd-3758b372e68f.json')

    gcs_json_dict = json.loads(blob.download_as_string(client=None))

    # Credentials 객체 생성
    credentials = service_account.Credentials.from_service_account_info(gcs_json_dict)

    # GCP 클라이언트 객체 생성
    client = bigquery.Client(credentials=credentials, project=credentials.project_id)

    # 테이블 append
    job = client.load_table_from_dataframe(final_df,'mpp-biz-prd.SCK_MPP.FT_MPP_ANALY_STOR_DISTANCE')
    job.result()

    
    

    # final_df.to_csv(final_retention_dataset.path + '.csv', encoding = 'utf-8-sig')

# 파이프라인 구성

In [26]:
@dsl.pipeline(
    name='stor-distance-pipeline',
    description='Get near store and distance',
    pipeline_root=PIPELINE_ROOT
)
def make_pipeline():
    
    from google.cloud import storage
    from google.oauth2 import service_account
    
    import json

    storage_client = storage.Client()
    bucket = storage_client.get_bucket('gcs-mpp-prd')
    blob = bucket.blob('analytics-pipeline/recipients.json')

    RECIPIENTS_LIST = json.loads(blob.download_as_string(client=None))
    notify_email_task = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST)

    with dsl.ExitHandler(notify_email_task):
    
        # 1. 빅쿼리 데이터 로드 및 전처리 수행
        final_df = read_and_preprocess()

In [27]:
compiler.Compiler().compile(
    pipeline_func=make_pipeline,
    package_path="get_near_stor_distance.json".replace(" ", "_")
)

In [23]:
# 파이프라인 에러 확인시만 사용

In [24]:
# job = aip.PipelineJob(
#     display_name='get_near_stor_distance',
#     template_path="get_near_stor_distance.json".replace(" ", "_"),
#     pipeline_root=PIPELINE_ROOT,
#     location=REGION,
#     enable_caching=False
# )

# job.submit(service_account='mpp-nt-danny-prd@mpp-biz-prd.iam.gserviceaccount.com')

Creating PipelineJob
PipelineJob created. Resource name: projects/397824194879/locations/asia-northeast3/pipelineJobs/stor-distance-pipeline-20230127021259
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/397824194879/locations/asia-northeast3/pipelineJobs/stor-distance-pipeline-20230127021259')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/asia-northeast3/pipelines/runs/stor-distance-pipeline-20230127021259?project=397824194879


### Cloud Function 코드
- function 만들 때 서비스 계정 danny-prd로 생성

In [None]:
# import json
# from google.cloud import aiplatform
# PROJECT_ID = 'mpp-biz-prd'         # <---CHANGE THIS
# REGION = 'asia-northeast3'                 # <---CHANGE THIS
# PIPELINE_ROOT = 'gs://gcs-mpp-prd/analytics-pipeline/3.Cannibalization'   # <---CHANGE THIS

# def process_request(request):
#   aiplatform.init(
#     project=PROJECT_ID,
#     location=REGION,
#   )

#   job = aiplatform.PipelineJob(
#     display_name=f'NEAR_STOR_DISTANCE',
#     template_path='gs://gcs-mpp-prd/analytics-pipeline/3.Cannibalization/get_near_stor_distance.json',
#     pipeline_root=PIPELINE_ROOT,
#     location=REGION,
#     enable_caching=False
#   )

#   job.submit(service_account='mpp-nt-danny-prd@mpp-biz-prd.iam.gserviceaccount.com')

#   return "Job submitted"

In [None]:
# google-api-python-client>=1.7.8,<2
# google-cloud-aiplatform

In [None]:
# from kfp.v2.google.client import AIPlatformClient

# api_client = AIPlatformClient(
#                 project_id=PROJECT_ID,
#                 region=REGION,
#                 )

# SERVICE_ACCOUNT = (
#     "mpp-nt-danny-dev@mpp-biz-dev.iam.dkslrgserviceaccount.com") # Replace the Xs with your generated service-account.
# response = api_client.create_schedule_from_job_spec(
#     enable_caching=False,
#     job_spec_path="month_retention_pipeline.json",
#     #schedule="0 1 * * 1-7", 
#     schedule = "0 0 1 * *",
#     time_zone="Asia/Seoul",  # change this as necessary
#     # parameter_values={"display_name": 'month_retention_pipeline'},
#     pipeline_root=PIPELINE_ROOT,  
#     service_account=SERVICE_ACCOUNT    
# )