In [6]:
!pip install findspark==2.0.1
# !pip install pandas==2.0.0
# !pip install numpy==1.24.2

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import findspark

findspark.init()
findspark.find()

'/usr/lib/spark'

In [20]:
!pip install mlflow

Defaulting to user installation because normal site-packages is not writeable
Collecting mlflow
  Downloading mlflow-2.14.1-py3-none-any.whl (25.8 MB)
[K     |████████████████████████████████| 25.8 MB 1.2 MB/s eta 0:00:01
Collecting graphene<4
  Downloading graphene-3.3-py2.py3-none-any.whl (128 kB)
[K     |████████████████████████████████| 128 kB 86.4 MB/s eta 0:00:01
[?25hCollecting opentelemetry-sdk<3,>=1.9.0
  Downloading opentelemetry_sdk-1.25.0-py3-none-any.whl (107 kB)
[K     |████████████████████████████████| 107 kB 91.1 MB/s eta 0:00:01
[?25hCollecting querystring-parser<2
  Downloading querystring_parser-1.2.4-py2.py3-none-any.whl (7.9 kB)
Collecting Flask<4
  Downloading flask-3.0.3-py3-none-any.whl (101 kB)
[K     |████████████████████████████████| 101 kB 5.0 MB/s eta 0:00:01
Collecting cloudpickle<4
  Downloading cloudpickle-3.0.0-py3-none-any.whl (20 kB)
Collecting sqlparse<1,>=0.4.0
  Downloading sqlparse-0.5.0-py3-none-any.whl (43 kB)
[K     |████████████████████

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
import os


# Initialize Spark session
spark = SparkSession.builder \
    .appName("Fraud Detection") \
    .getOrCreate()

# S3 path
s3_path = 's3a://otus-task-n3/2019-09-21.txt'

# Read data from S3 into Spark DataFrame
df = spark.read.parquet(s3_path, 
                        header=True, 
                        inferSchema=True,
                        **{
                            "key": "YCAJE4JxJM9HxqbDEUlXzJJsX",
                            "secret": "YCNH45SiJbpy_35ywf2KolGlHKfB5lLZtbgmw2xk",
                            "client_kwargs": {"endpoint_url": "https://storage.yandexcloud.net"}
                        })

# Assuming df is your DataFrame
df = df.withColumn('tx_datetime', F.to_timestamp(df['tx_datetime']))

# Time-based features
df = df.withColumn('is_weekend', F.when(F.dayofweek(df['tx_datetime']) >= 5, 1).otherwise(0))

# Sort by customer and transaction datetime
df = df.orderBy(['customer_id', 'tx_datetime'])

# Customer behavior features
window_spec = Window.partitionBy('customer_id').orderBy('tx_datetime')
# Add a lag column as a timestamp
df = df.withColumn('lagged_tx_datetime', F.lag('tx_datetime', 1).over(window_spec).cast('timestamp'))

# Calculate time_since_last_tx in seconds
df = df.withColumn('time_since_last_tx', 
                   (F.col('tx_datetime').cast('long') - F.col('lagged_tx_datetime').cast('long')) / F.lit(1000))

# Drop the intermediate lagged_tx_datetime column if not needed
df = df.drop('lagged_tx_datetime')

df = df.withColumn('avg_tx_amount_customer', F.avg('tx_amount').over(window_spec))
df = df.withColumn('tx_count_customer', F.count('tranaction_id').over(window_spec))
df = df.withColumn('var_tx_amount_customer', F.stddev('tx_amount').over(window_spec))

# Terminal-based features
window_spec_terminal = Window.partitionBy('terminal_id').orderBy('tx_datetime')
df = df.withColumn('avg_tx_amount_terminal', F.avg('tx_amount').over(window_spec_terminal))
df = df.withColumn('tx_count_terminal', F.count('tranaction_id').over(window_spec_terminal))
df = df.withColumn('var_tx_amount_terminal', F.stddev('tx_amount').over(window_spec_terminal))

# Select relevant numeric columns
numeric_columns = ['tx_amount', 'time_since_last_tx', 'avg_tx_amount_customer', 'tx_count_customer',
                   'var_tx_amount_customer', 'avg_tx_amount_terminal', 'tx_count_terminal', 'var_tx_amount_terminal']

# Initialize StandardScaler
scaler = StandardScaler(inputCol="scaled_features", outputCol="features")

# Drop rows with null values
df = df.dropna()

# Fit and transform the numeric columns
assembler = VectorAssembler(inputCols=numeric_columns, outputCol="scaled_features")
pipeline = Pipeline(stages=[assembler, scaler])
pipeline_model = pipeline.fit(df)
df = pipeline_model.transform(df)

# Convert boolean column to binary (1/0)
df = df.withColumn('is_weekend', F.col('is_weekend').cast('integer'))

# Drop rows with null values
df = df.dropna()

# Define feature columns and target variable
feature_columns = ['scaled_features', 'is_weekend']
target_column = 'tx_fraud'

# Split the data into training and testing sets
train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)

# Initialize the RandomForestClassifier
rf_classifier = RandomForestClassifier(featuresCol='scaled_features', labelCol=target_column, numTrees=100, seed=42)

# Train the model
model = rf_classifier.fit(train_df)

# Make predictions on the test data
predictions = model.transform(test_df)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol=target_column, predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)


print("Accuracy:", accuracy)

# Optionally, you can print confusion matrix and classification report using other evaluation metrics supported by PySpark


TypeError: __init__() got an unexpected keyword argument 'predictionCol'

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator


# Evaluate the model with MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol=target_column,predictionCol="prediction")

# Calculate metrics
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-score: {f1}")

Accuracy: 0.9495932235247968
Precision: 0.944067005065678
Recall: 0.9495932235247969
F1-score: 0.92602191718943


In [24]:
spark.stop()

In [4]:
import sys

In [8]:
import os
import logging
import argparse
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

import mlflow
from mlflow.tracking import MlflowClient


logging.basicConfig(level=logging.INFO, format="%(asctime)-15s %(message)s")
logger = logging.getLogger()

os.environ["MLFLOW_S3_ENDPOINT_URL"] = "https://storage.yandexcloud.net"
os.environ["AWS_ACCESS_KEY_ID"] = ""
os.environ["AWS_SECRET_ACCESS_KEY"] = ""


def get_dataframe(spark):
    
    s3_path = 's3a://otus-task-n3/2019-09-21.txt'
    df = spark.read.parquet(s3_path, 
                            header=True, 
                            inferSchema=True,
                            **{
                                "key": "",
                                "secret": "",
                                "client_kwargs": {"endpoint_url": "https://storage.yandexcloud.net"}
                            })
    
    return df

def preproc(df):
    
    # Assuming df is DataFrame
    df = df.withColumn('tx_datetime', F.to_timestamp(df['tx_datetime']))
    # Time-based features
    df = df.withColumn('is_weekend', F.when(F.dayofweek(df['tx_datetime']) >= 5, 1).otherwise(0))
    # Sort by customer and transaction datetime
    df = df.orderBy(['customer_id', 'tx_datetime'])
    # Customer behavior features
    window_spec = Window.partitionBy('customer_id').orderBy('tx_datetime')
    # Add a lag column as a timestamp
    df = df.withColumn('lagged_tx_datetime', F.lag('tx_datetime', 1).over(window_spec).cast('timestamp'))
    # Calculate time_since_last_tx in seconds
    df = df.withColumn('time_since_last_tx', 
                       (F.col('tx_datetime').cast('long') - F.col('lagged_tx_datetime').cast('long')) / F.lit(1000))
    # Drop the intermediate lagged_tx_datetime column if not needed
    df = df.drop('lagged_tx_datetime')
    df = df.withColumn('avg_tx_amount_customer', F.avg('tx_amount').over(window_spec))
    df = df.withColumn('tx_count_customer', F.count('tranaction_id').over(window_spec))
    df = df.withColumn('var_tx_amount_customer', F.stddev('tx_amount').over(window_spec))
    # Terminal-based features
    window_spec_terminal = Window.partitionBy('terminal_id').orderBy('tx_datetime')
    df = df.withColumn('avg_tx_amount_terminal', F.avg('tx_amount').over(window_spec_terminal))
    df = df.withColumn('tx_count_terminal', F.count('tranaction_id').over(window_spec_terminal))
    df = df.withColumn('var_tx_amount_terminal', F.stddev('tx_amount').over(window_spec_terminal))
    
    # Convert boolean column to binary (1/0)
    df = df.withColumn('is_weekend', F.col('is_weekend').cast('integer'))
    # Drop rows with null values
    df = df.dropna()
    
    return df


def scale(df):
    
    numeric_columns = ['tx_amount', 'time_since_last_tx', 'avg_tx_amount_customer', 'tx_count_customer',
                       'var_tx_amount_customer', 'avg_tx_amount_terminal', 'tx_count_terminal', 'var_tx_amount_terminal']
    
    scaler = StandardScaler(inputCol="scaled_features", outputCol="features")
    assembler = VectorAssembler(inputCols=numeric_columns, outputCol="scaled_features")
    pipeline = Pipeline(stages=[assembler, scaler])
    pipeline_model = pipeline.fit(df)
    df = pipeline_model.transform(df)

    df = df.dropna()
    
    return df


def main(args):
    
    TRACKING_SERVER_HOST = "62.84.126.15"
    mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:8000")
    logger.info("tracking URI: %s", {mlflow.get_tracking_uri()})

    logger.info("Creating Spark Session ...")
    spark = SparkSession.builder \
            .appName("Fraud Detection") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.fs.s3a.access.key", "") \
            .config("spark.hadoop.fs.s3a.secret.key", "") \
            .config("spark.hadoop.fs.s3a.endpoint", "storage.yandexcloud.net") \
            .getOrCreate()

    logger.info("Loading Data ...")
    df = get_dataframe(spark)

    # Prepare MLFlow experiment for logging
    client = MlflowClient()
    experiment = client.get_experiment_by_name("pyspark_experiment")
    experiment_id = experiment.experiment_id
    
    # Добавьте в название вашего run имя, по которому его можно будет найти в MLFlow
    run_name = 'MyRFmodelRUN' + ' ' + str(datetime.now())

    with mlflow.start_run(run_name=run_name, experiment_id=experiment_id):
    
        df = preproc(df)
        df = scale(df)
        
        logger.info("Splitting the dataset ...")
        train_df, test_df = df.randomSplit([1 - args.val_frac, args.val_frac], seed=42)

        rf_classifier = RandomForestClassifier(featuresCol='scaled_features', labelCol='tx_fraud', numTrees=100, seed=42)
        model = rf_classifier.fit(train_df)
        
        run_id = mlflow.active_run().info.run_id

        logger.info("Scoring the model ...")
        predictions = model.transform(test_df)
        
        evaluator = MulticlassClassificationEvaluator(labelCol='tx_fraud',predictionCol="prediction")

        accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
        precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
        recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
        f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
        
        logger.info(f"Logging metrics to MLflow run {run_id} ...")
        mlflow.log_metric("accuracy", accuracy)
        mlflow.log_metric("recall", recall)
        mlflow.log_metric("precision", precision)
        mlflow.log_metric("f1", f1)
        logger.info(f"Model accuracy: {accuracy}")
        logger.info(f"Model accuracy: {recall}")
        logger.info(f"Model accuracy: {precision}")
        logger.info(f"Model accuracy: {f1}")

        logger.info("Saving pipeline ...")
        mlflow.spark.save_model(model, args.output_artifact)

        logger.info("Exporting/logging pipline ...")
        mlflow.spark.log_model(model, args.output_artifact)
        logger.info("Done")

    spark.stop()
    

if __name__ == "__main__":

    parser = argparse.ArgumentParser(
        description="Model (Inference Pipeline) Training")

    parser.add_argument(
        "--val_frac",
        type=float,
        default = 0.2,
        help="Size of the validation split. Fraction of the dataset.",
    )

    # При запуске используйте оригинальное имя 'Student_Name_flights_LR_only'
    parser.add_argument(
        "--output_artifact",
        default="default_run_name",
        type=str,
        help="Name for the output serialized model (Inference Artifact folder)",
        required=True,
    )
    
    sys.argv = ['train.ipynb', '--val_frac', '0.2', '--output_artifact', 'run-name']
    args = parser.parse_args(sys.argv[1:])

    # args = parser.parse_args()

    main(args)



2024-06-25 00:18:05,982 tracking URI: {'http://62.84.126.15:8000'}
2024-06-25 00:18:05,983 Creating Spark Session ...
2024-06-25 00:18:05,986 Loading Data ...
2024-06-25 00:18:42,561 Splitting the dataset ...
2024-06-25 00:21:26,115 Scoring the model ...
2024-06-25 00:23:59,462 Logging metrics to MLflow run acfd3c9cd8e74717af9c901c03c002ee ...
2024-06-25 00:23:59,555 Model accuracy: 0.9497801349811982
2024-06-25 00:23:59,556 Model accuracy: 0.9497801349811982
2024-06-25 00:23:59,557 Model accuracy: 0.9466131236796039
2024-06-25 00:23:59,559 Model accuracy: 0.9261529216620966
2024-06-25 00:23:59,560 Saving pipeline ...
 - pandas (current: 2.0.0, required: pandas<2)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.
2024-06-25 00:24:20,244 Exporting/logging pipline ...
 - pandas (current: 2.0.0, required: pandas<2)
To fix the mismatches, call `mlflow.pyfunc.get_model

In [11]:
!pip install 'urllib3<2'

Defaulting to user installation because normal site-packages is not writeable
Collecting urllib3<2
  Downloading urllib3-1.26.19-py2.py3-none-any.whl (143 kB)
[K     |████████████████████████████████| 143 kB 1.2 MB/s eta 0:00:01
[?25hInstalling collected packages: urllib3
  Attempting uninstall: urllib3
    Found existing installation: urllib3 2.2.2
    Uninstalling urllib3-2.2.2:
      Successfully uninstalled urllib3-2.2.2
[31mERROR: After October 2020 you may experience errors when installing or updating packages. This is because pip will change the way that it resolves dependency conflicts.

We recommend you use --use-feature=2020-resolver to test your packages with the new resolver before it becomes the default.

docker 7.1.0 requires requests>=2.26.0, but you'll have requests 2.24.0 which is incompatible.
requests 2.24.0 requires urllib3!=1.25.0,!=1.25.1,<1.26,>=1.21.1, but you'll have urllib3 1.26.19 which is incompatible.
botocore 1.19.7 requires urllib3<1.26,>=1.25.4, but y

In [37]:
spark.stop()`