In [3]:
from pyspark.sql import SparkSession
import os
import time
import datetime
import pyspark.sql.functions as sf
from uuid import *
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark import SparkConf, SparkContext
from uuid import * 
from uuid import UUID
import time_uuid 
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
spark= SparkSession.builder.config('spark.jars.packages','com.datastax.spark:spark-cassandra-connector_2.12:3.1.0').getOrCreate()

<h5> Đọc data từ Cassandra

In [None]:
data = spark.read.format("org.apache.spark.sql.cassandra").options(table = 'tracking',keyspace = 'study_de').load()

<b> Xử lý custom_track = click

In [None]:
def calculating_clicks(df):
    clicks_data = df.filter(df.custom_track == 'click')
    clicks_data = clicks_data.na.fill({'bid':0})
    clicks_data = clicks_data.na.fill({'job_id':0})
    clicks_data = clicks_data.na.fill({'publisher_id':0})
    clicks_data = clicks_data.na.fill({'group_id':0})
    clicks_data = clicks_data.na.fill({'campaign_id':0})
    clicks_data.registerTempTable('clicks')
    clicks_output = spark.sql("""select job_id , date(ts) as date , hour(ts) as hour , publisher_id , campaign_id , group_id , avg(bid) as bid_set, count(*) as clicks , sum(bid) as spend_hour from clicks
    group by job_id , date(ts) , hour(ts) , publisher_id , campaign_id , group_id """)
    return clicks_output 

<b> Xử lý custom_track = conversion

In [None]:
def calculating_conversion(df):
    conversion_data = df.filter(df.custom_track == 'conversion')
    conversion_data = conversion_data.na.fill({'job_id':0})
    conversion_data = conversion_data.na.fill({'publisher_id':0})
    conversion_data = conversion_data.na.fill({'group_id':0})
    conversion_data = conversion_data.na.fill({'campaign_id':0})
    conversion_data.registerTempTable('conversion')
    conversion_output = spark.sql("""select job_id , date(ts) as date , hour(ts) as hour , publisher_id , campaign_id , group_id , count(*) as conversions  from conversion
    group by job_id , date(ts) , hour(ts) , publisher_id , campaign_id , group_id """)
    return conversion_output 

<b> Xử lý custom_track = qualified

In [None]:
def calculating_qualified(df):    
    qualified_data = df.filter(df.custom_track == 'qualified')
    qualified_data = qualified_data.na.fill({'job_id':0})
    qualified_data = qualified_data.na.fill({'publisher_id':0})
    qualified_data = qualified_data.na.fill({'group_id':0})
    qualified_data = qualified_data.na.fill({'campaign_id':0})
    qualified_data.registerTempTable('qualified')
    qualified_output = spark.sql("""select job_id , date(ts) as date , hour(ts) as hour , publisher_id , campaign_id , group_id , count(*) as qualified  from qualified
    group by job_id , date(ts) , hour(ts) , publisher_id , campaign_id , group_id """)
    return qualified_output

<b> Xử lý custom_track = unqualified

In [None]:
def calculating_unqualified(df):
    unqualified_data = df.filter(df.custom_track == 'unqualified')
    unqualified_data = unqualified_data.na.fill({'job_id':0})
    unqualified_data = unqualified_data.na.fill({'publisher_id':0})
    unqualified_data = unqualified_data.na.fill({'group_id':0})
    unqualified_data = unqualified_data.na.fill({'campaign_id':0})
    unqualified_data.registerTempTable('unqualified')
    unqualified_output = spark.sql("""select job_id , date(ts) as date , hour(ts) as hour , publisher_id , campaign_id , group_id , count(*) as unqualified  from unqualified
    group by job_id , date(ts) , hour(ts) , publisher_id , campaign_id , group_id """)
    return unqualified_output

<b> Join tất cả các kết quả xử lý để nhận được thông tin full

