# Common functions for Python, Hive, PySpark, Hive and Excel
## Introduction
The functions are generic and handy. You might use some of them for ad-hoc purpose or facilitating/speeding up your works.
## Common Functions
1. Date/Time related functions
 - f_get_hh_mm_ss : Convert total seconds into hours, minutes, and seconds
 - elapse_time : Get the elapse_time and invoke the function f_get_hh_mm_ss to convert total seconds into hours, minutes, and seconds
 - f_dte_for_yymm : Base on yymm to return last of yymm, previous month, next month, etc.
 - f_get_mth_diff : get month difference between starting month and ending 
 
2. Database related functions
 - f_drop_tb : Drop a table
 - f_alt_tb : Rename a table
 - f_drop_idxmm : Drop an index
 - f_rename_tbl_idx : Rename a table or an index
 - f_cr_idx : Invoke function 'f_drop_idx' to drop an index and create an index
 - f_get_tbl_cnt : Get table count with/without condition
 - f_cr_tbl_selas : Use 'select as' to create a target table from a source table with/without condition
 - f_chk_tbl_exist : Check if table exists
 - f_get_tbl_diff : Get the count and Spark dataframe for the difference of two tables
 - f_cr_tbl_from_csv : Create a table from a csv file
 - f_cr_dic_from_tbl : Create a dictionary from a table
 - f_cr_exl_fr_tbl : Create an excel file from a list of table(s)
 - f_cr_exl_fr_a_tbl : Create an excel file from a table with/without condition
 - f_union_all : Merge pyspark 
 
3. Excel related function
 - f_adj_col_sheet : Based on the column names and content of columns to adjust the spreadsheet column length
 
4. Miscellaneous functions
 - f_chk_df_is_empty : Check if Python dataframe is empty
 - f_rm_extra_space : Replace multiple spaces inside the string with a single space and remove both leading and trailing space
 - f_union_all : Merge PySpark dataframe row-wise
 - f_cr_dic_from_csv : Create a dictionary from a csv file
 - f_str_com_word : Return a string of common words between two strings
 - f_str_dif_wrd : Return a string of words in first string and not in the second string
 
## Notes
Before invoking the function(s), please import packages and libraries in the very beginning.


## Import packages and libraries

In [None]:


from pyspark.sql.functions import udf, struct, coalesce, col, concat
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.types import *
from pyspark.sql import Row,  DataFrame
import pandas as pd
import numpy as np
from functools import reduce
import datetime
from datetime import datetime              
#import csv
import gzip, re
from pandas import ExcelWriter
from pandas import ExcelFile
import calendar


##  Date/Time related functions

In [None]:
def f_get_hh_mm_ss(tot_second):
    def f_get_hh_mm_ss(tot_second): 
    """
      module name : f_get_hh_mm_ss
      purpose     : Convert total seconds into hours, minutes, and seconds 
      parameter
         tot_second   : total seconds; any numeric value, integer or floating
      note            : Will convert the floating number to integer and drop the values after the decimal point
      Example         : f_get_hh_mm_ss(3618.52)  # return (1, 0, 18)which is 1 hour and 18 seconds      
    """   
    tot_second = int(tot_second)
    min  = 60
    hour = 60 * 60
    day  = 60 * 60 * 24     
    hh =  tot_second // hour
    mm = (tot_second - (hh * hour)) // min
    ss =  tot_second  - ((hh * hour) + (mm * min))
    return  hh, mm, ss

def elapse_time (start_time, end_time, dsc):
    """
     module name  : elapse_time
     purpose      : Get elapse time and print it as hh:mm:yy 
     parameter    : 
       start_time : Start time in second;  any numeric value, integer or floating e.g. time.time()
       end_time   : End Time in second  ;  any numeric value; integer or floating e.g. time.time()
       desc       : Description 
     notes        :
                   - The calling function is required to "import time
                   - Will convert the start_time and end_time to integers
                   - will call function "f_get_hh_mm_ss" to convert the elapse_time to hours, minutes, and seconds
                   - Will call strftime function from time module to display the timestamp for start time and end time 
     example1    :
                   elapse_time(0, 2400.17, "Test completed.")
                   - The result would be
                     Test completed. It took 3701.170000 seconds - 1hh:1mm:41ss.
                     start time: Dec 31 1969 17:00:00  end time:  Dec 31 1969 18:01:41
     example2   :              
                   cty_start_time = time.time()  # type : floating
                   time.sleep(5)
                   cty_end_time = time.time()
                   elapse_time (  cty_start_time, cty_end_time, "Test completed.")
                   - The result would be
                     Test completed. It took 5.001574 seconds - 0hh:0mm:5ss.
                     start time: Jan 19 2019 20:20:14  end time:  Jan 19 2019 20:20:19                 
    """    
    elapsed =  end_time -  start_time
    hh, mm, ss = f_get_hh_mm_ss(elapsed)
    print (" %s It took %3f seconds - %uhh:%umm:%uss." %(dsc,elapsed, hh, mm, ss)) 
    print (" start time:", time.strftime("%b %d %Y %H:%M:%S", time.localtime(start_time)), " end time: ",  time.strftime("%b %d %Y %H:%M:%S", time.localtime(end_time)))

