# Imported Library

In [168]:
import psycopg2
from psycopg2 import sql
import psycopg2.extras as extras
import pandas as pd
import json
from datetime import datetime,timezone
from dateutil import tz

import os
import sys 

from configupdater import ConfigUpdater
# pip install ConfigUpdater

from dotenv import dotenv_values

from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.api_core.exceptions import BadRequest
from google.oauth2 import service_account



In [169]:
is_py=False
view_name = "pmr_pm_item"
isFirstLoad=False
if is_py:
    press_Y=''
    ok=False

    if len(sys.argv) > 1:
        view_name=sys.argv[1]
    else:
        print("Enter the following input: ")
        view_name = input("View Table Name : ")
print(f"View name to load to BQ :{view_name}")

View name to load to BQ :pmr_pm_item


# Imported date

In [170]:
dt_imported=datetime.now(timezone.utc) # utc
dt_imported=datetime.strptime(dt_imported.strftime("%Y-%m-%d %H:%M:%S"),"%Y-%m-%d %H:%M:%S")
print(f"UTC: {dt_imported}")



UTC: 2023-12-29 17:50:01


# Set view

In [171]:
log = "models_logging_change"
if view_name == "pmr_pm_plan":
    content_id = 36
    view_name_id = "pm_id"

elif view_name == "pmr_pm_item":
    content_id = 37
    view_name_id = "pm_item_id"

elif view_name == "pmr_project":
    content_id = 7
    view_name_id = "project_id"

elif view_name == "pmr_inventory":
    content_id = 14
    view_name_id = "inventory_id"

else:
    raise Exception("No specified content type id")

# Set data and cofig path

In [172]:
projectId='smart-data-ml'  # smart-data-ml  or kku-intern-dataai
dataset_id='SMartData_Temp'  # 'SMartData_Temp'  'PMReport_Temp'
main_dataset_id='SMartDataAnalytics'  # ='SMartDataAnalytics'  'PMReport_Main'
credential_file=r"C:\Windows\smart-data-ml-91b6f6204773.json"  
# C:\Windows\smart-data-ml-91b6f6204773.json
# C:\Windows\kku-intern-dataai-a5449aee8483.json




In [173]:
credentials = service_account.Credentials.from_service_account_file(credential_file)

table_name=view_name.replace("pmr_","temp_") #can change in ("name") to temp table
table_id = f"{projectId}.{dataset_id}.{table_name}"
print(table_id)


main_table_name=view_name.replace("pmr_","")
main_table_id = f"{projectId}.{main_dataset_id}.{main_table_name}"
print(main_table_id)

# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
to_bq_mode="WRITE_EMPTY"


client = bigquery.Client(credentials= credentials,project=projectId)

smart-data-ml.SMartData_Temp.temp_pm_item
smart-data-ml.SMartDataAnalytics.pm_item


Read Configuration File and Initialize BQ Object

In [174]:
updater = ConfigUpdater()
updater.read(".cfg")

env_path='.env'
config = dotenv_values(dotenv_path=env_path)

In [175]:
last_imported=datetime.strptime(updater["metadata"][view_name].value,"%Y-%m-%d %H:%M:%S")
print(f"UTC:{last_imported}")

# local_zone = tz.tzlocal()
# last_imported = last_imported.astimezone(local_zone)
# print(f"Local Asia/Bangkok:{last_imported}")

UTC:2023-12-01 00:00:00


# Postgres &BigQuery

In [176]:
def get_postgres_conn():
 try:
  conn = psycopg2.connect(
        database=config['DATABASES_NAME'], user=config['DATABASES_USER'],
      password=config['DATABASES_PASSWORD'], host=config['DATABASES_HOST']
     )
  return conn

 except Exception as error:
  print(error)      
  raise error
def list_data(sql,params,connection):
 df=None   
 with connection.cursor() as cursor:
    
    if params is None:
       cursor.execute(sql)
    else:
       cursor.execute(sql,params)
    
    columns = [col[0] for col in cursor.description]
    dataList = [dict(zip(columns, row)) for row in cursor.fetchall()]
    df = pd.DataFrame(data=dataList) 
 return df 

In [177]:
def get_bq_table():
 try:
    table=client.get_table(table_id)  # Make an API request.
    print("Table {} already exists.".format(table_id))
    print(table.schema)
    return True
 except NotFound:
    raise Exception("Table {} is not found.".format(table_id))
    
def collectBQError(x_job):
 if x_job.errors is not None:
    for error in x_job.errors:  
      msg=f"{error['reason']} - {error['message']}"
      listError.append([datetime.now().strftime("%Y-%m-%d %H:%M:%S"),dtStr_imported,source_name,msg])
    if   len(listError)>0:
     logErrorMessage(listError,False)  

    
