Pyspark connect to Cassandra

In [142]:
from pyspark.sql import SparkSession


In [143]:
spark = SparkSession.builder.config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.1.0').getOrCreate()

In [144]:
spark_df = spark.read.format("org.apache.spark.sql.cassandra").options(table='tracking',keyspace='study_data_engineering').load()

ETL data

In [145]:
spark_df.printSchema()

root
 |-- create_time: string (nullable = false)
 |-- bid: double (nullable = true)
 |-- bn: string (nullable = true)
 |-- campaign_id: double (nullable = true)
 |-- cd: double (nullable = true)
 |-- custom_track: string (nullable = true)
 |-- de: string (nullable = true)
 |-- dl: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- ed: string (nullable = true)
 |-- ev: double (nullable = true)
 |-- group_id: double (nullable = true)
 |-- id: string (nullable = true)
 |-- job_id: double (nullable = true)
 |-- md: string (nullable = true)
 |-- publisher_id: double (nullable = true)
 |-- rl: string (nullable = true)
 |-- sr: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- tz: double (nullable = true)
 |-- ua: string (nullable = true)
 |-- uid: string (nullable = true)
 |-- utm_campaign: string (nullable = true)
 |-- utm_content: string (nullable = true)
 |-- utm_medium: string (nullable = true)
 |-- utm_source: string (nullable = true)
 |-- utm_term: string (nu

In [146]:
spark_df.show()

+--------------------+----+----------+-----------+----+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+--------------------+---------+--------------------+------+--------------------+-------------------+------------+-----------+----------+----------+--------+---+--------+
|         create_time| bid|        bn|campaign_id|  cd|custom_track|   de|                  dl|             dt|                  ed| ev|group_id|  id|job_id|  md|publisher_id|                  rl|       sr|                  ts|    tz|                  ua|                uid|utm_campaign|utm_content|utm_medium|utm_source|utm_term|  v|      vp|
+--------------------+----+----------+-----------+----+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+--------------------+---------+--------------------+------+--------------------+-------------------+------------+-----------+----------

In [147]:
data = spark_df.select('ts','job_id','custom_track','bid','campaign_id','group_id','publisher_id')
data = data.filter(data.job_id.isNotNull())
data.show()

+--------------------+------+------------+---+-----------+--------+------------+
|                  ts|job_id|custom_track|bid|campaign_id|group_id|publisher_id|
+--------------------+------+------------+---+-----------+--------+------------+
|2022-07-27 04:04:...| 336.0|        NULL|1.0|       15.0|    NULL|         1.0|
|2022-07-26 02:56:...|1530.0|        NULL|0.0|      222.0|    NULL|         1.0|
|2022-07-26 03:03:...|1534.0|        NULL|0.0|      222.0|    NULL|         1.0|
|2022-07-25 09:49:...|1534.0|       click|0.0|      222.0|    NULL|         1.0|
|2022-07-24 14:17:...| 188.0|        NULL|1.0|       48.0|    34.0|         1.0|
|2022-07-26 02:47:...| 336.0|       click|1.0|       15.0|    NULL|         1.0|
|2022-07-24 10:06:...| 188.0|        NULL|1.0|       48.0|    34.0|         1.0|
|2022-07-26 02:48:...| 336.0|       click|1.0|       15.0|    NULL|         1.0|
|2022-07-25 09:59:...|1527.0|       click|0.0|      222.0|    NULL|         1.0|
|2022-07-25 09:49:...|1534.0

In [148]:
data.select("custom_track").distinct().show()

+------------+
|custom_track|
+------------+
|  conversion|
|       click|
| unqualified|
|   qualified|
|       alive|
|        NULL|
+------------+



Process click data 

In [149]:
click_data = data.filter(data.custom_track == 'click')
click_data.createOrReplaceTempView('click_data')
click_data = spark.sql(""" 
            SELECT 
                date(ts) as date,
                hour(ts) as hour,
                job_id,
                publisher_id,
                campaign_id,
                group_id,
                round(avg(bid),2) as bid_set, 
                sum(bid) as spend_hour, 
                count(*) as click 
            FROM click_data 
            GROUP BY date(ts), hour(ts), job_id, publisher_id, campaign_id, group_id """
        )

Process conversion data

In [150]:

conversion_data = data.filter(data.custom_track == 'conversion')
conversion_data.createOrReplaceTempView('conversion_data')
conversion_data = spark.sql(""" 
            SELECT 
                date(ts) as date,
                hour(ts) as hour,
                job_id,
                publisher_id,
                campaign_id,
                group_id, 
                count(*) as conversion 
            FROM conversion_data 
            GROUP BY date(ts), hour(ts), job_id, publisher_id, campaign_id, group_id """
        )

Process qualified data

In [151]:

qualified_data = data.filter(data.custom_track == 'qualified')
qualified_data.createOrReplaceTempView('qualified_data')
qualified_data = spark.sql(""" 
            SELECT 
                date(ts) as date,
                hour(ts) as hour,
                job_id,
                publisher_id,
                campaign_id,
                group_id, 
                count(*) as qualified 
            FROM conversion_data 
            GROUP BY date(ts), hour(ts), job_id, publisher_id, campaign_id, group_id """
        )

Process unqualified data 

In [152]:

unqualified_data = data.filter(data.custom_track == 'unqualified')
unqualified_data.createOrReplaceTempView('unqualified_data')
unqualified_data = spark.sql(""" 
            SELECT 
                date(ts) as date,
                hour(ts) as hour,
                job_id,
                publisher_id,
                campaign_id,
                group_id, 
                count(*) as unqualified 
            FROM conversion_data 
            GROUP BY date(ts), hour(ts), job_id, publisher_id, campaign_id, group_id """
        )

Finalize output full join

In [153]:
result = click_data.join(conversion_data, ['date','hour','job_id','publisher_id','campaign_id','group_id'], how='outer')
result = result.join(qualified_data, ['date','hour','job_id','publisher_id','campaign_id','group_id'], how='outer')
result = result.join(unqualified_data, ['date','hour','job_id','publisher_id','campaign_id','group_id'], how='outer')
result.show()


+----------+----+------+------------+-----------+--------+-------+----------+-----+----------+---------+-----------+
|      date|hour|job_id|publisher_id|campaign_id|group_id|bid_set|spend_hour|click|conversion|qualified|unqualified|
+----------+----+------+------------+-----------+--------+-------+----------+-----+----------+---------+-----------+
|2022-07-07|   3| 187.0|         1.0|       48.0|    NULL|   0.33|       2.0|    6|      NULL|     NULL|       NULL|
|2022-07-08|   4| 187.0|         1.0|       48.0|    NULL|    0.0|       0.0|    2|      NULL|     NULL|       NULL|
|2022-07-08|   6| 187.0|         1.0|       48.0|    NULL|    2.0|      12.0|    6|      NULL|     NULL|       NULL|
|2022-07-08|   9|  98.0|         1.0|        4.0|    NULL|    2.0|     216.0|  108|      NULL|     NULL|       NULL|
|2022-07-08|   9| 258.0|         3.0|       93.0|    NULL|    2.0|       2.0|    1|      NULL|     NULL|       NULL|
|2022-07-13|  15|  98.0|         1.0|        4.0|    NULL|    2.

Connect MySQL get companyid

In [154]:
df_company = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/Data_Warehouse") \
    .option("dbtable", "company") \
    .option("user", "root") \
    .option("password", "1") \
    .load()

df_company.show()

+---+----------+--------------------+----------------+--------------------+---------+-----------------+-----------------+------------------+--------------------+----------+-----------+-------------+------+------------+---------+-------------------+--------------------+
| id|created_by|        created_date|last_modified_by|  last_modified_date|is_active|             name|is_agree_conditon|is_agree_sign_deal|      sign_deal_user|billing_id|manage_type|customer_type|status|publisher_id|flat_rate|percentage_of_click|                logo|
+---+----------+--------------------+----------------+--------------------+---------+-----------------+-----------------+------------------+--------------------+----------+-----------+-------------+------+------------+---------+-------------------+--------------------+
|  1|    myuser| 2022-04-14 02:53:36|          myuser| 2022-07-26 15:58:30|        1|           GOTORO|                1|                 1|trananhtuan2495@g...|      NULL|          2|      

Merge company_id vào output 

In [155]:

data_company = df_company.select('id', 'created_date')
data_company.createOrReplaceTempView('company')
data_company = spark.sql(""" 
                            SELECT 
                            id as company_id, 
                            date(created_date) as date,
                            hour(created_date) as hour
                            FROM company 
                        """)

In [156]:
result = result.join(data_company, ['date','hour'], how='left')
result.show()

+----------+----+------+------------+-----------+--------+-------+----------+-----+----------+---------+-----------+----------+
|      date|hour|job_id|publisher_id|campaign_id|group_id|bid_set|spend_hour|click|conversion|qualified|unqualified|company_id|
+----------+----+------+------------+-----------+--------+-------+----------+-----+----------+---------+-----------+----------+
|2022-07-07|   3| 187.0|         1.0|       48.0|    NULL|   0.33|       2.0|    6|      NULL|     NULL|       NULL|      NULL|
|2022-07-08|   4| 187.0|         1.0|       48.0|    NULL|    0.0|       0.0|    2|      NULL|     NULL|       NULL|      NULL|
|2022-07-08|   6| 187.0|         1.0|       48.0|    NULL|    2.0|      12.0|    6|      NULL|     NULL|       NULL|      NULL|
|2022-07-08|   9|  98.0|         1.0|        4.0|    NULL|    2.0|     216.0|  108|      NULL|     NULL|       NULL|      NULL|
|2022-07-08|   9| 258.0|         3.0|       93.0|    NULL|    2.0|       2.0|    1|      NULL|     NULL|

THỰC HIỆN CDC

In [165]:
from pyspark.sql import functions as F

last_update_time = data.select(F.max("ts")).collect()[0][0]
result = result.withColumn("Last_Updated_At", F.lit(last_update_time))
result.show()

+----------+----+------+------------+-----------+--------+-------+----------+-----+----------+---------+-----------+----------+-------------------+
|      date|hour|job_id|publisher_id|campaign_id|group_id|bid_set|spend_hour|click|conversion|qualified|unqualified|company_id|    Last_Updated_At|
+----------+----+------+------------+-----------+--------+-------+----------+-----+----------+---------+-----------+----------+-------------------+
|2022-07-07|   3| 187.0|         1.0|       48.0|    NULL|   0.33|       2.0|    6|      NULL|     NULL|       NULL|      NULL|2023-01-14 21:56:05|
|2022-07-08|   4| 187.0|         1.0|       48.0|    NULL|    0.0|       0.0|    2|      NULL|     NULL|       NULL|      NULL|2023-01-14 21:56:05|
|2022-07-08|   6| 187.0|         1.0|       48.0|    NULL|    2.0|      12.0|    6|      NULL|     NULL|       NULL|      NULL|2023-01-14 21:56:05|
|2022-07-08|   9|  98.0|         1.0|        4.0|    NULL|    2.0|     216.0|  108|      NULL|     NULL|       N

In [169]:
mysql_updated_date = result.select(F.max('Last_Updated_At')).collect()[0][0]
cassandra_updated_date = data.select(F.max('ts')).collect()[0][0]
data = data.filter(data.ts > mysql_updated_date)
data.show()

+--------------------+------+------------+----+-----------+--------+------------+
|                  ts|job_id|custom_track| bid|campaign_id|group_id|publisher_id|
+--------------------+------+------------+----+-----------+--------+------------+
|2024-06-15 04:44:...|9999.0|       click|NULL|     9999.0|  9999.0|      9999.0|
+--------------------+------+------------+----+-----------+--------+------------+



In [157]:
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/Data_Warehouse") \
    .option("dbtable", "events") \
    .option("user", "root") \
    .option("password", "1") \
    .load()

In [158]:
df.show()

+----+------+----------+-----+------------------------+---------------------+----------+----------+--------+-----------+------------+-------+------+-----------+----------+---------+
|  id|job_id|     dates|hours|disqualified_application|qualified_application|conversion|company_id|group_id|campaign_id|publisher_id|bid_set|clicks|impressions|spend_hour|  sources|
+----+------+----------+-----+------------------------+---------------------+----------+----------+--------+-----------+------------+-------+------+-----------+----------+---------+
|2089|    98|2022-07-08|    9|                       1|                    0|         1|         1|    NULL|          4|           1|    2.0|   108|       NULL|     216.0|Cassandra|
|2090|    98|2022-07-13|   15|                       1|                    0|         1|         1|    NULL|          4|           1|    2.0|     1|       NULL|       2.0|Cassandra|
|2091|   187|2022-07-08|    4|                       1|                    0|         1|  