In [1]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext, SparkConf

conf = SparkConf()
conf.setAppName('practice_2')
conf.set("spark.dynamicAllocation.enabled", "true")
conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
conf.set("spark.driver.maxResultSize", "4G")
conf.set("spark.driver.memory", "4G")
conf.set("spark.executor.memory", "4G")
conf.set("spark.driver.allowMultipleContexts", "true")

<pyspark.conf.SparkConf at 0x7faacc753f10>

In [2]:
! hdfs dfs -ls ./all_transactions_table.parquet

Found 4 items
-rw-r--r--   1 ubuntu hadoop          0 2022-09-09 19:01 all_transactions_table.parquet/_SUCCESS
drwxr-xr-x   - ubuntu hadoop          0 2022-09-06 20:40 all_transactions_table.parquet/date=2022-09-06
drwxr-xr-x   - ubuntu hadoop          0 2022-09-09 19:01 all_transactions_table.parquet/date=2022-09-08
drwxr-xr-x   - ubuntu hadoop          0 2022-09-09 19:01 all_transactions_table.parquet/date=2022-09-09


In [3]:
spark = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate()

table = spark.read.parquet("./all_transactions_table.parquet")

In [4]:
table.cache()
table.count()

29095

In [5]:
table.printSchema()

root
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TX_DATETIME: timestamp (nullable = true)
 |-- CUSTOMER_ID: long (nullable = true)
 |-- TERMINAL_ID: long (nullable = true)
 |-- TX_AMOUNT: double (nullable = true)
 |-- TX_TIME_SECONDS: long (nullable = true)
 |-- TX_TIME_DAYS: long (nullable = true)
 |-- TX_FRAUD: long (nullable = true)
 |-- TX_FRAUD_SCENARIO: long (nullable = true)
 |-- date: date (nullable = true)



In [6]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.window import Window
import pyspark.sql.functions as F

customer_window = Window.partitionBy("CUSTOMER_ID")
terminal_window = Window.partitionBy("TERMINAL_ID")

prepared_table = (
    table
        .withColumn("f_mean_customer_TX_AMOUNT", F.mean(F.col("TX_AMOUNT")).over(customer_window))
        .withColumn("f_mean_terminal_TX_AMOUNT", F.mean(F.col("TX_AMOUNT")).over(terminal_window))
        .withColumn("f_unq_terminals_per_customer",
                    F.approx_count_distinct(F.col("TERMINAL_ID")).over(terminal_window))
        .withColumn(
        "f_customer_amount_ratio",
        F.col("TX_AMOUNT") / F.col("f_mean_customer_TX_AMOUNT")
    )
        .withColumn(
        "f_terminal_amount_ratio",
        F.col("TX_AMOUNT") / F.col("f_mean_terminal_TX_AMOUNT")
    )
        .withColumnRenamed("TX_AMOUNT", "f_TX_AMOUNT")
        .withColumnRenamed("TX_TIME_SECONDS", "f_TX_TIME_SECONDS")
)

features = list(filter(lambda x: x.startswith("f_"), prepared_table.columns))
vec_assembler = VectorAssembler(inputCols=features, outputCol="features")
prepared_table = vec_assembler.transform(prepared_table)

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withStd=True,
    withMean=True
)

scalerModel = scaler.fit(prepared_table)
prepared_table = scalerModel.transform(prepared_table)
prepared_dataset = prepared_table.select("scaledFeatures", "TX_FRAUD", "date")

df = prepared_table.toPandas()

In [7]:
from sklearn.linear_model import LogisticRegression
import pandas as pd
features_df = pd.DataFrame([list(x) for x in df['scaledFeatures'].values], columns=features)
target_column = ["TX_FRAUD"][0]
from sklearn.model_selection import train_test_split
train_df, test_df, train_target, test_target = train_test_split(
    features_df,
    df[target_column],
    stratify=df[target_column],
    test_size=.3,
)
lr = LogisticRegression(solver='liblinear')
lr.fit(train_df, train_target)
test_df["predict"] = lr.predict(test_df)
test_df["target"] = test_target

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df["predict"] = lr.predict(test_df)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df["target"] = test_target


