In [1]:
import os
import time
import datetime
import pyspark.sql.functions as sf
from uuid import *
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark import SparkConf, SparkContext
from uuid import * 
from uuid import UUID
import time_uuid 
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window as W

In [2]:
spark = SparkSession.builder.config('spark.jars.packages','com.datastax.spark:spark-cassandra-connector_2.12:3.1.0').getOrCreate()

In [19]:
def get_latest_etl_time():
    df = spark.read.format("org.apache.spark.sql.cassandra").options(table= "cdc_tracking", keyspace='netflix_keyspace').load()
    latest_etl_time = df.agg({'last_etl_time':'max'}).take(1)[0][0]
    return latest_etl_time if latest_etl_time else None

In [20]:
def get_latest_data_time():
    data = spark.read.format("org.apache.spark.sql.cassandra").options(table='user_search_history', keyspace='netflix_keyspace').load()
    cassandra_latest_time = data.agg({'createdat':'max'}).take(1)[0][0]
    return cassandra_latest_time if cassandra_latest_time else None

In [21]:
latest_cas_time = get_latest_data_time()
latest_etl_time = get_latest_etl_time()

In [26]:
print(latest_cas_time)

2024-09-21 03:12:45.202000


In [24]:
latest_etl_time

In [28]:
if latest_etl_time is None:
    print("CDC tracking is empty. Running main task for the first load.")
else:
    if latest_cas_time and latest_etl_time:
        if latest_cas_time  > latest_etl_time:
            print("Running main task: new data found.")
        else:
            print("No new data found.")
    else:
        print("One of the timestamps is None.")

CDC tracking is empty. Running main task for the first load.


In [None]:
def maintask(latest_etl_time):
    if latest_etl_time is None:
        df = spark.read.format("org.apache.spark.sql.cassandra") \
        .options(table='user_search_history', keyspace='netflix_keyspace') \
        .load()
    else:
        df = spark.read.format("org.apache.spark.sql.cassandra") \
        .options(table='user_search_history', keyspace='netflix_keyspace') \
        .load().where(col('createdat') >= latest_etl_time)


    

In [32]:
if latest_etl_time is None:
    # If latest_etl_time is None, load all records
    df = spark.read.format("org.apache.spark.sql.cassandra") \
        .options(table='user_search_history', keyspace='netflix_keyspace') \
        .load()
else:
    # Filter records based on latest_etl_time
    df = spark.read.format("org.apache.spark.sql.cassandra") \
        .options(table='user_search_history', keyspace='netflix_keyspace') \
        .load().where(col('createdat') >= latest_etl_time)

In [33]:
df.show()

+--------------------+--------------------+--------------------+------------------+----------+
|              userid|           createdat| cassandra_timestamp|           keyword|searchtype|
+--------------------+--------------------+--------------------+------------------+----------+
|66ed749093aab795b...|2024-09-21 03:11:...|2024-09-20 20:12:...|       the witcher|        tv|
|66ed749093aab795b...|2024-09-21 03:11:...|2024-09-20 20:12:...|         chernobyl|        tv|
|66ed749093aab795b...|2024-09-21 03:11:...|2024-09-20 20:12:...|         brad pitt|    person|
|66ed749093aab795b...|2024-09-21 03:11:...|2024-09-20 20:12:...|  the walking dead|        tv|
|66ed749093aab795b...|2024-09-21 03:11:...|2024-09-20 20:12:...|           mad max|     movie|
|66ed749093aab795b...|2024-09-21 03:11:...|2024-09-20 20:12:...|  the walking dead|        tv|
|66ed749093aab795b...|2024-09-21 03:11:...|2024-09-20 20:12:...|        the office|        tv|
|66ea83e5b3b49e715...|2024-09-18 21:40:...|2024-09

In [34]:
from pyspark.sql.functions import col, year, month, dayofmonth

