In [1]:
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, struct, udf,to_json,from_json
from pyspark.ml.linalg import Vectors,DenseVector,SparseVector
from pyspark.ml.feature import SQLTransformer,VectorAssembler
import joblib as jb
import numpy as np
import xgboost
import fastavro
import json
import io
from confluent_kafka.schema_registry import SchemaRegistryClient
from pyspark.sql.avro.functions import from_avro,to_avro

In [2]:
spark = SparkSession.builder \
    .appName("test") \
    .master("local[*]") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-avro_2.12:3.5.5,"
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5,"
            "com.datastax.spark:spark-cassandra-connector_2.12:3.3.0") \
    .config("spark.cassandra.connection.host", "host.docker.internal") \
    .config("spark.cassandra.connection.port", "9042") \
    .getOrCreate()
spark

:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d6705973-ed7a-406d-9e7c-56538ba02d39;1.0
	confs: [default]
	found org.apache.spark#spark-avro_2.12;3.5.5 in central
	found org.tukaani#xz;1.9 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.5 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.5 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#co

# Importing trainnig features stats "mode,mean,deviation" and categories

- **Credit data features stats**

- mean and deviation

In [3]:
df_credit_data_mean_deviation=spark.read.csv("../csv/credit_data_mean_scale.csv",header=True,inferSchema=True)
df_credit_data_mean_deviation.show()
df_credit_data_mean_deviation.printSchema()

                                                                                

+-----+------------------+------------------+------------------+
| info|            amount|     oldbalanceOrg|    newbalanceOrig|
+-----+------------------+------------------+------------------+
| mean| 627408.4007645844|1097893.0927459989| 610851.3635214248|
|scale|1658502.6666142726| 3132228.697963737|2628933.6624411293|
+-----+------------------+------------------+------------------+

root
 |-- info: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)



- mode

In [4]:
df_credit_data_mode=spark.read.csv("../csv/credit_data_mode.csv",header=True,inferSchema=True)
df_credit_data_mode.show()
df_credit_data_mode.printSchema()

+----+--------+
|info|    type|
+----+--------+
|mode|CASH_OUT|
+----+--------+

root
 |-- info: string (nullable = true)
 |-- type: string (nullable = true)



- Categories

In [5]:
with open("../json/credit_data_categorical_values.json","r") as f:
    credit_data_categorical_encoder_dict=json.load(f)
credit_data_categorical_encoder_dict

{'type': ['CASH_OUT', 'DEBIT', 'PAYMENT', 'TRANSFER']}

- **Insurance Data**

- mean and deviation

In [6]:
df_insurance_data_mean_deviation=spark.read.csv("../csv/insurance_data_mean_scale.csv",header=True,inferSchema=True)
df_insurance_data_mean_deviation.show()
df_insurance_data_mean_deviation.printSchema()

+-----+-------------------+-------------------+-------------------+------------------+-------------------+
| info|     marital_status|witness_present_ind| high_education_ind|past_num_of_claims| address_change_ind|
+-----+-------------------+-------------------+-------------------+------------------+-------------------+
| mean| 0.7152631578947368|0.23467105263157895| 0.6966447368421053|0.5023684210526316| 0.5731578947368421|
|scale|0.45128901255535303|0.42379305054279226|0.45970734981322536|0.9544189034241184|0.49461896692067236|
+-----+-------------------+-------------------+-------------------+------------------+-------------------+

root
 |-- info: string (nullable = true)
 |-- marital_status: double (nullable = true)
 |-- witness_present_ind: double (nullable = true)
 |-- high_education_ind: double (nullable = true)
 |-- past_num_of_claims: double (nullable = true)
 |-- address_change_ind: double (nullable = true)



In [7]:
df_insurance_data_mode=spark.read.csv("../csv/insurance_data_mode.csv",header=True,inferSchema=True)
df_insurance_data_mode.show()
df_insurance_data_mode.printSchema()

+----+------+-------+-------------+-------------+----------------+
|info|gender|channel|accident_site|living_status|vehicle_category|
+----+------+-------+-------------+-------------+----------------+
|mode|     M| Broker|        Local|          Own|         Compact|
+----+------+-------+-------------+-------------+----------------+

root
 |-- info: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- accident_site: string (nullable = true)
 |-- living_status: string (nullable = true)
 |-- vehicle_category: string (nullable = true)



- Categories

