# Oracle vs Athena data comparison using csv extract

The code provided performs a comparison between two CSV files, `file1` and `file2`, located in the specified `file_path`. It uses the pandas library to read and manipulate the data, the BeautifulSoup library for HTML parsing, and the webbrowser and os libraries for file handling and opening HTML reports.

Functions:
1. `open_html_file(file_name)`: Opens an HTML file in the default web browser. Takes the `file_name` as input.
2. `compare_files(file_path, file1, file2)`: Compares the data between the two CSV files. Takes `file_path`, `file1`, and `file2` as inputs.

    Inside the `compare_files` function:
    - The function first reads the CSV files into pandas DataFrames (`df_athena` and `df_oracle`).
    - It checks if there is a record count difference between the two tables. If there is a difference, it saves the rows that are present in `df_oracle` but not in `df_athena` to an HTML report named "comparison_report.html". The `count_check` variable is set to False to indicate a record count difference.
    - If the record count is the same, it compares the data in `df_athena` and `df_oracle` using the `compare` method from pandas. If there are no differences found, it writes a success message to the HTML report and opens it in the default web browser.
    - If differences are found, it saves the differences to the "comparison_report.html" file and modifies the HTML by adding a title tag if it doesn't exist or updating the existing one.
    - The modified HTML report is then written back to the file.

Note: The code includes exception handling to catch any errors that might occur during file operations or data processing. If an exception occurs, an appropriate error message is printed, and the function returns False.

Please make sure you have the required libraries installed and the files are available in the specified path before using this code.

# Functions to generate different report types

In [45]:
# Import the necessary libraries
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
import webbrowser
import os
import cx_Oracle
import boto3

cx_Oracle.init_oracle_client(lib_dir=r"C:\Users\manis\Downloads\instantclient-basic-windows.x64-21.10.0.0.0dbru\instantclient_21_10")

# Function to generate HTML report
def generate_html_report(comparison_results_df,file_path):
    if comparison_results_df.size == 0:
        with open(f"{file_path}/comparison_report.html", "w", encoding="utf-8") as f:
            # Write the HTML content
            f.write("<html><body><h1>Congratulations...No Difference Found between provided datasets. &#128522;</h1></body></html>")
            print("There is no fifference in provided datasets.")
            open_html_file(f"{file_path}/comparison_report.html")
    else:
        # Save the differences to an HTML report
        comparison_results_df.to_html(f"{file_path}/comparison_report.html")

        # Modify the HTML report by adding a title tag if it exists or creating a new one
        if os.path.exists(f"{file_path}/comparison_report.html"):
            with open(f"{file_path}/comparison_report.html", "r") as f:
                # Parse the HTML file using BeautifulSoup
                soup = BeautifulSoup(f, "html.parser")

                # Find the head tag in the parsed HTML
                head_tag = soup.find("head")

                # If head tag exists, add a title tag with the text "Comparison Report"
                if head_tag:
                    title_tag = soup.new_tag("title")
                    title_tag.string = "Comparison Report"
                    head_tag.append(title_tag)
                else:
                    # If head tag doesn't exist, create a new head tag and add the title tag
                    head_tag = soup.new_tag("head")
                    soup.insert(0, head_tag)
                    title_tag = soup.new_tag("title")
                    title_tag.string = "Comparison Report"
                    head_tag.append(title_tag)

                    # Write the modified HTML back to the file
                with open(f"{file_path}/comparison_report.html", "w") as f:
                    f.write(str(soup))
                print(f"Report has been generated at {file_path}")

    return open_html_file(f"{file_path}/comparison_report.html")

#Function to generate CSV report
def generate_csv_report(comparison_results_df, file_path):
    """
    Write a pandas DataFrame to a CSV file.

    Args:
        dataframe (pd.DataFrame): The DataFrame to be written to CSV.
        file_path (str): The file path along with the filename where the CSV file should be saved.
    """
    comparison_results_df.to_csv(file_path, index=False)
    print(f"Report has been generated at {file_path}")
    
#Function to generate EXCEL report
def generate_excel_report(comparison_results_df,filename ):
    # Create a Pandas Excel writer using the filename
    writer = pd.ExcelWriter(filename, engine='xlsxwriter')

    # Write the DataFrame to the Excel file
    comparison_results_df.to_excel(writer, index=True)

    # Save the Excel file
    writer.save()
    print(f"Report has been generated at {filename}")
    
