In [3]:
!pip install pyspark

[0m

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

24/11/03 23:57:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
incident_table = spark.read.csv('gs://6242filteringbucket/incident_2011_2021NEW.csv', header=True, inferSchema=True)
victim_table = spark.read.csv('gs://6242filteringbucket/victim_table_2011_2021.csv', header=True, inferSchema=True)
cde_agencies_table = spark.read.csv('gs://6242filteringbucket/cde_agencies_2011_2021_NEW.csv', header=True, inferSchema=True)
offense_table = spark.read.csv('gs://6242filteringbucket/offense_2011_2021NEW.csv', header=True, inferSchema=True) #get this table with the data_year
offense_type_table = spark.read.csv('gs://6242filteringbucket/offense_type_name_2011_2021NEW.csv', header=True, inferSchema=True)
victim_offense_table = spark.read.csv('gs://6242filteringbucket/victim_offense_2011_2021NEW.csv', header=True, inferSchema=True)

                                                                                

In [9]:
victim_table = victim_table.select(['incident_id', 'victim_id', 'age_num', 'sex_code', 'race_id', 'ethnicity_id', 'resident_status_code'])
offense_table = offense_table.drop('incident_id') #dropping the duplicate incident_id

In [10]:
incident_victim_df = incident_table.join(victim_table, on='incident_id', how='inner')
agencies_incident_victim_df = cde_agencies_table.join(incident_victim_df, on='agency_id', how='inner')
offense_victim_offense_df = offense_table.join(victim_offense_table, on='offense_id', how='inner')
final_df = agencies_incident_victim_df.join(offense_victim_offense_df, on='victim_id', how='inner')

In [11]:


# Define the mapping logic using `when` statements for race_id
final_df = final_df.withColumn(
    'race_id',
    F.when(final_df['ethnicity_id'] == 1, 'Hispanic or Latino')
     .when(final_df['race_id'] == 0, 'Unknown')
     .when(final_df['race_id'] == 1, 'White')
     .when(final_df['race_id'] == 2, 'Black or African American')
     .when(final_df['race_id'] == 3, 'American Indian or Alaska Native')
     .when(final_df['race_id'] == 4, 'Asian')
     .when(final_df['race_id'] == 5, 'Asian, Native Hawaiian, or Other Pacific Islander')
     .when(final_df['race_id'] == 6, 'Chinese')
     .when(final_df['race_id'] == 7, 'Japanese')
     .when(final_df['race_id'] == 8, 'Native Hawaiian or Other Pacific Islander')
     .when(final_df['race_id'] == 9, 'Other')
     .when(final_df['race_id'] == 98, 'Multiple')
     .when(final_df['race_id'] == 99, 'Not Specified')
     .otherwise(None)
)

# Filter out the specified race_id categories
final_df = final_df.filter(~final_df["race_id"].isin(['Unknown', 'Other', 'Multiple', 'Not Specified']))

In [12]:
final_df = final_df.dropDuplicates()

In [13]:
offense_data = [
    (1, 'Justifiable Homicide', 'Homicide Offenses'), 
    (2, 'False Pretenses/Swindle/Confidence Game', 'Fraud Offenses'), 
    (3, 'Statutory Rape', 'Sex Offenses'), 
    (4, 'Sexual Assault With An Object', 'Sex Offenses'), 
    (5, 'Destruction/Damage/Vandalism of Property', 'Destruction/Damage/Vandalism of Property'), 
    (6, 'Family Offenses, Nonviolent', None), 
    (7, 'Theft of Motor Vehicle Parts or Accessories', 'Larceny/Theft Offenses'), 
    (8, 'Pornography/Obscene Material', 'Pornography/Obscene Material'), 
    (9, 'Sports Tampering', 'Gambling Offenses'), 
    (10, 'Driving Under the Influence', None), 
    (11, 'Counterfeiting/Forgery', 'Counterfeiting/Forgery'), 
    (12, 'Welfare Fraud', 'Fraud Offenses'), 
    (13, 'Pocket-picking', 'Larceny/Theft Offenses'), 
    (14, 'Theft From Motor Vehicle', 'Larceny/Theft Offenses'), 
    (15, 'Assisting or Promoting Prostitution', 'Prostitution Offenses'), 
    (16, 'Drug/Narcotic Violations', 'Drug/Narcotic Offenses'), 
    (17, 'Wire Fraud', 'Fraud Offenses'), 
    (18, 'Purse-snatching', 'Larceny/Theft Offenses'), 
    (19, 'Runaway', None), 
    (20, 'Arson', 'Arson'), 
    (21, 'Motor Vehicle Theft', 'Motor Vehicle Theft'), 
    (22, 'Drunkenness', None), 
    (23, 'Shoplifting', 'Larceny/Theft Offenses'), 
    (24, 'Operating/Promoting/Assisting Gambling', None), 
    (25, 'Bad Checks', 'Fraud Offenses'), 
    (26, 'Extortion/Blackmail', 'Extortion/Blackmail'), 
    (27, 'Aggravated Assault', 'Assault Offenses'), 
    (28, 'Stolen Property Offenses', 'Stolen Property Offenses'), 
    (29, 'Kidnapping/Abduction', 'Kidnapping/Abduction'), 
    (30, 'Prostitution', 'Prostitution Offenses'), 
    (31, 'Betting/Wagering', 'Gambling Offenses'), 
    (32, 'Murder and Nonnegligent Manslaughter', 'Homicide Offenses'), 
    (33, 'Peeping Tom', None), 
    (34, 'Trespass of Real Property', None), 
    (35, 'Drug Equipment Violations', 'Drug/Narcotic Offenses'), 
    (36, 'Rape', 'Sex Offenses'), 
    (37, 'Embezzlement', 'Embezzlement'), 
    (38, 'Negligent Manslaughter', 'Homicide Offenses'), 
    (39, 'Weapon Law Violations', 'Weapon Law Violations'), 
    (40, 'Robbery', 'Robbery'), 
    (41, 'Credit Card/Automated Teller Machine Fraud', 'Fraud Offenses'), 
    (42, 'Curfew/Loitering/Vagrancy Violations', None), 
    (43, 'Sodomy', 'Sex Offenses'), 
    (44, 'Intimidation', 'Assault Offenses'), 
    (45, 'All Other Larceny', 'Larceny/Theft Offenses'), 
    (46, 'Impersonation', 'Fraud Offenses'), 
    (47, 'Theft From Building', 'Larceny/Theft Offenses'), 
    (48, 'All Other Offenses', None), 
    (49, 'Burglary/Breaking & Entering', 'Burglary/Breaking & Entering'), 
    (50, 'Theft From Coin-Operated Machine or Device', 'Larceny/Theft Offenses'), 
    (51, 'Simple Assault', 'Assault Offenses'), 
    (52, 'Liquor Law Violations', None), 
    (53, 'Disorderly Conduct', None), 
    (54, 'Gambling Equipment Violation', 'Gambling Offenses'), 
    (55, 'Incest', 'Sex Offenses'), 
    (56, 'Fondling', 'Sex Offenses'), 
    (57, 'Bribery', 'Bribery'), 
    (58, 'Not Specified', None), 
    (59, 'Human Trafficking, Commercial Sex Acts', 'Human Trafficking'), 
    (60, 'Human Trafficking, Involuntary Servitude', 'Human Trafficking'), 
    (61, 'Purchasing Prostitution', 'Prostitution Offenses'), 
    (62, 'Animal Cruelty', 'Animal Cruelty'), 
    (63, 'Identity Theft', 'Fraud Offenses'), 
    (64, 'Hacking/Computer Invasion', 'Fraud Offenses')
]

offense_data_df = spark.createDataFrame(offense_data, ['offense_type_id', 'offense_name', 'offense_category_name'])

# Join final_df with offense_data_df on offense_type_id
final_all_df = final_df.join(offense_data_df, on='offense_type_id', how='left') 

In [14]:
final_all_df = final_all_df.dropDuplicates()
final_all_df.show(10)

[Stage 32:>                                                         (0 + 1) / 1]

+---------------+---------+---------+----------------+--------------+----------+--------+----------+-----------+-------+--------+-------+------------+--------------------+----------+--------------------+---------------------+
|offense_type_id|victim_id|agency_id|       city_name|primary_county|state_abbr|state_id|population|incident_id|age_num|sex_code|race_id|ethnicity_id|resident_status_code|offense_id|        offense_name|offense_category_name|
+---------------+---------+---------+----------------+--------------+----------+--------+----------+-----------+-------+--------+-------+------------+--------------------+----------+--------------------+---------------------+
|             45| 62205773|     8714|Sterling Heights|        Macomb|        MI|      26|    132255|   57347297|     10|       M|  White|           2|                   R|  66872038|   All Other Larceny| Larceny/Theft Off...|
|             45| 62206067|    21679|            null|       Raleigh|        WV|      55|     57

                                                                                

In [15]:
final_all_df.write.csv('gs://6242filteringbucket/final_all.csv', header=True)

                                                                                

In [16]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("MergeCSV").getOrCreate()


24/11/04 01:38:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [28]:
import pandas as pd

# List of file names, change the path if files are in a different directory
file_names = [f"gs://6242filteringbucket/final_all.csv/part-{i:05d}-58716585-8ef7-4182-9bc0-ae3052657caf-c000.csv" for i in range(67)]

output_file = 'gs://6242filteringbucket/merged_output.csv'

# Open the output file in write mode
with open(output_file, "w") as outfile:
    x = 1
#     # Write header from the first file
#     with open(file_names[0], "r") as first_file:
#         outfile.write(first_file.readline())  # Write the header line once

#     # Append the rest of the files
#     for file_name in file_names:
#         with open(file_name, "r") as infile:
#             next(infile)  # Skip the header line for each file
#             # Write the rest of the lines
#             outfile.writelines(infile.readlines())


FileNotFoundError: [Errno 2] No such file or directory: 'gs://6242filteringbucket/victim_table_2011_2021.csv'

In [29]:

# Load the two CSV files
df1 = spark.read.csv("gs://6242filteringbucket/city_scout_fbi_nibrs_2011_2021.csv", header=True, inferSchema=True)
df2 = spark.read.csv("gs://6242filteringbucket/offense_dates.csv", header=True, inferSchema=True)


# Perform the join on offense_id
merged_df = df1.join(df2, on="offense_id", how="inner")  # Use "inner" for matching rows only; can be "left" or "outer" as needed


                                                                                

In [32]:
merged_df = merged_df.dropDuplicates()
# Save the merged DataFrame to a new CS
# merged_df.write.csv("gs://6242filteringbucket/final_csv_with_dates.csv", header=True)

In [44]:
# sort merged_df for data year between 2015 and 2020
filtered_df = merged_df.filter((merged_df.data_year >= 2017) & (merged_df.data_year <= 2021))
filtered_df.write.csv("gs://6242filteringbucket/final2017_2021.csv", header=True)
                                                                                


AnalysisException: path gs://6242filteringbucket/final2015_2021.csv already exists.

In [35]:
# merge the files using a separate python file in the analysis cluster
# different strat
new_merged_df = spark.read.option("header", "true").csv("gs://6242filteringbucket/final2017_2021.csv/")
new_merged_single_file_df = new_merged_df.coalesce(1)
new_merged_single_file_df.write.option("header", "true").mode("overwrite").csv("gs://6242filteringbucket/final2017_2021_merged.csv")

                                                                                

In [43]:
# sort merged_df for data year between 2015 and 2020
filtered_df = merged_df.filter((merged_df.data_year >= 2015) & (merged_df.data_year <= 2021))
filtered_df.write.csv("gs://6242filteringbucket/final2015_2021.csv", header=True)
                                                                                


                                                                                

In [45]:
# merge the files using a separate python file in the analysis cluster
# different strat
new_merged_df = spark.read.option("header", "true").csv("gs://6242filteringbucket/final2015_2021.csv/")
new_merged_single_file_df = new_merged_df.coalesce(1)
new_merged_single_file_df.write.option("header", "true").mode("error").csv("gs://6242filteringbucket/final2015_2021_merged.csv")

                                                                                