# I - Libraries

In [1]:
# Google sheet
import gspread
from oauth2client.service_account import ServiceAccountCredentials

# API, Dataframe
import requests
import pandas as pd
import numpy as np

# Psyhopyg
import psycopg2 as ps
from psycopg2 import connect

# Prefect
from prefect import flow, task

# Time
from datetime import date, timedelta
from time import gmtime, strftime

import datetime
import time

#
# II - Function to connect and edit Google sheet through Google sheet API

In [2]:
gsheet_id_api_list          = ''

scope = ['https://www.googleapis.com/auth/spreadsheets',
         "https://www.googleapis.com/auth/drive"]

credentials = ServiceAccountCredentials.from_json_keyfile_name("pfg_sheet_credentials.json", scope)
gc = gspread.authorize(credentials)

# Create sheet function
def extract_sheet_data(destination_database_url, destination_sheet):
    # open sheet and extract all data
    wks = gc.open_by_key(destination_database_url).worksheet(destination_sheet)
    data = wks.get_all_records()
    return data

def remove_sheet_data(destination_database_url, destination_sheet):
    wks = gc.open_by_key(destination_database_url).worksheet(destination_sheet)
    wks.batch_clear(['A:Z'])
    # wait before move to next task
    time.sleep(5)
    
def insert_all_sheet_data(data_to_load, destination_database_url, destination_sheet):
    wks = gc.open_by_key(destination_database_url).worksheet(destination_sheet)
    wks.update('A1', [data_to_load.columns.values.tolist()] + data_to_load.values.tolist())
    # wait before move to next task
    time.sleep(1 * 60)
    
#-----------

def convert_time(x):
    convert_time = []
    for i in x:
        if i != '':
            i = i.split('T', 1)[0] +' '+ i.split('T', 1)[1]
            i = pd.to_datetime(
            
                    i[:19]
            
                ) + pd.DateOffset(hours=7)
            convert_time.append(str(i))
        else:
            convert_time.append('')
    return convert_time

# funtion to calculate time duration':
# https://www.statology.org/pandas-convert-timedelta-to-int/
def duration(end_time, start_time, unit):
    duration_list = []
    for i, e in zip(end_time, start_time):

        if i != '' and e != '':
            duration = pd.to_datetime(i) - pd.to_datetime(e)
            if unit    == 'day':
                duration_list.append(round(duration / pd.Timedelta(days=1), 1) )

            elif unit  == 'hour':
                duration_list.append(round(duration / pd.Timedelta(hours=1), 1) )

            elif unit  == 'minute':
                duration_list.append(round(duration / pd.Timedelta(minutes=1), 1) )
        else:
            duration_list.append(float())
    return duration_list

#
# II - Function to connect Postgres database

In [3]:
# Kết nối với database
def connect_to_db(host_name, dbname, port, username, password):
    try:
        # https://stackoverflow.com/questions/26741175/psycopg2-db-connection-hangs-on-lost-network-connection
        conn = connect(host = host_name, database = dbname, 
               user = username, password = password, port = port, 
                       connect_timeout=3, keepalives=1, keepalives_idle=5, keepalives_interval=2, keepalives_count=2)
    except ps.OperationalError as e:
        raise e
    else:
        print('Hurrayyyyyy! I am on the CLOUD!')
        return conn

# giờ t sẽ load lên database
host_name          = ''
dbname             = ''
port               = ''
username           = ''
password           = ''
conn               = None

#
# III - ETL pipeline to extract data from API and load it to database

## 1 - Extract API key list:

In [4]:
@task(name                = "Step 1: Extract client list",
      retries             = 10, 
      retry_delay_seconds = 60)
