In [1]:
from argparse import ArgumentParser #Used for parsing command-line argument
from logger import AppLogger #Custom logging utility
import connectionconf #Configuration for database or other connections
import pandas as pd #Data manipulation and analysis
from sqlalchemy import create_engine, sql  #SQL toolkit and Object-Relational Mapping (ORM) library
import urllib.parse  #URL handling
from timeit import default_timer as timer #Timing execution
from datetime import datetime, timedelta #Date and time manipulation
from dateutil import parser, relativedelta #Extensions to the standard datetime module.
import os #Operating system interface
import re #Regular expressions.
from office365.sharepoint.client_context import ClientContext #used to interact with SharePoint sites
from office365.runtime.auth.user_credential import UserCredential #used to authenticate users in SharePoint
from office365.runtime.http.request_options import RequestOptions #allows customization of HTTP requests
from office365.sharepoint.files.file import File #File operations in SharePoint
from sharepoint_utils import sharepoint_buildurl #constructs URLs for SharePoint resources
import send_email #This module probably contains functions to send emails.
import calendar # Provides functions to handle calendar operations, like getting the month calendar

In [2]:
#To set up a connection to your PostgreSQL database
conn_detail = {
        "host": "10.70.31.118",
        "port": 5432,
        "username": "pipeline_user",
        "password": 's5xfN*mpzy:$=dvC',
        "database": "pxl"
        }

# Create the connection string
postgres_engine = create_engine('postgresql+psycopg2://%s:%s@%s:%s/%s' % (
        conn_detail['username'],
        urllib.parse.quote_plus(conn_detail['password']),
        conn_detail['host'],
        conn_detail['port'],
        conn_detail['database']
        ))

# connect to the database & test connection
conn = postgres_engine.connect()

In [None]:
# Get the current date and time
curDate = datetime.now() + relativedelta.relativedelta(hour=0, minute=0, second=0, microsecond=0)

In [3]:
#to interact with SharePoint sites.
ctx = ClientContext('https://um018.sharepoint.com/sites/ISDBIReportingTeam').with_credentials(UserCredential('bda.reports@u.com.my', 'Nvt4Zp+n'))

In [4]:
sharepoint_path = '/sites/ISDBIReportingTeam/Shared Documents/Share folder/Customer Experience Dashboard/NPS Dashboard'
sharepoint_path_processed = '/sites/ISDBIReportingTeam/Shared Documents/Share folder/Customer Experience Dashboard/NPS Dashboard/Processed'
sharepoint_path_failed = '/sites/ISDBIReportingTeam/Shared Documents/Share folder/Customer Experience Dashboard/NPS Dashboard/Failed'
local_path = '/user_data01/mapr/pipeline/uat/output/'
local_filename = 'NPS Test.xlsx'
local_file = local_path+local_filename # Concatenate the path and file name

In [5]:
# reusable function to move file between sharepoint directories
def moveFile(url, name, dir): #Retrieve File Information
    file_info = ctx.web.get_file_by_server_relative_path(url).get().execute_query()
    file_id = file_info.unique_id
    get_file_query_string = "getFileById('{}')".format(file_id)

#Build Query Strings
    moveto_query_params = {"newurl": dir + "/" + name, "flags": 1}
    moveto_query_string = sharepoint_buildurl("moveto", moveto_query_params)

#Construct the Move URL
    moveto_url = "/".join(
        [ctx.service_root_url(), "web", get_file_query_string, moveto_query_string]
    )
    #Create and Execute the Request
    request = RequestOptions(moveto_url)
    request.method = "POST"
    ctx.pending_request().execute_request_direct(request)

In [6]:
#Get the Root Folder:
root_folder = ctx.web.get_folder_by_server_relative_path(sharepoint_path)
# todo this function lists all files in the directory including subdirectories, need to take performance impact into account in future and explore alternate functions.
#List All Files in the Directory
files = root_folder.get_files(True).execute_query()

