In [None]:
from pyhive import hive
import psycopg2
import pandas as pd
import time
import hashlib
import thrift_sasl
import math

class HiveConnection:
    def __init__(self, host, port, username, password, database, auth_mode):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.database = database
        self.auth_mode = auth_mode
        self.connection = None

    def connect(self):
        """Creates a connection to the Hive database."""
        try:
            self.connection = hive.Connection(
                host=self.host,
                port=self.port,
                username=self.username,
                password=self.password,
                database=self.database,
                auth=self.auth_mode
            )
            return True
        except Exception as e:
            print(f"Error creating connection: {e}")
            return False

    def close(self):
        """Closes the connection to the Hive database."""
        if self.connection:
            try:
                self.connection.close()
            except Exception as e:
                print(f"Error closing connection: {e}")

    def execute_query(self, query):
        """Executes the given query and returns the results."""
        try:
            cursor = self.connection.cursor()
            cursor.execute(query)
            columns = [desc[0] for desc in cursor.description]
            results = cursor.fetchall()
            return results, columns
        except Exception as e:
            print(f"Error executing query: {e}")
            return None
        finally:
            cursor.close()


class PostgresConnection:
    def __init__(self, dbname, user, password, host, port):
        self.dbname = dbname
        self.user = user
        self.password = password
        self.host = host
        self.port = port
        self.connection = None
        self.cursor = None

    def connect(self):
        """Creates a connection to the PostgreSQL database."""
        try:
            self.connection = psycopg2.connect(
                dbname=self.dbname,
                user=self.user,
                password=self.password,
                host=self.host,
                port=self.port
            )
            self.connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
            self.cursor = self.connection.cursor()
            return True
        except Exception as e:
            print(f"Error creating PostgreSQL connection: {e}")
            return False

    def execute_query(self, query, params=None):
        """Executes a query with optional parameters for SELECT, INSERT, or UPDATE operations."""
        try:
            # Execute the query with parameters
            self.cursor.execute(query, params)

            # Check if the query is a SELECT statement
            if query.strip().upper().startswith("SELECT"):
                # Fetch and return results for SELECT queries
                return self.cursor.fetchall()
            else:
                # Commit transaction for non-SELECT queries (e.g., INSERT, UPDATE)
                self.connection.commit()
                return True  # Return True to indicate success for INSERT/UPDATE queries
        except Exception as e:
            print(f"Error executing PostgreSQL query: {e}")
            return None

    # def execute_query(self, query, params=None):
    #     """Executes a query with optional parameters."""
    #     try:
    #         self.cursor.execute(query, params)
    #         return self.cursor.fetchall()
    #     except Exception as e:
    #         print(f"Error executing PostgreSQL query: {e}")
    #         return None

    def close(self):
        """Closes the PostgreSQL database connection."""
        if self.connection:
            try:
                self.connection.close()
            except Exception as e:
                print(f"Error closing PostgreSQL connection: {e}")