In [8]:
with open("../json/insurance_data_categorical_values.json","r") as f:
    insurance_data_categorical_encoder_dict=json.load(f)
insurance_data_categorical_encoder_dict

{'gender': ['M'],
 'channel': ['Phone'],
 'accident_site': ['Parking_Lot'],
 'living_status': ['Rent'],
 'vehicle_category': ['Medium']}

# Loading the Models

In [9]:
credit_model_path="../models/credit_fraud_detection_model.pkl"
insurance_model_path="../models/insurance_fraud_detection_model.pkl"

In [10]:
with open(credit_model_path, "rb") as credit_model_file:
    credit_model = jb.load(credit_model_file)
credit_model

In [11]:
with open(insurance_model_path,"rb") as insurance_model_file:
    insurance_model=jb.load(insurance_model_file)
insurance_model

# Creating data pipelines and preprocessors  for each topic

- Creating custom function to process and transform data

In [12]:
def features_infos_to_map(df_mean_deviation=None,df_mode=None):
    if (df_mean_deviation==None) | (df_mode==None):
        raise ValueError("Null dataFrames cant be accepted !")
    mean_scale_map={
        col:{
            'mean': df_mean_deviation.filter(df_mean_deviation['info'] == 'mean').select(col).collect()[0][0],
            'deviation': df_mean_deviation.filter(df_mean_deviation['info'] == 'scale').select(col).collect()[0][0]
        } for col in df_mean_deviation.columns[1:]
    }
    mode_map={
        col : {
            "mode":df_mode.filter(df_mode["info"]=='mode').select(col).collect()[0][0],
        } for col in df_mode.columns[1:] 
    }
    featurs_stats={**mean_scale_map,**mode_map}
    return featurs_stats


def map_credit_features_infos(features_stats_dict=None,info=None,column=None):
    valid_columns=list(features_stats_dict.keys())
    valid_infos=['mean', 'deviation',"mode"]
    if column not in valid_columns:
        raise ValueError(f"Invalid column name. Expected one of {valid_columns}, got '{column}'")
    if info not in valid_infos:
        raise ValueError(f"Invalid info name. Expected one of {valid_infos}, got '{info}'")
    if info not in features_stats_dict[column].keys():
        raise ValueError(f"Invalid info name. Expected one of {list(features_stats_dict[column].keys())}, got '{info}'")
    return features_stats_dict[column][info]

# Custom inputer function
def custom_imputer(num_cols=[],cat_cols=[],features_stats_dict=None):
    imputer_sql_expressions=[
        f"COALESCE({col},{map_credit_features_infos(column=col,info='mean',features_stats_dict=features_stats_dict)}) as imputed_{col}" 
        for col in num_cols
    ]+[
        f"COALESCE({col},'{map_credit_features_infos(column=col,info='mode',features_stats_dict=features_stats_dict)}') as imputed_{col}" 
        for col in cat_cols
    ]
    imputer_sql_expression=f"SELECT *,{','.join(imputer_sql_expressions)} FROM __THIS__"
    print(imputer_sql_expression)
    return SQLTransformer(statement=imputer_sql_expression)

# custom scaler
def custom_scaler(scale_cols=None,features_stats_dict=None):
    scaler_sql_expressions=[
        f"(imputed_{col}-{map_credit_features_infos(column=col,info='mean',features_stats_dict=features_stats_dict)})/{map_credit_features_infos(column=col,info='deviation',features_stats_dict=features_stats_dict)} as scaled_{col}"
        for col in scale_cols
    ]
    scaler_sql_expression=f"SELECT *,{','.join(scaler_sql_expressions)} FROM __THIS__"
    print(scaler_sql_expression)
    return SQLTransformer(statement=scaler_sql_expression)

# custom hot encoder
def custom_hot_encoder(encoder_categories_dict=None):
    encoders=[]
    for col,categories in encoder_categories_dict.items():
        encoder_sql_expressions=[
            f"CASE WHEN imputed_{col}='{cat}' THEN 1 ELSE 0 END as {col}_{cat}"
            for cat in categories
        ]
        encoder_sql_expression=f"SELECT *,{','.join(encoder_sql_expressions)} FROM __THIS__"
        print(encoder_sql_expression)
        encoder=SQLTransformer(statement=encoder_sql_expression)
        encoders.append(encoder)
    return encoders

- Creating data pipline

