In [1]:
from logging import Logger
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, unix_timestamp, to_date, date_format, to_timestamp, lit

In [2]:
def read_from_pg(spark:SparkSession, config: dict, sql: str, table: str) -> DataFrame:
    """ Read dataframe from postgres
    Args:
        config: settings for connect
        sql: sql to read, it may be one of these format
             - 'table_name'
             - 'schema_name.table_name'
             - '(select a, b, c from t1 join t2 ...) as foo'
        spark: specific current spark_context or None
    Returns:
        selected DF
    """        
    try:
        print("reading from postgresql")
        if sql:
            print("executing query to create df")
            source_df = spark.read.format("jdbc").options(**config).option("query", sql).load()

        else:
            print("reading directly from table")
            source_df = spark.read.format("jdbc").options(**config).option('dbtable',table).load()

        return source_df
    
    except Exception as e:
        print("Failure occured check logs")
        print(f"{e}")
        return None

In [3]:
def write_to_pg(spark:SparkSession, df: DataFrame, config: dict, table: str, mode: str='append' ) -> None:
    """ Write dataframe to postgres
    Args:
        df: DataFrame to write
        config: config dict
        table: table_name in which we write_data
        column_list: list of columns in which we write the data
        mode: mode, one of these:
            - append - create table if not exists (with all columns of DataFrame)
                and write records to table (using fields only in table columns)
            - overwrite - truncate table (if exists) and write records (using fields only in table columns)
            - overwrite_full - drop table and create new one with all columns and DataFrame and append records to it
            - fail - fail if table is not exists, otherwise append records to it
    """
    
    try:
        column_list = df.columns
        if len(column_list) == 0:
            return("No columns to write into")

        else:
            df.select(*column_list).write.format('jdbc').options(**config).option('dbtable',table).mode(mode).save()
            return "Data written into postgresql successfully"
        
    except Exception as e:
        print("Failure occured check logs")
        print(f"{e}")
        return None

In [4]:
def read_csv_file(spark:SparkSession, filename:str, delimiter:str) -> DataFrame:
    """ Read dataframe from manual file
    Args:
        filename: file to read from
        delimiter: delimiter to use when reading the file
    Returns:
        selected DF
    """       
    try:
        print("reading from file")
        source_df = spark.read.option("delimiter", delimiter).option('inferschema',True).option('header',True).csv('sourcedata/'+filename)  
        print("file read successfully, returning dataframe")
        return source_df 

    except Exception as e:
        print("Failure occured check logs")
        print(f"{e}")
        return None

In [5]:
def write_to_csv(df:DataFrame, filename:str, delimiter:str = ',', mode: str='append' )->None:
    """Write dataframe to a csv file
    Args:
    df: data frame to write
    filename: filename with which we want to save our dataframe in a file
    by default we will write a comma delimited file only 
    """
    try:
        print("writing the dataframe")
        df.coalesce(1).write.option('delimiter', delimiter).option("header", True).csv('target/'+filename, mode=mode)
        print("dataframe written at target/"+filename)
        
    except Exception as e:
        print("Failure occured check logs")
        print(f"{e}")
        return None

In [6]:
def read_from_oracle(spark:SparkSession, config: dict, sql: str, table: str) -> DataFrame:
    """ Read dataframe from oracle
    Args:
        config: settings for connect
        sql: sql to read, it may be one of these format
             - 'table_name'
             - 'schema_name.table_name'
             - '(select a, b, c from t1 join t2 ...) as foo'
        spark: specific current spark_context or None
    Returns:
        selected DF
    """        
    try:
        print("reading from oracle")
        if sql:
            print("executing query to create df")
            source_df = spark.read.format("jdbc").options(**config).option("query", sql).load()

        else:
            print("reading directly from table")
            source_df = spark.read.format("jdbc").options(**config).option('dbtable',table).load()

        return source_df 
    
    except Exception as e:
        print("Failure occured check logs")
        print(f"{e}")
        return None


In [7]:
def write_to_oracle(spark:SparkSession, df: DataFrame, config: dict, table: str, mode: str='append' ) -> None:
    """ Write dataframe to oracle
    Args:
        df: DataFrame to write
        config: config dict
        table: table_name in which we write_data
        column_list: list of columns in which we write the data
        mode: mode, one of these:
            - append - create table if not exists (with all columns of DataFrame)
                and write records to table (using fields only in table columns)
            - overwrite - truncate table (if exists) and write records (using fields only in table columns)
            - overwrite_full - drop table and create new one with all columns and DataFrame and append records to it
            - fail - fail if table is not exists, otherwise append records to it
    """
    
    try:
        column_list = df.columns
        if len(column_list) == 0:
            return("No columns to write into")

        else:
            df.select(*column_list).write.format('jdbc').options(**config).option('dbtable',table).mode(mode).save()
            return "Data written into oracle successfully"
        
    except Exception as e:
        print("Failure occured check logs")
        print(f"{e}")
        return None

