# 🔄 Update 5 Records

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
# Create Spark Session
spark = SparkSession.builder \
    .appName("Update5Records") \
    .master("local[*]") \
    .config("spark.es.nodes", "elasticsearch") \
    .config("spark.es.port", "9200") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:8.14.3") \
    .getOrCreate()

print(f"Application ID: {spark.sparkContext.applicationId}")

In [None]:
# Define updated records data
updated_records_data = [
    (24410114, "Tran Trieu Thuan updated", 30),
    (24410100, "Nguyen Phuong Tan updated", 30),
    (24410109, "Nguyen Thi Thu Thao updated", 28),
    (24410092, "Huynh Duy Quoc updated", 35),
    (24410040, "Ha Huy Hung updated", 22)
]

update_ids = [record[0] for record in updated_records_data]
print(f"Updating records with IDs: {update_ids}")

# Read current data
current_df = spark.read \
    .format("org.elasticsearch.spark.sql") \
    .option("es.nodes", "elasticsearch") \
    .option("es.port", "9200") \
    .option("es.resource", "2_people_data_2k_spark") \
    .load()

In [None]:
# Show records before update
records_before_update = current_df.filter(col("id").isin(update_ids))
print("Records before update:")
records_before_update.show()

In [None]:
# Create DataFrame with updated data
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

updated_df = spark.createDataFrame(updated_records_data, schema)

print("Records after update:")
updated_df.show()

In [None]:
# Write updated records back to Elasticsearch
print("Writing updated records to Elasticsearch...")
try:
    updated_df.write \
        .format("org.elasticsearch.spark.sql") \
        .option("es.nodes", "elasticsearch") \
        .option("es.port", "9200") \
        .option("es.resource", "2_people_data_2k_spark") \
        .option("es.mapping.id", "id") \
        .mode("append") \
        .save()
    print("✓ Records updated successfully!")
except Exception as e:
    print(f"✗ Update failed: {e}")

In [None]:
# Stop Spark session
spark.stop()
print("Spark session stopped.")