# Airflow 에러 Logging에서 찾기

## 설명 Airflow 에러 Logging에서 Query로 [찾기](https://docs.google.com/spreadsheets/d/1sUmpuh03AqFGqdjJa9AzcWhLDUzxaMXhGNdyGxA6wLs/edit#gid=0) 

In [None]:
-- environment
resource.labels.project_id="wavvedp-prd"
resource.type="cloud_composer_environment"
resource.labels.environment_name="wavvedp-prd-bq-etl"
severity=ERROR

-- airflow info
labels."task-id"="job_skbcdn"
labels.workflow="WAVVE_LOG.MEDIA_TRAFFIC"

--time in UTC
timestamp>= "2022-12-21T07:30:00" and timestamp <= "2022-12-21T07:36:00"

# Azure Blob to GCS 파일 업로드 (이름 변경 & 기존 파일 삭제)

## 설명 Azure Blob -> Jupyter Notebook (local) -> GCS 로 업로드

In [None]:
import sys
import logging

from datetime import datetime, timedelta
from pytz import timezone
import os

from time import sleep

from azure.storage.blob import BlobClient, BlobServiceClient
from google.cloud import storage
from google.cloud import bigquery


# LOGGING
log = logging.getLogger('azure_to_gcs bookmark-cassandra')
log.setLevel(logging.INFO)

# handler = logging.StreamHandler(stream=sys.stdout)
file_name = sys.argv[3]
handler = logging.FileHandler(file_name)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] %(filename)s:[%(lineno)d] : %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)


# Azure infos
AZURE_BLOB_CONN_STR = '''ACCOUNTKEY'''
CONTAINER = '''CONTAINER_NAME'''
AZURE_BLOB_PATH_FORMAT = '''PREFIX/ymd={yyyymmdd}/hh={hh}/channeltype={channel_type}'''

#FILENAME_FORMAT = 'bookmark_{local_name}'
FILENAME_FORMAT = 'bookmark_{yyyymmdd}_{hh}_{channeltype}_{local_name}'

# GCP infos
# Jupyter 서버 로컬 경로
#dir_name = sys.argv[3]
LOCAL_PATH_FORMAT = '''/home/jupyter/src/Files/tmp_blob/warehouse/bookmark/cassandra/{local_path}'''

# GCS 경로
GCS_BUCKET_NAME = '''BUCKET_NAME'''
GCS_PATH_FORMAT = '''PREFIX/yyyy={yyyy}/mm={mm}/dd={dd}/hh={hh}/{path}'''


# Fuctions
# Azure Blob에서 다운로드
def download_az_blob(yyyymmdd, hh, channel_type):
    
    azure_blob_path = AZURE_BLOB_PATH_FORMAT.format(yyyymmdd=yyyymmdd, hh=hh, channel_type=channel_type)

    blobServiceClient = BlobServiceClient.from_connection_string(conn_str=AZURE_BLOB_CONN_STR)
    container_client = blobServiceClient.get_container_client(CONTAINER)
    blob_list = container_client.list_blobs(name_starts_with=azure_blob_path)
    
    blob_name_list = []
    local_path_dict = {}
    
    for blob in blob_list:
        if(not blob.name.endswith('.c000')):
            continue
        blob_name_list.append(blob.name)
    
        # local path 만들어 저장
        tmp_local_path = FILENAME_FORMAT.format(yyyymmdd=yyyymmdd,hh=hh, channeltype=channel_type,local_name=os.path.basename(blob.name)) #FILENAME_FORMAT.format(local_name=os.path.basename(blob.name))
    
        local_path = LOCAL_PATH_FORMAT.format( local_path=tmp_local_path)
        local_path_dict[blob.name] = local_path
    
    blobServiceClient.close()
    
    for blob_name in blob_name_list:
        blob_client = BlobClient.from_connection_string(conn_str=AZURE_BLOB_CONN_STR, container_name=CONTAINER, blob_name=blob_name)
    
        local_path = local_path_dict[blob_name]
        with open(local_path, "wb") as my_blob:
            blob_data = blob_client.download_blob()
            blob_data.readinto(my_blob)

        blob_client.close()

    log.info(' ===== download OK!')
    
    return local_path_dict