In [8]:
def read_from_msssql(spark:SparkSession, config: dict, sql: str, table: str) -> DataFrame:
    """ Read dataframe from msssql
    Args:
        config: settings for connect
        sql: sql to read, it may be one of these format
             - 'table_name'
             - 'schema_name.table_name'
             - '(select a, b, c from t1 join t2 ...) as foo'
        spark: specific current spark_context or None
    Returns:
        selected DF
    """       
    try:
        print("reading from msssql")
        if sql:
            print("executing query to create df")
            source_df = spark.read.format("jdbc").options(**config).option("query", sql).load()

        else:
            print("reading directly from table")
            source_df = spark.read.format("jdbc").options(**config).option('dbtable',table).load()

        return source_df 
    
    except Exception as e:
        print("Failure occured check logs")
        print(f"{e}")
        return None


In [9]:
def write_to_mssql(spark:SparkSession, df: DataFrame, config: dict, table: str, mode: str='append' ) -> None:
    """ Write dataframe to msssql
    Args:
        df: DataFrame to write
        config: config dict
        table: table_name in which we write_data
        column_list: list of columns in which we write the data
        mode: mode, one of these:
            - append - create table if not exists (with all columns of DataFrame)
                and write records to table (using fields only in table columns)
            - overwrite - truncate table (if exists) and write records (using fields only in table columns)
            - overwrite_full - drop table and create new one with all columns and DataFrame and append records to it
            - fail - fail if table is not exists, otherwise append records to it
    """
    
    try:
        column_list = df.columns
        if len(column_list) == 0:
            return("No columns to write into")

        else:
            df.select(*column_list).write.format('jdbc').options(**config).option('dbtable',table).mode(mode).save()
            return "Data written into msssql successfully"
        
    except Exception as e:
        print("Failure occured check logs")
        print(f"{e}")
        return None

In [10]:
def convert_to_target_dtypes(source_df:DataFrame, target_df:DataFrame, mappings:dict)-> DataFrame:
    """
    Function to convert source column names to target column names 
    and convert source column data types to target column data types
    
    Args:
        source_df: source dataframe 
        target_df: target dataframe
        mapping: dictionary with source to column mapping 
        date_format: date format to which we want to convert source date column to target date type
        
    returns :
        returns a data frame which we can write in target
    """
    try:
        
        # date_format =  'dd-MM-yyyy'
        
        # mapping source columns to target
        final_df = source_df
        for key, value in mappings.items():
            final_df= final_df.withColumnRenamed(key,value)
            
        target_types = target_df.dtypes
        source_types = final_df.dtypes
        # type conversion of source columns to target column types
        for i in source_types:
            for j in target_types:
                if i[0] == j[0] and j[1]!='date' and j[1] !='timestamp':
                    print(i[0])
                    final_df = final_df.withColumn(i[0],final_df[i[0]].cast(j[1]))

#                 elif i[0] == j[0] and j[1] =='date' :
#                     print(i[0])
#                     final_df = final_df.withColumn(i[0], to_timestamp(final_df[i[0]], source_date_format).cast('date') )
                
#                 elif i[0] == j[0] and j[1] =='timestamp' :
#                     print(i[0])
#                     final_df = final_df.withColumn(i[0], date_format(final_df[i[0]], target_date_format).cast('timestamp') )
    
        return final_df
    except Exception as e:
        print("Failure occured check logs")
        print(f"{e}")
        return None

