In [25]:
import findspark
findspark.init() 

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim, count, isnan, when, col


In [2]:
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local"))
spark = SparkSession \
    .builder \
    .getOrCreate()

In [3]:
train = spark.read.csv("../data/fraudTrain.csv", header=True, inferSchema=True) 
test = spark.read.csv("../data/fraudTest.csv", header=True) 

In [4]:
print(train.count(), test.count())
print(train.columns)

1296675 555719
['_c0', 'trans_date_trans_time', 'cc_num', 'merchant', 'category', 'amt', 'first', 'last', 'gender', 'street', 'city', 'state', 'zip', 'lat', 'long', 'city_pop', 'job', 'dob', 'trans_num', 'unix_time', 'merch_lat', 'merch_long', 'is_fraud']


In [5]:
train.show(4, vertical=True)

-RECORD 0-------------------------------------
 _c0                   | 0                    
 trans_date_trans_time | 2019-01-01 00:00:18  
 cc_num                | 2703186189652095     
 merchant              | fraud_Rippin, Kub... 
 category              | misc_net             
 amt                   | 4.97                 
 first                 | Jennifer             
 last                  | Banks                
 gender                | F                    
 street                | 561 Perry Cove       
 city                  | Moravian Falls       
 state                 | NC                   
 zip                   | 28654                
 lat                   | 36.0788              
 long                  | -81.1781             
 city_pop              | 3495                 
 job                   | Psychologist, cou... 
 dob                   | 1988-03-09 00:00:00  
 trans_num             | 0b242abb623afc578... 
 unix_time             | 1325376018           
 merch_lat   

In [11]:
train.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: timestamp (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [7]:
train = train.withColumn("zip", train["zip"].cast(StringType()))

In [8]:
train = train.dropDuplicates(["_c0"])
train.count()

1296675

In [9]:
# Remove whitespaces from String columns
string_cols = [item[0] for item in train.dtypes if item[1]=="string"]
for col in string_cols:
    train = train.withColumn(col, trim(train[col]))

In [29]:
# count of both Null and missing values
# this function does not work when we have timestamps columns

train.select([count(when(isnan(col(c))|col(c).isNull(), c)).alias(c) for c in train.columns if c not in ["trans_date_trans_time", "dob"]]).show()



+---+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---------+---------+---------+----------+--------+
|_c0|cc_num|merchant|category|amt|first|last|gender|street|city|state|zip|lat|long|city_pop|job|trans_num|unix_time|merch_lat|merch_long|is_fraud|
+---+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---------+---------+---------+----------+--------+
|  0|     0|       0|       0|  0|    0|   0|     0|     0|   0|    0|  0|  0|   0|       0|  0|        0|        0|        0|         0|       0|
+---+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---------+---------+---------+----------+--------+



In [31]:
# delete columns with null values more than a threshold
agg_row = train.select(
    [
        (count(when(isnan(col(c))|col(c).isNull(), c))/train.count()).alias(c)
        for c in train.columns if c not in ["trans_date_trans_time", "dob"]]
).collect()


In [35]:
agg_dict = [row.asDict() for row in agg_row][0]
col_null = [i for i in agg_dict if agg_dict[i]>0.4]
col_null
train = train.drop(*col_null)

[]

In [55]:
train.groupBy("State").count().orderBy("count", ascending=False).show()

+-----+-----+
|State|count|
+-----+-----+
|   TX|94876|
|   NY|83501|
|   PA|79847|
|   CA|56360|
|   OH|46480|
|   MI|46154|
|   IL|43252|
|   FL|42671|
|   AL|40989|
|   MO|38403|
|   MN|31714|
|   AR|31127|
|   NC|30266|
|   WI|29368|
|   VA|29250|
|   SC|29190|
|   KY|28475|
|   IN|27580|
|   IA|26985|
|   OK|26671|
+-----+-----+
only showing top 20 rows



In [57]:
train.groupBy("State").sum("is_fraud").orderBy("sum(is_fraud)", ascending=False).show()

+-----+-------------+
|State|sum(is_fraud)|
+-----+-------------+
|   NY|          555|
|   TX|          479|
|   PA|          458|
|   CA|          326|
|   OH|          321|
|   FL|          281|
|   IL|          248|
|   MI|          238|
|   AL|          215|
|   MN|          207|
|   VA|          198|
|   SC|          193|
|   MO|          191|
|   NE|          180|
|   WI|          163|
|   AR|          161|
|   MD|          157|
|   KS|          156|
|   KY|          155|
|   NC|          149|
+-----+-------------+
only showing top 20 rows



In [60]:
train.groupBy("State").pivot("is_fraud").count().show()