def f_dte_for_yymm(yymm, dte_typ):
    """
      module name : f_dte_for_yymm  
      purpose     : Base on yymm and dte_typ to return the date information, e.g. previous month, the first of the month  
      Parameter   :
          yymm - String or an integer to contain yymm
                 - yy is the year
                 - mm is the month
          dte_typ - type to determine date information to return (not case sensitive)
            L/l   : Return the last day of yymm
            P/p   : Return previous month of yymm
            N/n   : Return next month of yymm
            F1/f1 : Return the first day of yymm with the format ccyy-mm-dd
            F2/f2 : Return the first day of yymm with the format mmddccyy
            D/d   : Return date object 
      Note: 
        Function Period is required to 'import pandas as pd'
        datetime.date and datetime.date.strftime are required to 'import datetime'
      Examples
        f_dte_for_yymm(1901, 'l')     # Last date of 2019, january    (Return str  '2019-01-31')
        f_dte_for_yymm(1901, 'p')     # Last date of 2019, january    (Return str  '1812')
        f_dte_for_yymm(1812, 'n')     # Next month of 2018, December  (Return str  '1901')
        f_dte_for_yymm(1901, 'd')     # datetime.date(2019, 2, 1)     (Return datetime.date object with value 2019-02-01)
        f_dte_for_yymm(1902, 'f1')    # First day of the month with the format ccyy-mm-dd (Return '2019-02-01')
        f_dte_for_yymm(1902, 'f2')    # First day of the month with the format mmddccyy   (Return '02012019')
    """    
    import datetime
    import pandas as pd
    yymm = str(yymm)   # in case yymm was provided as an integer    
    ccyy = '20' + yymm [:2]
    mm = yymm[2:4]

    if dte_typ.lower() == 'l':      
       if mm == '12' :
         return ccyy + '-' + mm + '-' + '31'
       else:
          return  (pd.Period(datetime.date(int(ccyy), int(mm)+1, 1), 'D') - 1).strftime("%C%y-%m-%d")    
    elif dte_typ.lower() == 'f1':
       return datetime.date(int(ccyy), int(mm), 1).strftime("%C%y-%m-%d")
    elif dte_typ.lower() == 'f2':
       return datetime.date(int(ccyy), int(mm), 1).strftime("%m%d%C%y")                          
    elif dte_typ.lower() == 'p': 
       return  (pd.Period(datetime.date( int(ccyy), int(mm), 1), 'M') - 1).strftime('%y%m')  
    elif dte_typ.lower() == 'n': 
       return  (pd.Period(datetime.date( int(ccyy), int(mm), 1), 'M') + 1).strftime('%y%m')
    elif dte_typ.lower() == 'd':    
       return datetime.date(int(ccyy), int(mm), 1)
    else:
       print ("The dte_typ is ", dte_typ, "Valid dte_typ for f_dte_for_yymm should be L/l, P/p, N/n, F1/f1, F2/f2, D/d. ")
       sys.exit(1)