In [11]:
def  mapping_generation(spark:SparkSession, mapping_filename):
    """
    Function to read mapping file and do following:
        generate source to column mapping
        generate mapping of the date column to explicitly convert them
        
    Args:
        spark: SparkSession object
    Returns:
         conversion_mapping_dict: dict
             a dict column mapping for different use cases. 
             eg. : mapping source to target columns
                   mapping columns for specific date format conversion
    """
    try:
        mapping_file_path = 'mapping/'+mapping_filename
        mapping_df = spark.read.option('header',True).option('inferSchema',True).csv(mapping_file_path)
        # mapping_pdf = mapping_df.toPandas()
        # mapping_dict = mapping_pdf.to_dict(orient='records')
        mapping_dict = [row.asDict() for row in mapping_df.collect()]

        source_target_col_mapping = {}
        columns_for_date_conversion = []
        static_target_columns = []
        default_value_for_null_columns = []

        for element in mapping_dict:
            if (element['source'] != None):
                source_target_col_mapping[element['source']] = element['target']

            if (element['source_format']) and (element['target_format']):
                columns_for_date_conversion.append(element)

            if (element['source'] == None and element['target'] != None):
                static_target_columns.append(element)

            if (element['source'] != None and element['target'] != None and element['default_value'] != None ):
                default_value_for_null_columns.append(element)

        ## Need to add the code to coombine the result in a dic
        conversion_mapping_dict = {}
        conversion_mapping_dict['source_target_column_mapping'] = source_target_col_mapping
        conversion_mapping_dict['columns_for_date_conversion'] = columns_for_date_conversion
        conversion_mapping_dict['static_target_columns'] = static_target_columns
        conversion_mapping_dict['default_value_for_null_columns'] = default_value_for_null_columns
        return conversion_mapping_dict
    except Exception as e:
        print("Failure occured check logs")
        print(f"{e}")
        return None

In [12]:
def create_fill_na_dict(default_value_for_null_columns_list:list)->dict:
    """
    Function to get the columns to fill with NA values
    Args:
        default_value_for_null_columns_list: list of the jsons contaning information about default values
        for NA/NULLs
    returns:
        dict with column name to default value mapping
    """
    fill_na_dict = {}
    
    for element in default_value_for_null_columns_list:
        fill_na_dict[element['target']] = element['default_value']
     
    return fill_na_dict

In [13]:
def populate_null_values(df: DataFrame, fill_na_dict: dict)-> DataFrame:
    """
    Function to replace Nulls with hardcoded value proviede at mapping file
    Args:
        df: Spark dataframe in which data with null values is present
        fill_na_dict: dictionary with mapping of column name and the value for replacing NULL
    returns:
        data frame with null values replaced with hard coded values provided by the mapping
    
    """
    if fill_na_dict:
        res_df = df.fillna(fill_na_dict)
        return res_df
    else:
        return df

In [14]:
def populate_column_with_default_values(df: DataFrame, static_target_columns_list:list)-> DataFrame:
    """
    Function to populate columns which are only present at target and need hardcoded/static value
    Args:
        df : spark dataframe inside which we will add the static values
        static_target_columns_list : list of columns for which we need to add static columns 
        
    returns: spark dataframe with target columns populated with hard coded values
    """
    try:
        tmp = {} 
        for element in static_target_columns_list:  
            df = df.withColumn(element['target'], lit(element['default_value']) )

        return df
    
    except Exception as e:
        print("Failure occured check logs")
        print(f"{e}")
        return None

In [15]:
def date_column_format_converter(df:DataFrame,columns_for_date_conversion:list)-> DataFrame:
    """
    Function to convert the date columns into requested format
    Args:
        Spark dataframe
        columns_for_date_conversion: list of jsons contaning inforamtion about date columns 
    returns:
        dataframe with date column modfied per the format given
    """
    try:
        for column_details in columns_for_date_conversion:
            source_format = column_details['source_format']
            target_format = column_details['target_format']
            target_col = column_details['target']
            target_dtype = column_details['target_data_type']
            print('target_col =', target_col,',source_format =',source_format,',target_format=', target_format,',target_dtype=',target_dtype) 
#             print(target_col)
#             df = df.withColumn(target_col, to_timestamp(df[target_col].cast('string'), source_format))
            tmp_df = df.withColumn(target_col, to_timestamp(col(target_col) , source_format)) 
    
            if target_dtype == 'string':
                print("target type is STRING")
                final_df = tmp_df.withColumn(target_col, date_format(to_timestamp(col(target_col), source_format), target_format))
                
            elif target_dtype == 'date':
                final_df = tmp_df.withColumn(target_col, to_date(target_col, source_format))
            elif target_dtype == 'timestamp':
                final_df = tmp_df.withColumn(target_col, to_timestamp(target_col, source_format))

        return final_df
    except Exception as e:
        print("Failure occured check logs")
        print(f"{e}")
        return None