def s1_extract_client_list():
    
    # Create client list table 
    sheet_name  = ['philippines', 'thailand']

    client_list = pd.DataFrame(columns = ['Client ID', 'Market', 'BD', 'Client Type', 'Client Code', 'Client Abb. Name', 'Client POS Link', 'Start date', 'API key'])

    for tab in sheet_name:
        data          = extract_sheet_data(gsheet_id_api_list, tab)
        sheet_df      = pd.DataFrame(data)
        sheet_df      = sheet_df.assign(Market = tab)
        
        # Append data into client_list  https://pandas.pydata.org/docs/user_guide/merging.html
        client_list   = pd.concat([client_list, sheet_df])

    # Filter clients that have API key
    client_list  = client_list[['Client ID', 'Client Type', 'API key', 'Client Abb. Name', 'Market', 'Client POS Link']]
    client_list  = client_list[client_list['API key'] != '']

    # Remove 'S' in Client ID column:
    client_id    = []
    for i in client_list['Client ID']:
        client_id.append(i.split('S', 2)[1])
    client_list['Client ID'] = client_id

    # Rename column
    client_list.columns = ['shop_id', 'client_type', 'api_key', 'shop_name', 'country', 'pos_link']
    
    return client_list

## 2 - Extract orders data from API

In [7]:
@task(name                = "Step 2: Extract orders API data",
      retries             = 10, 
      retry_delay_seconds = 60)
def s2_extract_orders_api_data(client_list):   
    # Extract data from API- 

    thirty_days_ago   = (date.today() + timedelta(days= -45)).strftime("%Y-%m-%d")
    yesterday         = (date.today() + timedelta(days= -1 )).strftime("%Y-%m-%d")

    # https://www.geeksforgeeks.org/how-to-convert-datetime-to-unix-timestamp-in-python/
    a          = datetime.date(int(thirty_days_ago[0:4]), int(thirty_days_ago[5:7]), int(thirty_days_ago[8:10]))
    start_time = str(time.mktime(a.timetuple()))[0:-2]
    
    
    #a          = datetime.date(int(yesterday[0:4]), int(yesterday[5:7]), int(yesterday[8:10]))
    #start_time = str(time.mktime(a.timetuple()))[0:-2]
    
    page_size  = 1000
    page_num   = 1

    # Variable to store data:
    data_need_order  = [] 

    #___________________________________________________

    # Loop through each client's API and retreive data:
    for shop_id, api_key, shop_name in zip(client_list['shop_id'], client_list['api_key'], client_list['shop_name']):

        # Keep looping and get all the data inside!
        print('Client: ' + shop_name)

        # order data
        while True:
            url_order   = f'https://pos.pages.fm/api/v1/shops/{shop_id}/orders?api_key={api_key}&page_number={page_num}&page_size={page_size}&startDateTime={start_time}'  #&endDateTime={end_time}
            print(url_order)
            data_order  = requests.get(url_order).json()
            # Are there any data left?
            if len(data_order['data']) != 0:

                # If yes, add them to our Variable 'data_need'. And move on to the next page
                data_need_order.extend(data_order['data'])
                page_num  = page_num + 1      
            else:
                # If not, move to the next API
                page_num  = 1
                # (Get out and move to the next API, start from page 1)
                break

    # Now we got data_need that contain all the data. Make it a dataframe:

    df_order = pd.DataFrame(data_need_order)

    # Output
    return df_order

## 3 - Transform orders data, from JSON format to Pandas dataframe:

In [10]:
@task(name                = "Step 3: Transform orders data",
      retries             = 10, 
      retry_delay_seconds = 60)
