In [1]:
# !pip install findspark

In [2]:
import findspark
findspark.init()
findspark.find()

'/usr/lib/spark'

# MODEL

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType

In [4]:
spark = (
    SparkSession
        .builder
        .appName("hw7")
        .config("spark.executor.memory", "10g")
        .config("spark.driver.memory", "10g")
        .getOrCreate()
)

In [5]:
schema = StructType(
    [
        StructField("tranaction_id",IntegerType(),True),
        StructField("tx_datetime",StringType(),True),
        StructField("customer_id",IntegerType(),True),
        StructField("terminal_id",IntegerType(),True),
        StructField("tx_amount",DoubleType(),True),
        StructField("tx_time_seconds",IntegerType(),True),
        StructField("tx_time_days",IntegerType(),True),
        StructField("tx_fraud",IntegerType(),True),
        StructField("tx_fraud_scenario",IntegerType(),True),
    ]
)

In [6]:
df = spark.read.options(header=True,inferSchema=False).schema(schema).parquet(
    "/user/root/datasets/set02/data_cleansed.parquet"
)

In [7]:
df.count()

45693570

In [8]:
df_sample = df.sample(fraction=0.1, seed=3)

In [9]:
df_sample.count()

4568988

In [10]:
df_sample.show(10)

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|   1836822545|2022-11-06 14:21:07|     573306|        844|    28.64|      101312467|        1172|       0|                0|
|   1836826849|2022-11-06 02:58:34|     576031|        328|    31.66|      101271514|        1172|       0|                0|
|   1836827722|2022-11-06 12:25:44|     576623|        688|     9.83|      101305544|        1172|       0|                0|
|   1836827855|2022-11-06 10:44:16|     576731|        877|    40.04|      101299456|        1172|       0|                0|
|   1836828249|2022-11-06 19:09:56|     576977|        191|    65.79|      101329796|        1172|       0|           

In [11]:
from pyspark.ml.feature import VectorAssembler

In [43]:
features_assembler = VectorAssembler(inputCols=[
    "terminal_id",
    "tx_amount",
    "tx_time_seconds",
    "tx_time_days",
    ],
    outputCol="Features",
    handleInvalid="skip"
)

In [13]:
# X_train = features_assembler.transform(df_sample)

In [44]:
from pyspark.ml.pipeline import Pipeline

feat_ext_pipe = Pipeline(stages=[
    features_assembler,
]).fit(df_sample)

In [45]:
feat_ext_pipe.write().overwrite().save("feat_ext_pipe.joblib")

In [16]:
from pyspark.ml.pipeline import PipelineModel

In [17]:
feat_ext_pipe2 = PipelineModel.load("feat_ext_pipe.joblib")

In [19]:
feat_ext_pipe

PipelineModel_0c4d6279d902

In [20]:
X_train = feat_ext_pipe.transform(df_sample)

