# DATA PREPARATION: OVERSAMPLING AND UNDERSAMPLING 

Source:https://medium.com/@junwan01/oversampling-and-undersampling-with-pyspark-5dbc25cdf253

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, array, lit
from pyspark.sql import SQLContext

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("AppName") \
    .getOrCreate()

# importing the pre-cleaned dataset

df = spark.read.csv('Homicide_clean_Discr.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- Agency_Code: string (nullable = true)
 |-- Agency_Type: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Victim_Sex: string (nullable = true)
 |-- Victim_Race: string (nullable = true)
 |-- Victim_Ethnicity: string (nullable = true)
 |-- Perpetrator_Sex: string (nullable = true)
 |-- Perpetrator_Race: string (nullable = true)
 |-- Perpetrator_Ethnicity: string (nullable = true)
 |-- Relationship: string (nullable = true)
 |-- Weapon: string (nullable = true)
 |-- City_State: string (nullable = true)
 |-- Crime_Solved: string (nullable = true)
 |-- Victim_Age: string (nullable = true)
 |-- Perpetrator_Age: string (nullable = true)



In [3]:
# counting the number of records with CrimeSolved = Yes and with CrimeSolved= No and the ratio between them

major_df = df.filter(col("Crime_Solved") == "Yes")
minor_df = df.filter(col("Crime_Solved") == "No")
ratio = float(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
print("minor count: {}".format(minor_df.count()))
print("major count: {}".format(major_df.count()))

ratio: 2.3786490016998156
minor count: 182961
major count: 435200


## UNDERSAMPLING OF THE TOTAL DATASET

In [4]:
sampled_majority_df = major_df.sample(False, 1/ratio)
df = sampled_majority_df.unionAll(minor_df)

In [5]:
# we recompute the ratio in order to check that the undersampling technique worked as intended

major_df = df.filter(col("Crime_Solved") == "Yes")
minor_df = df.filter(col("Crime_Solved") == "No")
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
print("minor count: {}".format(minor_df.count()))
print("major count: {}".format(major_df.count()))

ratio: 1
minor count: 182961
major count: 183386


In [6]:
df.toPandas().to_csv("Homicide_Undersampled.csv", index = False)

## OVERSAMPLING OF THE STATE'S DATASETS
### Texas

In [7]:
# creation of a subsample including only homicides committed in Texas

df = spark.read.csv('Homicide_clean_Discr.csv', header = True, inferSchema = True)
sqlCtx = SQLContext(spark)
df.createOrReplaceGlobalTempView("df");
df = sqlCtx.sql("SELECT * FROM global_temp.df WHERE State='Texas'")

In [8]:
# counting the number of records with CrimeSolved = Yes and with CrimeSolved= No and the ratio between them

major_df = df.filter(col("Crime_Solved") == "Yes")
minor_df = df.filter(col("Crime_Solved") == "No")
ratio = float(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
print("minor count: {}".format(minor_df.count()))
print("major count: {}".format(major_df.count()))

ratio: 3.3456127628716463
minor count: 13790
major count: 46136


In [9]:
a = range(round(ratio))
a

range(0, 3)

In [10]:
# duplicating records belonging to the minority class
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

# combining the oversampled minority class and the majority class 
df = major_df.unionAll(oversampled_df)

In [11]:
# we recompute the ratio in order to check that the oversampling technique worked as intended

major_df = df.filter(col("Crime_Solved") == "Yes")
minor_df = df.filter(col("Crime_Solved") == "No")
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
print("minor count: {}".format(minor_df.count()))
print("major count: {}".format(major_df.count()))

ratio: 1
minor count: 41370
major count: 46136


In [12]:
df.toPandas().to_csv("Homicide_Texas.csv", index = False)

### California

In [13]:
# creation of a subsample including only homicides committed in California

df = spark.read.csv('Homicide_clean_Discr.csv', header = True, inferSchema = True)
sqlCtx = SQLContext(spark)
df.createOrReplaceGlobalTempView("df");
df = sqlCtx.sql("SELECT * FROM global_temp.df WHERE State='California'")

In [14]:
# counting the number of records with CrimeSolved = Yes and with CrimeSolved= No and the ratio between them

major_df = df.filter(col("Crime_Solved") == "Yes")
minor_df = df.filter(col("Crime_Solved") == "No")
ratio = float(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
print("minor count: {}".format(minor_df.count()))
print("major count: {}".format(major_df.count()))

ratio: 1.745169877207545
minor count: 34937
major count: 60971


In [15]:
a = range(round(ratio))
a

range(0, 2)

In [16]:
# duplicating records belonging to the minority class
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

# combining the oversampled minority class and the majority class 
df = major_df.unionAll(oversampled_df)

In [17]:
# we recompute the ratio in order to check that the oversampling technique worked as intended

major_df = df.filter(col("Crime_Solved") == "Yes")
minor_df = df.filter(col("Crime_Solved") == "No")
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
print("minor count: {}".format(minor_df.count()))
print("major count: {}".format(major_df.count()))

ratio: 0
minor count: 69874
major count: 60971


In [18]:
df.toPandas().to_csv("Homicide_California.csv", index = False)

### New York


In [19]:
# creation of a subsample including only homicides committed in New York

df = spark.read.csv('Homicide_clean_Discr.csv', header = True, inferSchema = True)
sqlCtx = SQLContext(spark)
df.createOrReplaceGlobalTempView("df");
df = sqlCtx.sql("SELECT * FROM global_temp.df WHERE State='New York'")

In [20]:
# counting the number of records with CrimeSolved = Yes and with CrimeSolved= No and the ratio between them

major_df = df.filter(col("Crime_Solved") == "Yes")
minor_df = df.filter(col("Crime_Solved") == "No")
ratio = float(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
print("minor count: {}".format(minor_df.count()))
print("major count: {}".format(major_df.count()))

ratio: 1.1596588011682352
minor count: 21571
major count: 25015


In [21]:
# this dataset revealed to be already balanced and therefore didn't need to be oversampled
df.toPandas().to_csv("Homicide_NY.csv", index = False)