def  f_get_mth_diff(s_yymm, e_yymm):
     """
       module name : f_get_mth_diff
       purpose     : Get month difference between starting month and ending month
                     -  Will return the month difference between s_yymm and e_yymm as an integer
       parameter   :
         s_yymm    : Starting month with yymm  format where yy is the year and mm is the month  
                     - It can be numeric or string.
         e_yymm    : Ending month with yymm format where yy is the year and mm is the month  
                    - It can be numeric or string.       
       Notes       : if starting month > ending month, the  month difference would be negative                   
       example1    : f_get_mth_diff(1811, 1904)
                     - Starting month is 2018, November
                     - Ending   month is 2019, Aprial
                     - return 5 (% months difference)
       example2    : f_get_mth_diff('1811', 1904)
                     - return 5 
    """    
    import datetime
    import pandas as pd
    s_yymm = str(s_yymm)   # in case s_yymm was provided as an integer    
    s_ccyy = '20' + s_yymm [:2]
    s_mm   = s_yymm[2:4]
     
    e_yymm = str(e_yymm)   # in case e_yymm was provided as an integer    
    e_ccyy = '20' + e_yymm [:2]
    e_mm   = e_yymm[2:4]
     
    s_dte = datetime.date( int(s_ccyy), int(s_mm), 1)  #datetime.date object
    e_dte = datetime.date( int(e_ccyy), int(e_mm), 1)  #datetime.date object
     
    return  pd.Period(e_dte,'M') - pd.Period(s_dte, 'M') 


## Database related functions

In [None]:


def f_drop_tb(tbl_nm):
    """
      module name : f_drop_tb
      purpose     : Drop a table
      parameter   :
        tbl_nm    : Table name 
      example1    : f_drop_tb("lookup.post_code") # drop table lookup.post_code
      example2    : tbl_nm = "lookup.post_code"  
                    f_drop_tb(tbl_nm)             # drop table lookup.post_code   
    """     
    q_drop = "drop table if exists {0}".format(tbl_nm)
    sqlContext.sql(q_drop)
    print(q_drop)

def f_alt_tb(src_tbl_nm, trg_tbl_nm):
    """
      module name : f_alt_tb
      purpose     : Rename a table 
      parameter   :
       src_tbl_nm : Old table name
       trg_tbl_nm : New table name
      example1    : f_alt_tb("lookup.post_code", "lookup.post_code_bkup ") # Change table name from lookup.post_code to lookup.post_code_bkup
      example2    : tbl_nm_old = "lookup.post_code"
                    tbl_nm_new = "lookup.post_code_bkup"
                    f_alt_tb(tbl_nm_old, tbl_nm_new)             
    """      
    q_alt = "alter table {0} rename to {1}".format(src_tbl_nm, trg_tbl_nm)
    sqlContext.sql(q_alt)
    print(q_alt)

def f_drop_idx(tbl_nm, idx_nm) :
    """
      module name : f_drop_idx
      purpose     : Drop an index on a table
      parameter   :
       tbl_nm     : Table name which the index had been created
       idx_nm     : Index name to be dropped             
    """    
    q_drp_idx = 'DROP INDEX IF EXISTS {0} on {1}'.format(idx_nm,tbl_nm )
    sqlContext.sql(q_drp_idx)
    print(q_drp_idx)

def f_rename_tbl_idx(obj_nm_old, obj_nm_new, obj_typ) :
    """
      module name : f_rename_tbl_idx
      purpose     : Rename a table or an index 
      parameter   :
       obj_nm_old : Old table/index name
       obj_nm_new : New table/index name
       obj_type   : Object type  (not case sensitive)
         "ind"    : Will rename object "index"
                    Otherwise, rename object "table"              
    """        
    if obj_typ.lower() == 'ind':
       _obj_typ = 'index'
    else: _obj_typ= 'table'
    q_rename = "alter {0} {1} rename to {2}".format(_obj_typ, obj_nm_old, obj_nm_new )  
    sqlContext.sql(q_rename)
    print(q_rename)

def f_cr_idx(tbl_nm, idx_nm, key_var, idx_typ) :
    """
      module name : f_cr_idx
      purpose     : Invoke function 'f_drop_idx' to drop an index and create an index 
      parameter   :
       tbl_nm     : table name which index had been created or would be created on
      idx_nm     : Index name
       key_var    : Create the index on the colunms defined in key_var
       idx_typ    : Define index.handler.class.name  (not case sensitive)
         'bmp'    : The index.handler.class.name would be 'BITMAP'  (
         Otherwise: The index.handler.class.name would be "org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler"
       example
                   bus_ctc_tbl = "pcr_cor.cor_bus_ctc_1812"
                   idx_nm = "bus_ctc_idx_cmp_1812" 
                   f_cr_idx(bus_ctc_tbl, idx_nm, "(abbr_nm, se10)", "cmp")
                    - will create the table pcr_cor__cor_bus_ctc_1812_bus_ctc_idx_cmp_1812__ for index
                     - on column abbr_nm and se10
                    - The index handler is   "org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler"                    
    """     
    f_drop_idx(tbl_nm, idx_nm)
    if idx_typ.lower() == 'bmp':  as_stmt = 'BITMAP'  (used for columns with few distinct values)
    else: as_stmt = 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'   

    # Create an empty index 
    q_cr_idx = """ CREATE INDEX {0}
    ON TABLE {1}  {2}
    AS "{3}"
    WITH DEFERRED REBUILD""".format(idx_nm, tbl_nm, key_var, as_stmt)
    sqlContext.sql(q_cr_idx)
    print(q_cr_idx)
    
    #Build index structure and load data to index table
    q_alt_idx = "ALTER INDEX {0}   on {1}   REBUILD".format( idx_nm, tbl_nm)
    sqlContext.sql(q_alt_idx)
    print(q_alt_idx)