def s3_transform_order_data(df):
    df = df[[ 'id', 'status_name', 'status', 'status_history', 'inserted_at', 'tags', 'histories', 'cod', 
             'transfer_money', 'warehouse_info', 'partner', 'shop_id', 'assigning_seller' , 'updated_at', 'items']]
    
     # Create id_column that contain order_id, shop_id:
    #Add 2 columns contain 'S' and 'O'
    df = df.assign(S='S', O = 'O')

    # Concat 3 columns: Format:   S + shop_id + O + id
    df['id_column'] = df['S'] + df['shop_id'].astype(str) + df['O'] + df['id'].astype(str)

    # remove duplicate based on id_column column
    df = df.drop_duplicates(subset=['id_column'], keep='last')
    
    # Create Revenue column: Revenue = COD + bank transfer
    df['revenue'] = df['cod'] + df['transfer_money']

    # ______________________________________________________________________________________________________________________________________________________________________________________________________

    # Creat tag column
    tag_list = []
    for i in df['tags']:
        tag = []
        for q in i:
            if str(q).find("'name':") == -1:
                tag.append('')
            else:
                j = 'Tag: (' + q['name'] + ')'
                tag.append(j)
        k = ', '.join(tag)
        tag_list.append(k)
    df['tags'] = tag_list

    # ______________________________________________________________________________________________________________________________________________________________________________________________________

    # Create Warehouse column
    warehouse = []
    for i in df['warehouse_info']:
        if type(i) == dict and 'name' in i:
            warehouse.append(i['name'])
        else:
            warehouse.append('')
    df['warehouse_info'] = warehouse

    # ______________________________________________________________________________________________________________________________________________________________________________________________________

    # create seller column:
    seller = []
    for i in df['assigning_seller']:
        if type(i) == dict and 'name' in i: 
            seller.append(i['name'])
        else:
            seller.append('')

    df['assigning_seller'] = seller

    # ______________________________________________________________________________________________________________________________________________________________________________________________________

    # create first_call name column:
    first_call = []
    for i in df['histories']:

        take_out_the_time = []
        for e in i:
            #check if there is key 'tags' in value in histories and also has tags 1st call for the first time, if yes, take out the 'updated_time' and add to the list
            if 'tags' in e and str(e['tags']['new']).lower().find("1st call") != -1 : 
                take_out_the_time.append(e['updated_at']) 
        # now, chec each row: if the row have tag 1st call
        if len(take_out_the_time) == 0: 
            first_call.append('')
        else:
            first_call.append(take_out_the_time[0])
    df['first_call'] = first_call

    # ______________________________________________________________________________________________________________________________________________________________________________________________________

    # create courier partner column
    partner = []
    for i in df['partner']:
        if str(i).find('extend_code') != -1 and str(i).find('order_number_vtp') != -1: 
            j = str(i['extend_code']) + ' ' + str(i['order_number_vtp'])
            partner.append(j)
        else:
            partner.append('')
    df['partner'] = partner

    # ______________________________________________________________________________________________________________________________________________________________________________________________________

    # tạo 1 new column và sau đó append các giá trị tạo ra sau khi loop vào
    # https://www.geeksforgeeks.org/create-a-column-using-for-loop-in-pandas-dataframe/
    # Add Waiting for delivery
    waiting_delivery_time = []
    for h in df['status_history']:
        # ghep thêm hàm if để chỉ lấy thời điểm chốt cho các đơn đã chốt
        # https://stackoverflow.com/questions/3437059/does-python-have-a-string-contains-substring-method
        if str(h).find("'status': 9,")     == -1 and str(h).find("'status': 8,") == -1:
            waiting_delivery_time.append('')

        elif str(h).find("'status': 9,")   != -1 and str(h).find("'status': 8,") == -1:
            waiting_delivery_time.append(str(h).split("'status': 9,", 10)[-1][16:35])

        elif str(h).find("'status': 9,")   == -1 and str(h).find("'status': 8,") != -1:
            waiting_delivery_time.append(str(h).split("'status': 8,", 10)[-1][16:35])

        elif str(h).find("'status': 9,")   != -1 and str(h).find("'status': 8,") != -1:
            waiting_delivery_time.append(str(h).split("'status': 9,", 10)[-1][16:35])   

        else:
            waiting_delivery_time.append('')
    df['waiting_delivery_time'] = waiting_delivery_time

    # ______________________________________________________________________________________________________________________________________________________________________________________________________

    # Add shipped column
    shipped_time = []
    for h in df['status_history']:
        # https://stackoverflow.com/questions/3437059/does-python-have-a-string-contains-substring-method
        if str(h).find("'status': 2,") == -1:
            shipped_time.append('2010-01-01T00:00:00') # To fix this problem when insert null value into postgres timestamp column: is of type timestamp without time zone but expression is of type double precision
        else:
            shipped_time.append(str(h).split("'status': 2,", 10)[-1][16:35]) 
    df['shipped_time'] = shipped_time

    # ______________________________________________________________________________________________________________________________________________________________________________________________________

    # Add comfirmed time column:
    confirmed_time = []
    for h in df['status_history']:
        # https://stackoverflow.com/questions/3437059/does-python-have-a-string-contains-substring-method
        if str(h).find("'status': 1,") == -1:
            confirmed_time.append('') 
        else:
            confirmed_time.append(str(h).split("'status': 1,", 10)[-1][16:35]) 
    df['confirmed_time'] = confirmed_time


    # timeline status:
    status_history = []
    for i in df['status_history']:
        if str(i).find("'status':") == -1:
            status_history.append('')

        else:
            status = []
            for j in i:
                a  = j['updated_at'].split('T', 1)[0] +' '+ j['updated_at'].split('T', 1)[1]
                updated_at = pd.to_datetime(a[:19]) + pd.DateOffset(hours=7)
                k  = 'S: ' + str(j['status']) + ' - ' + str(updated_at)
                status.append(k)
            z = ', '.join(status)
            if len(z) < 1000:
                status_history.append(z)
            else:
                status_history.append('')
    df['status_history'] = status_history

    # ______________________________________________________________________________________________________________________________________________________________________________________________________

    # convert time:
    df['inserted_at']                = convert_time(df['inserted_at'])
    df['confirmed_time']             = convert_time(df['confirmed_time'])
    df['waiting_delivery_time']      = convert_time(df['waiting_delivery_time'])
    df['shipped_time']               = convert_time(df['shipped_time'])
    df['first_call']                 = convert_time(df['first_call'])
    df['updated_at']                 = convert_time(df['updated_at'])

    # ______________________________________________________________________________________________________________________________________________________________________________________________________

    # Calculate duration:
    df['first_call_lead_time']     = duration(df['first_call'],      df['inserted_at'],             'hour')
    df['confirmed_lead_time']      = duration(df['confirmed_time'],  df['inserted_at'],             'hour')
    df['warehouse_lead_time']      = duration(df['shipped_time'],    df['waiting_delivery_time'],   'hour')


    # Delete no need columns:
    # https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.drop.html
    df_orders = df.drop(columns = ['S', 'O', 'first_call', 'confirmed_time', 
                             'cod', 'transfer_money', 
                             'waiting_delivery_time', 'histories'])

    # # Use this if you want to update only new event
    # https://stackoverflow.com/questions/6871016/adding-days-to-a-date-in-python
    # https://www.programiz.com/python-programming/datetime/current-datetime ('Y' and 'y' will give you different year)
    today      = date.today().strftime("%Y-%m-%d")
    yesterday  = (date.today() + timedelta(days= -1)).strftime("%Y-%m-%d")
    two_days_ago = (date.today() + timedelta(days= -2)).strftime("%Y-%m-%d")
    # Subset the dataframe when only update new event:

    df_orders = df_orders[
        # Convert updated_at into string
        # https://stackoverflow.com/questions/22005911/convert-columns-to-string-in-pandas
        # https://www.statology.org/pandas-check-if-column-contains-string/
        # https://stackoverflow.com/questions/22591174/pandas-multiple-conditions-while-indexing-data-frame-unexpected-behavior
            df_orders['updated_at'].astype(str).str.contains(today) | df_orders['updated_at'].astype(str).str.contains(yesterday) | df_orders['updated_at'].astype(str).str.contains(two_days_ago)]
    
    return df_orders

