
# **Initiate and Configure Spark**

In [1]:
# Using ! to execute a command in the command line or terminal
# Using pip3 to interact with the Python package manager for Python 3.x
# Using install to specify that we want to install a package
# Install the PySpark library, which is the Python API for Apache Spark


!pip3 install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=b2704c5806718d28ec97ac682ac0347b603e381b6670c8dba907b7ee283413fe
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1



# **Connect Google Drive**

In [2]:
# Importing the 'drive' module from the 'google.colab' library to mount Google Drive
from google.colab import drive

# Mounting the Google Drive at the '/content/drive' directory
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# Importing the SparkSession class from the pyspark.sql module
from pyspark.sql import SparkSession

# Creating a SparkSession named 'spark' to interact with Spark
# The 'master' parameter is set to "local[*]", which means Spark will run in local mode using all available cores
# The 'appName' parameter is set to 'Fraud Detection' to give a name to the Spark application
# The 'getOrCreate()' method ensures that if an existing SparkSession is available, it will be reused; otherwise, a new one will be created


spark = SparkSession.builder \
        .master("local[*]") \
        .appName('Fraud Detection') \
        .getOrCreate()
        # .config("spark.driver.memory", "8g") \
        # .config("spark.kryoserializer.buffer.max", "16g") \

---
# **Data Loading and Preprocessing**
---

In [4]:
#Identify the student who made a contribution and mention their name in the appropriate section of the code.

## The students' names who made contributions

# load spark_df using spark
spark_df = spark.read.csv('/content/drive/MyDrive/big_data/transactions_train.csv',inferSchema=True, header =True)

# available columns in this spark_df
spark_df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrig',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud']

In [5]:
spark_df.describe().show()

+-------+------------------+--------+-----------------+-----------+-----------------+------------------+-----------+------------------+------------------+--------------------+
|summary|              step|    type|           amount|   nameOrig|   oldbalanceOrig|    newbalanceOrig|   nameDest|    oldbalanceDest|    newbalanceDest|             isFraud|
+-------+------------------+--------+-----------------+-----------+-----------------+------------------+-----------+------------------+------------------+--------------------+
|  count|           6351193| 6351193|          6351193|    6351193|          6351193|           6351193|    6351193|           6351193|           6351193|             6351193|
|   mean|242.55529819358347|    NULL|179815.5359635669|       NULL|834795.6840371998|  856169.582831443|       NULL|1101042.5969942801|1225371.9736932502|0.001215047314732...|
| stddev|141.06763627792867|    NULL|603630.9774416926|       NULL|2889959.094210148|2926073.0596211716|       NULL|3398

In [6]:
from pyspark.sql.functions import isnan, when, count, col

# Check for missing values in each column
missing_counts = spark_df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in spark_df.columns])
missing_counts.show()

# Check for missing values in any row
total_missing_count = spark_df.rdd.map(lambda row: sum([1 for x in row if x == None])).sum()
print("Total missing values in DataFrame: {}".format(total_missing_count))

+----+----+------+--------+--------------+--------------+--------+--------------+--------------+-------+
|step|type|amount|nameOrig|oldbalanceOrig|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|
+----+----+------+--------+--------------+--------------+--------+--------------+--------------+-------+
|   0|   0|     0|       0|             0|             0|       0|             0|             0|      0|
+----+----+------+--------+--------------+--------------+--------+--------------+--------------+-------+

Total missing values in DataFrame: 0


In [7]:
from pyspark.sql.functions import when
# convert column type into numerical value
"""
  CASH-IN : 0,
  CASH-OUT : 1,
  DEBIT: 2,
  PAYMENT : 3,
  TRANSFER: 4,

"""
spark_df=spark_df.withColumn("type",
                             when(spark_df.type=="CASH_IN", 0)
                            .when(spark_df.type=="CASH_OUT", 1)
                            .when(spark_df.type=="DEBIT", 2)
                            .when(spark_df.type=="PAYMENT", 3)
                            .when(spark_df.type=="TRANSFER", 4)
                            .otherwise(-1))

spark_df.show()

+----+----+---------+-----------+--------------+--------------+-----------+--------------+--------------+-------+
|step|type|   amount|   nameOrig|oldbalanceOrig|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|
+----+----+---------+-----------+--------------+--------------+-----------+--------------+--------------+-------+
|   1|   3|  9839.64|C1231006815|      170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|
|   1|   3|  1864.28|C1666544295|       21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|
|   1|   4|    181.0|C1305486145|         181.0|           0.0| C553264065|           0.0|           0.0|      1|
|   1|   1|    181.0| C840083671|         181.0|           0.0|  C38997010|       21182.0|           0.0|      1|
|   1|   3| 11668.14|C2048537720|       41554.0|      29885.86|M1230701703|           0.0|           0.0|      0|
|   1|   3|  7817.71|  C90045638|       53860.0|      46042.29| M573487274|           0.

In [8]:
spark_df.count()

6351193

In [11]:
from pyspark.ml.feature import HashingTF
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.ml import Pipeline

# Define the schema
schema = StructType([
    StructField("nameOrig", StringType(), True),  # Original column
    StructField("nameDest", StringType(), True),  # Original column
    StructField("nameOrig_hashed", ArrayType(FloatType()), True),  # Hashed column for nameOrig
    StructField("nameDest_hashed", ArrayType(FloatType()), True),  # Hashed column for nameDest
])


# Define a function to apply the pipeline to each partition
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def hashingFeatures(pdf):
    # Define stages for the pipeline within the function
    hashingTF = HashingTF(inputCol="nameOrig", outputCol="nameOrig_hashed", numFeatures=10)
    hashingTF_dest = HashingTF(inputCol="nameDest", outputCol="nameDest_hashed", numFeatures=10)

    # Create the pipeline
    pipeline = Pipeline(stages=[hashingTF, hashingTF_dest])

    # Fit the pipeline to the data
    pipeline_model = pipeline.fit(pdf)

    # Transform the data
    df_hashed = pipeline_model.transform(pdf)

    return df_hashed

# Apply hashing to the DataFrame
df_hashed = spark_df.groupby("nameOrig").apply(hashingFeatures)

df_hashed.show()




PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "<ipython-input-11-8f1e8861d288>", line 19, in hashingFeatures
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/__init__.py", line 139, in wrapper
    return func(self, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/ml/feature.py", line 1677, in __init__
    self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.HashingTF", self.uid)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 80, in _new_java_obj
    assert sc is not None
AssertionError
