In [1]:
import pyspark
print(pyspark.__version__)

!pip install elasticsearch
!pip install python-dotenv

3.3.2
Collecting elasticsearch
  Downloading elasticsearch-8.7.0-py3-none-any.whl (387 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m387.9/387.9 kB[0m [31m626.3 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting elastic-transport<9,>=8
  Downloading elastic_transport-8.4.0-py3-none-any.whl (59 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.5/59.5 kB[0m [31m486.0 kB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
Installing collected packages: elastic-transport, elasticsearch
Successfully installed elastic-transport-8.4.0 elasticsearch-8.7.0
Collecting python-dotenv
  Downloading python_dotenv-1.0.0-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.0


In [2]:
import os
os.environ[
    "PYSPARK_SUBMIT_ARGS"
] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,org.elasticsearch:elasticsearch-spark-30_2.12:7.16.2,com.fasterxml.jackson.module:jackson-module-scala_2.12:2.13.0 pyspark-shell"

In [3]:
from pyspark.sql import SparkSession
# create a Spark session
spark = SparkSession.builder.appName("kafka_elastic_test").getOrCreate()

In [4]:
# create a Kafka stream for transaction
df_transaction_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "dbserver1.fineract_default.m_savings_account_transaction") \
    .load()

# create a Kafka stream for account
df_account_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "dbserver1.fineract_default.m_savings_account") \
    .load()

In [5]:
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

# Define the schema for the DataFrame
schema_transaction = StructType([
    StructField("account-id", LongType(), True),
    StructField("amount", DoubleType(), True),
    StructField("customer-id", LongType(), True),
    StructField("datetime", StringType(), True),
    StructField("is_fraud", StringType(), True),
    StructField("transaction-id", LongType(), True),
    StructField("type", StringType(), True),
    StructField("@timestamp", StringType(), True)
])

# Define the schema for the DataFrame
schema_account = StructType([
    StructField("customer-id",  LongType(), True),
    StructField("account-id", LongType(), True)
])

account_df = spark.createDataFrame([], schema=schema_account)

dic_trans = {}

In [6]:
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import RequestError
import os
from dotenv import load_dotenv

# Define the Elasticsearch index name
es_index = "transactions_index"
es_port = 9200
es_host = "elasticsearch"


# Load environment variables from .env file
load_dotenv()

# Access the username and password
username = os.environ.get("USERNAME")
password = os.environ.get("PASSWORD")

# Create an Elasticsearch client
es = Elasticsearch(
    [{"host": es_host, "port": es_port, "scheme": "http"}],
     basic_auth=(username, password)
)

# Create a mapping for the Elasticsearch index
# Define the index mapping
mapping = {
    "mappings": {
        "properties": {
            "@timestamp": {"type": "date"},
            "account-id": {"type": "long"},
            "amount": {"type": "double"},
            "customer-id": {"type": "long"},
            "datetime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "is_fraud": {"type": "keyword"},
            "transaction-id": {"type": "long"},
            "type": {"type": "keyword"},
        }
    }
}

# Check if the index exists, and create it if it does not
if not es.indices.exists(index=es_index):
    try:
        # Create the Elasticsearch index if it doesn't exist
        es.indices.create(index=es_index, body=mapping)
    except RequestError as e:
        print(f"Index creation failed: {e}")
        exit(1)
    print(f"Created Elasticsearch index '{es_index}'")
    
# Write the streaming DataFrame to Elasticsearch
def write_to_es(es_df):
    es_df.show()
    es_df.write \
    .format("org.elasticsearch.spark.sql") \
    .option("es.nodes", es_host) \
    .option("es.port", es_port) \
    .option("es.net.http.auth.user", username) \
    .option("es.net.http.auth.pass", password) \
    .option("es.resource", es_index) \
    .mode("append") \
    .save()

In [14]:
from datetime import datetime as dt

def is_night(datetime_str):
    datetime_format = "%Y-%m-%d %H:%M:%S"
    datetime_obj = dt.strptime(datetime_str, datetime_format)
    hour = datetime_obj.hour
    
    if hour >= 22 or hour < 6:
        return 1
    else:
        return 0

def is_weekend(datetime_str):
    datetime_format = "%Y-%m-%d %H:%M:%S"
    datetime_obj = dt.strptime(datetime_str, datetime_format)
    weekday = datetime_obj.weekday()
    
    if weekday >= 5:  # 5 and 6 correspond to Saturday and Sunday
        return 1
    else:
        return 0

In [15]:
# mllib ia model

from pyspark.ml.classification import LogisticRegressionModel

lr_model = LogisticRegressionModel.load("lr_ml")

In [16]:
%%time
from pyspark.ml.linalg import Vectors

def predict_isfraude(dic_trans):
    
    dense_vector = Vectors.dense([
        dic_trans['amount'],
        1500.0, 5000.0, 3000.0, 2500.0,
        1.0, 2.0, 3.0, 4.0,
        7000.0, 6000.0, 4500.0, 2775.0,
        2.0, 4.0, 5.0, 7.0,
        dic_trans['in_weekend'], dic_trans['at_night']
    ])
    data_vect = spark.createDataFrame([(dense_vector, ), ], ['features'])
    is_fraude = lr_model.transform(data_vect).select(['prediction']).collect()[0][0]
    
    return "valid" if is_fraude == 0.0 else "fraudulent"

CPU times: user 12 µs, sys: 4 µs, total: 16 µs
Wall time: 19.6 µs


In [17]:
from pyspark.sql.functions import from_unixtime, date_format
from pyspark.sql.functions import col

import json
import datetime
import base64
import decimal
from time import sleep

def getDecimalFromKafka(encoded):
    
    # Decode the Base64 encoded string and create a BigInteger from it
    decoded = decimal.Decimal(int.from_bytes(base64.b64decode(encoded), byteorder='big', signed=False))

    # Create a context object with the specified scale
    context = decimal.Context(prec=28, rounding=decimal.ROUND_HALF_DOWN)

    # Set the scale of the decimal value using the context object
    decimal_value = decoded.quantize(decimal.Decimal('.1') ** 3, context=context)

    return decimal_value/1000000

def write_to_es_transaction(df_t, epoch_id):
    
    row_transactions = df_t.collect()
    
    for row_transaction in row_transactions:
    
      # if(row_transaction):
            value_dict_transaction = json.loads(row_transaction.value)
            
            if value_dict_transaction['payload']['before'] == None :
                
                timestamp = value_dict_transaction['payload']['after']['created_date']/1000
                # convert Unix timestamp to a datetime object
                dt = datetime.datetime.fromtimestamp(timestamp)
                # format datetime object as "yyyy-mm-dd hh:mm:ss"
                formatted_date = dt.strftime("%Y-%m-%d %H:%M:%S")
                formatted_date_es = dt.strftime("%Y-%m-%dT%H:%M:%S.%f%z")
        
                account_id = value_dict_transaction['payload']['after']['savings_account_id']
        
                while account_df.filter(col("account-id") == account_id).count() == 0:
                    # Wait for 0.1 second before checking again
                    sleep(0.1)
        
                # Code to execute after the condition becomes true
                # Filter the DataFrame to get rows where "account-id" is in account_df["account-id"]
                filtered_account_df = account_df.filter(account_df["account-id"] == account_id)
                # Select the "customer-id" column from the filtered DataFrame
                cutomer_id = filtered_account_df.select("customer-id").collect()[0][0]
                op_type = "DEBIT" if value_dict_transaction['payload']['after']['transaction_type_enum'] == 2 else "CREDIT"  
                
                dic_trans['amount'] = float(getDecimalFromKafka(value_dict_transaction['payload']['after']['amount']))
                dic_trans['in_weekend'] = is_night(formatted_date)
                dic_trans['at_night'] = is_weekend(formatted_date)
                
                new_row_transaction = spark.createDataFrame([(account_id,
                                                      dic_trans['amount'],
                                                      cutomer_id,
                                                      formatted_date,
                                                     # date_format(from_unixtime("timestamp"), "yyyy-MM-dd HH:mm:ss"),
                                                      predict_isfraude(dic_trans),#-----test after will be predected by ML
                                                      value_dict_transaction['payload']['after']['id'],
                                                      op_type,
                                                      formatted_date_es,
                                                     )], schema=schema_transaction)
                        
                write_to_es(new_row_transaction)
                    
        
def write_to_es_account(df_a, epoch_id):
    
    global account_df
        
    row_accounts = df_a.collect()
    
    for row_account in row_accounts :
    
       #if(row_account):
            value_dict_account = json.loads(row_account.value)
            new_row_account= spark.createDataFrame([(value_dict_account['payload']['after']['client_id'],
                                                      value_dict_account['payload']['after']['id'],
                                                     )], schema=schema_account)
        
            # Check if new_row_account is already present in acount_df
            if account_df.subtract(new_row_account).count() == account_df.count():
                # new_row_account does not exist in acount_df, so concatenate the two DataFrames
                account_df = account_df.union(new_row_account)


In [None]:
# Call the write_to_es function on each micro-batch of data
value_df_account = df_account_stream.selectExpr("CAST(value AS STRING)")
query_account = value_df_account.writeStream.foreachBatch(write_to_es_account).start()

value_df_transaction = df_transaction_stream.selectExpr("CAST(value AS STRING)")
query_transaction = value_df_transaction.writeStream.foreachBatch(write_to_es_transaction).start()

# Wait for the stream to finish
#query_account.awaitTermination()
query_transaction.awaitTermination()

+----------+------+-----------+-------------------+--------+--------------+-----+--------------------+
|account-id|amount|customer-id|           datetime|is_fraud|transaction-id| type|          @timestamp|
+----------+------+-----------+-------------------+--------+--------------+-----+--------------------+
|         4| 310.0|          2|2023-05-22 13:42:42|   valid|           115|DEBIT|2023-05-22T13:42:...|
+----------+------+-----------+-------------------+--------+--------------+-----+--------------------+
+----------+------+-----------+-------------------+--------+--------------+-----+--------------------+
|account-id|amount|customer-id|           datetime|is_fraud|transaction-id| type|          @timestamp|
+----------+------+-----------+-------------------+--------+--------------+-----+--------------------+
|         4| 310.0|          2|2023-05-22 13:42:42|   valid|           115|DEBIT|2023-05-22T13:42:...|
+----------+------+-----------+-------------------+--------+-------------