In [21]:
X_train.show(10)

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+--------------------+
|tranaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|            Features|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+--------------------+
|   1836822545|2022-11-06 14:21:07|     573306|        844|    28.64|      101312467|        1172|       0|                0|[844.0,28.64,1.01...|
|   1836826849|2022-11-06 02:58:34|     576031|        328|    31.66|      101271514|        1172|       0|                0|[328.0,31.66,1.01...|
|   1836827722|2022-11-06 12:25:44|     576623|        688|     9.83|      101305544|        1172|       0|                0|[688.0,9.83,1.013...|
|   1836827855|2022-11-06 10:44:16|     576731|        877|    40.04|      101299456|        1172|       0|           

In [22]:
df_sample.show(1)

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|   1836822545|2022-11-06 14:21:07|     573306|        844|    28.64|      101312467|        1172|       0|                0|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
only showing top 1 row



In [23]:
df_sample.columns

['tranaction_id',
 'tx_datetime',
 'customer_id',
 'terminal_id',
 'tx_amount',
 'tx_time_seconds',
 'tx_time_days',
 'tx_fraud',
 'tx_fraud_scenario']

In [24]:
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="tx_fraud", seed=42, leafCol="leafId", featuresCol='Features')

In [25]:
model = rf.fit(X_train)

In [26]:
model.write().overwrite().save("spark_model.joblib")

In [27]:
model2 = RandomForestClassificationModel.load("spark_model.joblib")

In [None]:
schema

In [28]:
schema_inf = StructType(
    [
        StructField("tranaction_id",IntegerType(),True),
        StructField("tx_datetime",StringType(),True),
        StructField("customer_id",IntegerType(),True),
        StructField("terminal_id",IntegerType(),True),
        StructField("tx_amount",DoubleType(),True),
        StructField("tx_time_seconds",IntegerType(),True),
        StructField("tx_time_days",IntegerType(),True),
        # StructField("tx_fraud",IntegerType(),True),
        # StructField("tx_fraud_scenario",IntegerType(),True),
    ]
)

In [29]:
test0 = spark.createDataFrame(
    [
        (1836822545, "2022-11-06 14:21:07", 573306, 844, 28.64, 101312467, 1172),
    ],
    schema_inf
)

In [None]:
test0

In [35]:
model2.predict(feat_ext_pipe2.transform(test0).head().Features)

0.0

In [39]:
inf = feat_ext_pipe2.transform(test0)

In [41]:
model2.transform(inf).show()

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------------------+--------------------+--------------------+----------+-------------+
|tranaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|            Features|       rawPrediction|         probability|prediction|       leafId|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------------------+--------------------+--------------------+----------+-------------+
|   1836822545|2022-11-06 14:21:07|     573306|        844|    28.64|      101312467|        1172|[844.0,28.64,1.01...|[2.91377163899818...|[0.97125721299939...|       0.0|[0.0,0.0,0.0]|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------------------+--------------------+--------------------+----------+-------------+



In [42]:
inf.columns

['tranaction_id',
 'tx_datetime',
 'customer_id',
 'terminal_id',
 'tx_amount',
 'tx_time_seconds',
 'tx_time_days',
 'Features']

# KAFKA

In [None]:
# !sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !sudo pip install kafka-python

In [None]:
# import os

In [None]:
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
# !wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.2.4/spark-sql-kafka-0-10_2.12-3.2.4.jar
# !sudo mkdir /usr/local/lib/python3.8/dist-packages/pyspark
# !sudo mkdir /usr/local/lib/python3.8/dist-packages/pyspark/jars
# !sudo mv spark-sql-kafka-0-10_2.12-3.2.4.jar /usr/local/lib/python3.8/dist-packages/pyspark/jars/

In [None]:
# !wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.3/spark-sql-kafka-0-10_2.12-3.0.3.jar
# # !sudo mkdir /usr/local/lib/python3.8/dist-packages/pyspark
# # !sudo mkdir /usr/local/lib/python3.8/dist-packages/pyspark/jars
# !sudo mv spark-sql-kafka-0-10_2.12-3.0.3.jar /usr/local/lib/python3.8/dist-packages/pyspark/jars/

In [None]:
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4 pyspark-shell'

In [None]:
# spark_stream = SparkSession.builder \
#     .appName("stream") \
#     .master("local[*]") \
#     .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4") \
#     .config("spark.jars", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4") \
#     .config("spark.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4") \
#     .getOrCreate()


In [None]:
# spark_stream = SparkSession.builder \
#     .appName("stream") \
#     .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4") \
#     .config("spark.jars", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4") \
#     .config("spark.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4") \
#     .getOrCreate()


In [None]:
# spark_stream.conf.set("spark.sql.shuffle.partitions", 1)

In [None]:
# stream = spark_stream.read.format("kafka") \
#     .option("kafka.bootstrap.servers", "rc1a-e9s90k61kn45nunj.mdb.yandexcloud.net:9091") \
#     .option("kafka.security.protocol", "SASL_SSL") \
#     .option("kafka.sasl.mechanism", "SCRAM-SHA-256") \
#     .option("kafka.sasl.username", "mlops") \
#     .option("kafka.sasl.password", "otus-mlops") \
#     .option("subscribe", "clicks") \
#     .option("startingOffsets", "latest") \
#     .load()