In [None]:
def process_final_data(clicks_output,conversion_output,qualified_output,unqualified_output):
    final_data = clicks_output.join(conversion_output,['job_id','date','hour','publisher_id','campaign_id','group_id'],'full').\
    join(qualified_output,['job_id','date','hour','publisher_id','campaign_id','group_id'],'full').\
    join(unqualified_output,['job_id','date','hour','publisher_id','campaign_id','group_id'],'full')
    return final_data 

<b> Tổng hợp output để lấy kết quả cuối cùng

In [None]:
def process_cassandra_data(df):
    clicks_output = calculating_clicks(df)
    conversion_output = calculating_conversion(df)
    qualified_output = calculating_qualified(df)
    unqualified_output = calculating_unqualified(df)
    final_data = process_final_data(clicks_output,conversion_output,qualified_output,unqualified_output)
    return final_data

<b> Lấy id của company merge vào output trên để lấy được id của company cho job đó

In [None]:
def retrieve_company_data(url,driver,user,password):
    sql = """(SELECT id as job_id, company_id, group_id, campaign_id FROM job) test"""
    company = spark.read.format('jdbc').options(url=url, driver=driver, dbtable=sql, user=user, password=password).load()
    return company 

In [None]:
def cassandra_output(df):
    cassandra_output = process_cassandra_data(df)
    company = retrieve_company_data(url = "jdbc:mysql://localhost:3306/data_engineering",driver = "com.mysql.cj.jdbc.Driver",user = 'root',password = '1') 
    final_output = cassandra_output.join(company,'job_id','left').drop(company.group_id).drop(company.campaign_id)
    return final_output

<b> Bắn final output trên vào Kafka

In [None]:
scala_version = '2.12'  # TODO: Ensure this is correct
spark_version = '3.0.3'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.0.1'
]

In [None]:
spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-ETL")\
   .config("spark.jars.packages", ",".join(packages))\
   .getOrCreate()

<b> Đọc lại data từ Cassandra rồi lấy hàm xử lý output đê xử lý data

In [None]:
cassandra_df = spark.readStream \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="tracking", keyspace="study_de") \
    .load()

In [None]:
cassandra_df = cassandra_df.select('ts','job_id','custom_track','bid','campaign_id','group_id','publisher_id')
cassandra_df = cassandra_df.filter(cassandra_df.job_id.isNotNull())

<b> Biến đổi data từ cassandra về dạng key:value

In [None]:
cassandra_df = cassandra_df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")

Load data vào kafka

In [None]:
cassandra_df.write.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("topic", "project_ETL").save()

<b> Đọc data từ kafka

In [None]:
kf_read = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "project_ETL") \
  .load()

In [None]:
kf_read.show()

In [None]:
final_data = kf_read.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

<b> Lọc ra giá trị khác null

In [None]:
result = final_data.select('value').filter(final_data.key.isNotNull())

<b> Biến đổi data từ kafka về lại dataframe để đúng với cấu trúc bảng load vào MySQL

In [None]:
columns = StructType([StructField('company_id', StringType(), True),
                      StructField('job_id', StringType(), True),
                      StructField('dates', StringType(), True),
                      StructField('hours', StringType(), True),
                      StructField('publisher_id', StringType(), True),
                      StructField('campaign_id', StringType(), True),
                      StructField('group_id', StringType(), True),
                      StructField('bid_set', StringType(), True),
                      StructField('clicks', StringType(), True),
                      StructField('conversion', StringType(), True),
                      StructField('qualified_application', StringType(), True),
                      StructField('disqualified_application', StringType(), True)])

In [None]:
final_result_kafka = result.withColumn('c1', F.from_json('value', schema = columns)).select('c1.*')

<b> Load Data vào mysql

In [None]:
final_result_kafka.write.format('jdbc').option('url','jdbc:mysql://localhost:3306/data_engineering')\
                                     .option('driver','com.mysql.cj.jdbc.Driver')\
                                     .option('dbtable','event_DW')\
                                     .option('user','root')\
                                     .option('password','1').mode('append').save()