def insertDataFrameToBQ(df_trasns):
    try:
        job_config = bigquery.LoadJobConfig(write_disposition=to_bq_mode,)
        job = client.load_table_from_dataframe(df_trasns, table_id, job_config=job_config)
        try:
         job.result()  # Wait for the job to complete.
        except ClientError as e:
         print(job.errors)

        print("Total ", len(df_trasns), f"Imported data to {table_id} on bigquery successfully")

    except BadRequest as e:
        print("Bigquery Error\n")
        print(e) 

# Check whether it is the first loading?

In [178]:
print("If the main table is empty , so the action of each row  must be 'added' on temp table")
rows_iter   = client.list_rows(main_table_id, max_results=1) 
no_main=len(list(rows_iter))
if no_main==0:
 isFirstLoad=True
 print(f"This is the first loaing , so there is No DATA in {main_table_id}, we load all rows from {view_name} to import into {table_id} action will be 'added' ")

If the main table is empty , so the action of each row  must be 'added' on temp table
This is the first loaing , so there is No DATA in smart-data-ml.SMartDataAnalytics.pm_item, we load all rows from pmr_pm_item to import into smart-data-ml.SMartData_Temp.temp_pm_item action will be 'added' 


# For The next Load
* get data from model log based on condition last_imported and table
* Get all actions from log table by selecting unique object_id and setting by doing something as logic
* Create  id and action dataframe form filtered rows from log table

In [179]:
def list_model_log(x_last_imported,x_content_id):
    sql_log = f"""
    SELECT object_id, action,TO_CHAR(date_created,'YYYY-MM-DD HH24:MI:SS') as date_created FROM {log}
    WHERE date_created  AT time zone 'utc' >= '{x_last_imported}' AND content_type_id = {x_content_id} ORDER BY object_id, date_created
    """
    print(sql_log)


    # Asia/Bangkok 
    lf = list_data(sql_log, None, get_postgres_conn())
    print(f"Retrieve all rows after {last_imported}")
    print(lf.info())
    return lf


In [180]:
def select_actual_action(lf):
    listIDs=lf["object_id"].unique().tolist()
    listUpdateData=[]
    for id in listIDs:
        lfTemp=lf.query("object_id==@id")
        # print(lfTemp)
        # print("----------------------------------------------------------------")


        first_row = lfTemp.iloc[0]
        last_row = lfTemp.iloc[-1]
        # print(first_row)
        # print(last_row)

        if len(lfTemp)==1:
            listUpdateData.append([id,first_row["action"]])
        else:
            if first_row["action"] == "added" and last_row["action"] == "deleted":
                continue
            elif first_row["action"] == "added" and last_row["action"] != "deleted":
                listUpdateData.append([id,"added"])
            else : listUpdateData.append([id,last_row["action"]])

    print("Convert listUpdate to dataframe")
    dfUpdateData = pd.DataFrame(listUpdateData, columns= ['id', 'action'])
    dfUpdateData['id'] = dfUpdateData['id'].astype('int64')
    dfUpdateData=dfUpdateData.sort_values(by="id")
    dfUpdateData=dfUpdateData.reset_index(drop=True)

    return dfUpdateData


In [181]:
if isFirstLoad==False:
    dfModelLog=list_model_log(last_imported,content_id)
    if dfModelLog.empty==True:
        print("No row to be imported.")
        exit()
    else:
       dfModelLog=select_actual_action( dfModelLog)
       listModelLogObjectIDs=dfModelLog['id'].tolist()
       print(dfModelLog.info())
       print(dfModelLog)       
       print(listModelLogObjectIDs) 

# Load view and transform

In [182]:
if isFirstLoad==False:
    if len(listModelLogObjectIDs)>1:
     sql_view=f"select *  from {view_name}  where {view_name_id} in {tuple(listModelLogObjectIDs)}"
    else:
     sql_view=f"select *  from {view_name}  where {view_name_id} ={listModelLogObjectIDs[0]}"
else:
     sql_view=f"select *  from {view_name}  where  updated_at AT time zone 'utc' >= '{last_imported}'"
        

print(sql_view)
df=list_data(sql_view,None,get_postgres_conn())


if df.empty==True:
    print("No row to be imported.")
    exit()

df=df.drop(columns='updated_at')


if isFirstLoad:
    df['action']='added'
    
print(df.info())
df

