In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession, functions as F
import warnings
warnings.filterwarnings('ignore')

In [2]:
# minio giriş bilgilerini tanımlıyorum.
accessKeyId=<user>
secretAccessKey=<password>

# SparkSession oluşturuyorum.
spark = SparkSession.builder \
.appName("Spark Example MinIO") \
.master("spark://spark-master:7077") \
.config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1") \
.getOrCreate()
# master olarak spark master'ı veriyoruz çünkü Standalone cluster da çalışırken yarn veya local diyemeyiz. 
# spark streaming uygulaması yaptığımız için spark ile kafkanın beraber çalışacağı jar dosyasını aldım.

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-878ba8f4-3c6d-41c0-add2-473c98bcb1c3;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.1 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 962ms :: artifacts dl 22ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central in [default]


In [3]:
def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.access.key', accessKeyId)
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.secret.key', secretAccessKey)
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.path.style.access', 'true')
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://minio:9000')

In [4]:
load_config(spark.sparkContext)

In [5]:
# MinIO'ya yazmak için gerekli ayarları Spark'ın içerisine tanımladım.

In [6]:
# önce Kafka'nın IP Adresini almamız gerekiyor.
# docker inspect final_project_2-kafka-1 | grep "IPAddress"

In [7]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "172.18.0.7:9092") \
  .option("subscribe", "dbserver1.public.customers") \
  .option("startingOffsets", "earliest") \
  .option("multiline","true") \
  .load()
# Bu bir streaming uygulaması olduğu için formatımız kafka, sonra Kafka'nın server adresini belirttim. 
# dbserver1.public.customers topic'ini dinleyeceğiz. offsetleri en başından itibaren alsın. 
# multiline true diyerek de json formatı olacağı için veri, option olarak belirttim.

In [8]:
# veri payload ve schema olmak üzere 2 ana dala ayrılıyor.
# payload kısmı bizim için önemli yani bu tarafı key olarak düşünebiliriz.
# before ve after ksımlarını ise value olarak düşünüp ilerliyorum. İzlememiz gereken akış bu şekildedir.
# ts_ms: kaçıncı saniyede yazıldığı yani timestamp bilgisi ve op: hangi işlem türü olduğudur. 

In [9]:
df_schema = StructType([
     StructField('schema',  StringType(), True),
     StructField('payload', StringType(), True)
     ])
# df_schema ile Kafka console'dan okuduğumuz veriyi 2 ana kolon altında şemaya giydiriyorum. 
# Her ikisini de string türünde tanımlıyorum. ilki schema, ikincisi payload. Bana lazım olan da payload.

In [10]:
df2 = df.selectExpr("cast (value as string) as json").select(F.from_json("json", schema=df_schema).alias("data")).select("data.payload")
# Console'dan gelen value kısmını string olarak cast ettim yani serialize ettim ismini de json olarak verdim. 
# Sonra bu value'nun içindeki veri json formatında olduğu için from json kullandım. 
# ve yukarıda tanımladığım şemayı giydiriyorum. En sonda da bu veri içerisinden payload kısmını select ile seçiyorum.

In [11]:
df2.printSchema()

root
 |-- payload: string (nullable = true)



In [12]:
# 2. kısım: payload içerisindeki şemaları oluşturuyorum. Value kısmı olarak düşünebiliriz.

In [13]:
message_schema = StructType([
     StructField('before', MapType(StringType(), StringType(), True), True),
     StructField('after', MapType(StringType(), StringType(), True), True),
     StructField('op', StringType(), True),
     StructField('ts_ms', StringType(), True)
     ]
)
# before kısmının kendisi sözlük tipinde içerisi ise string yapıda olduğu için bu şekilde verdim. Aynısı after içinde geçerli.
# op ve ts_ms zaten string olacak.

In [14]:
# after_fields ve before_fieldsları ise döngü yazarken kullanacaz gibi düşünelim. 
# Her ikisindeki kolonların isimlerini yazıyoruz. Bu sayede bu liste içerisindeki değerleri getirebileceğiz.