In [8]:
test_df[test_df["target"] == 1]

Unnamed: 0,f_TX_AMOUNT,f_TX_TIME_SECONDS,f_mean_customer_TX_AMOUNT,f_mean_terminal_TX_AMOUNT,f_unq_terminals_per_customer,f_customer_amount_ratio,f_terminal_amount_ratio,predict,target
17435,4.275586,-1.066814,1.923735,4.012968,0.0,2.272645,0.925284,0,1
27504,4.239012,-0.155784,3.479425,2.651104,0.0,0.856353,1.571442,0,1
13601,4.221103,-0.853672,1.74514,2.173512,0.0,2.465966,1.884686,0,1
5571,4.657223,1.623392,2.824694,2.834891,0.0,1.600499,1.689827,0,1
13603,4.221103,-0.853672,1.74514,2.173512,0.0,2.465966,1.884686,0,1
15813,5.719145,-1.196651,2.162305,2.311723,0.0,3.104997,2.689151,1,1
15883,3.47473,-1.458138,3.658532,0.445487,0.0,0.32114,3.200805,0,1
15449,5.419486,1.757571,1.17175,1.477604,0.0,4.680125,3.372671,1,1
6662,4.413308,1.32032,1.63656,1.182105,0.0,2.795775,2.978019,0,1


In [9]:
! hdfs dfs -ls ./all_transactions_table.parquet/

Found 4 items
-rw-r--r--   1 ubuntu hadoop          0 2022-09-09 19:01 all_transactions_table.parquet/_SUCCESS
drwxr-xr-x   - ubuntu hadoop          0 2022-09-06 20:40 all_transactions_table.parquet/date=2022-09-06
drwxr-xr-x   - ubuntu hadoop          0 2022-09-09 19:01 all_transactions_table.parquet/date=2022-09-08
drwxr-xr-x   - ubuntu hadoop          0 2022-09-09 19:01 all_transactions_table.parquet/date=2022-09-09


In [10]:
! hdfs dfs -ls ./practice_1/features.parquet/

Found 4 items
-rw-r--r--   1 ubuntu hadoop          0 2022-09-09 19:12 practice_1/features.parquet/_SUCCESS
drwxr-xr-x   - ubuntu hadoop          0 2022-09-09 19:12 practice_1/features.parquet/date=2022-09-06
drwxr-xr-x   - ubuntu hadoop          0 2022-09-09 19:12 practice_1/features.parquet/date=2022-09-08
drwxr-xr-x   - ubuntu hadoop          0 2022-09-09 19:12 practice_1/features.parquet/date=2022-09-09


In [11]:
spark.read.parquet("./practice_1/features.parquet/").limit(10).toPandas()

Unnamed: 0,scaledFeatures,TX_FRAUD,date
0,"[0.18781528127408215, -1.1916602227553825, 1.5...",0,2022-09-08
1,"[-0.7159539003608556, -0.35581083080726184, -0...",0,2022-09-08
2,"[-1.1308856066933906, 0.024210238302047084, -1...",0,2022-09-08
3,"[-0.016497595400296803, -0.10665631397324175, ...",0,2022-09-08
4,"[-0.22257613891013317, -0.5499508127528502, 0....",0,2022-09-08
5,"[-0.36206381891128325, -0.5033518454470265, -0...",0,2022-09-08
6,"[0.08591108105624379, -0.6643443391217576, 0.5...",0,2022-09-08
7,"[-0.6914868027838004, -0.9059337978629014, -0....",0,2022-09-08
8,"[-1.2320330925531757, -1.2125424689111624, -1....",0,2022-09-08
9,"[-1.266085238665572, -0.6363894351039987, -1.4...",0,2022-09-08


In [12]:
spark.stop()