# Code Samples 

## initial Config
use `%%configure magic` to configure spark since there are some configs that appear to not be affected by it.

In [None]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}

Manually add job params like this

In [None]:
import sys 
sys.argv.append('--JOB_NAME')
sys.argv.append('test_count')

## Libraries

### pip libraries

In [None]:
%%sh
pip install faker

In [None]:
import faker

### Custom Libraries
The custom libraries shoud be in the `./extra_python_path` and imported with the file name

In [None]:
import hudi_library

## Reading Hudi Data

In [None]:
df = spark.read.format("hudi").load(f"s3://<bucket-name>/<path-to-table>")

## Writing Hudi Data

In [None]:
# imports
from faker import Faker

import sys, boto3
from awsglue.context import GlueContext, DynamicFrame
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, lit, to_timestamp
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
import json

args = getResolvedOptions(
    sys.argv, [
        'JOB_NAME'
    ],
)

spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

# Create a Spark context and Glue context
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()
job.init(args["JOB_NAME"], args)

# Creating Fake Data with faker lib
fake = Faker()
Faker.seed(0)

schema = StructType([
    StructField("id", StringType()),
    StructField("name", StringType()),
    StructField("email", StringType()),
    StructField("address", StringType()),
    StructField("ts", StringType()) # Required precombine.field by hudi
])

data = []
for i in range(100):
    data.append((fake.uuid4(), fake.name(), fake.email(), fake.address(), fake.date_time_this_month()))

df = spark.createDataFrame(data, schema)
df.show(5)

In [None]:
# Basic Hudi settings adjust as needed.
hudi_common_settings = {
    "className" : "org.apache.hudi", 
    "hoodie.table.name": "sample_tb", # Glue Catalog table name
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.operation": "insert_overwrite_table",
    "hoodie.datasource.write.recordkey.field": "id", # primary key 
    "hoodie.datasource.write.precombine.field" : "ts", # precombined key
    "path" : "s3://testbucket-juanamaya/sample_db/sample_tb/", # S3 target path
}
hudi_index_settings = {
    "hoodie.index.type": "BLOOM", 
}
hudi_hive_sync_settings = {
    "hoodie.parquet.compression.codec": "gzip",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.database": "sample_db", # Glue Catalog database name
    "hoodie.datasource.hive_sync.table": "sample_tb", # Glue Catalog table name
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.datasource.hive_sync.mode": "hms",
}
hudi_cleaner_options = {
    "hoodie.clean.automatic": "true",
    "hoodie.clean.async": "true",
    "hoodie.cleaner.policy": 'KEEP_LATEST_COMMITS',
    'hoodie.cleaner.commits.retained': 10,
    "hoodie-conf hoodie.cleaner.parallelism": '200',
}
unpartition_settings = {
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
}
hudi_final_settings = {**hudi_common_settings, **hudi_index_settings, **hudi_hive_sync_settings, **hudi_cleaner_options, **unpartition_settings}

In [None]:
# Write with Hudi full overwrite table and contents, adjust logic to use upserts, incremental ETL or cdc
df.write.format('hudi').options(**hudi_final_settings).mode('Overwrite').save()