#Function to open html report in browser
def open_html_file(file_name):
    webbrowser.open(file_name)

In [36]:
#Function to compare the files having record count mismatch
def compare_files_for_count_diff(file_path, df1, df2):
    try:
    
        # Flag to check if there is a record count difference
        count_check = True

        # Check if there is a record count difference between the two tables
        if len(df1) != len(df2):
            print("There is a record count difference between the two files.")

            # Get the rows that are present in df_oracle but not in df_athena
            df_difference = pd.concat([df2, df1]).drop_duplicates(keep=False)
            return df_difference
            
        else:
            print("Record counts are matching between the two files.")
            return pd.DataFrame()

    except Exception as e:
        print(f"An error occurred: {str(e)}")
        return False

#Function to compare the data between the 2 files.
def compare_files(file_path, df1, df2):
    try:
        # Flag to check if there is a record count difference
        count_check = True
        
        df = compare_files_for_count_diff(file_path, df1, df2)
        # Check if there is a record count difference between the two tables
        if len(df)!=0:
            print("There is a record count difference between the two tables.")
            count_check = False
            return df

        # If the record count is the same, compare the data in the DataFrames
        if count_check:

            # Compare the data in df_athena and df_oracle
            df_diff = df1.compare(df2, align_axis=1)
            return df_diff
    except:
        print(f"File is not available at {file_path}")
        return False

oracle athena comparison<br>
oracle and redshift<br>
athena and redshift<br>
s3 and oracle<br>
oracle and RDS

table name check<br>
field name check<br>
datatype check<br>
data check


# Athena Vs Oracle (Field Name and Datatype check)

In [37]:
import cx_Oracle
import boto3
#cx_Oracle.init_oracle_client(lib_dir=r"C:\Users\manis\Downloads\instantclient-basic-windows.x64-21.10.0.0.0dbru\instantclient_21_10")

In [38]:
def get_oracle_columns(user, password, host, port, service_name, table_name):
    """
    Connects to Oracle database and retrieves the columns of a specific table.

    Args:
        user (str): Oracle database username.
        password (str): Oracle database password.
        host (str): Oracle database host.
        port (int): Oracle database port.
        service_name (str): Oracle database service name.
        table_name (str): Name of the table to retrieve columns from.

    Returns:
        list: List of Oracle column descriptions.

    """
    oracle_connection = cx_Oracle.connect(f"{user}/{password}@{host}:{port}/{service_name}")
    oracle_cursor = oracle_connection.cursor()
    oracle_cursor.execute(f"SELECT * FROM {table_name} WHERE 1=0")
    oracle_columns = oracle_cursor.description
    oracle_cursor.close()
    oracle_connection.close()
    return oracle_columns

def get_athena_datatype(cx_oracle_datatype):
    """
    Returns the corresponding Athena datatype for the given cx_Oracle datatype.
    Args:
        cx_oracle_datatype: The cx_Oracle datatype.
    Returns:
        The corresponding Athena datatype.
    """
    if cx_oracle_datatype[1].name in ["DB_TYPE_VARCHAR","DB_TYPE_CHAR", "DB_TYPE_NCHAR", "DB_TYPE_VARCHAR2", "DB_TYPE_NVARCHAR2", "DB_TYPE_LONG"]:
        return "STRING"
    elif cx_oracle_datatype[1].name in ["DB_TYPE_NUMBER"] and cx_oracle_datatype[5]==0:
        return "INT"
    elif cx_oracle_datatype[1].name in ["DB_TYPE_NUMBER", "DB_TYPE_FLOAT", "DB_TYPE_DOUBLE", "DB_TYPE_BINARY_FLOAT", "DB_TYPE_BINARY_DOUBLE"]:
        return "DOUBLE"
    elif cx_oracle_datatype[1].name == "DB_TYPE_DATE":
        return "DATE"
    elif cx_oracle_datatype[1].name == "DB_TYPE_TIMESTAMP":
        return "TIMESTAMP"
    elif cx_oracle_datatype[1].name in ["DB_TYPE_BLOB", "DB_TYPE_CLOB", "DB_TYPE_NCLOB"]:
        return "STRING"  # Consider using "BINARY" for BLOB if necessary
    elif cx_oracle_datatype[1].name in ["DB_TYPE_RAW", "DB_TYPE_LONG_RAW"]:
        return "BINARY"
    elif cx_oracle_datatype[1].name in ["DB_TYPE_ROWID", "DB_TYPE_UROWID"]:
        return "STRING"
    elif cx_oracle_datatype[1].name == "DB_TYPE_BOOLEAN":
        return "BOOLEAN"
    elif cx_oracle_datatype[1].name == "DB_TYPE_INTERVAL":
        return "STRING"
    elif cx_oracle_datatype[1].name in ["DB_TYPE_XML", "DB_TYPE_GEOMETRY", "DB_TYPE_TOPO_GEOMETRY", "DB_TYPE_GEORASTER"]:
        return "STRING"
    else:
        raise ValueError("Unsupported cx_Oracle datatype: {}".format(cx_oracle_datatype.name))