## 4 - Load data to database

In [12]:
@task(name                = "Step 4.1: load client list data",
      retries             = 10, 
      retry_delay_seconds = 60)
def s4_load_client_list(df):
    def insert_into_table(curr, shop_id, client_type, api_key, shop_name, country):
        insert_into_orderss = ("""INSERT INTO d_client (shop_id, client_type, api_key, shop_name, country) VALUES( %s, %s, %s, %s, %s);""")
        row_to_insert = (shop_id, client_type, api_key, shop_name, country)
        curr.execute(insert_into_orderss, row_to_insert)


    def update_row(curr, shop_id, client_type, api_key, shop_name, country):
        query = ("""UPDATE d_client
                SET client_type         = %s,
                    api_key	            = %s,
                    shop_name           = %s,
                    country             = %s
                WHERE shop_id           = %s;""")
        curr.execute(query, vars_to_update)


    def check_if_orders_exists(curr, shop_id): 
        query = ("""SELECT shop_id FROM d_client WHERE shop_id = %s""")  

        curr.execute(query, (shop_id,))
        return curr.fetchone() is not None


    def append_from_df_to_db(curr,df):
        for i, row in df.iterrows():
            insert_into_table(curr, row['shop_id'], row['client_type'],row['api_key'], row['shop_name'], row['country'])


    def update_db(curr,df):
        tmp_df = pd.DataFrame(columns=['shop_id', 'client_type', 'api_key', 'shop_name', 'country'])
        for i, row in df.iterrows():
            if check_if_orders_exists(curr, row['shop_id']): # If orders already exists then we will update
                update_row(curr, row['shop_id'], row['client_type'], row['api_key'], row['shop_name'], row['country'])
            else: # The orders doesn't exists so we will add it to a temp df and append it using append_from_df_to_db
                # https://stackoverflow.com/questions/71132469/appending-row-to-dataframe-with-concat
                tmp_df = pd.concat([tmp_df, pd.DataFrame([row])])

        return tmp_df

    # In[6]:
    conn = connect_to_db(host_name, dbname, port, username, password)
    curr = conn.cursor()

    new_vid_df = update_db(curr, df)
    conn.commit()

    append_from_df_to_db(curr, new_vid_df)
    conn.commit()
    
    
