In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, LongType
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import os

# Initialize Spark session
spark = SparkSession.builder \
    .appName("HiveTableMetadata") \
    .enableHiveSupport() \
    .config("spark.executor.memory", '10g') \
    .config("spark.network.timeout", '800') \
    .config("spark.executor.cores", '3') \
    .config("spark.driver.memory", '8g') \
    .config("spark.sql.warehouse.dir", 'enter the path') \
    .config("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083") \
    .getOrCreate()

# Load table names from an Excel file
input_excel_path = "tables_input.xlsx"  # Path to the input Excel file
try:
    df_tables = pd.read_excel(input_excel_path, engine="openpyxl")
    specific_tables = df_tables['Table_Name'].dropna().tolist()  # Ensure no NaN values
    print(f" Loaded {len(specific_tables)} tables from Excel.")
except Exception as e:
    print(f"Error reading Excel file: {e}")
    specific_tables = []

# Define schema
schema_name = "Enter the schema name"

schema = StructType([
    StructField("Table_Name", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Owner", StringType(), True),
    StructField("CreateTime", StringType(), True),
    StructField("LastAccessTime", StringType(), True),
    StructField("Row_count", LongType(), True)
])

def get_table_details(table_name):
    """Fetch table metadata and row count for a given Hive table."""
    try:
        describe_df = spark.sql(f"DESCRIBE FORMATTED {schema_name}.{table_name}")
        table_info = describe_df.selectExpr("col_name as Column", "data_type as Value").collect()

        # Initialize metadata variables
        metadata = {
            "Location": None,
            "Owner": None,
            "CreateTime": None,
            "LastAccessTime": None
        }

        # Extract metadata dynamically
        for row in table_info:
            key = row["Column"].strip()
            value = row["Value"].strip() if row["Value"] else None
            if key in metadata:
                metadata[key] = value

        # Fetch row count
        row_count = spark.sql(f"SELECT COUNT(*) FROM {schema_name}.{table_name}").collect()[0][0]

        return Row(
            Table_Name=table_name,
            Location=metadata["Location"],
            Owner=metadata["Owner"],
            CreateTime=metadata["CreateTime"],
            LastAccessTime=metadata["LastAccessTime"],
            Row_count=row_count
        )

    except Exception as e:
        print(f"Error processing table {table_name}: {e}")
        return Row(
            Table_Name=table_name,
            Location=None,
            Owner=None,
            CreateTime=None,
            LastAccessTime=None,
            Row_count=None
        )

def process_tables_in_batches(tables, batch_size=100):
    """Process tables in parallel using ThreadPoolExecutor."""
    batches = [tables[i:i + batch_size] for i in range(0, len(tables), batch_size)]
    all_table_details = []

    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(get_table_details, table): table for batch in batches for table in batch}
        
        for future in as_completed(futures):
            try:
                table_details = future.result(timeout=60)  # Timeout to prevent stuck processes
                all_table_details.append(table_details)
            except Exception as e:
                print(f" Error in batch execution: {e}")

    return all_table_details

# Process tables
if specific_tables:
    table_details = process_tables_in_batches(specific_tables)

    # Convert to DataFrame
    try:
        table_details_df = spark.createDataFrame(table_details, schema=schema)
        pandas_df = table_details_df.toPandas()  # Convert to Pandas DataFrame

        # Define local paths for saving
        local_csv_path = "define the path"
        local_excel_path = "define the path"

        # Save to CSV
        pandas_df.to_csv(local_csv_path, index=False)
        print(f"CSV file saved locally at: {local_csv_path}")

        # Save to Excel
        pandas_df.to_excel(local_excel_path, index=False, engine='openpyxl')
        print(f" Excel file saved locally at: {local_excel_path}")

        # Move files from Local to HDFS (overwrite if needed)
        hdfs_csv_path = "/=define the path and output table name.csv"
        hdfs_excel_path = "define the pathand output table name.xlsx"

        os.system(f"hdfs dfs -rm -skipTrash {hdfs_csv_path} {hdfs_excel_path} 2>/dev/null")
        os.system(f"hdfs dfs -put {local_csv_path} {hdfs_csv_path}")
        os.system(f"hdfs dfs -put {local_excel_path} {hdfs_excel_path}")

        print(f"📂 Files are now available in HDFS: {hdfs_csv_path} and {hdfs_excel_path}")

    except Exception as e:
        print(f" Error in saving DataFrame to file: {e}")
else:
    print("No tables found in the Excel file!")