# GCS에 이미 존재하는 파일 리스트 출력
def get_gcs_list(gcs_path):
    all_list = []
    storage_client = storage.Client()
    bucket = storage_client.bucket(GCS_BUCKET_NAME)
    blobs = bucket.list_blobs(prefix=gcs_path)
    for blob in blobs:
        all_list.append(blob.name)
    return all_list

# GCS 파일 삭제
def del_gcs_file(gcs_path):
    storage_client = storage.Client()
    bucket = storage_client.bucket(GCS_BUCKET_NAME)
    file = bucket.blob(gcs_path)
    file.delete()
    storage_client.close()

# GCS 업로드
def upload_to_gcs(local_path, gcs_path):
    storage_client = storage.Client()
    bucket = storage_client.bucket(GCS_BUCKET_NAME)
    blob = bucket.blob(gcs_path)
    blob.upload_from_filename(local_path)


# 로컬 파일 삭제
def del_local_file(local_path):
    if os.path.exists(local_path):
        os.remove(local_path)

def delete_process(yyyymmdd, hh):
    yyyy = yyyymmdd[:4]
    mm = yyyymmdd[4:6]
    dd = yyyymmdd[6:]
    
    gcs_folder = GCS_PATH_FORMAT.replace('/{path}','').format(yyyy=yyyy, mm=mm, dd=dd, hh=hh)
    get_list = get_gcs_list(gcs_folder)
    
    for item in get_list:
        del_gcs_file(item)
        log.info("file deleted : {item}".format(item=item))

        

def process(channel_type, yyyymmdd, hh):
    yyyy = yyyymmdd[:4]
    mm = yyyymmdd[4:6]
    dd = yyyymmdd[6:]
    
    log.info("{hh} - {channel_type}".format(hh=hh, channel_type=channel_type))
    log.info(" === 1. download - from az blob ")
    local_path_dict = download_az_blob(yyyymmdd, hh, channel_type)
    
    log.info(" === 2. upload - to gcs ")
    values = local_path_dict.values()
    for local_path in values:
        #GCS_PATH_FORMAT = '''az_raw_file/bookmark/yyyy={yyyy}/mm={mm}/dd={dd}/hh={hh}/{path}'''
        gcs_path = GCS_PATH_FORMAT.format(yyyy=yyyy, mm=mm, dd=dd, hh=hh, path=os.path.basename(local_path))
        
        upload_to_gcs(local_path, gcs_path)
        del_local_file(local_path)
        
    log.info(' ===== upload OK!')


if __name__ == '__main__':
    
    date1 = sys.argv[1] # '2022-08-05'
    date2 = sys.argv[2] # '2022-08-01'
    
    # ternimal : python3 Bookmark_blob_to_gcs-cassandra_v2.py 2020-06-11 2021-06-09 /home/jupyter/logs/azure_to_gcs/bookmark_blob_to_gcs_cassandra_v2.log
                
    
    start_date = datetime.strptime(date1, '%Y-%m-%d') # 2022-08-05
    end_date = datetime.strptime(date2, '%Y-%m-%d') # 2022-08-01
    day_delta = timedelta(days=1)

    log.info("start_date :: {start_date}".format(start_date=start_date))
    log.info("end_date :: {end_date}".format(end_date=end_date))
    
    while start_date >= end_date:
        
        yyyymmdd = start_date.strftime('%Y%m%d')
        log.info(" === yyyymmdd :: {yyyymmdd}".format(yyyymmdd=yyyymmdd))
        
        for i in range(0, 24):
            
            hh = "{:02d}".format(i)
            log.info(" ===== HOUR :: {hh}".format(hh=hh))

            try:
                delete_process(yyyymmdd=yyyymmdd, hh=hh)
                
                channel_type = 'V'
                process(channel_type=channel_type, yyyymmdd=yyyymmdd, hh=hh)

                channel_type = 'L'
                process(channel_type=channel_type, yyyymmdd=yyyymmdd, hh=hh)

                channel_type = 'M'
                process(channel_type=channel_type, yyyymmdd=yyyymmdd, hh=hh)

                channel_type = 'E'
                process(channel_type=channel_type, yyyymmdd=yyyymmdd, hh=hh)
            except Exception as e: # channel type 'E'가 없는 시간대가 종종 있음
                logger.error(e)
                pass # 예외 무시
        
        start_date -= day_delta # 2022-08-05 -> 2022-08-04
        log.info(" ----- \n")