In [13]:
def create_data_pipline(df=None,df_mean_deviation=None,df_mode=None,encoder_categories_dict=None,output_features=None):
    
    features_stats_dict=features_infos_to_map(
        df_mode=df_mode,
        df_mean_deviation=df_mean_deviation
    )
    
    # Filling missing values 
    num_cols=df_mean_deviation.columns[1:]
    cat_cols=df_mode.columns[1:]
    imputer=custom_imputer(
        num_cols=num_cols,
        cat_cols=cat_cols,
        features_stats_dict=features_stats_dict
    )
    
    # Scaling data
    scale_cols=df_mean_deviation.columns[1:]
    scaler=custom_scaler(
        scale_cols=scale_cols,
        features_stats_dict=features_stats_dict
    )
    
    # Encoding categorical values
    cat_encoders=custom_hot_encoder(
        encoder_categories_dict=encoder_categories_dict
    )

    # Assembling all features in one vectore
    assembler=VectorAssembler(
        inputCols=output_features,
        outputCol="features"
    )

    # Creating the pipeline
    pipline=Pipeline(stages=[imputer]+[scaler]+cat_encoders+[assembler])
    return pipline

- Testing the pipline on credit data

In [14]:
credit_data = [
    ("DEBIT",100.,5000.,4000.),
    ("TRANSFER",2000.,6000.,4000.),
    ("CASH_OUT",70000.,2000.,1500.),
    ("TRANSFER",None,7000.,5000.), 
    (None,1500.,None,3000.) 
]

credit_data_schema = StructType([
    StructField("type",StringType(),True),
    StructField("amount",DoubleType(),True),
    StructField("oldbalanceOrg",DoubleType(),True),
    StructField("newbalanceOrig",DoubleType(),True),
])

df_test_credit_data=spark.createDataFrame(
    credit_data, 
    schema=credit_data_schema
)
print("--------------Raw Credit Data")
df_test_credit_data.show()


credit_data_output_features=["scaled_amount","scaled_oldbalanceOrg","scaled_newbalanceOrig","type_CASH_OUT","type_DEBIT","type_PAYMENT","type_TRANSFER"]

credit_data_pipline=create_data_pipline(
    df=df_test_credit_data,
    df_mean_deviation=df_credit_data_mean_deviation,
    df_mode=df_credit_data_mode,
    encoder_categories_dict=credit_data_categorical_encoder_dict,
    output_features=credit_data_output_features
)

transformed_credit_data=credit_data_pipline.fit(df_test_credit_data).transform(df_test_credit_data)
print("--------------Transformed Credit Data")
transformed_credit_data.drop("amount","oldbalanceOrg","newbalanceOrig").show()

print("--------------Credit Data features'Vectors'")
transformed_credit_data.select("features").collect()

--------------Raw Credit Data


                                                                                

+--------+-------+-------------+--------------+
|    type| amount|oldbalanceOrg|newbalanceOrig|
+--------+-------+-------------+--------------+
|   DEBIT|  100.0|       5000.0|        4000.0|
|TRANSFER| 2000.0|       6000.0|        4000.0|
|CASH_OUT|70000.0|       2000.0|        1500.0|
|TRANSFER|   NULL|       7000.0|        5000.0|
|    NULL| 1500.0|         NULL|        3000.0|
+--------+-------+-------------+--------------+

SELECT *,COALESCE(amount,627408.4007645844) as imputed_amount,COALESCE(oldbalanceOrg,1097893.0927459989) as imputed_oldbalanceOrg,COALESCE(newbalanceOrig,610851.3635214248) as imputed_newbalanceOrig,COALESCE(type,'CASH_OUT') as imputed_type FROM __THIS__
SELECT *,(imputed_amount-627408.4007645844)/1658502.6666142726 as scaled_amount,(imputed_oldbalanceOrg-1097893.0927459989)/3132228.697963737 as scaled_oldbalanceOrg,(imputed_newbalanceOrig-610851.3635214248)/2628933.6624411293 as scaled_newbalanceOrig FROM __THIS__
SELECT *,CASE WHEN imputed_type='CASH_OUT' THE

[Row(features=DenseVector([-0.3782, -0.3489, -0.2308, 0.0, 1.0, 0.0, 0.0])),
 Row(features=DenseVector([-0.3771, -0.3486, -0.2308, 0.0, 0.0, 0.0, 1.0])),
 Row(features=DenseVector([-0.3361, -0.3499, -0.2318, 1.0, 0.0, 0.0, 0.0])),
 Row(features=SparseVector(7, {1: -0.3483, 2: -0.2305, 6: 1.0})),
 Row(features=SparseVector(7, {0: -0.3774, 2: -0.2312, 3: 1.0}))]

