In [None]:
#@formatter:off
%stop_session
%region {region}
%iam_role arn:aws:iam::{iamNumber}:role/IamRoleGlueNotebookExecution
%worker_type G.1X
%glue_version 4.0
%number_of_workers 3
%idle_timeout 10
%session_id_prefix ash_notebook_session
%profile default
%connections ggpcom-replica-compliance-read
#@formatter:on

In [None]:
import sys
from datetime import datetime, timedelta
from pytz import timezone
import json
import boto3
import functools
import time
import pyspark.sql.functions as F

from pyspark.context import SparkContext  # glue 기동시 필요.

from awsglue import DynamicFrame
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job


class DeDataDiffCheck:
    def __init__(self):
        # AWS Glue job 개발을 위해 필요한 변수 초기화 
        self.sc = SparkContext.getOrCreate()
        self.glueContext = GlueContext(self.sc)
        self.spark = self.glueContext.spark_session
        self.job = Job(self.glueContext)
        self.default_arguments = {
            "--JOB_NAME": "de-player-daily-snapshot",
            "--DB_CONNECTION": "-ggpcom-replica-compliance-read",
            "--DATALAKE_REGION": "{region}"
        }
        self.argv = [*sys.argv,
                     *(a for pair in (kv for kv in self.default_arguments.items() if kv[0] not in sys.argv) for a in
                       pair)]
        self.args = getResolvedOptions(self.argv, ['JOB_NAME', 'DB_CONNECTION', 'DATALAKE_REGION'])
        self.job.init(self.args["JOB_NAME"], self.args)
        # 코드 내 필요한 변수 초기화
        self.db_ = "-ggpcom-replica-compliance-read"
        self.today = datetime.today().strftime('%Y%m%d')
        self.jurisdiction = 'de'
        self.target_bucket = 'compliance-develop'
        self.target_bucket_subpath = f'/player_snapshot/{self.jurisdiction}/{self.today}'  # 의 Data File이 떨어질 위치
        self.slack_title = "Error - DE Diff Check Glue Job"
        self.glue_url = "https://{region}.console.aws.amazon.com/gluestudio/home?region={region}#/editor/job/de-player-daily-snapshot-prod/runs"
        self.glue_job_name = "de-player-daily-snapshot"
        self.email_address = "dd.data.exchange@nsuslab.com"
        self.email_title = "[Error] DE Diff Check Glue Job"
        self.slack_retry_error_msg = f"<br>Slack Retry Error. 최대 횟수인 3번을 넘었습니다.<br>DE Diff Check Glue Job({self.glue_job_name})이 수행되지 않았으니 확인 필요 합니다."
        self.is_test = 0
        self.target_slack_channel = 'notice-dx-alarm-test' if self.is_test else 'notice-compliance-monitoring'
        if self.is_test:
            self.target_bucket_subpath = 'test/' + self.target_bucket_subpath

    def _retry(self, func, **kwargs):
        """
        Slack으로 알람 전달하는 과정에서도 오류가 발생할 경우를 대비 하기 위해 만든 Retry 로직. 
        최대 3번의 Retry 시도 한다.(각 시도는 5초 간격) 최대 Retry 시도 후에도 알람이 전송되지 않을 경우, 최종적으로 Email로 오류를 전달 한다.
        위의 과정에서도 예기치 못한 오류가 발생할 경우 Email로 오류를 전달하도록 처리한다. 
            - Slack Error Message 전달 후 Response = 200을 받게 되면, 함수를 종료 하게 된다.
        :param func: Error가 발생한 Function -> self._send_error_slack_message function
        :param kwargs: Error Message와 Error가 발생한 Function Name을 전달 받는다.
        :return: None
        """
        try_cnt = 0
        for i in range(1, 4):
            try:
                res = func(**kwargs)
                if res["ResponseMetadata"]["HTTPStatusCode"] == 200:
                    return
                else:
                    try_cnt += 1
                if try_cnt == 3:
                    self._send_error_email(subject=self.email_title,
                                           message=f"{self.slack_retry_error_msg}\n- HTTPStatusCode-> {res['ResponseMetadata']['HTTPStatusCode']}")
                time.sleep(5)
            except Exception as e:
                self._send_error_email(subject=self.email_title,
                                       message=f"Slack Error Message 최초 전달시 예기치 못한 오류가 발생함. -> {e}")
                break

    def __exception_handler(func):
        """
        수행될 각 Function 마다, Error 발생시 Slack 알람을 전달하기 위해 만든 decorator.
        Function에서 Error 발생시 _retry 함수로 Error가 발생한 Function을 전달 하여 처리한다. 
        :param func: Handler를 통해 전달 받을 Function.
        :return: wrapper -> function
        """

        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            try:
                return func(self, *args, **kwargs)
            except Exception as e:
                self._retry(self._send_error_slack_message, message=e, func_name=func.__name__)

        return wrapper

    def _send_error_slack_message(self, message, func_name):
        """
        Slack으로 Error가 발생한 내용을 알람으로 전달 한다.
        :param message: Str -> Slack Message Body
        :param func_name: Str -> Error가 발생한 Function의 Name.
        :return: response -> Slack Message Body
        """
        sendErrorMessage = {
            "version": "1.0",
            "source": "custom",
            "content": {
                "textType": "client-markdown",
                "title": self.slack_title,
                "description": f"1. *Error Function*: {func_name}\n2. *Error Message*: {message}\n",
                "nextSteps": [
                    f"AWS Glue UI Refer to <{self.glue_url}|*_{self.glue_job_name}_* Glue Runs History>",
                    "Glue Run History Check",
                    "Code 내에서 Error 발생한 Function 확인이 필요 합니다."
                ],
            }
        }
        sns_client = boto3.client('sns')
        response = sns_client.publish(TopicArn=f"arn:aws:sns:{region}:190490205267:{self.target_slack_channel}",
                                      Message=json.dumps(sendErrorMessage))
        return response

    def _send_error_email(self, subject, message):
        """
        Slack으로 Error Message를 전달하지 못할 경우 최종적으로 Email을 통해 Error를 알린다.
        :param subject: Str -> Email Title
        :param message: Str -> Email 본문 Error Message
        :return: None
        """
        ses_client = boto3.client(
            service_name='ses',
            region_name='us-east-1'
        )

        body_html = f"""
        <html>
            <head>
                <title>DE Diff Check Error Result</title>
                <style>
                    body {{
                        font-family: Arial, sans-serif;
                        background-color: #f4f4f4;
                        margin: 0;
                        padding: 20px;
                    }}
                    .container {{
                        background-color: #ffffff;
                        border-radius: 10px;
                        padding: 20px;
                        max-width: 600px;
                        margin: 0 auto;
                        box-shadow: 0px 4px 8px rgba(0, 0, 0, 0.1);
                    }}
                    h1 {{
                        color: #d9534f;
                        font-size: 24px;
                        text-align: center;
                    }}
                    p {{
                        font-size: 16px;
                        line-height: 1.6;
                        color: #333333;
                    }}
                    .error-details {{
                        background-color: #f9ecec;
                        border-left: 4px solid #d9534f;
                        padding: 10px;
                        margin: 20px 0;
                        color: #d9534f;
                    }}
                    .footer {{
                        font-size: 12px;
                        text-align: center;
                        color: #777777;
                        margin-top: 20px;
                    }}
                </style>
            </head>
            <body>
                <div class="container">
                    <h1>[Error] DE Diff Check via AWS Glue</h1>
                    <p>Dear User,</p>
                    <p>An error occurred during the DE Diff Check process. Please find the details below:</p>
                    <div class="error-details">
                        <strong>Error Details:</strong> <em>{message}</em>.
                    </div>
                    <p>Please address this issue at your earliest convenience.</p>
                    <p>Best regards,<br>DX Team</p>
                    <div class="footer">
                        <p>This is an automated message. Please do not reply.</p>
                    </div>
                </div>
            </body>
        </html>
        """
        ses_client.send_email(
            Source=self.email_address,
            Destination={
                "ToAddresses": [self.email_address]
            },
            Message={
                "Subject": {
                    'Charset': "UTF-8",
                    "Data": subject
                },
                "Body": {
                    "Text": {
                        'Charset': "UTF-8",
                        "Data": message
                    },
                    "Html": {
                        'Charset': "UTF-8",
                        "Data": body_html
                    }
                },
            }
        )

    @__exception_handler
    def _wirte_down_s3(self, dynamic_frame):
        """
        s3에 Data File을 내려 적는다.
        :param dynamic_frame: awsglue.dynamicframe.DynamicFrame
        :return: awsglue.dynamicframe.DynamicFrame
        """
        result = self.glueContext.write_dynamic_frame.from_options(
            frame=dynamic_frame.coalesce(1),
            connection_type="s3",
            format="csv",
            connection_options={"path": f"s3://{self.target_bucket}/{self.target_bucket_subpath}/"}
        )
        return result

    @__exception_handler
    def _query_in_glue(self, query, connection):
        """
        Option을 이용하여 DynamicFrame을 생성하여 반환한다.
        하단의 함수는 Query를 통해서 Dynamicframe을 생성한다.
        :param query: SQL Query Statement -> For  Player Data Extract
        :param connection: Connections Info
        :return: awsglue.dynamicframe.DynamicFrame
        """
        result = self.glueContext.create_dynamic_frame.from_options(
            connection_type="custom.jdbc",
            connection_options={
                "query": query,
                "connectionName": connection,
            },
            transformation_ctx="result"
        )
        return result



    @__exception_handler
    def __player_to_s3(self):
        """
        Query를 이용하여 의 Player Data를 s3 File로 생성 한다.
            - 수정 일지
                - [DX-59]
                    - Before
                        Where 조건이 없었음.
                    - After
                        Where 조건에 CreatedAt 추가 (이유: Glue Schedule이 동작하는 시간을 기준으로 조건을 넣어달라는 요청)
        :return: None
        """
        date_time = datetime.now(tz=timezone('UTC')).replace(hour=6, minute=0, second=0, microsecond=0).strftime(
            '%Y-%m-%d %H:%M:%S')
        query = f"""select p.AccountId                                         as GGPassID,
                           json_value(plcm.JsonRequest, '$.GivenName')         as FirstName,
                           json_value(plcm.JsonRequest, '$.LastName')          as LastName,
                           json_value(plcm.JsonRequest, '$.Postcode')          as PostalCode,
                           json_value(plcm.JsonRequest, '$.Place')             as City,
                           json_value(plcm.JsonRequest, '$.Area')              as State,
                           json_value(plcm.JsonRequest, '$.CountryAlpha2Code') as Country,
                           json_value(plcm.JsonRequest, '$.Street')            as StreetName,
                           json_value(plcm.JsonRequest, '$.Number')            as HouseNumber,
                           json_value(plcm.JsonRequest, '$.BirthDate')         as Birthday,
                           json_value(plcm.JsonRequest, '$.BirthPlace')        as PlaceOfBirth,
                           reg.RequestedAt                                     as LugasRegistrationTime,
                           p.KycStatus                                         as KycStatusCode,
                           CAST(p.CreatedAt as datetime2(0))                   as RegisteredAt,
                           CASE pas.Status
                               WHEN 0 THEN 'Active'
                               WHEN 1 THEN 'Suspended'
                               WHEN 2 THEN 'Blocked'
                               WHEN 3 THEN 'Closed'
                               END                                             as PlayerSuspensionStatus,
                           pas.Type                                            as PlayerSuspensionStatusType,
                           pas.Reason                                          as PlayerSuspensionReason, -- PlayerSuspension suspensionReason
                           isPending.ExpiredAt                                 as PlayerMandatoryReverificationPeriodStartedFrom,
                           isPending.LastReverificationLoggedInAt              as PlayerLastReverificationLoggedInAt,
                           isModified.AddressLastModifiedAt                    as AddressLastModifiedAt,
                           CASE pvd.VerificationStatus
                               WHEN 0 THEN 'VerificationPending'
                               WHEN 1 THEN 'VerificationFailed'
                               WHEN 2 THEN 'Verified'
                               WHEN 3 THEN 'AccountClosed'
                               END                                             as PlayerVerificationDEStatus,
                           CASE pvd.VerificationIntent
                               WHEN 1 THEN 'Registration'
                               WHEN 2 THEN 'TemporarilyAccepted'
                               WHEN 3 THEN 'DataChanged'
                               WHEN 4 THEN 'ChangedPayment'
                               WHEN 5 THEN 'RegularCycle'
                               END                                             as PlayerVerificationDEStatusCause
                    from A88.dbo.Player as p
                             with (nolock) -- Lugas에 등록되지 않은 Player도 모두 대상이 됨.
                             left outer join(select *
                                             from (select JsonRequest,
                                                          PlayerId,
                                                          row_number() over (partition by PlayerId order by RequestedAt desc) as rn1
                                                   from dbo.PlayerLugasCertificationMonitoring with (nolock)
                                                   where Resource in ('/players/{{playerId}}', '/players')
                                                     and ErrorCode = 0
                                                     and StatusCode <> 0
                                                     and JsonRequest not like '%"Name":%') _plcm
                                             where _plcm.rn1 = 1) as plcm 
                                            on plcm.PlayerId = p.Id
                             left join (select PlayerId, RequestedAt
                                            from (select PlayerId,
                                                         RequestedAt,
                                                         row_number() over (partition by PlayerId order by RequestedAt) as rn1
                                                  from PlayerLugasCertificationMonitoring with (nolock)
                                                  where Resource in ('/players')
                                                    and ErrorCode = 0
                                                    and StatusCode <> 0
                                                    and JsonRequest not like '%"Name":%') a
                                            where rn1 = 1) reg on reg.PlayerId = p.Id --/players로 필터링하여 update 아닌 register만 고름
                             left join (select PlayerId,
                                               max(case when Status = 2 then CreatedAt end)   as ExpiredAt,
                                               max(case when Status = 100 then CreatedAt end) as LastReverificationLoggedInAt
                                        from dbo.PlayerKycReVerificationHistory with (nolock)
                                        where IsActive = 'True'
                                        group by PlayerId) isPending on isPending.PlayerId = p.Id
                             left outer join (select PlayerId, Status, Type, Reason, UpdatedAt, SuspendEndDate
                                              from A88.dbo.PlayerAccountStatus with (nolock)) pas
                                             on p.Id = pas.PlayerId -- only Suspended Info (p.Id랑 plcm.PlayerId는 같은값)
                             left join PlayerVerificationDE pvd with (nolock) on pvd.PlayerId = p.Id
                             left join (select PlayerId, max(case when Status = 102 then CreatedAt end) as AddressLastModifiedAt
                                        from PlayerKycReVerificationHistory with (nolock)
                                        where IsModified = 'True'
                                        group by PlayerId) isModified on isModified.PlayerId = p.Id
                            where p.Site = 4
                            and p.CreatedAt <= '{date_time}' 
                 """
        query_result = self._query_in_glue(query=query, connection=self.db_)
        _snapshot = query_result.toDF()

        _snapshot = _snapshot.select('*',
                                                 # Lugas 미등록 유저인지, Close 상태 유저인지, PlayerVerificationDE 테이블에 상태가 등록되었는지 순서대로 확인한다. 만약 해당사항 있을시 즉시 상태 판정을 한다.
                                                 (F.when(F.col('LugasRegistrationTime').isNull(), 'NotRegistered')
                                                 .when(F.col('PlayerSuspensionStatus') == 'Closed', 'AccountClosed')
                                                 .when(F.col('PlayerVerificationDEStatus').isNotNull(),
                                                       F.col('PlayerVerificationDEStatus'))
                                                 # kyc = PoA, Edd 인 경우에 
                                                 .when(F.col('KycStatusCode') > 4,
                                                       # PlayerKYCReverificationHistory 테이블의 isActive=True 인건이 있으면 Reverification 대상이다.
                                                       F.when(F.col(
                                                           'PlayerMandatoryReverificationPeriodStartedFrom').isNotNull() & F.col(
                                                           'PlayerLastReverificationLoggedInAt').isNotNull(),
                                                              # Reverification 필요 기간이 시작되었고 그 후 로그인한 기록이 있으며, 그 로그인 시각이 현재로부터 1시간이 경과한 경우 VerificationFailed로 판정한다. 
                                                              F.when(
                                                                  (F.col(
                                                                      'PlayerMandatoryReverificationPeriodStartedFrom') < F.col(
                                                                      'PlayerLastReverificationLoggedInAt')) &
                                                                  ((F.current_timestamp().cast('long') - F.col(
                                                                      'PlayerLastReverificationLoggedInAt').cast(
                                                                      'timestamp').cast('long')) > 3600), 'VerificationFailed')
                                                              # 1시간이 경과하지 않았다면 VerificationPending 상태로 판정한다.
                                                              .otherwise('VerificationPending')
                                                              )
                                                       # Reverification이 필요하지만 로그인한 기록은 없다면 VerificationPending으로 판정한다.
                                                       .when(
                                                           F.col('PlayerMandatoryReverificationPeriodStartedFrom').isNotNull(),
                                                           'VerificationPending')
                                                       # isActive=True 가 없다면 Verified 로 판정한다.
                                                       .otherwise('Verified')
                                                       )
                                                 # kyc = 4 이하의 경우
                                                 .otherwise(
                                                    # LugasRegistrationTime 이 현재 시간으로부터 144시간 넘게 경과한 경우 VerificationFailed로 판정한다. 
                                                     F.when((F.current_timestamp().cast('long') - F.col(
                                                         'LugasRegistrationTime').cast('timestamp').cast('long')) > (
                                                                        3600 * 144), 'VerificationFailed')
                                                     # 경과 하지 않은 경우 VerificationPending으로 판정한다. 
                                                     .otherwise('VerificationPending')
                                                 )
                                                 ).alias('PlayerAccountStatus'))
        
        _snapshot = _snapshot.withColumn('PlayerAccountStatusCause',
                                                     (F.when(F.col('PlayerVerificationDEStatusCause').isNotNull(),
                                                             F.col('PlayerVerificationDEStatusCause'))
                                                      .when(F.col('PlayerAccountStatus').isin(['Verified', 'NotRegistered']),
                                                            F.lit(None).cast('string'))
                                                      .when(F.col('KycStatusCode') > 4,
                                                            F.when(F.col('PlayerMandatoryReverificationPeriodStartedFrom').isNotNull() 
                                                                   & F.col('PlayerLastReverificationLoggedInAt').isNotNull(),
                                                                   F.when(
                                                                       F.add_months(F.col('AddressLastModifiedAt').cast('date'),
                                                                                    12) <= F.current_date(), 'DataChanged')
                                                                   .otherwise('RegularCycle')
                                                                   )
                                                            .when(F.col('PlayerMandatoryReverificationPeriodStartedFrom').isNotNull(), 'RegularCycle')
                                                            )
                                                      .when(F.col('KycStatusCode').isin([3, 4]), 'TemporarilyAccepted')
                                                      .otherwise('DataChanged')
                                                      )
                                                     )
        
        _snapshot = _snapshot.withColumn('PlayerSuspensionStatusCause',
                                                     (F.when(F.col("PlayerSuspensionStatus") == "Suspended",
                                                             F.when(F.col("PlayerSuspensionStatusType").isin([11, 12, 14, 19]),
                                                                    F.when((F.col("PlayerSuspensionStatusType") == 19) & (F.col(
                                                                        "PlayerSuspensionReason") == "Self-Exclusion"),
                                                                           "Addiction")
                                                                    .otherwise("Legal"))
                                                             .when(F.col("PlayerSuspensionStatusType").isin([13, 15]),
                                                                   F.when((F.col("PlayerSuspensionStatusType") == 13) & (
                                                                               F.col("PlayerSuspensionReason") == "Panic Btn"),
                                                                          "Voluntary")
                                                                   .otherwise("Addiction"))
                                                             .otherwise("Legal")
                                                             )
                                                      .when(F.col("PlayerSuspensionStatus") == "Blocked",
                                                            F.when(F.col("PlayerSuspensionStatusType").isin(
                                                                [21, 22, 23, 25, 26, 28, 29, 201]), "Legal")
                                                            .when(F.col("PlayerSuspensionStatusType").isin([24, 27]),
                                                                  "Addiction")
                                                            .otherwise("Legal")
                                                            )
                                                      .when(F.col("PlayerSuspensionStatus") == "Closed",
                                                            F.when(F.col("PlayerSuspensionStatusType").isin(
                                                                [31, 32, 33, 36, 37, 39]), "Legal")
                                                            .when(F.col("PlayerSuspensionStatusType").isin([34, 35]),
                                                                  "Addiction")
                                                            .otherwise("Legal")
                                                            )
                                                      ))
        
        
        query_result = DynamicFrame.fromDF(_snapshot, self.glueContext, "_query_result")
        self._wirte_down_s3(dynamic_frame=query_result)

    @__exception_handler
    def execute(self):
        '''
        Class에서 최종적으로 실행될 function.
        :return: None
        '''
        self.__player_to_s3()
        # self.job.commit() 추후 파이프라인 수정시 복원 예정(24.11.12)

    def commit(self):
        """
        코드 파이프라인 배포에 의해서 자동으로 붙게됨. 
        임시 추가: 24.11.12
        :return: None
        """
        self.job.commit()


job = DeDataDiffCheck()
job.execute()