@task(name                = "Step 4.2: load order data",
      retries             = 15, 
      retry_delay_seconds = 60)
def s4_load_order_data(df):

    def insert_into_table(curr, shop_id, id, status_name, status, tags, inserted_at, id_column, confirmed_lead_time, updated_at, revenue, shipped_time, warehouse_info, status_history, 
                          first_call_lead_time, partner, assigning_seller, warehouse_lead_time):
        insert_into_orderss = ("""INSERT INTO test_2 (shop_id, id, status_name, status, tags, inserted_at, id_column, confirmed_lead_time, updated_at, revenue, shipped_time, warehouse_info, status_history, 
        first_call_lead_time, partner, assigning_seller, warehouse_lead_time)
        VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""")
        row_to_insert = (shop_id, id, status_name, status, tags, inserted_at, id_column, confirmed_lead_time, updated_at, revenue, shipped_time, warehouse_info, status_history, 
                         first_call_lead_time, partner, assigning_seller, warehouse_lead_time)
        curr.execute(insert_into_orderss, row_to_insert)


    def update_row(curr, shop_id, id, status_name, status, tags, inserted_at, id_column, confirmed_lead_time, updated_at, revenue, shipped_time, warehouse_info, status_history, 
                   first_call_lead_time, partner, assigning_seller, warehouse_lead_time):
        query = ("""UPDATE test_2
                SET shop_id                 = %s,
                    id	                    = %s,
                    status_name             = %s,
                    status                  = %s,
                    tags                    = %s,
                    inserted_at             = %s,
                    confirmed_lead_time     = %s,
                    updated_at              = %s, 
                    revenue                 = %s,
                    shipped_time            = %s,
                    warehouse_info          = %s,
                    status_history          = %s,
                    first_call_lead_time    = %s,
                    partner                 = %s,
                    assigning_seller        = %s,
                    warehouse_lead_time     = %s
                WHERE id_column             = %s;""")
        vars_to_update = (shop_id, id, status_name, status, tags, inserted_at, confirmed_lead_time, updated_at, revenue, shipped_time, warehouse_info, status_history, 
                          first_call_lead_time, partner, assigning_seller, warehouse_lead_time, id_column)
        curr.execute(query, vars_to_update)


    def check_if_orders_exists(curr, id_column): 
        query = ("""SELECT id_column FROM test_2 WHERE id_column = %s""")

        curr.execute(query, (id_column,))
        return curr.fetchone() is not None


    def append_from_df_to_db(curr,df):
        for i, row in df.iterrows():
            insert_into_table(curr, row['shop_id'], row['id'], row['status_name'], row['status'], row['tags'], row['inserted_at'], row['id_column'],  row['confirmed_lead_time'], row['updated_at'], row['revenue'], row['shipped_time'], row['warehouse_info'], row['status_history'], 
                              row['first_call_lead_time'], row['partner'], row['assigning_seller'], row['warehouse_lead_time'])


    def update_db(curr,df):
        tmp_df = pd.DataFrame(columns=['shop_id', 'id', 'status_name', 'status', 'tags', 'inserted_at', 'id_column', 'confirmed_lead_time', 'updated_at', 'revenue', 'shipped_time', 'warehouse_info', 'status_history', 
                                       'first_call_lead_time', 'partner', 'assigning_seller', 'warehouse_lead_time'])
        for i, row in df.iterrows():
            if check_if_orders_exists(curr, row['id_column']): # If orders already exists then we will update
                update_row(curr, row['shop_id'], row['id'], row['status_name'], row['status'], row['tags'], row['inserted_at'], row['id_column'],  row['confirmed_lead_time'], row['updated_at'], row['revenue'], row['shipped_time'], row['warehouse_info'], row['status_history'], 
                           row['first_call_lead_time'], row['partner'], row['assigning_seller'], row['warehouse_lead_time'])
            else: # The orders doesn't exists so we will add it to a temp df and append it using append_from_df_to_db
                # https://stackoverflow.com/questions/71132469/appending-row-to-dataframe-with-concat
                tmp_df = pd.concat([tmp_df, pd.DataFrame([row])])

        return tmp_df

    #____________________________________________________________________________________________________________________________________________________________________________________________________________
    global number_of_chunk
    global chunk_list # numpy array
    # # Divide data into small chunk, upload them
    dfs = np.array_split(df, int(number_of_chunk) +1) # split the dataframe into 161 separate tables

    print('- Number of pieces in df is: ' + str(len(dfs)))
    print('- Number of Chunk is: ' + str(len(chunk_list)))

    # In[6]:

    conn = connect_to_db(host_name, dbname, port, username, password)
    curr = conn.cursor()
    
    if len(chunk_list) == 0:
        print('Congratulation, sir!')
    else:
        for i in chunk_list['Chunk']:
            print('Uploading _ _ _ _ _ _ _ _ ' + str(chunk_list['Chunk'][i]))
            new_vid_df = update_db(curr, dfs[i])
            conn.commit()

            append_from_df_to_db(curr, new_vid_df)
            conn.commit()

            print('Drop '+ str(chunk_list['Chunk'][i]))
            chunk_list = chunk_list.drop(chunk_list['Chunk'][i])
            print('- Number of Chunk left is: ' + str(len(chunk_list)))
            print('')

#
# IV - Combine the whole flow into 1 function and run:

In [None]:
@flow(name                = 'Main pipeline",
      retries             = 10, 
      retry_delay_seconds = 60)
def etl_pipeline():
    
    # Extract data
    client_list           = s1_extract_client_list()
    df_order              = s2_extract_orders_api_data(client_list)

    # Transform data
    df_order              = s3_transform_order_data(df_order)
    
    # Load data to database
    s4_load_client_list(client_list)
    s4_load_order_data(df_order)

In [None]:
etl_pipeline()