In [1]:
import os
import findspark
from delta.tables import DeltaTable
print("DeltaTable loaded:", bool(DeltaTable))

# Initialize Spark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, TimestampType, LongType, ArrayType, IntegerType
from pyspark.sql.functions import col
from pyspark.sql.functions import coalesce


DeltaTable loaded: True


In [2]:
iceberg_jar_path = "/home/karthik/iceberg-spark-runtime-3.4_2.12-1.4.2.jar"
hive_jdbc = "/opt/spark/jars/hive-jdbc-2.3.9.jar"

spark_access_key = os.environ.get('AWS_ACCESS_KEY_ID')
spark_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')

In [3]:
# Create a Spark session with Iceberg configurations
spark.stop()
spark = SparkSession.builder \
    .appName("IcebergToS3WithHiveSync") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.jars", f"{iceberg_jar_path}") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.spark_catalog.uri", "thrift://XXXX.XX.XX.XX:32431") \
    .config("spark.hadoop.fs.s3a.access.key", "XXXXXX") \
    .config("spark.hadoop.fs.s3a.secret.key", "XXXXXXXXX") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.my_catalog.type", "hive") \
    .config("spark.sql.catalog.my_catalog.uri", "thrift://XX.XX.XX.XX:32431") \
    .config("spark.sql.catalog.my_catalog.warehouse", "s3a://XXXX/iceberg/warehouse") \
    .getOrCreate()
# Check the Spark version to confirm setup
print(f"Spark Version: {spark.version}")

Spark Version: 3.4.0


In [5]:

# Verify the current schema
current_schema = spark.sql("SELECT current_schema()").collect()[0][0]
print(f"Current schema: {current_schema}")


schema = StructType([
    StructField("customer", StringType(), False),
    StructField("user_id", StringType(), False),
     StructField("picture_url", StringType(), True),
    StructField("user_type", StringType(), True),
    StructField("created_date", StringType(), True),
    StructField("audit_timestamp", LongType(), False)
])

# Read data from a CSV file into a DataFrame
df = spark.read.schema(schema).json("s3a://XXXX/data/customers/4e0333a1205e_308_2_2.json")
df.printSchema()
#We can select the required columns only
selected_columns_df = df.select("customer", "user_id", "picture_url", "user_type", "audit_timestamp") 


# Check if there are any null values in customer or user_id
null_counts = selected_columns_df.filter(col("customer").isNull() | col("user_id").isNull() | col("audit_timestamp").isNull()).count() 

if null_counts == 0:
    # Create DataFrame with the new schema
    df_new = spark.createDataFrame(selected_columns_df.rdd, schema=schema)
else:
    print("Data contains null values in 'customer' or 'user_id'. Handling nulls before applying the new schema.")
    # Handle null values by filling with default values
    df_handled = selected_columns_df.na.fill({
        'customer': 'default_customer',  # Replace with an appropriate default value
        'user_id': 'default_user_id',
        'audit_timestamp': 0 # Replace with an appropriate default value
    })
    # Create DataFrame with the new schema
    df_new = spark.createDataFrame(df_handled.rdd, schema=schema)




Current schema: default


24/08/03 18:33:08 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [6]:
# Print the schema to validate the Schema
df_new.printSchema()

In [7]:
table_name = "default.customers"
# Check if the table exists, if not, create it
spark.sql(f"""
CREATE TABLE IF NOT EXISTS  {table_name} (
    customer STRING,
    user_id STRING NOT NULL,
    user_type STRING,
    audit_timestamp bigint
    
)
USING iceberg
PARTITIONED BY (customer)
 LOCATION 's3a://XXXX/iceberg/warehouse/customers/'
 """)
# #Adding New columns
spark.sql(f"""
ALTER TABLE {table_name} 
ADD COLUMNS picture_url STRING
""")
# #Adding Identifier, Identifier column should ne NOT NULL type
spark.sql(f"""
ALTER TABLE {table_name} SET IDENTIFIER FIELDS user_id
""")

spark.sql(f"""   
ALTER TABLE {table_name} SET 
TBLPROPERTIES ('write.update.mode'='copy-on-write',
    'write.update.isolation-level'='snapshot',
    'write.merge.mode'='copy-on-write',
    'write.merge.isolation-level'='snapshot',
    'write.target-file-size-bytes'='100870912',
    'write.distribution.mode'='hash')
""")

# # Write data to Iceberg table
selected_columns_df.write \
    .format("iceberg") \
    .mode("append") \
    .save(table_name)

print("Data written to Iceberg table successfully")

                                                                                

Data written to Iceberg table successfully