def f_get_tbl_cnt(tbl_nm, cond_stmt) :
    """
      module name : f_get_tbl_cnt(
      purpose     : Get table count with/without condition 
      parameter   :
       tbl_nm     : Table name
       cond_stmt  : where/Condition statement
      return      :
       cnt        : Count of the table with/without condition
    """
    q_get_cnt = 'select count(*) as cnt from {0} {1} '.format(tbl_nm, cond_stmt)
    cnt = sqlContext.sql(q_get_cnt).collect()[0]["cnt"]
    print("count = ", cnt, ' for ', q_get_cnt )
    return cnt


def f_cr_tbl_selas(trg_tbl, src_tbl, cond_stmt):
    """
      module name : f_cr_tbl_selas
      purpose     : Use select as to create a target table from a source table 
      parameter   :
        trg_tbl   : Table name to be created
        src_tbl   : Source table to get data
        cond_stmt : Where/Condition statement to create a table
      Notes       : will invoke f_drop_tb to drop a table and invoke f_get_tbl_cnt to get count
    """         
    f_drop_tb(trg_tbl)
    q_cr_tbl = "create table {0} as select * from {1} {2}".format(trg_tbl, src_tbl, cond_stmt)  
    sqlContext.sql(q_cr_tbl)
    
    f_get_tbl_cnt(trg_tbl, "") 
    print(" Create table ", trg_tbl,  " from ", src_tbl, cond_stmt ) 

def f_chk_tbl_exist (sch_tbl_nm):
    """
      module name : f_chk_tbl_exist 
      purpose     : Check if table exists. 
      parameter   :
      sch_tbl_nm  : Schema name and table name 
    
      notes       : 
          - The function will return 'True' if the table exists. Otherwise, return 'False'
          - Hive only accepts lowercase schema name and table name
            - The function will convert the sch_tbl_nm to lowercase
      Example     : f_chk_tbl_exist( "cstonedb3.gms_merchant_char")
        sch_nm =  'cstonedb3'   
        tbl_nm =  'gms_merchant_char'  
    """    
    sch_tbl_nm = sch_tbl_nm.lower()
    sch_nm = sch_tbl_nm.split(".")[0]   
    tbl_nm = sch_tbl_nm.split(".")[1] 
    if tbl_nm  in sqlContext.tableNames(sch_nm):
       return True
    else: return False  