In [35]:
df = df.withColumn("year",year(col("createdat")))\
        .withColumn("month",month(col("createdat")))\
        .withColumn("day",dayofmonth(col("createdat")))

In [36]:
df.show()

+--------------------+--------------------+--------------------+-------------------+----------+----+-----+---+
|              userid|           createdat| cassandra_timestamp|            keyword|searchtype|year|month|day|
+--------------------+--------------------+--------------------+-------------------+----------+----+-----+---+
|66ea83e7b3b49e715...|2024-09-18 21:40:...|2024-09-18 14:40:...| the social network|     movie|2024|    9| 18|
|66ea83e7b3b49e715...|2024-09-18 21:40:...|2024-09-18 14:40:...|        johnny depp|    person|2024|    9| 18|
|66ea83e7b3b49e715...|2024-09-18 21:40:...|2024-09-18 14:40:...|      the lion king|     movie|2024|    9| 18|
|66ea83e7b3b49e715...|2024-09-18 21:40:...|2024-09-18 14:40:...|       breaking bad|        tv|2024|    9| 18|
|66ea83e7b3b49e715...|2024-09-18 21:40:...|2024-09-18 14:40:...|the big bang theory|        tv|2024|    9| 18|
|66ea83e7b3b49e715...|2024-09-18 21:40:...|2024-09-18 14:40:...|    natalie portman|    person|2024|    9| 18|
|

In [37]:
final_df = df.drop("cassandra_timestamp")

In [38]:
final_df.show()

+--------------------+--------------------+-------------------+----------+----+-----+---+
|              userid|           createdat|            keyword|searchtype|year|month|day|
+--------------------+--------------------+-------------------+----------+----+-----+---+
|66ed748493aab795b...|2024-09-21 03:11:...|    natalie portman|    person|2024|    9| 21|
|66ed748493aab795b...|2024-09-21 03:11:...|      margot robbie|    person|2024|    9| 21|
|66ed748493aab795b...|2024-09-21 03:11:...|         mindhunter|        tv|2024|    9| 21|
|66ed748493aab795b...|2024-09-21 03:11:...|             narcos|        tv|2024|    9| 21|
|66ed748493aab795b...|2024-09-21 03:11:...|          chernobyl|        tv|2024|    9| 21|
|66ed748493aab795b...|2024-09-21 03:11:...|   schindler's list|     movie|2024|    9| 21|
|66ed748493aab795b...|2024-09-21 03:11:...|            mad max|     movie|2024|    9| 21|
|66ed748493aab795b...|2024-09-21 03:11:...|saving private ryan|     movie|2024|    9| 21|
|66ed74849

In [39]:
final_df.write.partitionBy('year','month','day').mode('append').format('parquet').save("D:\PROJECT\RealTime-Batch_Datapipeline_using_Kafka_Airflow\Data")

In [40]:
etl_time = final_df.agg({'createdat':'max'}).take(1)[0][0]

In [42]:
count = final_df.count()

In [48]:
data = [Row(id = 1001, last_etl_time=etl_time, num_records_processed=count)]
df_cas = spark.createDataFrame(data)

In [49]:
df_cas.show()

+----+--------------------+---------------------+
|  id|       last_etl_time|num_records_processed|
+----+--------------------+---------------------+
|1001|2024-09-21 03:12:...|                   50|
+----+--------------------+---------------------+



In [50]:
df_cas.write.format("org.apache.spark.sql.cassandra").options(table="cdc_tracking", keyspace="netflix_keyspace").mode("append").save()

In [51]:
latest_cas_time = get_latest_data_time()
latest_etl_time = get_latest_etl_time()

In [52]:
if latest_etl_time is None:
    print("CDC tracking is empty. Running main task for the first load.")
else:
    if latest_cas_time and latest_etl_time:
        if latest_cas_time  > latest_etl_time:
            print("Running main task: new data found.")
        else:
            print("No new data found.")
    else:
        print("One of the timestamps is None.")

No new data found.
