## **Real Time Data Engineering Projects**

> โปรเจกต์นี้ออกแบบมาให้เป็นแนวทางแบบครบวงจร (end-to-end) สำหรับการสร้าง data engineering pipeline ตั้งแต่ขั้นตอน ingestion, streaming, processing ไปจนถึงการจัดเก็บข้อมูล โดยใช้โครงสร้างพื้นฐานที่เน้น real-time processing และ containerized deployment

#### **สถาปัตยกรรมระบบ (System Architecture)**

มี component หลักดังนี้:

- **Data Source**: ดึงข้อมูลจาก API เช่น randomuser.me เพื่อใช้เป็นข้อมูลจำลอง (dummy data) 

- **Apache Airflow**: ทำหน้าที่ orchestration ของ pipeline และเก็บข้อมูลที่ fetch เข้าไว้ใน PostgreSQL 

- **Kafka + Zookeeper**: ใช้สำหรับ streaming ข้อมูลจาก PostgreSQL ไปยัง processing engine 

- **Control Center & Schema Registry**: สำหรับดูแล schema และ monitoring Kafka streams 

- **Apache Spark**: ใช้ประมวลผลข้อมูล (stream processing) ผ่าน cluster (master/worker nodes) 

- **Cassandra**: ใช้เก็บข้อมูลหลังผ่านการประมวลผล

#### **เทคโนโลยีที่ใช้**

- Orchestration: Apache Airflow

- Streaming: Apache Kafka & Zookeeper

- Processing: Apache Spark

- Storage: PostgreSQL (ก่อนประมวลผล), Cassandra (หลังประมวลผล)

- Containerization: Docker

#### 📌 **Kafka พื้นฐาน (สำหรับ Data Engineer)**

##### **1. Kafka คืออะไร?**

- Kafka = ระบบ distributed event streaming platform
  → ใช้สำหรับส่ง, เก็บ และประมวลผลข้อมูลแบบ real-time

- มันทำตัวเหมือน message broker (ท่อข้อมูลกลาง)

- ใช้กันเยอะในงาน real-time analytics, log streaming, data pipeline

##### **2. แนวคิดหลัก (Concepts)**

ลองคิดเป็น "ห้อง chat" 👇

- **Producer** → คนพิมพ์ข้อความ (ส่งข้อมูลเข้าไปใน Kafka)

- **Consumer** → คนอ่านข้อความ (ดึงข้อมูลออกมาใช้)

- **Topic** → ห้องสนทนา (channel ที่ใช้ส่ง/รับข้อมูล)

- **Broker** → Server ของ Kafka ที่เก็บ topic

- **Cluster** → กลุ่ม broker หลายๆ ตัว

- **Partition** → การแบ่ง topic ออกเป็นส่วนๆ เพื่อ scale (แต่ละ partition คือท่อแยก)

- **Offset** → เลขบอกลำดับของ message ใน partition

👉 Keyword: Publish–Subscribe system
Producer ยิงข้อมูล → Kafka → Consumer อ่านข้อมูล

##### **3. Kafka Data Flow**

1. Producer สร้าง event → ส่งไปที่ Kafka topic

2. Kafka เก็บ event ไว้ใน log แบบ append-only

3. Consumer subscribe topic → อ่าน event ตามลำดับ offset

4. Data ถูก process หรือส่งต่อไป DB, Data Lake, Dashboard

#### **เริ่มต้นโปรเจ็ค**

ตรวจสอบ version ของ Python และ Spark ที่จาก Docker Container

In [1]:
!python3 --version
!spark-submit --version

Python 3.11.6
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.28
Branch HEAD
Compiled by user heartsavior on 2024-02-15T11:24:58Z
Revision fd86f85e181fc2dc0f50a096855acf83a6cc5d9c
Url https://github.com/apache/spark
Type --help for more information.


##### 1. สร้าง Mock Streaming data จาก API

ดึงค่าจาก API

In [2]:
import requests

def get_data():
    # ดึงข้อมูล Random User
    res = requests.get("https://randomuser.me/api/") # เวลาดึง data มาจะไม่เหมือนกัน
    res = res.json() # แปลงจาก json เป็น dict
    res = res['results'][0]

    return res


In [3]:
import json

print(json.dumps(get_data(), indent=3))