def f_get_tbl_diff(tbl_nm_1, tbl_nm_2, var_nm, cond_stmt, diff_tbl_nm):       
    """
      module name  : f_get_tbl_diff
      purpose      : Get the count and spark dataframe for the difference of two tables
                     - save the data extracted from tbl_nm_1 to spark dataframe sdf_tbl_1
                     - save the data extracted from tbl_nm_2 to spark dataframe sdf_tbl_2
                     - Use subtract function to get the difference between sdf_tbl_1 and sdf_tbl_2 
      parameter    :
       tbl_nm_1    : Table name 1
       tbl_nm_2    : Table name 2
       var_nm      : string of variable name(s) to compare the difference
                     - use '*' if no specific variable to compare    
       cond_stmt   : Condition statement including group by, sort by
       diff_tbl_nm : Table name to save the differences. Schema name is part of table name  
                     - if diff_tbl_nm = '' will not save the difference to a table
      return       :
       diff_cnt    : Count of difference
       sdf_diff    : PySpark data frame to save the difference
       
      note         : 
          The function will return the count of difference and the spark dataframe to save the difference
          
    """    
    diff_tbl_nm.strip()   # Removes both leading and trailing characters
    fnc_nm = 'f_chk_tbl_dif'
    tbl_not_exist = 0
    if len(diff_tbl_nm.split(".")) < 2 and  diff_tbl_nm is not '':
       print ("MSG from " + fnc_nm + ": " + diff_tbl_nm + " is incorrect. It might miss the schema name." ) 
       tbl_not_exist = 1
    
    if not f_chk_tbl_exist (tbl_nm_1):
       print ("MSG from " + fnc_nm + ": " + tbl_nm_1, " does not exist ") 
       tbl_not_exist = 1

    if not f_chk_tbl_exist (tbl_nm_2):
       print ("MSG from " + fnc_nm + ": " + tbl_nm_2, " does not exist ") 
       tbl_not_exist = 1  
        
    if  tbl_not_exist == 1:
        print ("ERR from " + fnc_nm + ": At least  one of the input tables doesn't exist or the output table name is incorrect. ")
        sys.exit(1)
        
    q_sdf_tbl_1 = "select {0} from {1} {2}".format(var_nm, tbl_nm_1, cond_stmt)
    print(" q_sdf_tbl_1",  q_sdf_tbl_1)
    sdf_tbl_1   = sqlContext.sql(q_sdf_tbl_1)
    
    
    q_sdf_tbl_2 = "select {0} from {1} {2}".format(var_nm, tbl_nm_2, cond_stmt)
    sdf_tbl_2   = sqlContext.sql(q_sdf_tbl_2)   
    
    print(" q_sdf_tbl_2",  q_sdf_tbl_2)
    
    sdf_diff = sdf_tbl_1.subtract(sdf_tbl_2)  # Get the difference between sdf_tbl_1 and sdf_tbl_2
    diff_cnt =  sdf_diff.count()

    # If diff_tbl_nm has value, save the difference to a table
    if diff_cnt > 0 and diff_tbl_nm is not '':
       sdf_diff.registerTempTable("tmp_diff_tbl")
      f_cr_tbl_selas (diff_tbl_nm, 'tmp_diff_tbl', '' )
    return diff_cnt, sdf_diff    


def f_cr_tbl_from_csv(csv_file, tbl_nm, databricks_csv):
    """
      module name  : f_cr_tbl_from_csv
      purpose      : Create a table from a csv file 
      parameter    :
       csv_file    : CSV name including path name 
       tbl_nm      : Table name including schema name
       databricks_csv: Indicator
          True     : will use " com.databricks.spark.csv" to save the csv file to a Spark dataframe
         Otherwise: Use panda  
        
      example      :
      
      notes        : - Null
                       -  com.databricks.spark.csv will save 'NULL' as 'NULL'
                       -  If save CSV file to a panda dataframe and save it into table will save 'NULL' as 'NaN'
                       -  databricks_csv = 'True' is the suggested way to create a table
                     - ClassNotFoundException: Failed to find data source: com.databricks.spark.csv. 
                       - If submit the calling program with 'True'for databricks_csv will get ClassNotFoundException error message and job will stop
                         “sh /axp/platform/cloak/bin/spark/cloak-sparksubmit calling_code"
                       -  Avoid the error message
                          - Define sphjar environmental variable
spkjar="--master yarn-client --jars /axp/platform/cloak/lib/cloak-spark-1.2.0-SNAPSHOT.jar,/axp/platform/mlplat/app/lib/thirdpartyjars/spark-libs/commons-csv-1.4.jar,/axp/platform/mlplat/app/lib/thirdpartyjars/spark-libs/spark-csv_2.10-1.5.0.jar" 
export HADOOP_CLASSPATH=/axp/platform/cloak/lib/cloak-hive-1.2.0-SNAPSHOT.jar; 
                          - Use "spark-submit $spkjar calling_code" to submit your job to create a table from a csv via com.databricks.spark.csv
                                                                                                                                                                                           
    """    
    print(" def csv file", csv_file, "tbl_nm ", tbl_nm,   "databricks_csv ", databricks_csv)  
    if databricks_csv == "True" :   
       sdf_csv = sqlContext.read.format("com.databricks.spark.csv") \
           .options(header='true', inferschema='true') \
           .option("treatEmptyValuesAsNulls", "true")  \
           .load(os.path.realpath(csv_file))
       print("use databricks_csv")
    else:
       pdf_csv = pd.read_csv(csv_file)
       sdf_csv = sqlContext.createDataFrame(pdf_csv)
       print("not use databricks_csv")
    
    sqlContext.registerDataFrameAsTable(sdf_csv, "sdf_csv_tbl") # register the dataframe as a temp table
    #drop table
    f_drop_tb(tbl_nm)                                        
    q_cr_tbl = "create table {0} as select * from sdf_csv_tbl".format(tbl_nm )
    print("q_cr_tbl :", q_cr_tbl)
    sqlContext.sql(q_cr_tbl)
    f_get_tbl_cnt(tbl_nm, '')
    print("create table ", tbl_nm ," from the csv file :", csv_file)

 