In [15]:
insurance_schema=StructType(
    [
        StructField('high_education_ind',IntegerType(),True),
        StructField('past_num_of_claims',IntegerType(),True),
        StructField('gender',StringType(),True),
        StructField('address_change_ind',IntegerType(),True),
        StructField('witness_present_ind',IntegerType(),True),
        StructField('marital_status',IntegerType(),True),
        StructField('channel',StringType(),True),
        StructField('accident_site',StringType(),True),
        StructField('living_status',StringType(),True),
        StructField('vehicle_category',StringType(),True),
    ]
)

insurance_data = [
    (None, 2, "M", 0, 1, 1, "Broker", None, "Own", "Compact"),
    (1, 0, "F", 1, 0, 0, None, "Local", "Rent", None),
    ( 0, 1, "M", 0, 1, 1, "Online", "Parking Lot", "Own", "Large"),
    ( 1, 0, "F", 1, 0, 0, "Phone", "Local", "Rent", "Medium"),
    (0, None, "M", 1, 1, 1, "Broker", "Highway", None, "Compact"),
    ( 1, 1, "F", None, 0, 0, "Online", "Parking Lot", "Own", "Medium"),
    (0, 2, "M", 1, None, 1, None, "Local", "Rent", "Large"),
    ( 1, 0, "F", 0, 0, None, "Broker", "Highway", "Own", "Compact"),
]

print("--------------Raw insurance Data")
df_insurance_test = spark.createDataFrame(insurance_data, insurance_schema)
df_insurance_test.show()

output_features=["scaled_high_education_ind","scaled_past_num_of_claims","gender_M","scaled_address_change_ind",
                   "scaled_witness_present_ind","scaled_marital_status","channel_Phone","accident_site_Parking_Lot","living_status_Rent",
                   "vehicle_category_Medium"
                  ]
print("--------------Transformed insurance Data")
insurance_data_pipeline=create_data_pipline(
    df=df_insurance_test, 
    df_mean_deviation=df_insurance_data_mean_deviation,
    df_mode=df_insurance_data_mode,
    encoder_categories_dict=insurance_data_categorical_encoder_dict,
    output_features=output_features
)

insurance_pipeline_model=insurance_data_pipeline.fit(df_insurance_test)
transformed_insurance_df=insurance_pipeline_model.transform(df_insurance_test)

print("--------------Insurance Data features'Vectors'")
transformed_insurance_df.drop(
    "high_education_ind",
    "past_num_of_claims",
    "gender",
    "address_change_ind",
    "witness_present_ind",
    "marital_status",
    "channel",
    "accident_site",
    "living_status",
    "vehicle_category",
    "features"
).show()
transformed_insurance_df.select("features").collect() 

--------------Raw insurance Data
+------------------+------------------+------+------------------+-------------------+--------------+-------+-------------+-------------+----------------+
|high_education_ind|past_num_of_claims|gender|address_change_ind|witness_present_ind|marital_status|channel|accident_site|living_status|vehicle_category|
+------------------+------------------+------+------------------+-------------------+--------------+-------+-------------+-------------+----------------+
|              NULL|                 2|     M|                 0|                  1|             1| Broker|         NULL|          Own|         Compact|
|                 1|                 0|     F|                 1|                  0|             0|   NULL|        Local|         Rent|            NULL|
|                 0|                 1|     M|                 0|                  1|             1| Online|  Parking Lot|          Own|           Large|
|                 1|                 0|    

25/04/07 13:07:14 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


--------------Insurance Data features'Vectors'
+----------------------+---------------------------+--------------------------+--------------------------+--------------------------+--------------+---------------+---------------------+---------------------+------------------------+---------------------+--------------------------+-------------------------+-------------------------+-------------------------+--------+-------------+-------------------------+------------------+-----------------------+
|imputed_marital_status|imputed_witness_present_ind|imputed_high_education_ind|imputed_past_num_of_claims|imputed_address_change_ind|imputed_gender|imputed_channel|imputed_accident_site|imputed_living_status|imputed_vehicle_category|scaled_marital_status|scaled_witness_present_ind|scaled_high_education_ind|scaled_past_num_of_claims|scaled_address_change_ind|gender_M|channel_Phone|accident_site_Parking_Lot|living_status_Rent|vehicle_category_Medium|
+----------------------+-----------------------

