## Data Transformations

### Libraries import and data ingestions

In [1]:
from common_libraries import * 
import project_function

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.sql.debug.catalog", False) \
    .config("spark.logLevel", "ERROR") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.memory", "4g") \
    .config("spark.driver.cores", "2") \
    .getOrCreate()


spark.sparkContext.setLogLevel("WARN")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/19 15:17:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/19 15:17:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/04/19 15:17:19 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
df = spark.read.csv("/Users/pasqualesalomone/Desktop/crime_data.csv", header=True, inferSchema=True)


                                                                                

### Using lower case snake notation to standardize column names

In [4]:
column_snake_case_lambda = lambda s: re.sub(r'\W+', ' ', s).lower().replace(' ', '_')


In [5]:
new_columns = [column_snake_case_lambda(col_name) for col_name in df.columns]

# Rename the columns in the DataFrame
df = df.toDF(*new_columns)

### Converting data types

In [6]:
df = (df.withColumn("date_rptd", 
                   to_date(concat_ws("-", split(substring(col('date_rptd'),1,10), "/")[2], 
                              split(substring(col('date_rptd'),1,10), "/")[0], 
                              split(substring(col('date_rptd'),1,10), "/")[1]),'yyyy-mm-dd'))\
      .withColumn("date_occ", 
                   to_date(concat_ws("-", split(substring(col('date_occ'),1,10), "/")[2], 
                              split(substring(col('date_occ'),1,10), "/")[0], 
                              split(substring(col('date_occ'),1,10), "/")[1]),'yyyy-mm-dd'))\

      #.withColumn('time_occ',col('time_occ').cast('string'))
      .withColumn('weapon_used_cd', col('weapon_used_cd').cast('string'))
      .withColumn('premis_cd', col('premis_cd').cast('string'))
      .withColumn('crm_cd', col('crm_cd').cast('string'))
      .withColumn('area', col('area').cast('string'))
      .withColumn('crm_cd_1', col('crm_cd_1').cast('string'))
      .withColumn('crm_cd_2', col('crm_cd_2').cast('string'))
      .withColumn('crm_cd_3', col('crm_cd_3').cast('string'))
      .withColumn('crm_cd_4', col('crm_cd_4').cast('string'))
      .withColumn('rpt_dist_no',col('rpt_dist_no').cast('string')))

24/04/19 15:17:37 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Renaming dr_no column to crime_record_id

In [7]:
df = df.withColumnRenamed('dr_no','crime_record_id')

### Creating a new column days_to_report

In [8]:
df = df.withColumn('column_days_to_report', datediff(
                                                col('date_rptd'),col('date_occ')))
df.show(2,vertical=True)

24/04/19 15:17:48 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


-RECORD 0-------------------------------------
 crime_record_id       | 190326475            
 date_rptd             | 2020-01-01           
 date_occ              | 2020-01-01           
 time_occ              | 2130                 
 area                  | 7                    
 area_name             | Wilshire             
 rpt_dist_no           | 784                  
 part_1_2              | 1                    
 crm_cd                | 510                  
 crm_cd_desc           | VEHICLE - STOLEN     
 mocodes               | NULL                 
 vict_age              | 0                    
 vict_sex              | M                    
 vict_descent          | O                    
 premis_cd             | 101                  
 premis_desc           | STREET               
 weapon_used_cd        | NULL                 
 weapon_desc           | NULL                 
 status                | AA                   
 status_desc           | Adult Arrest         
 crm_cd_1    

### Creating new columns crime_occ_hour,crime_occ_minutes, and drop the time_occ column 

In [9]:
# Extract day, month, and year from date_occ and date_rptd
df = df.withColumn("crime_day_occ", dayofmonth("date_occ")) \
       .withColumn("crime_month_occ", month("date_occ")) \
       .withColumn("crime_year_occ", year("date_occ")) \
       .withColumn("crime_day_rptd", dayofmonth("date_rptd")) \
       .withColumn("crime_month_rptd", month("date_rptd")) \
       .withColumn("crime_year_rptd", year("date_rptd")) \
       .withColumn("crime_occ_hour", (df["time_occ"] / 100).cast("int"))\
       .withColumn("crime_occ_minute", df["time_occ"] % 100)

# Drop unnecessary columns
df = df.drop("time_occ", "date_rptd", "date_occ")


### Dropping columns with a nulls ratio grater than 60%

In [10]:
project_function.nulls_buster(df,spark)

                                                                                

Unnamed: 0,column_name,%null
0,crm_cd_4,99.993086
1,crm_cd_3,99.755866
2,crm_cd_2,92.736789
3,cross_street,84.312103
4,weapon_used_cd,65.464611
5,weapon_desc,65.464611
6,mocodes,13.985006
7,vict_descent,13.305103
8,vict_sex,13.304023
9,premis_desc,0.060385


In [11]:
df = df.drop('weapon_desc','weapon_used_cd','cross_street','crm_cd_2','crm_cd_3','crm_cd_4')

In [14]:
#df.groupBy('vict_descent').count().orderBy('count', ascending=False).first()['vict_descent']
   

In [12]:
# df1 = df.toPandas().copy()

                                                                                

In [17]:
# # Calculate the total number of rows
# total_rows = len(df1)

# # Count the number of null or whitespace values in the 'vict_sex' column
# null_or_whitespace_count = df1['vict_sex'].isnull().sum() + (df1['vict_sex'] == '').sum()

# # Calculate the percentage
# percentage_null_or_whitespace = (null_or_whitespace_count / total_rows) * 100

# print("Percentage of null or whitespace values in 'vict_sex':", percentage_null_or_whitespace)