def oracle_equivalent_athena_datatypes(username, password, host, port, service_name, table_name):
    cx_oracle_datatypes = get_oracle_columns(username, password, host, port, service_name, table_name)
    
    l = []
    for cx_oracle_datatype in cx_oracle_datatypes:
        athena_datatypes = {}
        athena_datatype = get_athena_datatype(cx_oracle_datatype)
        athena_datatypes['Name']=cx_oracle_datatype[0].lower()
        athena_datatypes['Type']=athena_datatype.lower()
        l.append(athena_datatypes)
    return l


In [39]:
def get_athena_columns(region, database, athena_table):
    """
    Connects to Athena and retrieves the columns of a specific table.

    Args:
        region (str): AWS region name.
        database (str): Athena database name.
        athena_table (str): Name of the Athena table to retrieve columns from.

    Returns:
        list: List of Athena column descriptions.

    """
    # Connect to Athena
    athena_client = boto3.client('athena', region_name=region)
    
    # Get the Athena table schema
    response = athena_client.get_table_metadata(
        CatalogName='AwsDataCatalog',
        DatabaseName=database,
        TableName=athena_table
    )
    
    athena_columns = response['TableMetadata']['Columns']
    return athena_columns
    

In [40]:
def write_list_of_dicts_to_text(data, file_path):
    """
    Write a list of dictionaries to a text file.

    Args:
        data (list[dict]): The list of dictionaries to be written.
        file_path (str): The file path along with the filename where the text file should be saved.
    """
    if len(data) == 0:
        with open(file_path, 'w') as file:
            file.write("No difference found in Datatypes.")
    else:
        with open(file_path, 'w') as file:
            for dictionary in data:
                for key, value in dictionary.items():
                    file.write(f"{key}: {value}\n")
                file.write("\n")
                
def compare_oracle_vs_athena_data_types(ora_user, ora_pwd, host, port, service_name, ora_tab_name,region, athena_db, athena_table):
    """
    Compares the data types of columns between Athena and Oracle tables.

    Args:
        athena_columns (list): List of Athena column descriptions.
        oracle_columns (list): List of Oracle column descriptions.

    Returns:
        list: List of differences in column data types.

    """
    athena_columns = get_athena_columns(region, athena_db, athena_table)
    oracle_columns = oracle_equivalent_athena_datatypes(ora_user, ora_pwd, host, port, service_name, ora_tab_name)
    differences = []
    for athena_col, oracle_col in zip(athena_columns, oracle_columns):
        athena_col_name = athena_col['Name']
        athena_data_type = athena_col['Type']

        oracle_col_name = oracle_col['Name']
        oracle_data_type = oracle_col['Type']

        if athena_data_type != oracle_data_type:
            difference = {
                    'column': athena_col_name,
                    'athena_data_type': athena_data_type,
                    'oracle_data_type': oracle_data_type
                }
            differences.append(difference)
    if differences:
        print('Differences found:')
        for diff in differences:
            print(diff)
    else:
        print('No differences found.') 
    return differences

# Data comparison between athena and oracle

In [41]:
import cx_Oracle
import pandas as pd