[Row(features=SparseVector(10, {1: 1.5692, 2: 1.0, 3: -1.1588, 4: 1.8059, 5: 0.6309})),
 Row(features=DenseVector([0.6599, -0.5264, 0.0, 0.863, -0.5537, -1.5849, 0.0, 0.0, 1.0, 0.0])),
 Row(features=DenseVector([-1.5154, 0.5214, 1.0, -1.1588, 1.8059, 0.6309, 0.0, 0.0, 0.0, 0.0])),
 Row(features=DenseVector([0.6599, -0.5264, 0.0, 0.863, -0.5537, -1.5849, 1.0, 0.0, 1.0, 1.0])),
 Row(features=SparseVector(10, {0: -1.5154, 2: 1.0, 3: 0.863, 4: 1.8059, 5: 0.6309})),
 Row(features=SparseVector(10, {0: 0.6599, 1: 0.5214, 4: -0.5537, 5: -1.5849, 9: 1.0})),
 Row(features=DenseVector([-1.5154, 1.5692, 1.0, 0.863, 0.0, 0.6309, 0.0, 0.0, 1.0, 0.0])),
 Row(features=SparseVector(10, {0: 0.6599, 1: -0.5264, 3: -1.1588, 4: -0.5537}))]

# Kafka Confuguration

In [16]:
kafka_broker="host.docker.internal:9092"
shcema_registry="http://host.docker.internal:8081"
insurance_input_topic="raw_insurance_data"
insurance_output_topic="processed_insurance"
credit_input_topic="raw_credit_data"
credit_output_topic="processed_credit_data"

# Defining shcemas for kafka topics

- Raw credit data schema

In [17]:
raw_credit_data_schema=StructType(
    [
        StructField('client_id',StringType(),False),
        StructField('transaction_id',StringType(),False),
        StructField("type",StringType(),True),
        StructField("amount",DoubleType(),True),
        StructField("oldbalanceOrg",DoubleType(),True),
        StructField("newbalanceOrig",DoubleType(),True), 
    ]
)

- Raw insurance data schema

In [18]:
raw_insurance_data_schema=StructType(
    [
        StructField('client_id',StringType(),False),
        StructField('transaction_id',StringType(),False),
        StructField('high_education_ind',IntegerType(),True),
        StructField('past_num_of_claims',IntegerType(),True),
        StructField('gender',StringType(),True),
        StructField('address_change_ind',IntegerType(),True),
        StructField('witness_present_ind',IntegerType(),True),
        StructField('marital_status',IntegerType(),True),
        StructField('channel',StringType(),True),
        StructField('accident_site',StringType(),True),
        StructField('living_status',StringType(),True),
        StructField('vehicle_category',StringType(),True),    ]
)

# Stream processing

In [19]:
# Function that fetch schema from schema regitry if it's working ,if not it load it the last used one from ../schemas 
def fetch_schema(schema_regitry_url=None,topic=None):
    schema_registry_client = SchemaRegistryClient({"url": schema_regitry_url})
    try:
        latest_schema=schema_registry_client.get_latest_version(f"{topic}-value")
        schema_str=latest_schema.schema.schema_str
        
        schema_json=json.loads(schema_str)
        with open(f"../schemas/hard_coded_{topic}_schema.avsc","w") as f:
            json.dump(schema_json,f)
        print(f"Successfully fetched and stored schema for {topic}-value")
        return schema_json
    except Exception as e:
        with open(f"../schemas/hard_coded_{topic}_schema.avsc","r") as f:
            schema_json=json.load(f)
        return schema_json

# Function to deserialize Avro data using fastavro
def deserialize_avro(avro_bytes, schema):
    try:
        bytes_io = io.BytesIO(avro_bytes)
        reader = fastavro.reader(bytes_io, schema)
        for record in reader:
            return record  
    except Exception as e:
        print(f"Error deserializing Avro message: {e}")
        return None

# Register UDF for deserialization
def create_udf_for_deserialization(avro_schema):
    def deserialize_udf(avro_bytes):
        if avro_bytes is not None:
            record = deserialize_avro(avro_bytes, avro_schema)
            return json.dumps(record) if record else None
        return None
    return udf(deserialize_udf,StringType())