In [16]:
# def driver_code(spark:SparkSession, source_info:dict, target_info:dict, mapping_filename:str):
#     """
#     This is the driver code function
#     the flow of execution based on type of source and target gets executed inside this function
#     Args:
#         Function takes two dicts as arugmnets which as source and target information
#         a sample of these inputs is :
#             source_config = {
#                         "source_name" : "",
#                         "source_config" : {
#                                         "url" : "",
#                                         "driver" : "",
#                                         "user" : "",
#                                         "password" : ""},
#                         "source_query" : "",
#                         "source_table" : "",
#                         "source_filename" : "",
#                         "source_delimiter" : ""
#                     }

#                     target_config = {

#                         "target_name" : "",
#                         "target_config" : {
#                                         "url" : "",
#                                         "driver" : "",
#                                         "user" : "",
#                                         "password" : ""
#                                         },
#                         "target_table" : "",
#                         "target_filename" : "",
#                         "target_delimiter" : "",
#                         "target_write_mode" : "" 
#                     }
#     """
#     source_name = source_info['source_name']
#     source_config = source_info['source_config']
#     source_query = source_info['source_query']
#     source_table = source_info['source_table']
#     source_filename = source_info['source_filename']
#     source_delimiter = source_info['source_delimiter']
    
#     target_name = target_info['target_name']
#     target_config = target_info['target_config'] 
#     target_table = target_info['target_table']
#     target_filename = target_info['target_filename'] 
#     target_delimiter = target_info['target_delimiter']
#     target_write_mode = target_info['target_write_mode']
    
#     try:
#         # reading from source into source_df
#         if source_name == 'oracle':
#             source_df = dbu.read_from_oracle(spark , source_config, source_query, source_table)

#         elif source_name == 'sqlserver':
#             source_df = dbu.read_from_msssql(spark , source_config, source_query, source_table)

#         elif source_name == 'postgres':
#             source_df = dbu.read_from_pg(spark , source_config, source_query, source_table)

#         elif source_name == 'manualfile':
#             source_df = dbu.read_csv_file(spark , source_filename, source_delimiter)  
            
            
#         print('Data read from source')
        
#         if target_name != 'manualfile':
            
#             #reading from target for target_df creation
#             if target_name == 'oracle':
#                 target_df = dbu.read_from_oracle(spark , target_config, '', target_table)

#             elif target_name == 'sqlserver':
#                 target_df = dbu.read_from_msssql(spark , target_config, '', target_table)

#             elif target_name == 'postgres':
#                 target_df = dbu.read_from_pg(spark , target_config, '', target_table)


#             # generating mappings
#             mappings = dbu.mapping_generation(spark, mapping_filename)
#             columns_for_date_conversion = mappings['columns_for_date_conversion']
#             source_to_target_mapping = mappings['source_target_column_mapping']
#             static_target_columns = mappings['static_target_columns']
#             default_value_for_null_columns = mappings['default_value_for_null_columns']

#             # converting primary data types 
#             print("****converting preliminary data types****")
#             type_converted_df = dbu.convert_to_target_dtypes(source_df, target_df, source_to_target_mapping)

#             #converting date column types and formats
#             print("****converting date column types and formats****")
#             date_converted_df = dbu.date_column_format_converter(type_converted_df ,columns_for_date_conversion)

#             #adding values for hard coded columns
#             print("****adding values for hard coded columns****")
#             hard_coded_value_populated_df = dbu.populate_column_with_default_values(date_converted_df, static_target_columns)

#             #adding default values for nulls
#             print("****adding default values for nulls****")
#             fill_na_dict = dbu.create_fill_na_dict(default_value_for_null_columns)
#             print("****fill_na_dict:", fill_na_dict)
#             null_populated_df = dbu.populate_null_values(hard_coded_value_populated_df, fill_na_dict)

#             #write to target
#             print("****Writing to target") 
#             columns_list = list(set(target_df.columns).intersection(null_populated_df.columns)) 
#             final_df = null_populated_df.select(*columns_list)
#             if target_name == 'oracle':
#                 dbu.write_to_oracle(spark, final_df, target_config, target_table, target_write_mode) 

#             elif target_name == 'sqlserver':
#                 dbu.write_to_mssql(spark, final_df, target_config, target_table, target_write_mode) 

#             elif target_name == 'postgres':
#                 dbu.write_to_pg(spark, final_df, target_config, target_table, target_write_mode) 

#         else:
#             ## Need to add function to convert output for delimited files
#             dbu.write_to_csv(source_df, target_filename, target_delimiter)
        
#         # print('Data written to target successfully')
        
#     except Exception as e:
#         print("Failure occured check logs")
#         print(f"{e}")
#         return None