def read_oracle_table(username, password, host, port, service_name, table_name):
    """
    Reads data from an Oracle table and returns a pandas DataFrame.

    Args:
        username (str): The username to connect to the Oracle database.
        password (str): The password to connect to the Oracle database.
        host (str): The hostname or IP address of the Oracle database server.
        port (int): The port number to connect to the Oracle database.
        service_name (str): The service name of the Oracle database.
        table_name (str): The name of the table to fetch data from.

    Returns:
        pandas.DataFrame: A DataFrame containing the data from the Oracle table.
    """
    
    # Establish a connection to the Oracle database
    connection = cx_Oracle.connect(f'{username}/{password}@{host}:{port}/{service_name}')

    # Create a cursor object
    cursor = connection.cursor()

    # Execute a query to fetch data from the Oracle table
    query = f'SELECT * FROM {table_name}'
    cursor.execute(query)

    # Fetch all the rows from the cursor
    rows = cursor.fetchall()

    # Get the column names from the cursor description
    column_names = [desc[0] for desc in cursor.description]

    # Create a pandas DataFrame from the fetched data and column names
    df = pd.DataFrame(rows, columns=column_names)

    # Close the cursor and connection
    cursor.close()
    connection.close()

    return df

In [42]:
def cast_value(value, data_type):
    """
    Casts the value to the specified data type.

    Args:
        value: The value to be casted.
        data_type: The target data type.

    Returns:
        The casted value.
    """
    if value is None:
        return None
    
    if data_type == 'integer':
        return int(value)
    elif data_type == 'double':
        return float(value)
    elif data_type == 'boolean':
        return bool(value)
    elif data_type == 'date':
        return value
    elif data_type == 'timestamp':
        return value
    else:
        return value

def create_dataframe_from_athena_table(region, database, table, output_location):
    """
    Creates a pandas DataFrame by fetching data from an Athena table.

    Args:
        region (str): The AWS region name.
        database (str): The name of the Athena database.
        table (str): The name of the Athena table to fetch data from.
        output_location (str): The S3 bucket location to store the query results.

    Returns:
        pandas.DataFrame: A DataFrame containing the data from the Athena table.
    """
    
    # Create an Athena client
    athena_client = boto3.client('athena', region_name=region)
    
    # Execute the query to fetch data from the Athena table
    query = f'SELECT * FROM {table}'
    response = athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
        },
        ResultConfiguration={
            'OutputLocation': output_location
        }
    )
    
    # Get the query execution ID
    query_execution_id = response['QueryExecutionId']
    
    # Wait for the query execution to complete
    while True:
        query_status = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
        status = query_status['QueryExecution']['Status']['State']
        if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            break
        time.sleep(5)  # Wait for 5 seconds before checking the query status again
    
    # Check if the query execution was successful
    if status != 'SUCCEEDED':
        raise ValueError("Athena query execution failed or was cancelled.")
    
    # Get the query results
    results = athena_client.get_query_results(QueryExecutionId=query_execution_id)
    
    # Extract the column names and data types
    column_metadata = results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    column_names = [field['Name'].upper() for field in column_metadata]
    data_types = [field['Type'] for field in column_metadata]
    
    # Extract the rows
    rows = []
    for row in results['ResultSet']['Rows'][1:]:
        values = []
        for i, data in enumerate(row['Data']):
            column_type = data_types[i]
            
            value = data.get('VarCharValue')
            
            casted_value = cast_value(value, column_type)
            values.append(casted_value)
        rows.append(values)
    
    # Create the pandas DataFrame
    df = pd.DataFrame(rows, columns=column_names)
    
    return df


# Main Function