def create_udf_for_prediction_serialization(avro_schema):
    def serialize_avro(fraud):
        row_dict = {"prediction": fraud}
        buff = io.BytesIO()
        fastavro.writer(buff, avro_schema, [row_dict])
        return buff.getvalue()  
    return udf(serialize_avro, BinaryType())  

def create_udf_for_prediction(ml_model):
    def predict(vector):
        if isinstance(vector,DenseVector):
            vector= np.array(vector)
        elif isinstance(vector,SparseVector):
            vector= np.array(vector.toArray())
        elif isinstance(vector,list) or isinstance(vector,np.ndarray):
            vector= np.array(vector)
        else :
            raise ValueError(f"Expected one of these [np.ndarray, list], but got {type(vector)}")

        if vector.ndim == 1:
            vector =vector.reshape(1, -1)
            
        return int(ml_model.predict(vector))
    return udf(predict,IntegerType())
    
# Function that stream data from a kafka topic
def read_kafka_stream(broker, topic):
    return spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", broker) \
        .option("subscribe", topic) \
        .option("startingOffsets", "earliest") \
        .option("failOnDataLoss", "false") \
        .load()
    
def write_to_cassandra(df_batch, batch_id,cassandra_infos):
    df_batch = df_batch.toDF(*[col.lower() for col in df_batch.columns])
    df_batch.write \
        .format("org.apache.spark.sql.cassandra") \
        .option("keyspace", cassandra_infos["keyspace"]) \
        .option("table", cassandra_infos["table"]) \
        .mode("append") \
        .save()


# Function that process stream 
def process_stream(kafka_infos, cassandra_infos,pipeline, ml_model, pyspark_schema):
    # Extract only the value field from the raw kafka stream and cast it as a binary
    raw_df = read_kafka_stream(kafka_infos["input"]["broker"], kafka_infos["input"]["topic"])\
        .selectExpr("CAST(value AS BINARY) as value")

    # Deserialize Avro using Fastavro
    deserialize_avro_udf=create_udf_for_deserialization(kafka_infos["input"]["schema"])
    deserialized_df = raw_df.withColumn("deserialized_json", deserialize_avro_udf(raw_df.value))

    # Structring deserilized data "json-format" into a pyspark dataframe
    structured_df = deserialized_df\
        .select(from_json(col("deserialized_json"), pyspark_schema).alias("data"))\
        .select("data.*")
    
    cassandra_output_cols=structured_df.columns+["fraud"]

    # transforming data frame feateares "scaling,cat encoding..." and assembling them into a vector
    transformed_df=pipeline.fit(structured_df).transform(structured_df)

    # Predicting Fraud statue
    predition_udf=create_udf_for_prediction(ml_model)
    prediction_df=transformed_df\
        .withColumn("fraud",predition_udf(transformed_df.features))

    # df that will be written to cassandra
    cassandra_output_df=prediction_df.select(cassandra_output_cols)

    # df that will be passed to kafka so the client can consume it
    serialize_avro_udf=create_udf_for_prediction_serialization(kafka_infos["output"]["schema"])
    kafka_output_df=prediction_df\
        .withColumnRenamed("transaction_id","key")\
        .withColumn("value",serialize_avro_udf(prediction_df.fraud))

    kafka_output_query=kafka_output_df.select(["key","value"])\
        .writeStream\
        .outputMode("append")\
        .format("kafka")\
        .option("kafka.bootstrap.servers", kafka_infos["output"]["broker"]) \
        .option("topic", kafka_infos["output"]["topic"]) \
        .option("checkpointLocation", "/tmp/kafka_checkpoint") \
        .start()
    
    cassandra_output_query = cassandra_output_df.select(cassandra_output_cols)\
        .writeStream \
        .outputMode("append") \
        .foreachBatch(lambda df_batch, batch_id:write_to_cassandra(df_batch, batch_id,cassandra_infos)) \
        .start()
        #.trigger(processingTime="1 minute")\

    # Debugging
    #console_output_query = cassandra_output_df.select(cassandra_output_cols)\
     #   .writeStream \
      #  .outputMode("append") \
       # .format("console") \
        # .option("truncate", False) \
         # .start()

    return kafka_output_query,cassandra_output_query

# SHIT SHOW

- Testing the process_stream function

In [20]:
'''try:
    df = spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="credit_data", keyspace="fraud_detection") \
        .load()
    
    print("Connexion réussie ! Voici la liste des keyspaces :")
    df.show()
except Exception as e:
    print("Erreur de connexion à Cassandra :", e)
    '''