select *  from pmr_pm_item  where  updated_at AT time zone 'utc' >= '2023-12-01 00:00:00'
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6204 entries, 0 to 6203
Data columns (total 10 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   pm_item_id         6204 non-null   int64 
 1   pm_id              6204 non-null   int64 
 2   is_pm              6204 non-null   bool  
 3   pm_engineer        297 non-null    object
 4   actual_date        298 non-null    object
 5   document_engineer  295 non-null    object
 6   document_date      296 non-null    object
 7   inventory_id       6204 non-null   int64 
 8   is_complete        6204 non-null   bool  
 9   action             6204 non-null   object
dtypes: bool(2), int64(3), object(5)
memory usage: 400.0+ KB
None


Unnamed: 0,pm_item_id,pm_id,is_pm,pm_engineer,actual_date,document_engineer,document_date,inventory_id,is_complete,action
0,103662,5034,False,,,,,14216,False,added
1,103663,5034,False,,,,,14217,False,added
2,103664,5034,False,,,,,14218,False,added
3,103665,5034,False,,,,,14219,False,added
4,103666,5034,True,,,,,14222,False,added
...,...,...,...,...,...,...,...,...,...,...
6199,103755,5050,True,,,,,19109,False,added
6200,103756,5050,True,,,,,19110,False,added
6201,103757,5051,True,,,,,19108,False,added
6202,103758,5051,True,,,,,19109,False,added


# Data Transaformation
* IF The first load then add actio='Added'
* IF The nextload then Merge LogDF and ViewDF and add deleted row 
  * Get Deleted Items  to Create deleted dataframe by using listDeleted
  * If there is one deletd row then  we will merge it to master dataframe

In [183]:
def add_acutal_action_to_df_at_next(df,dfUpdateData):
    merged_df = pd.merge(df, dfUpdateData, left_on=view_name_id, right_on='id', how='inner')
    merged_df = merged_df.drop(columns=['id'])

    listSelected = df[view_name_id].tolist()
    print(listSelected)

    set1 = set(listModelLogObjectIDs)
    set2 = set(listSelected)
    listDeleted = list(set1.symmetric_difference(set2))

    print(listDeleted)

    if len(listDeleted)>0:
        print("There are some deleted rows")
        dfDeleted=pd.DataFrame(data=listDeleted,columns=[view_name_id])
        dfDeleted['action']='deleted'
        print(dfDeleted)
        merged_df=pd.concat([merged_df,dfDeleted],axis=0)

    else:
        print("No row deleted")

    return merged_df    

# Check duplicate ID

In [184]:
if isFirstLoad==False:
 df=add_acutal_action_to_df_at_next(df,dfModelLog)
 

# merged_df['imported_at']=dt_imported
df=df.reset_index(drop=True  )
print(df.info())
print(df)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6204 entries, 0 to 6203
Data columns (total 10 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   pm_item_id         6204 non-null   int64 
 1   pm_id              6204 non-null   int64 
 2   is_pm              6204 non-null   bool  
 3   pm_engineer        297 non-null    object
 4   actual_date        298 non-null    object
 5   document_engineer  295 non-null    object
 6   document_date      296 non-null    object
 7   inventory_id       6204 non-null   int64 
 8   is_complete        6204 non-null   bool  
 9   action             6204 non-null   object
dtypes: bool(2), int64(3), object(5)
memory usage: 400.0+ KB
None
      pm_item_id  pm_id  is_pm pm_engineer actual_date document_engineer  \
0         103662   5034  False        None        None              None   
1         103663   5034  False        None        None              None   
2         103664   5034  False     

In [185]:
hasDplicateIDs = df[view_name_id].duplicated().any()
if  hasDplicateIDs:
 raise Exception("There are some duplicate id on dfUpdateData")
else:
 print(f"There is no duplicate {view_name_id} ID")  

There is no duplicate pm_item_id ID


# Insert data to BQ data frame

In [186]:
if get_bq_table():
    try:
        insertDataFrameToBQ(df)
    except Exception as ex:
        raise ex

Table smart-data-ml.SMartData_Temp.temp_pm_item already exists.
[SchemaField('pm_item_id', 'INTEGER', 'NULLABLE', None, None, (), None), SchemaField('pm_id', 'INTEGER', 'NULLABLE', None, None, (), None), SchemaField('is_pm', 'BOOLEAN', 'NULLABLE', None, None, (), None), SchemaField('pm_engineer', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('actual_date', 'DATE', 'NULLABLE', None, None, (), None), SchemaField('document_engineer', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('document_date', 'DATE', 'NULLABLE', None, None, (), None), SchemaField('inventory_id', 'INTEGER', 'NULLABLE', None, None, (), None), SchemaField('is_complete', 'BOOLEAN', 'NULLABLE', None, None, (), None), SchemaField('action', 'STRING', 'NULLABLE', None, None, (), None)]
Total  6204 Imported data to smart-data-ml.SMartData_Temp.temp_pm_item on bigquery successfully


In [187]:
updater["metadata"][view_name].value=dt_imported.strftime("%Y-%m-%d %H:%M:%S")
updater.update_file() 

<ConfigUpdater [
    <Section: 'metadata' [
        <Option: pmr_pm_plan = '2023-12-29 17:09:53'>
        <Option: pmr_pm_item = '2023-12-29 17:50:01'>
        <Option: pmr_project = '2023-12-01 00:00:00'>
        <Option: pmr_inventory = '2023-12-01 00:00:00'>
    ]>
]>

In [188]:
print(datetime.now(timezone.utc) )

2023-12-29 17:50:36.836521+00:00