In [43]:
def generate_comparison_report():
    choice = int(input("What kind of comparison you want. Please choose from below options \n 1. File to File comparison \n 2. Oracle vs Athena datatype comparison. \n 3. Oracle vs Athena Data comparison\n"))
    file_path = input("Please enter the location of files: ")
    if choice==1:
        
        file1 = input("Please enter first file name: ")
        file2 = input("Please enter second file name: ")
        df1 = pd.read_csv(f"{file_path}/{file1}", encoding='latin-1')
        df2 = pd.read_csv(f"{file_path}/{file2}", encoding='latin-1')
        while True:
            report_type = input("Please enter the format in which you want the report CSV/EXCEL/HTML: ")
            if report_type.upper() in ["CSV","EXCEL","HTML"]:
                break
        df = compare_files(file_path, df1, df2)
        if report_type.upper() == "HTML":
            generate_html_report(df,file_path)
        elif report_type.upper() == "CSV":
            generate_csv_report(df, f"{file_path}/comparison_report.csv")
        elif report_type.upper() == "EXCEL":
            generate_excel_report(df,f"{file_path}/comparison_report.xlsx" )
        else:
            print("Please enter valid file format.")#C:/Users/manis/Downloads/Comparison
    elif choice==2:
        ora_user = input("USERNAME: ")
        ora_pwd = input("PASSWORD: ")
        host = input("HOST: ")
        port = int(input("PORT: "))
        service_name = input("SID/Service Name: ")
        ora_tab_name = input("Table Name: ")
        region = input("AWS Region: ")
        athena_db = input("Athena Database: ")
        athena_table = input("Athena table name: ")
        differences = compare_oracle_vs_athena_data_types(ora_user, ora_pwd, host, port, service_name, ora_tab_name,region, athena_db, athena_table)
        write_list_of_dicts_to_text(differences,f"{file_path}/Datatype_comparison_report.txt")
        differences
    elif choice ==3:
        ora_user = "hr" #input("USERNAME: ")
        ora_pwd = "oracle" #input("PASSWORD: ")
        host = "192.168.56.102" #input("HOST: ")
        port = 1521 #int(input("PORT: "))
        service_name = "freepdb1" #input("SID/Service Name: ")
        ora_tab_name = "job_history" #input("Table Name: ")
        region = "us-east-1" #input("AWS Region: ")
        athena_db = "test_fw" #input("Athena Database: ")
        athena_table = "job_history" #input("Athena table name: ")
        output_location = "s3://manish600712/Unsaved/" #input("S3 output location: ")
        df_athena = create_dataframe_from_athena_table(region, athena_db, athena_table, output_location)
        df_oracle = read_oracle_table(ora_user, ora_pwd, host, port, service_name, ora_tab_name)
        while True:
            report_type = input("Please enter the format in which you want the report CSV/EXCEL/HTML: ")
            if report_type.upper() in ["CSV","EXCEL","HTML"]:
                break
        df = compare_files(file_path, df_athena, df_oracle)
        if report_type.upper() == "HTML":
            generate_html_report(df,file_path)
        elif report_type.upper() == "CSV":
            generate_csv_report(df, f"{file_path}/comparison_report.csv")
        elif report_type.upper() == "EXCEL":
            generate_excel_report(df,f"{file_path}/comparison_report.xlsx" )
        else:
            print("Please enter valid file format.")#C:/Users/manis/Downloads/Comparison

In [46]:
generate_comparison_report()


What kind of comparison you want. Please choose from below options 
 1. File to File comparison 
 2. Oracle vs Athena datatype comparison. 
 3. Oracle vs Athena Data comparison
2
Please enter the location of files: C:/Users/manis/Downloads/Comparison
USERNAME: hr
PASSWORD: oracle
HOST: 192.168.56.102
PORT: 1521
SID/Service Name: freepdb1
Table Name: job_hist
AWS Region: us-east-1
Athena Database: test_fw
Athena table name: job_history


  athena_columns = get_athena_columns(region, athena_db, athena_table)


Differences found:
{'column': 'department_id', 'athena_data_type': 'int', 'oracle_data_type': 'string'}


# Utility Functions

In [2]:
def convert_spark_to_pandas(spark_df):
    """
    Converts a Spark DataFrame to a Pandas DataFrame.
    
    Parameters:
        spark_df (pyspark.sql.DataFrame): The Spark DataFrame to be converted.
        
    Returns:
        pandas_df (pandas.DataFrame): The resulting Pandas DataFrame.
    """
    pandas_df = spark_df.toPandas()
    return pandas_df

In [3]:
import pandas as pd
import psycopg2

def create_pandas_dataframe_from_rds_table(host, port, database, username, password, table_name):
    """
    Create a Pandas DataFrame from a table in a remote RDS database.

    Parameters:
        host (str): The hostname or IP address of the RDS database.
        port (int): The port number for the RDS database.
        database (str): The name of the database in the RDS instance.
        username (str): The username for the RDS database.
        password (str): The password for the RDS database.
        table_name (str): The name of the table in the RDS database.

    Returns:
        pandas.DataFrame: A DataFrame containing the data from the specified table.

    Raises:
        psycopg2.OperationalError: If there is an error connecting to the RDS database.

    Example:
        df = create_pandas_dataframe_from_rds_table(
            host='example.com',
            port=5432,
            database='mydb',
            username='user',
            password='password',
            table_name='mytable'
        )
    """
    # Establish a connection to the RDS database
    conn = psycopg2.connect(
        host=host,
        port=port,
        database=database,
        user=username,
        password=password
    )

    # Create a cursor object to interact with the database
    cursor = conn.cursor()

    # Execute a SQL query to fetch all data from the table
    query = f"SELECT * FROM {table_name}"
    cursor.execute(query)

    # Fetch all rows from the result set
    rows = cursor.fetchall()

    # Get the column names from the cursor description
    column_names = [desc[0] for desc in cursor.description]

    # Create a Pandas DataFrame from the fetched data and column names
    df = pd.DataFrame(rows, columns=column_names)

    # Close the cursor and connection
    cursor.close()
    conn.close()

    return df