{
   "gender": "female",
   "name": {
      "title": "Mrs",
      "first": "Joelma",
      "last": "Carvalho"
   },
   "location": {
      "street": {
         "number": 273,
         "name": "Rua Para\u00edba "
      },
      "city": "Paulista",
      "state": "Esp\u00edrito Santo",
      "country": "Brazil",
      "postcode": 84055,
      "coordinates": {
         "latitude": "-75.3848",
         "longitude": "135.3140"
      },
      "timezone": {
         "offset": "-3:30",
         "description": "Newfoundland"
      }
   },
   "email": "joelma.carvalho@example.com",
   "login": {
      "uuid": "729407a4-d45a-4518-8981-922ee498a995",
      "username": "bigmeercat213",
      "password": "keywest",
      "salt": "rtPueIVL",
      "md5": "1228c6e9945b11b9af65cc28194a311e",
      "sha1": "4c3a15a09c1ed12010c7ebffae10e9275aea64c4",
      "sha256": "897de8f0e17b825aac6637456af02458d25639734e0341147b52b369e2af40dc"
   },
   "dob": {
      "date": "1952-11-11T17:30:51.966Z",
      "age": 

จัดรูปแบบตามที่ต้องการ

In [4]:
import uuid

def format_data(res):
    # จัดรูปแบบข้อมูล
    data = {}
    location = res['location']
    data['id'] = str(uuid.uuid4())
    data['first_name'] = res['name']['first']
    data['last_name'] = res['name']['last']
    data['gender'] = res['gender']
    data['address'] = f"{str(location['street']['number'])} {location['street']['name']}, " \
                      f"{location['city']}, {location['state']}, {location['country']}"
    data['post_code'] = location['postcode']
    data['email'] = res['email']
    data['username'] = res['login']['username']
    data['dob'] = res['dob']['date']
    data['registered_date'] = res['registered']['date']
    data['phone'] = res['phone']
    data['picture'] = res['picture']['medium']

    print(data)

    return data

In [5]:
format_data(get_data())

{'id': '3be8f393-e386-417c-8ebf-3cdccfaae80b', 'first_name': 'Hugo', 'last_name': 'Moreno', 'gender': 'male', 'address': '6139 Calle del Pez, Santa Cruz de Tenerife, Aragón, Spain', 'post_code': 48348, 'email': 'hugo.moreno@example.com', 'username': 'lazydog173', 'dob': '1980-12-06T16:12:22.549Z', 'registered_date': '2012-01-16T04:28:50.357Z', 'phone': '954-268-334', 'picture': 'https://randomuser.me/api/portraits/med/men/12.jpg'}


{'id': '3be8f393-e386-417c-8ebf-3cdccfaae80b',
 'first_name': 'Hugo',
 'last_name': 'Moreno',
 'gender': 'male',
 'address': '6139 Calle del Pez, Santa Cruz de Tenerife, Aragón, Spain',
 'post_code': 48348,
 'email': 'hugo.moreno@example.com',
 'username': 'lazydog173',
 'dob': '1980-12-06T16:12:22.549Z',
 'registered_date': '2012-01-16T04:28:50.357Z',
 'phone': '954-268-334',
 'picture': 'https://randomuser.me/api/portraits/med/men/12.jpg'}

Mock Api Streaming Data

In [6]:
def stream_data():
    import json
    from kafka import KafkaProducer
    import time
    import logging

    producer = KafkaProducer(bootstrap_servers=['broker:29092'], max_block_ms=5000) # สร้าง KafkaProducer เพื่อส่งช้อมูลไปใน Kafka Boker
    curr_time = time.time() # real time

    while True:
        if time.time() > curr_time + 10: # 1 minute
            break
        try:
            res = get_data()
            res = format_data(res)

            print(res)
            producer.send('users_created', json.dumps(res).encode('utf-8')) # ส่ง res ที่เป็น json ไปให้ boker
        except Exception as e:
            logging.error(f'An error occured: {e}')
            continue

In [7]:
# Function Streaming Data from API
stream_data()

{'id': 'a1ec5ca8-7d47-4d47-bed4-96ff198a9a65', 'first_name': 'Katharine', 'last_name': 'Pott', 'gender': 'female', 'address': '3259 Tannenweg, Freyung, Baden-Württemberg, Germany', 'post_code': 35617, 'email': 'katharine.pott@example.com', 'username': 'heavymouse564', 'dob': '1990-04-01T04:00:24.010Z', 'registered_date': '2022-03-02T22:58:10.484Z', 'phone': '0875-8014851', 'picture': 'https://randomuser.me/api/portraits/med/women/63.jpg'}
{'id': 'a1ec5ca8-7d47-4d47-bed4-96ff198a9a65', 'first_name': 'Katharine', 'last_name': 'Pott', 'gender': 'female', 'address': '3259 Tannenweg, Freyung, Baden-Württemberg, Germany', 'post_code': 35617, 'email': 'katharine.pott@example.com', 'username': 'heavymouse564', 'dob': '1990-04-01T04:00:24.010Z', 'registered_date': '2022-03-02T22:58:10.484Z', 'phone': '0875-8014851', 'picture': 'https://randomuser.me/api/portraits/med/women/63.jpg'}
{'id': '9b4bd941-d9d1-4487-8386-193e6d2fd478', 'first_name': 'Vilma', 'last_name': 'Makela', 'gender': 'female', '

#### **ETL (Extract, Transform, Load) แบบ Real-time** 

โดยมีหน้าที่หลักคือ ดึงข้อมูลผู้ใช้ (user) ที่ถูกส่งเข้ามาใน Kafka อย่างต่อเนื่อง, แปลงข้อมูลให้อยู่ในรูปแบบที่ถูกต้อง, แล้วบันทึกลงในฐานข้อมูล Cassandra

In [8]:
import logging
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

# ตั้งค่า logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)


In [9]:
# ฟังก์ชันสร้างการเชื่อมต่อกับ Spark
def create_spark_connection():
    s_conn = None

    try:
        s_conn = (SparkSession.builder 
            .appName('SparkDataStreaming') 
            .config('spark.jars.packages', # โหลดแพคเกจสำหรับ Spark
                    "com.datastax.spark:spark-cassandra-connector_2.12:3.5.0," # โหลดแพคเกจสำหรับเชื่อมต่อกับ Cassandra
                    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") # โหลดแพคเกจสำหรับเชื่อมต่อกับ Kafka 
            .config('spark.cassandra.connection.host', 'localhost') # กำหนดที่อยู่การเชื่อมต่อกับ Cassandra
            .getOrCreate())
        
        s_conn.sparkContext.setLogLevel("ERROR")
        logging.info("Spark connection created successfully!")
    except Exception as e:
        logging.error(f"Couldn't create the spark session due to exception {e}")

    return s_conn

In [10]:
# ฟังก์ชันการเชื่อมต่อกับ Kafka
def connect_to_kafka(spark_conn):
    spark_df = None
    try:
        spark_df = (spark_conn.readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', 'broker:29092') # กำหนดที่อยู่การเชื่อมต่อกับ Kafka Broker
            .option('subscribe', 'users_created') # กำหนดชื่อของ Kafka Topic ที่จะดึงข้อมูลมา
            .option('startingOffsets', 'earliest') # กำหนดว่าจะดึงข้อมูลจากเริ่มต้นหรือไม่ earliest (อ่านจากที่เริ่มต้น) หรือ latest (อ่านจากข้อมูลที่เข้ามาล่าสุด)
            .option("maxOffsetsPerTrigger", 100)  # อ่านทีละ 50 records
            .load())
        logging.info("kafka dataframe created successfully")
    except Exception as e:
        logging.warning(f"kafka dataframe could not be created because: {e}")

    return spark_df

In [11]:
# ฟังก์ชันสร้างการเชื่อมต่อกับ Cassandra
def create_cassandra_connection():
    try:
        # connecting to the cassandra cluster
        cluster = Cluster(contact_points=['cassandra'], port=9042) # กำหนดที่อยู่ hostname การเชื่อมต่อกับ Cassandra
        cas_session = cluster.connect() # สร้างการเชื่อมต่อ

        logging.info("Connected to Cassandra!")
        return cas_session
    except Exception as e:
        logging.error(f"Could not create cassandra connection due to {e}")
        return None

In [12]:
def create_keyspace(session):
    # สร้าง Database / Schema บน Cassandra เรียกว่า Keyspace
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS spark_streams
        WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    """)

    print("Keyspace created successfully!")

In [13]:
def create_table(session):
    # สร้าง Table
    session.execute("""
    CREATE TABLE IF NOT EXISTS spark_streams.created_users (
        id UUID PRIMARY KEY,
        first_name TEXT,
        last_name TEXT,
        gender TEXT,
        address TEXT,
        post_code TEXT,
        email TEXT,
        username TEXT,
        registered_date TEXT,
        phone TEXT,
        picture TEXT) WITH default_time_to_live = 1200;
    """)

    print("Table created successfully!")

In [14]:
# ฟังก์ชันประมวลผลข้อมูลด้วย Spark และ Data Quality Checks
def create_selection_df_from_kafka(spark_df):
    """
    แปลงข้อมูลจาก Kafka เป็น Spark DataFrame และทำ Data Quality Checks
    - Schema Enforcement
    - Missing Value Check
    - Business Rule Validation (regex, allowed values)
    - Deduplication
    - ส่งข้อมูลไม่ผ่าน validation ไป Dead-letter topic (optional)
    
    Args:
        spark_df: input streaming DataFrame จาก Kafka
        kafka_producer_invalid: KafkaProducer สำหรับส่งข้อมูล invalid (optional)
    
    Returns:
        validated_df: Spark DataFrame พร้อมส่งต่อ (validated)
    """
    # -------------------- 1. Define Schema --------------------
    # กำหนดโครงสร้างข้อมูลที่จะนำเข้า
    schema = StructType([
        StructField("id", StringType(), False),
        StructField("first_name", StringType(), False),
        StructField("last_name", StringType(), False),
        StructField("gender", StringType(), False),
        StructField("address", StringType(), False),
        StructField("post_code", StringType(), False),
        StructField("email", StringType(), False),
        StructField("username", StringType(), False),
        StructField("registered_date", StringType(), False),
        StructField("phone", StringType(), False),
        StructField("picture", StringType(), False)
    ])

    # -------------------- 2. Parse JSON and Enforce Schema --------------------
    # ดึงและแปลงข้อมูลจาก kafka เป็น Spark DataFrame 
    df = (spark_df.selectExpr("CAST(value AS STRING)") # เลือกเฉพาะคอลัมน์ value แปลงเป็น String 
        .select(from_json(col('value'), schema).alias('data')).select("data.*")) # อ่านข้อมูลจาก json ที่เป็น value และจัดข้อมูลตาม schema
    logging.info("Schema applied and JSON parsed.")

    # -------------------- 3. Missing Value Check --------------------
    df_non_null = df.dropna(subset=["id", "email", "registered_date"]) # ลบตัวที่มีค่าเป็น null ใน col พวกนี้
    logging.info("Dropped records with null id, email, or registered_date.")

    # -------------------- 4. Business Rule Validation --------------------
    df_valid = df_non_null.filter( # filter แค่ค่าที่ตรงตาม regex ตามแบบนี้
        col("email").rlike(r".+@.+\..+") & # รูปแบบของ email pattern
        col("gender").isin("male", "female") & # จะมีแค่ 2 ค่านี้เท่านั้น
        col("id").rlike("^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$")
    )
    logging.info("Applied business rule validation.")

    # -------------------- 5. Deduplicate --------------------
    df_deduped = df_valid.dropDuplicates(["id"]) # ลบแถวที่มี id ซ้ำกัน
    logging.info("Deduplicated based on id.")

    return df_deduped

In [15]:
# create spark connection
spark_conn = create_spark_connection() # สร้างการเชื่อมต่อกับ Spark

if spark_conn is not None:
        # connect to kafka with spark connection
        spark_df = connect_to_kafka(spark_conn) # สร้างการเชื่อมต่อกับ Kafka
        selection_df = create_selection_df_from_kafka(spark_df) # ดึงข้อมูลจาก Kafka
        session = create_cassandra_connection() # สร้างการเชื่อมต่อกับ Cassandra

        # Testing Data Flow on Console
        # query = (selection_df
        #  .writeStream
        #  .format("console")
        #  .outputMode("append")
        #  .start())

        # query.awaitTermination(30) # หน่วงเวลา 30s ให้ query data
        # query.stop() # stop streaming query หลัง 30s

        if session is not None:
            create_keyspace(session) # สร้าง Database / Schema บน Cassandra เรียกว่า Keyspace
            create_table(session) # สร้าง Table

            logging.info("Streaming is being started...")

            # เขียนข้อมูลลง Database
            streaming_query = (selection_df.writeStream
                               .format("org.apache.spark.sql.cassandra")
                               .option('checkpointLocation', '/tmp/checkpoint')
                               .option('keyspace', 'spark_streams')
                               .option('table', 'created_users')
                               .option("spark.cassandra.connection.host", "cassandra")
                               .option("spark.cassandra.connection.port", "9042")
                               .option("spark.cassandra.connection.local_dc", "datacenter1")
                               .trigger(processingTime="10 seconds") # เขียนค่าทุกๆ 10 วินาที
                               .start())

            streaming_query.awaitTermination(30) # ให้หยุดแล้ว query data
            streaming_query.stop()

:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5bf06d28-90e7-4ab5-9e78-d5080ba9b157;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.5.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.5.0 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.

Keyspace created successfully!
Table created successfully!


25/09/11 04:12:50 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 2, writer: CassandraBulkWrite(org.apache.spark.sql.SparkSession@e5220bc,com.datastax.spark.connector.cql.CassandraConnector@7e2a89fa,TableDef(spark_streams,created_users,ArrayBuffer(ColumnDef(id,PartitionKeyColumn,UUIDType)),ArrayBuffer(),Stream(ColumnDef(address,RegularColumn,VarCharType), ColumnDef(email,RegularColumn,VarCharType), ColumnDef(first_name,RegularColumn,VarCharType), ColumnDef(gender,RegularColumn,VarCharType), ColumnDef(last_name,RegularColumn,VarCharType), ColumnDef(phone,RegularColumn,VarCharType), ColumnDef(picture,RegularColumn,VarCharType), ColumnDef(post_code,RegularColumn,VarCharType), ColumnDef(registered_date,RegularColumn,VarCharType), ColumnDef(username,RegularColumn,VarCharType)),Stream(),false,false,Map()),WriteConf(BytesInBatch(1024),1000,Partition,LOCAL_QUORUM,false,false,5,None,TTLOption(DefaultValue),TimestampOption(DefaultValue),true,None),StructType(Struc