'try:\n    df = spark.read         .format("org.apache.spark.sql.cassandra")         .options(table="credit_data", keyspace="fraud_detection")         .load()\n    \n    print("Connexion réussie ! Voici la liste des keyspaces :")\n    df.show()\nexcept Exception as e:\n    print("Erreur de connexion à Cassandra :", e)\n    '

In [21]:
'''fetched_fraud_prediction_schema=fetch_schema(
    schema_regitry_url=shcema_registry,
    topic="fraud_prediction"
)

fetched_credit_data_schema=fetch_schema(
    schema_regitry_url="http://host.docker.internal:8081",
    topic="raw_credit_data"
)

kafka_infos_for_credit_data={
    "input":{
        "broker":"host.docker.internal:9092",
        "topic":"raw_credit_data",
        "schema":fetched_credit_data_schema
    },
    "output":{
        "broker":"host.docker.internal:9092",
        "topic":"fraud_prediction",
        "schema":fetched_fraud_prediction_schema
    },
}

cassandra_infos_for_credit_data={
    "keyspace":"fraud_detection",
    "table":"credit_data"
}

credit_fraud_kafka_stream_query,credit_fraud_cassandra_stream_query=process_stream(
    cassandra_infos=cassandra_infos_for_credit_data,
    kafka_infos=kafka_infos_for_credit_data,
    pipeline=credit_data_pipline,
    ml_model=credit_model,
    pyspark_schema=raw_credit_data_schema
)
credit_fraud_kafka_stream_query.awaitTermination()
credit_fraud_cassandra_stream_query.awaitTermination()'''

'fetched_fraud_prediction_schema=fetch_schema(\n    schema_regitry_url=shcema_registry,\n    topic="fraud_prediction"\n)\n\nfetched_credit_data_schema=fetch_schema(\n    schema_regitry_url="http://host.docker.internal:8081",\n    topic="raw_credit_data"\n)\n\nkafka_infos_for_credit_data={\n    "input":{\n        "broker":"host.docker.internal:9092",\n        "topic":"raw_credit_data",\n        "schema":fetched_credit_data_schema\n    },\n    "output":{\n        "broker":"host.docker.internal:9092",\n        "topic":"fraud_prediction",\n        "schema":fetched_fraud_prediction_schema\n    },\n}\n\ncassandra_infos_for_credit_data={\n    "keyspace":"fraud_detection",\n    "table":"credit_data"\n}\n\ncredit_fraud_kafka_stream_query,credit_fraud_cassandra_stream_query=process_stream(\n    cassandra_infos=cassandra_infos_for_credit_data,\n    kafka_infos=kafka_infos_for_credit_data,\n    pipeline=credit_data_pipline,\n    ml_model=credit_model,\n    pyspark_schema=raw_credit_data_schema\n)\

- Testing the process_stream function

In [None]:
fetched_fraud_prediction_schema=fetch_schema(
    schema_regitry_url=shcema_registry,
    topic="fraud_prediction"
)

fetched_insurance_data_schema=fetch_schema(
    schema_regitry_url="http://host.docker.internal:8081",
    topic="raw_insurance_data"
)

kafka_infos_for_insurance_data={
    "input":{
        "broker":"host.docker.internal:9092",
        "topic":"raw_insurance_data",
        "schema":fetched_insurance_data_schema
    },
    "output":{
        "broker":"host.docker.internal:9092",
        "topic":"fraud_prediction",
        "schema":fetched_fraud_prediction_schema
    },
}

cassandra_infos_for_insurance_data={
    "keyspace":"fraud_detection",
    "table":"insurance_data"
}

credit_fraud_kafka_stream_query,credit_fraud_cassandra_stream_query=process_stream(
    cassandra_infos=cassandra_infos_for_insurance_data,
    kafka_infos=kafka_infos_for_insurance_data,
    pipeline=insurance_data_pipeline,
    ml_model=insurance_model,
    pyspark_schema=raw_insurance_data_schema
)
credit_fraud_kafka_stream_query.awaitTermination()
credit_fraud_cassandra_stream_query.awaitTermination()

Successfully fetched and stored schema for fraud_prediction-value
Successfully fetched and stored schema for raw_insurance_data-value


25/04/07 13:07:17 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/07 13:07:18 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4815d6db-b490-4c1f-8839-9df6c5f639e9. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/07 13:07:18 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/07 13:07:18 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/04/07 13:07:18 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.pol