In [4]:
import pandas as pd
import psycopg2

def create_pandas_dataframe_from_redshift_table(host, port, database, username, password, table_name):
    """
    Create a Pandas DataFrame from a Redshift table.

    Parameters:
        host (str): The hostname or IP address of the Redshift cluster.
        port (int): The port number used to connect to the Redshift cluster.
        database (str): The name of the Redshift database.
        username (str): The username used to authenticate the connection.
        password (str): The password used to authenticate the connection.
        table_name (str): The name of the table from which to fetch the data.

    Returns:
        pandas.DataFrame: A DataFrame containing the fetched data from the table.

    Raises:
        psycopg2.OperationalError: If there is an error in establishing the connection to the Redshift cluster.

    Example:
        host = 'example-redshift-cluster.com'
        port = 5439
        database = 'my_database'
        username = 'my_username'
        password = 'my_password'
        table_name = 'my_table'
        df = create_pandas_dataframe_from_redshift_table(host, port, database, username, password, table_name)
    """
    # Establish a connection to the Redshift cluster
    conn = psycopg2.connect(
        host=host,
        port=port,
        database=database,
        user=username,
        password=password
    )

    # Create a cursor object to interact with the database
    cursor = conn.cursor()

    # Execute a SQL query to fetch all data from the table
    query = f"SELECT * FROM {table_name}"
    cursor.execute(query)

    # Fetch all rows from the result set
    rows = cursor.fetchall()

    # Get the column names from the cursor description
    column_names = [desc[0] for desc in cursor.description]

    # Create a Pandas DataFrame from the fetched data and column names
    df = pd.DataFrame(rows, columns=column_names)

    # Close the cursor and connection
    cursor.close()
    conn.close()

    return df


In [5]:
import pandas as pd
import psycopg2

def create_pandas_dataframe_from_rds_table(host, port, database, username, password, table_name):
    """
    Creates a Pandas DataFrame from an RDS table.

    Args:
        host (str): The hostname or IP address of the RDS instance.
        port (int): The port number to connect to the RDS instance.
        database (str): The name of the RDS database.
        username (str): The username for authentication.
        password (str): The password for authentication.
        table_name (str): The name of the table in the RDS database.

    Returns:
        pandas.DataFrame: A DataFrame containing the data from the RDS table.
    """

    # Establish a connection to the RDS database
    conn = psycopg2.connect(
        host=host,
        port=port,
        database=database,
        user=username,
        password=password
    )

    # Create a cursor object to interact with the database
    cursor = conn.cursor()

    # Execute a SQL query to fetch all data from the table
    query = f"SELECT * FROM {table_name}"
    cursor.execute(query)

    # Fetch all rows from the result set
    rows = cursor.fetchall()

    # Get the column names from the cursor description
    column_names = [desc[0] for desc in cursor.description]

    # Create a Pandas DataFrame from the fetched data and column names
    df = pd.DataFrame(rows, columns=column_names)

    # Close the cursor and connection
    cursor.close()
    conn.close()

    return df


In [7]:
import pandas as pd
from pyspark.sql import SparkSession

def create_pandas_dataframe_from_s3_parquet_and_pyspark_table(parquet_file_path, spark_table_name):
    """
    Creates a Pandas DataFrame from an S3 Parquet file and a PySpark table.

    Args:
        parquet_file_path (str): The S3 path of the Parquet file.
        spark_table_name (str): The name of the PySpark table.

    Returns:
        pandas.DataFrame: The resulting Pandas DataFrame.

    """

    # Create a SparkSession
    spark = SparkSession.builder.getOrCreate()

    # Read the Parquet file into a PySpark DataFrame
    spark_df = spark.read.parquet(parquet_file_path)

    # Register the PySpark DataFrame as a temporary table
    spark_df.createOrReplaceTempView(spark_table_name)

    # Convert the PySpark table to a Pandas DataFrame
    pandas_df = spark.sql(f"SELECT * FROM {spark_table_name}").toPandas()

    return pandas_df