In [15]:
after_fields = [
    "customerId", "customerFName", "customerLName", "customerEmail",
    "customerPassword", "customerStreet", "customerCity", "customerState", "customerZipcode"]

In [16]:
before_fields = [
    "customerId", "customerFName", "customerLName", "customerEmail",
    "customerPassword", "customerStreet", "customerCity", "customerState", "customerZipcode"]

In [17]:
# Önemli kısım: Burda payload içerisinden value kısmını from json ile çekeceğim.
# Şemam ise yukarda tanımladığım message_schema ile de payload kısmının value'ları için şema giydiriyorum.
# payload.before ve payload.after için comprehension yapısını kullanarak bunlara ait kolonları getiriyorum. op ve ts_ms kolonlarını da şeçerek value'larımı getirmiş oluyorum.

In [18]:
df_final = df2.withColumn("payload",F.from_json(F.col("payload"), message_schema))\
.select(*[F.col("payload.before").getItem(f).alias('payload.before.'+f) for f in before_fields], \
*[F.col("payload.after").getItem(f).alias('payload.after.'+f) for f in after_fields], 'payload.op','payload.ts_ms')

In [19]:
df_final.printSchema()

root
 |-- payload.before.customerId: string (nullable = true)
 |-- payload.before.customerFName: string (nullable = true)
 |-- payload.before.customerLName: string (nullable = true)
 |-- payload.before.customerEmail: string (nullable = true)
 |-- payload.before.customerPassword: string (nullable = true)
 |-- payload.before.customerStreet: string (nullable = true)
 |-- payload.before.customerCity: string (nullable = true)
 |-- payload.before.customerState: string (nullable = true)
 |-- payload.before.customerZipcode: string (nullable = true)
 |-- payload.after.customerId: string (nullable = true)
 |-- payload.after.customerFName: string (nullable = true)
 |-- payload.after.customerLName: string (nullable = true)
 |-- payload.after.customerEmail: string (nullable = true)
 |-- payload.after.customerPassword: string (nullable = true)
 |-- payload.after.customerStreet: string (nullable = true)
 |-- payload.after.customerCity: string (nullable = true)
 |-- payload.after.customerState: string

In [20]:
# veriyi console yazıdırarak kontrol ediyorum.

In [21]:
query = df_final.writeStream\
.format("console") \
.option("truncate",False) \
.outputMode("append").start()

query.awaitTermination()

23/02/18 21:33:15 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-edfa7a3c-2fa5-4ab3-aa0d-b14544bde305. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------------+----------------------------+----------------------------+----------------------------+-------------------------------+-----------------------------+---------------------------+----------------------------+------------------------------+------------------------+---------------------------+---------------------------+---------------------------+------------------------------+----------------------------+--------------------------+---------------------------+-----------------------------+---+-------------+
|payload.before.customerId|payload.before.customerFName|payload.before.customerLName|payload.before.customerEmail|payload.before.customerPassword|payload.before.customerStreet|payload.before.customerCity|payload.before.customerState|payload.before.customerZipcode|payload.after.customerId|payload.after.customerFName|payload.after.customerLName|payload.after.custome

KeyboardInterrupt: 

# Write to Minio with Spark Streaming

In [22]:
# minio ya foreach batch şeklinde verimizi yazıyorum. Bunun için fonksiyonumuzu tanımladık.
# Formatı csv, mode'nu append şeklinde ve cdc-data bucket altında customers klasöründe değişiklik meydana gelen verilerimi tutmuş oluyorum.

In [25]:
def write_to_multiple_sinks(df, batchId):
    # write to file
    df.write\
            .format("csv") \
            .mode("append") \
            .save("s3a://cdc-data/datasets/customers")

In [26]:
# Sink
checkpointDir = "s3a://cdc-data/checkpoint/customers"
# start streaming
streamingQuery = (df_final
                  .writeStream
                  .foreachBatch(write_to_multiple_sinks)
                  .option("checkpointLocation", checkpointDir)
                  .start())

#
streamingQuery.awaitTermination()

23/02/18 21:36:00 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

KeyboardInterrupt: 

In [27]:
spark.stop()