In [None]:
import pyspark
from pyspark.sql import SparkSession


from pyspark.sql.functions import col
import os
import tarfile
from datetime import datetime, timedelta
from utils.utils import configure_spark

def create_iceberg_table_if_not_exists(spark, table_name, df, partition_column='DATE'):
    """Create an Iceberg table if it doesn't exist."""
    schema = ', '.join([f'`{field.name}` {field.dataType.simpleString()}' for field in df.schema.fields])
    create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            {schema}
        )
        USING iceberg
        PARTITIONED BY ({partition_column})
    """
    spark.sql(create_table_query)

def query_and_write_to_local_table(spark, nessie_table_name, date, local_table_name):
    """Query data from Nessie table for a given date and write it to the local Iceberg table if it doesn't already exist."""
    try:
        existing_data = spark.sql(f"SELECT * FROM {local_table_name} WHERE DATE = '{date}'")
        if existing_data.count() > 0:
            print(f"Data for {date} already exists in {local_table_name}. Skipping append.")
            return
    except Exception as e:
        print(f"No existing data found for {date} in {local_table_name}. Proceeding with append.")

    query = f"SELECT * FROM nessie.{nessie_table_name} WHERE DATE = '{date}';"
    df = spark.sql(query)
    create_iceberg_table_if_not_exists(spark, local_table_name, df)
    df.writeTo(local_table_name).append()
    print(f"Successfully appended data for {date} to {local_table_name}.")

def untar_latest_iceberg_warehouse(iceberg_warehouse_path):
    """Untar the most recent iceberg_warehouse tar.gz file if the directory doesn't exist."""
    base_name = os.path.basename(iceberg_warehouse_path)
    directory = os.path.dirname(iceberg_warehouse_path) or '.'  # Default to current directory if no parent

    try:
        # hdrusted to match the prefix pattern based on date
        tar_files = [f for f in os.listdir(directory) if f.endswith(f'{base_name}.tar.gz')]
    
        if not tar_files:
            print(f"No tar.gz file found for {iceberg_warehouse_path}. Proceeding without un-tarring.")
            return

        latest_tar_file = max(tar_files, key=lambda f: os.path.getctime(os.path.join(directory, f)))
        latest_tar_path = os.path.join(directory, latest_tar_file)
    
        if not os.path.exists(iceberg_warehouse_path):
            with tarfile.open(latest_tar_path, "r:gz") as tar:
                tar.extractall(path=directory)
            print(f"Un-tarred {latest_tar_path} successfully.")
        else:
            print(f"Directory {iceberg_warehouse_path} already exists. Skipping un-tarring.")
    except Exception as e:
        print(f"Failed to un-tar {latest_tar_path}. Error: {e}")



def tar_iceberg_warehouse(iceberg_warehouse_path, date_prefix):
    """Tar the iceberg_warehouse directory with a date prefix."""
    tar_path = f"{date_prefix}_iceberg_warehouse.tar.gz"
    try:
        with tarfile.open(tar_path, "w:gz") as tar:
            tar.add(iceberg_warehouse_path, arcname=os.path.basename(iceberg_warehouse_path))
        print(f"Tarred {iceberg_warehouse_path} to {tar_path} successfully.")
    except Exception as e:
        print(f"Failed to tar {iceberg_warehouse_path}. Error: {e}")

def process_dates(spark, nessie_table_name, local_table_name, dates):
    """Process a list of dates to query from Nessie and write to both local and Nessie Iceberg tables."""
    for date in dates:
        query_and_write_to_local_table(spark, nessie_table_name, date, local_table_name)
        # read_local_and_write_to_nessie(spark, local_table_name, nessie_table_name)
iceberg_warehouse_path = "iceberg_warehouse"

untar_latest_iceberg_warehouse(iceberg_warehouse_path)

if not os.path.exists(iceberg_warehouse_path):
    os.makedirs(iceberg_warehouse_path, exist_ok=True)

spark = configure_spark('minio', 'main')

tables_to_process = []

# Detect today's date
today_date = datetime.today().date()
#date = today_date - timedelta(days=1)
print(today_date)

# Process each table for today's date
for table in tables_to_process:
    nessie_table_name = table
    local_table_name = f"spark_catalog.default.{table}"
    process_dates(spark, nessie_table_name, local_table_name, [today_date])

# Stop the Spark session
spark.stop()

tar_iceberg_warehouse(iceberg_warehouse_path, today_date)


# Initialize Spark Session and Process Tables
Initialize a Spark session and perform any necessary table processing steps.

# Query Iceberg Table Locally
Run queries against the Iceberg table stored locally to retrieve data.

In [None]:
from utils import configure_spark


spark = configure_spark('minio','main')

# Define the Iceberg table name
table_name = "spark_catalog.default.{table}"  # Make sure this matches the table you've created and written to

# Query the Iceberg table locally
df = spark.sql(f"SELECT * FROM {table_name}")

# Show the results
df.show()

# Query Data from Dremio and Load into DuckDB
Query data from Dremio and load the results into a DuckDB table for further analysis.

In [None]:
from dremio_simple_query.connect import get_token, DremioConnection
import duckdb

# URL to Login Endpoint
login_endpoint = "http://dremio/apiv2/login"

# Payload for Login
payload = {
    "userName": "",
    "password": ""
}

# Get token from API
token = get_token(uri=login_endpoint, payload=payload)



con = duckdb.connect()

# URL Dremio Software Flight Endpoint
arrow_endpoint = "grpc://dremio:32010"

# Establish Client
dremio = DremioConnection(token, arrow_endpoint)

table_name = ""
# Query data from Dremio and load it into DuckDB
duck_rel = dremio.toPandas(
    f"""
    SELECT * FROM nessie.{table_name};
    """
)
duck_rel

# Query Iceberg Table and Save Results to DuckDB Table
Query the Iceberg table and save the results into a DuckDB table for future use.

In [None]:
import duckdb

table_name = ""
# Connect to DuckDB
con = duckdb.connect('iceberg_data.duckdb')

# Install and load the Iceberg extension
con.execute("INSTALL iceberg;")
con.execute("LOAD iceberg;")

# Query the Iceberg table and save the results to a DuckDB table
con.execute(f"""
    CREATE TABLE {table_name} AS
    SELECT *
    FROM iceberg_scan('iceberg_warehouse/default/{table_name}', allow_moved_paths = true) ;
""")

