### Install PySpark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq


In [None]:
!pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


### Get Kaggle Dataset

In [None]:
!pip install kaggle

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import os

os.environ["KAGGLE_USERNAME"] = "***"
os.environ["KAGGLE_KEY"] = "***"

In [None]:
!kaggle datasets download -d ealaxi/paysim1

Downloading paysim1.zip to /content
 99% 176M/178M [00:05<00:00, 38.8MB/s]
100% 178M/178M [00:05<00:00, 35.4MB/s]


In [None]:
!unzip paysim1.zip

Archive:  paysim1.zip
  inflating: PS_20174392719_1491204439457_log.csv  


[link text](https://)

In [None]:
from pyspark.sql import SparkSession
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression, Ridge, Lasso
from sklearn.metrics import accuracy_score, mean_squared_error
from pyspark.sql.functions import broadcast
import pandas as pd
import numpy as np


# Create a SparkSession
spark = SparkSession.builder \
    .appName("BroadcastAndRunModels") \
    .getOrCreate()

df_train = pd.read_csv("PS_20174392719_1491204439457_log.csv")#, nrows=500) #.loc[1:500]
df_train = df_train.rename(columns={'isFraud': 'target'})

df_train["type"] = df_train["type"].astype('category')
df_train["type"] = df_train["type"].cat.codes

columns_to_drop = ["nameDest", "nameOrig", "step"]

df_train = df_train.drop(columns_to_drop, inplace=False, axis=1)
# print(df_train)

# Broadcast the training dataset
broadcast_train = spark.sparkContext.broadcast(df_train)

# Define the models to run
models = [
    LogisticRegression(),
    Ridge(),
    Lasso()
]

# Define a function to run a model on a partition
def run_model_on_partition(rows):
    # Get the broadcasted training dataset
    train_data = broadcast_train.value

    # Convert the partition rows to a Pandas DataFrame
    #df_partition = pd.DataFrame(rows, columns=df_features.columns)

    # Split the data into features and target
    X = train_data.drop('target', axis=1)
    y = train_data['target']

    # Train and evaluate each model on the partition
    results = []
    for model in models:
        model.fit(X, y)

        if isinstance(model, LogisticRegression):
            y_pred = model.predict(X)
            accuracy = accuracy_score(y, y_pred)
            results.append(('Logistic Regression', accuracy))
        else:
            y_pred = model.predict(X)
            mse = mean_squared_error(y, y_pred)
            results.append((type(model).__name__, mse))

    return results

# Create an RDD from the training dataset
rdd_train = spark.sparkContext.parallelize(df_train.values.tolist())

# Apply the model function on each partition of the RDD
results_rdd = rdd_train.mapPartitions(run_model_on_partition)

# Collect the results from all partitions
results = results_rdd.collect()

# Print the results
for model_name, result in results:
    print(f"Model: {model_name}, Result: {result}")




KeyboardInterrupt: ignored