class DataFetcher:
    def __init__(self, hive_conn, pg_conn, batch_size=50000, polling_interval=600):
        self.hive_conn = hive_conn
        self.pg_conn = pg_conn
        self.batch_size = batch_size
        self.polling_interval = polling_interval
        self.last_processed_value = '1900-01-01 00:00:00'

    def check_if_snapshot_done(self):
        query = "SELECT dw_date_created FROM marts.dm_anc ORDER BY dm_date_created DESC LIMIT 1"
        result = self.pg_conn.execute_query(query)
        if result:
            return result[0][0]
        else:
            return None
    def hash_record(self, record):
        """
        Hash the combined values of a record using SHA-256.
        If a value is None or NaN, replace it with an empty string before hashing.
        """
        combined = ''.join([str(value) if not pd.isna(value) else '' for value in record])
        return hashlib.sha256(combined.encode('utf-8')).hexdigest()

    def fetch_data_in_batches(self):
        snapshot_date = self.check_if_snapshot_done()
        if snapshot_date:
            self.last_processed_value = snapshot_date
            print(">>> Detected Snapshot done:", self.last_processed_value)
        else:
            print(">>> Initial snapshot")

        while True:
            print(f"Fetching batch data where last_updated > {self.last_processed_value}")
            query = f"""
            SELECT * FROM fact_anc 
            WHERE last_updated_date> '{self.last_processed_value}' 
            ORDER BY last_updated_date ASC LIMIT {self.batch_size}
            """
            batch_data, columns = self.hive_conn.execute_query(query)

            if not batch_data:  # No more data
                print("No more data to fetch. Ending batch fetching.")
                break

            df = pd.DataFrame(batch_data, columns=columns)
            print("Fetched batch data:", df.shape)
            #  print (df.columns)
            # Add hash column to the DataFrame
            df['record_hash'] = df.apply(lambda row: self.hash_record(row), axis=1)
            self.process_data(df)

            # Update the last_processed_value to the latest timestamp in the batch
            self.last_processed_value = df['last_updated_date'].max()
            print("Updated last_processed_value to:", self.last_processed_value)

    def process_data(self, df):

        ''' df.rename(columns={'patient_id': 'person_id',
                           'organization_id': 'facility_id_code',
                           'last_updated':'dw_date_created'
                           }, inplace=True)'''
  
        for _, row in df.iterrows():
           record_hash = row['record_hash']
           encounter_id = row['encounter_id']
           check_query = "SELECT 1 FROM marts.dm_anc WHERE record_hash = %s"
        if self.pg_conn.execute_query(check_query, (record_hash,)):
                # Update existing record
                print(f"Updating record for record_hash: {record_hash}")
                update_query = """
                UPDATE marts.dm_anc
                SET anc_number = %s, registration_date = %s, first_time_booking= %s, sex = %s, 
                    age= %s, lnmp= %s, gravida = %s,
                    parity_at_booking= %s, weight = %s, height = %s,
                    pulse = %s, bp = %s, assess_for_polor = %s,
                    presentation = %s, ga_at_visit = %s, hiv_status_prior_booking = %s, art_number = %s, edd = %s,
                    sex = %s, dw_date_created = %s,
                    dm_date_created = NOW(), event_date = %s, encounter_id= %s, facility_id_code = %s, 
                     
                WHERE redord_hash = %s
                """
                self.pg_conn.execute_query(update_query, (
                    row['anc_number'], row['registration_date'], row['first_time_booking'],
                    row['age'], row['lnmp'],
                    "", row['gravida'], row['parity_at_booking'],
                    row['weight'], "", "",
                    row['height'], row['pulse'], "",
                    row['bp'], row['assess_for_polor'], row['presentation'], row['ga_at_visit'],
                    row['hiv_status_at_booking'], row['art_number'], row['edd'],
                    row['sex'], row['last_updated_date'], row['anc_visit_date'], row['encounter_id'], row['facility_id_code'], record_hash
                ))
        else:
                # Insert new record
                print(f"Inserting new record for record_hash: {record_hash}")
                insert_query = """
                INSERT INTO marts.dm_anc (
                    anc_number, 
                    registration_date, 
                    first_time_booking, 
                    age, 
                    lnmp,
                    gravida, 
                    parity_at_booking, 
                    weight, 
                    height,
                    pulse, 
                    bp, 
                    assess_for_polor, 
                    presentation, 
                    ga_at_visit, 
                    hiv_status_prior_booking, 
                    art_number,
                    edd, 
                    sex,
                    dw_date_created, 
                    dm_date_created,
                    event_date, 
                    encounter_id, 
                    facility_id_code, 
                    record_hash
                ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW(),%s,%s,%s,%s)
                """
                self.pg_conn.execute_query(insert_query, (
                    row['anc_id'], 
                    row['reg_date'], 
                    row['first_anc_booking'],
                    row['age_at_encounter'], 
                    "", 
                    "", 
                    row['parity_at_booking'], 
                    "", 
                    "", 
                    "", 
                    "", 
                    "", 
                    "", 
                    "",
                    "", 
                    "", 
                    row['estimated_date_of_delivery'], 
                    "", 
                    row['last_updated_date'], 
                    row['anc_visit_date'], 
                    row['encounter_id'], 
                    row['organization_id'], 
                    record_hash
                ))

    def start_polling(self):
        """Starts polling to fetch data every 5 minutes."""
        while True:
            print("Starting data fetch cycle.")
            self.fetch_data_in_batches()
            print(f"Waiting for {self.polling_interval} seconds before next fetch.")
            time.sleep(self.polling_interval)


def main():
    # Initialize Hive connection
    hive_conn = HiveConnection(
        host="197.221.242.150",
        port=17251,
        username="schaputsira",
        password="brbvKgRhESRa9R4u",
        database="default",
        auth_mode="LDAP"
    )

    # Initialize PostgreSQL connection
    pg_conn = PostgresConnection(
        dbname="HTSDATA",
        user="postgres",
        password="wGMCAE6zFHcyrBmXtus97JPanxvkY4fb",
        host="127.0.0.1",
        port=5431
    )

    if hive_conn.connect() and pg_conn.connect():
        data_fetcher = DataFetcher(hive_conn, pg_conn)
        data_fetcher.start_polling()

    hive_conn.close()
    pg_conn.close()

if __name__ == "__main__":
    main()


Starting data fetch cycle.
>>> Detected Snapshot done: 2024-12-17T09:14:47.497+00:00
Fetching batch data where last_updated > 2024-12-17T09:14:47.497+00:00
Fetched batch data: (1, 28)
Inserting new record for record_hash: e1f5aa2d805c653042afd713aa336fa66048518ce111091b80c535eb8c7bbe77
Updated last_processed_value to: 2024-12-17T18:41:23.838+00:00
Fetching batch data where last_updated > 2024-12-17T18:41:23.838+00:00
Error executing query: TExecuteStatementResp(status=TStatus(statusCode=3, infoMessages=['*org.apache.hive.service.cli.HiveSQLException:Error running query: org.apache.spark.sql.AnalysisException: Table or view not found: Observation; line 13 pos 11:36:35', 'org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$:runningQueryError:HiveThriftServerErrors.scala:43', 'org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute:SparkExecuteStatementOperation.scala:325', 'org.apache.s

TypeError: cannot unpack non-iterable NoneType object