def  f_cr_dic_from_tbl(tbl_nam, key_nam, val_nam,):    
    """
      module name : f_cr_dic_from_tbl
      purpose     : Create/Return a dictionary from a table 
      parameter   :
        ctbl_nam  : Table name with schema
        key_nam   : Variable to define key   for a dictionary
        val_nam   : Variable to define value for a dictionary
      
      example     :  
        d_bus_rol_grp_dsc =  f_cr_dic_from_tbl('pcr_cor.cor_lup_bus_rule', 'rol_typ', 'rol_typ_dsc')
        - Create a dictionary d_bus_rol_grp_dsc from the table pcr_cor.cor_lup_bus_rule with key as rol_typ and value as rol_typ_dsc
     """ 
     q_pdf = "select {0}, {1} from {2} group by {0}, {1}". format( key_nam,  val_nam, tbl_nam)
     pdf = sqlContext.sql(q_pdf).toPandas()   # Create a panda dataframe
     l_key = pdf[key_nam].tolist()
     l_val = pdf[val_nam].tolist()
     return dict (zip(l_key, l_val))


    
"""
 module name : f_cr_exl_fr_tbl
 purpose : Create an excel file from a list of table(s) without condition 
  parm
    excel_file: Excel file with extension as 'xlxs'
    l_tbl     : List of table name

"""

def f_cr_exl_fr_tbl(excel_file, l_tbl):    
    """
      module name : f_cr_exl_fr_tbl
      purpose     : Create an excel file from a list of table(s) without condition 
      parameter   :
        excel_file: Excel file with extension as 'xlxs'
        l_tbl     : List of table name
      notes       :
        - Create a spreadsheet with table name as a tab
    """    
    
    writer = pd.ExcelWriter(excel_file)
    for tbl in l_tbl:
        q_pdf = "select * from {0}".format(tbl)
        print("q_pdf", q_pdf)
        sdf = sqlContext.sql(q_pdf)
        sdf.head(10)
        pdf = sqlContext.sql(q_pdf).toPandas()
        pdf.to_excel(writer,tbl, index=False) 
    writer.save()                   

def f_cr_exl_fr_a_tbl(excel_file, tbl_nam, cond_stmt):
    """
      module name : f_cr_exl_fr_a_tbl
      purpose     : Create an excel file from a table with/without condition 
      parameter   :
       excel_file : Excel file with extension as 'xlxs'
       tbl_nam    : Table name
       cond_stmt  : Filter out condition
      Note: 
          The function is cloned from f_cr_exl_fr_tbl which can handle more than one tables without condition
    """    
    writer = pd.ExcelWriter(excel_file)
    q_pdf = "select * from {0} {1} ".format(tbl_nam, cond_stmt)
    print("q_pdf", q_pdf)
    sdf = sqlContext.sql(q_pdf)
    pdf = sqlContext.sql(q_pdf).toPandas()
    pdf.to_excel(writer,tbl_nam, index=False) 
    writer.save()

Excel related function


## Excel related function


In [None]:

def f_adj_col_sheet(pdf_nm, excel_writer,sheet_nm, head_fmt_ind):    
    """
      module name : f_adj_col_sheet
      purpose     : Based on the column names and content of columns to adjust the spreadsheet column length
                     - Return workbook with adjusted column length 
      parameters     :
        pdf_nm       : panda data frame name for the excel sheet
        excel_writer : XlsxWriter Excel object
        sheet_nm     : sheet name
        head_fmt_ind : indictor to define head format for the workbook  
      Note: 
        The calling program needs to define the xcel_writer with engine='xlsxwriter', e.g.
        - writer = pd.ExcelWriter(excel_file, engine='xlsxwriter')
    """    
    workbook  = excel_writer.book
    worksheet = excel_writer.sheets[sheet_nm]
    if ( head_fmt_ind == 'True'):
         header_format = workbook.add_format({
             'bold': True,
             'text_wrap': True,
             'valign': 'top',
             'bg_color': 'grey', 
             'border': 1})        
                 
    _col = 'A'
    for col in pdf_nm.columns.values:        
         max_col_cnt =  max(pdf_nm[col].apply(repr).apply(len).max(), len(col))
         col_rang =  _col + ':' + _col 
         worksheet.set_column(col_rang , max_col_cnt) + 2
         _col = chr(ord(_col) + 1)    
    return workbook


## Miscellaneous functions

In [None]:
def  f_chk_df_is_empty(df):
     """
      module name : f_chk_df_is_empty(df)
      purpose : Check if python dataframe is empty
      parameter
        df    : Python data frame     
      Note: 
          The function will return True if the length of DF is 0 
    """     
    if len(df) == 0:
       return True
    else : return False
 
def f_rm_extra_space(in_str):
    """
      module name : f_rm_extra_space
      purpose     : Replace multiple spaces inside the string with a single space and remove both leading and trailing spaces
      parameter   :
        in_str: String
      note
       -re.sub()  : Search and replace function in re (regular expression) module  
      example     : f_rm_extra_space("  a b          d  ") # Return 'a b d'
    """
    return re.sub(' +', ' ', in_str).strip()
 
def f_union_all(*dfs):
    """
      module name : f_union_all
      purpose     : Merge PySpark dataframe row-wise
      parameter   :
        *dfs      : Any number of PySpark dataframe, and separated by ',' 
      note        : Adapted from https://datascience.stackexchange.com/questions/11356/merging-multiple-data-frames-row-wise-in-pyspark
      example     : f_union_all(td1, td2, td3, td4) # merge PySpark dataframe td1, td2, td3, and td4    
    """    
    from   functools   import reduce 
    from   pyspark.sql import DataFrame
    return reduce(DataFrame.unionAll, dfs)

def  f_cr_dic_from_csv(csv_file, val_nam, key_nam ):
    """
      module name : f_cr_dic_from_csv
      purpose     : Create a dictionary from a csv file 
      parameter   :
        csv_file  : include file name and  for the CSV file
        val_nam   : variable for value
        key_nam   : variable for key 
     """      
     pdf_csv = pd.read_csv(csv_file)
     l_val = pdf_csv[val_nam]
     l_key = pdf_csv[key_nam].tolist()
     return  dict (zip(l_key, l_val

def f_str_com_word(str1, str2):   
    """
      module name : f_str_com_word
      purpose     : Return a string of common words between srt1 and str2
      parameter   :
        str1      : string with words separated by space(s)        
        str2      : string with words separated by space(s)
      note:
          - words are separate by space(s)
          - The sequence of words is not important,  e.g.
            -  f_str_com_word('ab cd ef ef', 'ef ab ef')   # return 'ef ab'
            -  Will dedupe the repeating word and then do the comparison, e.g.
               - f_str_com_word('ab cd ef ef', 'ef ab ef')  # return 'ef ab'
          - str1.strip().split() - convert a string to a list       
"""
    return ' '.join(list( set(str1.strip().split())  & set(str2.strip().split


def f_str_dif_wrd(str1, str2): 
     """
      module name : f_str_dif_wrd
      purpose     : Return a string of words in str1 and not in str2
      parameter   :
        str1      : string with words separated by space(s)        
        str2      : string with words separated by space(s  
      example     :
                    - f_str_dif_wrd('ab cd ef ef', 'ef ab     ef')   # 'cd'
                    - f_str_dif_wrd('ab cd ef ef', 'ef ab cd ef hj')  # ''
                    - f_str_dif_wrd('ab cd ef ef ba', 'ef ab cd ef hj')  # 'ba'
      notes       :
          - words are separate by space(s)
          - The sequence of words is not important
          - Will dedupe the repeating word and then do the comparison       
          - str1.strip().split() - convert a string to a list       
     """    
        return ' '.join(list( set(str1.strip().split())  - set(str2.strip().split())))                 
                                                           
Misc
,lpad(first_rspbl_cd, 3, "0") as first_rspbl_cd
,lpad(scnd_rspbl_cd, 3, "0") as scnd_rspbl_cd