In [8]:
import psycopg2
import cx_Oracle

def compare_datatypes_redshift_oracle(redshift_conn_params, oracle_conn_params, table_name):
    """
    Compares the data types between Redshift and Oracle databases for a given table.

    Args:
        redshift_conn_params (dict): A dictionary containing the connection parameters for Redshift.
            Example: {'host': 'your-redshift-hostname', 'port': 5439, 'database': 'your-database-name',
                      'username': 'your-username', 'password': 'your-password'}

        oracle_conn_params (dict): A dictionary containing the connection parameters for Oracle.
            Example: {'host': 'your-oracle-hostname', 'port': 1521, 'database': 'your-database-name',
                      'username': 'your-username', 'password': 'your-password'}

        table_name (str): The name of the table to compare.

    Returns:
        dict: A dictionary containing the mapping of column names and their corresponding data types.
            Example: {'column1': ('redshift_type', 'oracle_type'), 'column2': ('redshift_type', 'oracle_type'), ...}

    """

    # Establish a connection to Redshift
    redshift_conn = psycopg2.connect(
        host=redshift_conn_params['host'],
        port=redshift_conn_params['port'],
        database=redshift_conn_params['database'],
        user=redshift_conn_params['username'],
        password=redshift_conn_params['password']
    )

    # Create a cursor for Redshift
    redshift_cursor = redshift_conn.cursor()

    # Get the column names and data types from Redshift
    redshift_cursor.execute(f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name='{table_name}'")
    redshift_columns = redshift_cursor.fetchall()

    # Establish a connection to Oracle
    oracle_conn = cx_Oracle.connect(
        f"{oracle_conn_params['username']}/{oracle_conn_params['password']}@{oracle_conn_params['host']}:"
        f"{oracle_conn_params['port']}/{oracle_conn_params['database']}"
    )

    # Create a cursor for Oracle
    oracle_cursor = oracle_conn.cursor()

    # Get the column names and data types from Oracle
    oracle_cursor.execute(f"SELECT column_name, data_type FROM all_tab_columns WHERE table_name='{table_name.upper()}'")
    oracle_columns = oracle_cursor.fetchall()

    # Close the cursors and connections
    redshift_cursor.close()
    redshift_conn.close()
    oracle_cursor.close()
    oracle_conn.close()

    # Create a dictionary to store the mapping of column names and data types
    column_datatypes = {}

    # Compare the data types between Redshift and Oracle
    for redshift_col, oracle_col in zip(redshift_columns, oracle_columns):
        column_name = redshift_col[0]
        redshift_datatype = redshift_col[1]
        oracle_datatype = oracle_col[1]

        column_datatypes[column_name] = (redshift_datatype, oracle_datatype)

    return column_datatypes


# Difference using pysaprk

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

def compare_dataframes(df1, df2):
    # Compare the schemas of the two DataFrames
    if df1.schema != df2.schema:
        print("The schemas of the two DataFrames are different.")
        return
    
    # Compare the data in the two DataFrames
    common_records = df1.intersect(df2)
    diff_in_df1 = df1.subtract(common_records)
    diff_in_df2 = df2.subtract(common_records)
    
    if diff_in_df1.count() == 0 and diff_in_df2.count() == 0:
        print("The data in the two DataFrames is identical.")
        return
    
    # Write the differences to separate files
    common_records_output_path = "common_records.csv"
    diff_in_df1_output_path = "diff_in_df1.csv"
    diff_in_df2_output_path = "diff_in_df2.csv"
    
    common_records.write.format("csv").option("header", "true").mode("overwrite").save(common_records_output_path)
    diff_in_df1.write.format("csv").option("header", "true").mode("overwrite").save(diff_in_df1_output_path)
    diff_in_df2.write.format("csv").option("header", "true").mode("overwrite").save(diff_in_df2_output_path)
    
    print(f"Common records between the two DataFrames are saved to {common_records_output_path} in CSV format.")
    print(f"Differences found in df1 are saved to {diff_in_df1_output_path} in CSV format.")
    print(f"Differences found in df2 are saved to {diff_in_df2_output_path} in CSV format.")

# Read the two files into Spark DataFrames
df1 = spark.read.format("csv").option("header", "true").load("oracle.csv")
df2 = spark.read.format("csv").option("header", "true").load("athena.csv")

# Compare the DataFrames
compare_dataframes(df1, df2)