In [7]:
# a pair of matching file name and the final db table name 
categorySeq = [['Master_Send_TD', 'nps_master_send_td'], ['Master_Response_TD', 'nps_master_response_td'],
               ['Master_Send_BU', 'nps_master_send_bu'], ['Master_Response_BU', 'nps_master_response_bu']]

In [8]:
#Iterate Over Categories
for eachCat in categorySeq:
    
    # grab ALL the files with the matching name
    matching_files = [
        f
        for f in files
        if eachCat[0] in f.properties["Name"] #search for the files with the matching name
        and f.serverRelativeUrl == sharepoint_path + "/" + f.name
    ]

    # process each matching file
    for f in matching_files:
        # download to the local folder
        file_url = f.properties["ServerRelativeUrl"]
        download_file = os.path.join(
            local_path, os.path.basename(file_url)
            )
        
        # get the year and quarter from the file name 
        file_name = f.properties["Name"]
        pattern = r'_(\d{4})(Q[1-4])'
        match = re.search(pattern, file_name)
        
        if match:
            year = match.group(1)
            quarter = match.group(2)
        
        print(f"Year: {year}, Quarter: {quarter}")
        print(f"Processing for {file_name}")
        
        # open file
        with open(download_file, "wb") as local_file:
            file = (
                ctx.web.get_file_by_server_relative_path(file_url)
                .download(local_file)
                .execute_query()
            )

        # read the file and create a dataframe
        nps_df = pd.read_excel(download_file)

        # ingest to db, temp table first
        nps_df.to_sql('nps_temp', postgres_engine, schema='pxl_temp', if_exists ='replace', index=False) 

        # move the data from temp table to the main table
        # if statements to avoid data type error for each data ingestion
        if eachCat[0] == 'Master_Send_TD':

            updateTableQuery  = """
                BEGIN;

                insert into external.{table}
                select 
                    "MSISDN_Survey ID"::text,
                    "Survey ID"::text,
                    "MSISDN"::text,
                    "PLAN"::text,
                    "FIRST_ACTIVATION_DATE"::timestamp,
                    "RATEPLAN"::text,
                    "CUSTOMER_GROUP"::text,
                    "CURR_STATUS"::text,
                    "TOTAL_TOPUP"::text,
                    "CITY"::text,
                    "STATE"::text,
                    "TENURE_IN_MONTH"::int8,
                    "SUM_INV_AMT"::float8,
                    "TOTAL_COLLECTED"::float8,
                    "OUTSTANDING_BAL"::float8,
                    "TOTAL_OVERDUE"::float8,
                    {year}::int4 as year, 
                    '{quarter}'::text as quarter,
                    '{curDate}'::date as processed_date
                from pxl_temp.nps_temp;

                COMMIT;

                """.format(table = eachCat[1], year = year, quarter = quarter, curDate = curDate.strftime("%Y-%m-%d"))
            conn.execute(sql.text(updateTableQuery)) 

            # move the processed file to another folder
            moveFile(file_url, f.name, sharepoint_path_processed)

        elif eachCat[0] == 'Master_Response_TD':

            updateTableQuery  = """
                BEGIN;

                insert into external.{table}
                select 
                    "MSISDN_Survey ID"::text,
                    "Customer MSISDN"::text,
                    "Customer Name"::text,
                    "SurveyID"::text,
                    "Response Date & Time"::timestamp,
                    "Date"::text,
                    "Time"::text,
                    "Hour"::time,
                    "Question"::text,
                    "Response"::text,
                    "Question Sequence"::text,
                    "Rating"::int8,
                    "PLAN"::text,
                    "FIRST_ACTIVATION_DATE"::timestamp,
                    "RATEPLAN"::text,
                    "CUSTOMER_GROUP"::text,
                    "CURR_STATUS"::text,
                    "TOTAL_TOPUP"::float8,
                    "CITY"::text,
                    "STATE"::text,
                    "TENURE_IN_MONTH"::int8,
                    "SUM_INV_AMT"::float8,
                    "TOTAL_COLLECTED"::float8,
                    "OUTSTANDING_BAL"::float8,
                    "TOTAL_OVERDUE"::float8,
                    {year}::int4 as year, 
                    '{quarter}'::text as quarter,
                    '{curDate}'::date as processed_date
                from pxl_temp.nps_temp;

                COMMIT;

                """.format(table = eachCat[1], year = year, quarter = quarter, curDate = curDate.strftime("%Y-%m-%d"))
            conn.execute(sql.text(updateTableQuery)) 

            # move the processed file to another folder
            moveFile(file_url, f.name, sharepoint_path_processed)

        elif eachCat[0] == 'Master_Send_BU':
            
            updateTableQuery  = """
                BEGIN;

                insert into external.{table}
                select 
                    "MSISDN_Survey ID"::text,
                    "Survey ID"::text,
                    "MSISDN"::text,
                    "Customer"::text,
                    "Creation Time"::timestamp,
                    "Creator Org"::text,
                    "SR ID"::int8,
                    "Creator"::text,
                    "SR Type"::text,
                    "Acceptance Channel"::text,
                    "Customer Group"::text,
                    "Remarks"::text,
                    "Customer ID"::float8,
                    "Alternate contact number"::text,
                    "Has TT"::text,
                    "QC mark"::float8,
                    "CC - Complaint"::text,
                    "Non-FCR"::text,
                    "CSAT"::text,
                    "NPS"::float8,
                    "Rate Plan"::text,
                    "Subscription Status"::text,
                    "Account type"::text,
                    "Account No."::text,
                    "Amount"::float8,
                    "Recipient_Type"::float8,
                    "Order_Operation_Type"::float8,
                    {year}::int4 as year, 
                    '{quarter}'::text as quarter,
                    '{curDate}'::date as processed_date
                from pxl_temp.nps_temp;

                COMMIT;

                """.format(table = eachCat[1], year = year, quarter = quarter, curDate = curDate.strftime("%Y-%m-%d"))
            conn.execute(sql.text(updateTableQuery)) 

            # move the processed file to another folder
            moveFile(file_url, f.name, sharepoint_path_processed)

        elif eachCat[0] == 'Master_Response_BU':
            
            updateTableQuery  = """
                BEGIN;

                insert into external.{table}
                select 
                    "MSISDN_Survey ID"::text,
                    "Customer MSISDN"::text,
                    "Customer Name"::text,
                    "SurveyID"::text,
                    "Response Date & Time"::timestamp,
                    "Date"::text,
                    "Time"::text,
                    "Hour"::time,
                    "Question"::text,
                    "Response"::text,
                    "Question Sequence"::text,
                    "Rating"::int8,
                    "Full Name"::text,
                    "Creation Time"::text,
                    "Creator Org"::text,
                    "SR ID"::int8,
                    "Creator"::text,
                    "SR Type"::text,
                    "Acceptance Channel"::text,
                    {year}::int4 as year, 
                    '{quarter}'::text as quarter,
                    '{curDate}'::date as processed_date
                from pxl_temp.nps_temp;

                COMMIT;

                """.format(table = eachCat[1], year = year, quarter = quarter, curDate = curDate.strftime("%Y-%m-%d"))
            conn.execute(sql.text(updateTableQuery)) 

            # move the processed file to another folder
            moveFile(file_url, f.name, sharepoint_path_processed)

        else: # if the file does not match any of the name pattern
            
            moveFile(file_url, f.name, sharepoint_path_failed)

        # delete df and local file
        del nps_df
        os.remove(download_file)
        



Year: 2022, Quarter: Q3
Processing for Master_Response_BU_2022Q3.xlsx
Year: 2022, Quarter: Q2
Processing for Master_Response_BU_2022Q2.xlsx
Year: 2022, Quarter: Q1
Processing for Master_Response_BU_2022Q1.xlsx
Year: 2022, Quarter: Q4
Processing for Master_Response_BU_2022Q4.xlsx