Percentage of null or whitespace values in 'vict_sex': 13.304022814673983


In [14]:
# from pyspark.sql.functions import col

# # Calculate the total number of rows
# total_rows = df.count()

# # Count the number of null or whitespace values in the 'vict_sex' column
# null_or_whitespace_count = df.filter((col('vict_sex').isNull()) | (col('vict_sex') == '')).count()

# # Calculate the percentage
# percentage_null_or_whitespace = (null_or_whitespace_count / total_rows) * 100

# print("Percentage of null or whitespace values in 'vict_sex':", percentage_null_or_whitespace)


Percentage of null or whitespace values in 'vict_sex': 13.304022814673983


In [19]:

# # Method 1: Impute missing values with the mode
# mode_value = df1['vict_sex'].mode()[0]
# df_mode = df1.fillna({'vict_sex': mode_value})

# # Method 2: Impute missing values with a probability-based approach
# value_counts = df1['vict_sex'].value_counts(normalize=True,dropna=False)
# missing_indices = df1[df1['vict_sex'].isna() | df1['vict_sex'].str.isspace()].index
# imputed_values = np.random.choice(value_counts.index, size=len(missing_indices), p=value_counts.values)
# df_prob = df1.copy()
# df_prob.loc[missing_indices, 'vict_sex'] = imputed_values
# print(df_prob['vict_sex'].value_counts())

# # Compare the results
# # print("Original DataFrame:")
# # print(df1['vict_sex'])
# # print("\nImputed DataFrame using mode:")
# # print(df_mode['vict_sex'])
# # print("\nImputed DataFrame using probability-based approach:")
# # print(df_prob['vict_sex'])
# #(df_prob['vict_sex'] != df_mode['vict_sex']).sum()

vict_sex
M    429384
F    382973
X     96815
H       121
-         1
Name: count, dtype: int64


In [20]:
# df1 = df.toPandas().copy()

# # Calculate the probability distribution of existing values in the 'vict_sex' column
# value_counts = df1['vict_sex'].value_counts(normalize=True)
# df1['vict_sex'].replace(r'^\s*$', np.nan, regex=True, inplace=True)

# # Get indices of rows with missing values
# missing_indices = df1[df1['vict_sex'].isna()].index


                                                                                

In [21]:
# imputed_values = np.random.choice(value_counts.index, size=len(missing_indices), p=value_counts.values)
# df1.loc[missing_indices, 'vict_sex'] = imputed_values


### Imputing missing values

In [23]:
df = project_function.impute_missing_values(df, [
        "mocodes",
        "vict_descent",
        "vict_sex",
        "premis_desc",
        "premis_cd",
        "crm_cd_1"
    ])

                                                                                

In [26]:
# def jaccard_similarity(list1, list2):
#     set1 = set(list1)
#     set2 = set(list2)
#     intersection = len(set1.intersection(set2))
#     union = len(set1.union(set2))
#     return intersection / union



# similarity = jaccard_similarity(setone, settwo)
# print("Jaccard similarity:", similarity)


Jaccard similarity: 1.0


In [None]:

#df.select(sum(col('mocodes').isNull().cast('int'))).collect()[0][0]
#df.filter((F.col('mocodes').isNull()) | (F.col('mocodes') == '')).count()
#df.select(col('mocodes')).show()

### Ensuring that the nulls_buster function returns no more null values

In [24]:
project_function.nulls_buster(df,spark)

                                                                                

Unnamed: 0,column_name,%null


In [None]:
#df.groupBy('mocodes').agg(F.count('*').alias('count')).orderBy('count',ascending=False).show()


### Dropping an uncategorized column

In [26]:
df = df.drop('part_1_2')


### Data Integrity Check: Ensuring Validity of Column Values

In [27]:
columns_of_interest = ['vict_age', 'lat', 'lon', 'crime_day_occ', 
                       'crime_month_occ', 'crime_year_occ', 
                       'crime_day_rptd', 'crime_month_rptd', 'crime_year_rptd']

min_max_df = df.select(*[min(col).alias(f"min_{col}") for col in columns_of_interest] +
                      [max(col).alias(f"max_{col}") for col in columns_of_interest])

min_max_df.toPandas().transpose()

                                                                                

Unnamed: 0,0
min_vict_age,-4.0
min_lat,0.0
min_lon,-118.6676
min_crime_day_occ,1.0
min_crime_month_occ,1.0
min_crime_year_occ,2020.0
min_crime_day_rptd,1.0
min_crime_month_rptd,1.0
min_crime_year_rptd,2020.0
max_vict_age,120.0


In [28]:
# Filter out rows with negative values in any of the selected columns
negative_values_df = df.filter(
    (col("vict_age") < 0) 
)

print(f"The number of rows with negative values in the field vict_age is {negative_values_df.toPandas().shape[0]} or the {negative_values_df.toPandas().shape[0]/df.count()} %")

                                                                                

The number of rows with negative values in the field vict_age is 112 or the 0.00012098690748822538 %


### Replacing neegative vict_age values with positive ones.

In [29]:
df = df.withColumn("vict_age", when(col("vict_age") < 0, -col("vict_age")).otherwise(col("vict_age")))


In [30]:
print(f"The number of rows with negative values in the field vict_age after the transformation is {df.filter((col('vict_age') < 0)).count()}")


The number of rows with negative values in the field vict_age after the transformation is 0


### Replace "-" with "unknown" in the vict_sex column

In [31]:
df = df.withColumn("vict_sex", when(col("vict_sex") == "-", "unknown").otherwise(col("vict_sex")))

#### Writing the curated df to a parquet file

In [32]:
df.write.mode("overwrite").parquet("curated_data.parquet")

                                                                                