In [1]:
import os
import pyspark
import logging

from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, to_timestamp, col, monotonically_increasing_id
from random_spec import rand_spec_case_wsl
from rand_engine.main.data_generator import DataGenerator


class SparkUtils:

  def get_spark_session_nessie(self, logger, app_name):
    jar_packages = [
        "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1",
        "org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.99.0",
        "org.apache.iceberg:iceberg-aws-bundle:1.6.1"]

    spark_extensions = [
      "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
      "org.projectnessie.spark.extensions.NessieSparkSessionExtensions"]

    logger.info("Environment Variables:")
    logger.info(f"SPARK_MASTER: {os.getenv('SPARK_MASTER')}")
    logger.info(f"S3_URL: {os.getenv('S3_URL')}")
    logger.info(f"NESSIE_URI: {os.getenv('NESSIE_URI')}")
    logger.info(f"AWS_ACCESS_KEY_ID: {os.getenv('AWS_ACCESS_KEY_ID')[:4]}")
    logger.info(f"AWS_SECRET_ACCESS_KEY: {os.getenv('AWS_SECRET_ACCESS_KEY')[:4]}")
      
    conf = (
      pyspark.SparkConf()
      .setAppName(app_name)
      .set('spark.sql.catalog.nessie.s3.path-style-access', 'true')
      .set('spark.sql.catalog.nessie.warehouse', 's3a://lakehouse/warehouse')
      .set('spark.sql.catalog.nessie.cache-enabled', 'false')    
      .set('spark.hadoop.fs.s3a.access.key', os.getenv("AWS_ACCESS_KEY_ID"))
      .set('spark.hadoop.fs.s3a.secret.key', os.getenv("AWS_SECRET_ACCESS_KEY"))
      .set("spark.hadoop.fs.s3a.endpoint", os.getenv("S3_URL"))
      .set("spark.hadoop.fs.s3a.path.style.access", "true")
      .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"))
      
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    return spark
    
  def get_spark_session_glue(self, logger, app_name):
    jar_packages = [
        "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1",
        "org.apache.iceberg:iceberg-aws-bundle:1.6.1"]

    spark_extensions = ["org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"]

    logger.info("Environment Variables:")
    logger.info(f"SPARK_MASTER: {os.getenv('SPARK_MASTER')}")
    logger.info(f"S3_URL: {os.getenv('S3_URL')}")
    logger.info(f"NESSIE_URI: {os.getenv('NESSIE_URI')}")
    logger.info(f"AWS_ACCESS_KEY_ID: {os.getenv('AWS_ACCESS_KEY_ID')[:4]}")
    logger.info(f"AWS_SECRET_ACCESS_KEY: {os.getenv('AWS_SECRET_ACCESS_KEY')[:4]}")

    conf = (
      pyspark.SparkConf()
      .setAppName(app_name)
      .set("spark.jars_packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1")
      .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .set("spark.sql.catalog.glue", "org.apache.iceberg.spark.SparkCatalog")
      #.set("spark.sql.catalog.glue.catalog-impl", "org.apache.iceberg.aws.glue.catalog.GlueCatalog")
      .set("spark.sql.catalog.glue.warehouse", "s3://dadaia-dbfs-learning/warehouse")
      .set("spark.sql.catalog.glue.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    )
      
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    return spark
  

logger = logging.getLogger(__name__)
spark = SparkUtils().get_spark_session_nessie(logger, "Rosemberg")
spark


:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0d7e7ce6-fb9e-4437-ad7c-285afea9c39d;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.1 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.99.0 in central
	found org.apache.iceberg#iceberg-aws-bundle;1.6.1 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.3 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central


In [2]:
spark.sql("SHOW NAMESPACES").show()

+---------+
|namespace|
+---------+
|   silver|
|   bronze|
+---------+



25/02/15 01:13:16 ERROR TaskSchedulerImpl: Lost executor 1 on 10.0.1.34: worker lost: 10.0.1.34:44479 got disassociated
25/02/15 01:13:19 ERROR TaskSchedulerImpl: Lost executor 4 on 10.0.1.36: worker lost: 10.0.1.36:41047 got disassociated
25/02/15 01:13:22 ERROR TaskSchedulerImpl: Lost executor 6 on 10.0.1.33: worker lost: 10.0.1.33:44649 got disassociated
25/02/15 01:13:25 ERROR TaskSchedulerImpl: Lost executor 3 on 10.0.1.37: worker lost: 10.0.1.37:45919 got disassociated
25/02/15 01:13:28 ERROR TaskSchedulerImpl: Lost executor 5 on 10.0.1.19: worker lost: 10.0.1.19:42623 got disassociated
25/02/15 01:13:30 ERROR TaskSchedulerImpl: Lost executor 0 on 10.0.1.18: worker lost: 10.0.1.18:40497 got disassociated
25/02/15 01:13:33 ERROR TaskSchedulerImpl: Lost executor 2 on 10.0.1.5: worker lost: 10.0.1.5:33665 got disassociated
25/02/15 01:16:05 ERROR TransportRequestHandler: Error sending result StreamResponse[streamId=/jars/org.apache.hadoop_hadoop-client-api-3.3.4.jar,byteCount=194586

In [None]:
# full_tablename = "nessie.bronze.kafka_topics_multiplexed"
# spark.sql(f"SELECT COUNT(*) FROM {full_tablename}.partitions").show(10, False)
# spark.sql(f"SELECT * FROM {full_tablename}.snapshots").show(10, False)

In [None]:
from datetime import datetime as dt, timedelta
import os
from functools import reduce

class IceStreamMaintenance:

    def __init__(self, spark, table):
        self.spark = spark
        self.table = table
    
    def expire_snapshots(self, hours_retained=1):
        timestamp_after_to_retain = dt.now() - timedelta(hours=hours_retained)
        query = f"CALL nessie.system.expire_snapshots('{self.table}', TIMESTAMP '{timestamp_after_to_retain}', 2)"
        print(query)
        self.spark.sql(query).show()

    def rewrite_manifests(self):
        query = f"CALL nessie.system.rewrite_manifests('{self.table}')"
        print(query)
        self.spark.sql(query).show()

    def rewrite_position_delete_files(self):
        query = f"CALL nessie.system.rewrite_position_delete_files('{self.table}')"
        print(query)
        self.spark.sql(query).show()

    
    def rewrite_position_delete_files(self):
        query = f"CALL nessie.system.rewrite_position_delete_files('{self.table}')"
        print(query)
        self.spark.sql(query).show()

    
    def rewrite_data_files(self, where=None):
        query = f"CALL nessie.system.rewrite_data_files(table => '{self.table}')"
        query_with_where = f"CALL nessie.system.rewrite_data_files(table => '{self.table}', where => '{where}')"
        if where is not None: self.spark.sql(query_with_where).show()
        else: self.spark.sql(query).show()

    
    def remove_orphan_files(self, where=None):
        query = f"CALL nessie.system.remove_orphan_files(table => '{self.table}')"
        print(query)
        self.spark.sql(query).show()


tables = ["nessie.bronze.kafka_topics_multiplexed", "nessie.silver.blocks", "nessie.silver.blocks_transactions", "nessie.silver.transactions"]

for table in tables:
    
    ice_handler = IceStreamMaintenance(spark, table)
    ice_handler.rewrite_manifests()
    ice_handler.compact_files()
    spark.sql(f"CALL nessie.system.rewrite_position_delete_files('{table}')").show()
    spark.sql(f"CALL nessie.system.remove_orphan_files(table => '{table}')").show()
    
    ice_handler.expire_snapshots(hours_retained=0)
    # df_partitions = spark.sql(f"SELECT partition FROM {table}.partitions")
    # df_schema = spark.sql(f"DESCRIBE EXTENDED {table}").filter(col("col_name") != "").withColumn("id", monotonically_increasing_id())
    # df_schema.createOrReplaceTempView("df_schema")
    # filter_bottom = "SELECT id FROM df_schema WHERE col_name = '# Partition Information'"
    # filter_top = "SELECT id FROM df_schema WHERE col_name = '# Metadata Columns'"
    # df_new = spark.sql(f"SELECT * FROM df_schema WHERE id > ({filter_bottom}) AND id < ({filter_top})").filter(col("col_name") != '# col_name').select("col_name")
    # partitions = [i["col_name"] for i in df_new.collect()]
    # print(partitions)
    # #for partition in partitions:
    # result = [{j: i["partition"][j] for j in partitions} for i in df_partitions.select("partition").collect()]
    # result = [[f'{k}="{v}"' for k, v in i.items()] for i in result]
    # result_2 = [reduce(lambda a, b: f"{a} and {b}", i) for i in result]

        
        # ice_handler.compact_files()

        # ice_handler.rewrite_manifests()
        # ice_handler.expire_snapshots(hours_retained=0)

In [None]:
spark.sql("CALL nessie.system.remove_orphan_files(table => 'db.sample', dry_run => true)").show()


In [None]:
spark.sql("CALL nessie.system.rewrite_manifests('nessie.bronze.kafka_topics_multiplexed')").show()

In [None]:
spark.sql("CALL nessie.system.rewrite_position_delete_files('nessie.bronze.kafka_topics_multiplexed')").show()

In [None]:
from datetime import datetime as dt, timedelta
class LittleETL:


    def __init__(self, spark, full_tablename):
        self.spark = spark
        self.df_extracted = None
        self.df_transformed = None
        self.full_tablename = full_tablename

    def create_table(self):
        namespace = self.full_tablename.split(".")[1]
        self.spark.sql(f"CREATE NAMESPACE IF NOT EXISTS nessie.{namespace}").show()
        self.spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {self.full_tablename} (
            ip_address STRING,
            identifier STRING,
            user STRING,
            http_version STRING,
            object_size INT,
            http_request STRING,
            http_status INT,
            odate STRING,
            log_timestamp TIMESTAMP)
        USING ICEBERG
        PARTITIONED BY (odate)
        TBLPROPERTIES (
            'gc.enabled' = 'true',
            'write.delete.mode' = 'copy-on-write',
            'write.update.mode' = 'merge-on-read',
            'write.merge.mode' = 'merge-on-read',
            'write.metadata.delete-after-commit.enabled' = 'true',
            'write.metadata.previous-versions-max' = 3,
            'write.parquet.compression-codec' = 'snappy'
        )""").show()
        return self

    def show_metadata(self, metric="history"):
        assert self.full_tablename is not None, "table must bet set before reading its metadata!"
        ice_metadata = {
            "partitions": lambda: self.spark.sql(f"SELECT * FROM {self.full_tablename}.partitions").show(),
            "history": lambda: self.spark.sql(f"SELECT * FROM {self.full_tablename}.history").show(),
            "files": lambda: self.spark.sql(f"SELECT * FROM {self.full_tablename}.files").show()
        }
        return ice_metadata[metric]()

    def maintenance(self):
        self.spark.sql(f"CALL nessie.system.rewrite_data_files('{self.full_tablename}')").show()

    def remove_orphans(self):
        print(f"Try to expire snapshots")
        timestamp_30_minutes_ago = dt.now() - timedelta(minutes=300)
        formatted_timestamp = timestamp_30_minutes_ago.strftime("%Y-%m-%d %H:%M:%S")
        self.spark.sql(f"CALL nessie.system.expire_snapshots('{self.full_tablename}', TIMESTAMP '{dt.now()}', 1)").show()
        self.spark.sql(f"CALL nessie.system.remove_orphan_files(table => '{self.full_tablename}', dry_run => true)").show()
        #"2025-02-02 14:25:00.000"
    
    def extract(self, metadata, size=10**5):
        df_pandas = DataGenerator(rand_spec).generate_pandas_df(size).get_df()
        self.df_extracted = self.spark.createDataFrame(df_pandas)
        del df_pandas
        print(f"Num Partitions: {self.df_extracted.rdd.getNumPartitions()}")
        self.df_extracted.printSchema()
        return self

    def transform(self):
        assert self.df_extracted is not None, "dataframe must be extracted before transform it!"
        datetime_format = "dd/MMM/yyyy:HH:mm:ss"
        odate_format = "yyyy-MM-dd"
        self.df_transformed = (
            self.df_extracted 
                .withColumn("timestamp", to_timestamp(col("datetime"), datetime_format))
                .withColumn("odate", date_format(col("timestamp"), odate_format))
                .withColumn("http_status", col("http_status").cast("int"))
                .withColumnRenamed("identificador", "identifier")
                .withColumnRenamed("timestamp", "log_timestamp")
                .drop("datetime")
        )
        return self.df_transformed
        

    def load(self):
        assert self.df_transformed  is not None, "dataframe must be transformed before load it!"
        _ = (
            self.df_transformed 
            .writeTo(self.full_tablename)
            .partitionedBy("odate")
            .append()
        )




if __name__ == '__main__':
    

    table_name = "nessie.learn.web_server_logs"
    rand_spec = rand_spec_case_wsl(min_date="2025-02-02", max_date="2025-02-03")
    
    etl = LittleETL(spark, table_name).create_table()
    # for i in range(10):
    #     df = etl.extract(metadata=rand_spec).transform()
    #     etl.load()
    etl.maintenance()
    etl.remove_orphans()
    etl.show_metadata()


In [None]:
import ast


    
extract_metadata()

In [None]:
spark.sql("DROP TABLE nessie.silver.blocks_transactions")

In [None]:
print(dt.now())

In [None]:

#spark.table("nessie.learn.web_server_logs").show()

In [None]:
# spark.sql(f"""
# CREATE TABLE IF NOT EXISTS nessie.learn.web_server_logs (
#     ip_address STRING,
#     identifier STRING,
#     user STRING,
#     http_version STRING,
#     object_size INT,
#     http_request STRING,
#     http_status INT,
#     odate STRING,
#     log_timestamp TIMESTAMP)
# USING ICEBERG
# PARTITIONED BY (odate)
# TBLPROPERTIES ('gc.enabled' = 'true')
# """).show()

25/02/15 01:00:46 ERROR TransportRequestHandler: Error sending result StreamResponse[streamId=/jars/org.apache.iceberg_iceberg-spark-runtime-3.5_2.12-1.6.1.jar,byteCount=41824467,body=FileSegmentManagedBuffer[file=/root/.ivy2/jars/org.apache.iceberg_iceberg-spark-runtime-3.5_2.12-1.6.1.jar,offset=0,length=41824467]] to /10.0.1.33:59600; closing connection
io.netty.channel.StacklessClosedChannelException
	at io.netty.channel.AbstractChannel.close(ChannelPromise)(Unknown Source)
25/02/15 01:00:46 ERROR TransportRequestHandler: Error sending result StreamResponse[streamId=/files/org.apache.hadoop_hadoop-client-api-3.3.4.jar,byteCount=19458635,body=FileSegmentManagedBuffer[file=/root/.ivy2/jars/org.apache.hadoop_hadoop-client-api-3.3.4.jar,offset=0,length=19458635]] to /10.0.1.5:41000; closing connection
io.netty.channel.StacklessClosedChannelException
	at io.netty.channel.AbstractChannel.close(ChannelPromise)(Unknown Source)
25/02/15 01:00:46 ERROR TransportRequestHandler: Error sending r