In [1]:
#### Setting up Spark for colab space, code provided by Professor Othman

import os
#Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Set JAVA_HOME path variable in Linux
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version


#Install Spark
#download Spark file
!wget -q http://apache.osuosl.org/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
#extract the file
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
#Set SPARK-HOME path variable in Linux
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"


#install findspark package
!pip install -q findspark

openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1~18.04-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)


In [2]:
import findspark
findspark.init()

In [3]:
# create entry points to spark
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
conf = SparkConf().setAppName("lecture10").setMaster("local[*]")
sc=SparkContext(conf = conf)
spark = SparkSession(sparkContext=sc)
sqlContext = SQLContext(sc)

In [4]:
spark

In [5]:
# hardcoded variables telling our program which states, years, and tables are needed from NIBRS dataa

# Can make the list longer if running locally, for google colab, better to do one state at a time

state_list = ["alabama", "arizona", "arkansas","colorado", "connecticut"]
state_abbr = ["AL", "AZ", "AR", "CO", "CT"]
csv = ["nibrs_circumstances", "nibrs_victim_circumstances", "nibrs_victim", "ref_race"]
firstyear = 1991
lastyear = 2018

In [6]:
import pandas as pd

df_all = []   # empty list of all read in tables
url_base="https://raw.githubusercontent.com/roched1atwit/CS3800_4050/master/data/"

for j in range(0, len(csv)):
    df_list = []
    for state_i in range(0,len(state_list)):
        for year in range(firstyear,lastyear+1):
            url = url_base+state_list[state_i]+'/'+state_abbr[state_i]+'-'+str(year)
            url_c = url + '/' + csv[j] + '.csv'
            #print("Reading " + url_c)
            #print(url_c)
            try:
                temp_df = pd.read_csv(url_c, error_bad_lines=False)
            except:
                url_c = url + '/' + csv[j].upper() + '.csv'
                try:
                    temp_df = pd.read_csv(url_c, error_bad_lines=False)
                    temp_df.columns = map(str.lower, temp_df.columns)
                except:
                    #break
                    print(url_c + " did not exist")
            
            temp_df['state'] = state_abbr[state_i]
            df_list.append(temp_df)
            #print(df_list)
            #break
    df_all.append(pd.concat(df_list))
        
#AL2018_OFFENSE=pd.read_csv(url,  error_bad_lines=False)
#AL2018_OFFENSE

https://raw.githubusercontent.com/roched1atwit/CS3800_4050/master/data/alabama/AL-1993/NIBRS_CIRCUMSTANCES.csv did not exist
https://raw.githubusercontent.com/roched1atwit/CS3800_4050/master/data/alabama/AL-1994/NIBRS_CIRCUMSTANCES.csv did not exist
https://raw.githubusercontent.com/roched1atwit/CS3800_4050/master/data/alabama/AL-1995/NIBRS_CIRCUMSTANCES.csv did not exist
https://raw.githubusercontent.com/roched1atwit/CS3800_4050/master/data/alabama/AL-1996/NIBRS_CIRCUMSTANCES.csv did not exist
https://raw.githubusercontent.com/roched1atwit/CS3800_4050/master/data/alabama/AL-1997/NIBRS_CIRCUMSTANCES.csv did not exist
https://raw.githubusercontent.com/roched1atwit/CS3800_4050/master/data/alabama/AL-1998/NIBRS_CIRCUMSTANCES.csv did not exist
https://raw.githubusercontent.com/roched1atwit/CS3800_4050/master/data/alabama/AL-1999/NIBRS_CIRCUMSTANCES.csv did not exist
https://raw.githubusercontent.com/roched1atwit/CS3800_4050/master/data/alabama/AL-2000/NIBRS_CIRCUMSTANCES.csv did not exist


In [7]:
from pyspark.sql.types import *

circumstances_schema = StructType([ StructField("circumstances_id", IntegerType(), True)\

                       ,StructField("circumstances_type", StringType(), True)\

                       ,StructField("circumstances_code", IntegerType(), True)\

                       ,StructField("circumstances_name", StringType(), True)\

                       ,StructField("state", StringType(), True)])

circumstances_spark = spark.createDataFrame(df_all[0],schema=circumstances_schema)
del df_all[0]

In [8]:
circumstances_spark = circumstances_spark.drop("state")
circumstances_spark = circumstances_spark.drop_duplicates()
circumstances_spark.show()
circumstances_spark.count()

+----------------+------------------+------------------+--------------------+
|circumstances_id|circumstances_type|circumstances_code|  circumstances_name|
+----------------+------------------+------------------+--------------------+
|              13|                 N|                30|Child Playing Wit...|
|               7|                 A|                 7|       Mercy Killing|
|              15|                 N|                32|    Hunting Accident|
|              17|                 N|                34|Other Negligent K...|
|               9|                 A|                 9| Other Circumstances|
|               6|                 A|                 6|      Lovers Quarrel|
|              12|                 J|                21|Criminal Killed b...|
|              10|                 A|                10|Unknown Circumsta...|
|              16|                 N|                33|Other Negligent W...|
|               3|                 A|                 3|        

18

In [9]:
from pyspark.sql.types import *

victim_circumstances_schema = StructType([ StructField("victim_id", IntegerType(), True)\

                       ,StructField("circumstances_id", IntegerType(), True)\

                       ,StructField("justifiable_force_id", DoubleType(), True)\

                       ,StructField("state", StringType(), True)\

                       ,StructField("data_year", DoubleType(), True)])

victim_circumstances_spark = spark.createDataFrame(df_all[0],schema=victim_circumstances_schema)
del df_all[0]

In [10]:
victim_circumstances_spark = victim_circumstances_spark.drop("state")
victim_circumstances_spark = victim_circumstances_spark.drop("data_year")
victim_circumstances_spark = victim_circumstances_spark.drop("justifiable_force")
victim_circumstances_spark = victim_circumstances_spark.drop_duplicates()
victim_circumstances_spark.show()
victim_circumstances_spark.count()

+---------+----------------+--------------------+
|victim_id|circumstances_id|justifiable_force_id|
+---------+----------------+--------------------+
|    12877|              10|                 NaN|
|    14590|              10|                 NaN|
|    20679|              10|                 NaN|
|    18592|              10|                 NaN|
|    24124|              10|                 NaN|
|    25785|              10|                 NaN|
|     1647|              10|                 NaN|
|     2964|              10|                 NaN|
|     3621|               1|                 NaN|
|     5138|              10|                 NaN|
|     6876|              10|                 NaN|
|    10417|              10|                 NaN|
|     9078|              10|                 NaN|
|    10073|              10|                 NaN|
|    13360|              10|                 NaN|
|    50767|               1|                 NaN|
|    52720|              10|                 NaN|


428009

In [11]:
# decided to remove unneeded features before converting to Spark to be more efficient memoery-wise
victim_df = df_all[0].drop(columns=['victim_seq_num', 'victim_type_id', 'assignment_type_id','activity_type_id','outside_agency_id', \
                        'age_id', 'age_num', 'sex_code', 'resident_status_code', 'agency_data_year', 'ff_line_number', \
                        'age_range_low_num', 'age_range_high_num', 'data_year'])

In [12]:
victim_schema = StructType([ StructField("victim_id", IntegerType(), True)\

                       ,StructField("incident_id", IntegerType(), True)\

                       #,StructField("victim_seq_num", IntegerType(), True)\

                       #,StructField("victim_type_id", IntegerType(), True)\

                       #,StructField("assignment_type_id", DoubleType(), True)\

                       #,StructField("activity_type_id", DoubleType(), True)\

                       #,StructField("outside_agency_id", DoubleType(), True)\

                       #,StructField("age_id", DoubleType(), True)\

                       #,StructField("age_num", DoubleType(), True)\

                       #,StructField("sex_code", StringType(), True)\
                       
                       ,StructField("race_id", DoubleType(), True)\
                       
                       ,StructField("ethnicity_id", DoubleType(), True)\
                       
                       #,StructField("resident_status_code", StringType(), True)\
                       
                       #,StructField("agency_data_year", DoubleType(), True)\
                       
                       #,StructField("ff_line_number", DoubleType(), True)\
                       
                       #,StructField("age_range_low_num", DoubleType(), True)\
                       
                       #,StructField("age_range_high_num", DoubleType(), True)\
                       
                       ,StructField("state", StringType(), True)\
                       
                       #,StructField("data_year", DoubleType(), True)\
                       
                       ])

victim_spark = spark.createDataFrame(victim_df,schema=victim_schema)
del df_all[0]
del victim_df

In [13]:
victim_spark.show()


KeyboardInterrupt: ignored

In [None]:
df_all[0]

In [14]:
from pyspark.sql.types import *

races_schema = StructType([ StructField("race_id", IntegerType(), True)\

                       #,StructField("race_code", StringType(), True)\

                       ,StructField("race_desc", StringType(), True)\

                       #,StructField("start_year", DoubleType(), True)\

                       #,StructField("end_year", DoubleType(), True)\

                       #,StructField("notes", StringType(), True)\

                       #,StructField("state", StringType(), True)

                        ])

races_df = df_all[0].drop(columns=['race_code', 'sort_order','start_year','end_year','notes','state'])
races_spark = spark.createDataFrame(races_df,schema=races_schema)
del races_df
del df_all[0]

In [None]:
races_spark = races_spark.drop_duplicates()
races_spark.show()
races_spark.count()

In [None]:
# should remove any NaN values in race_id
victim_spark = victim_spark.filter(victim_spark.race_id < 100)

res = victim_spark.join(races_spark, "race_id", "inner")
res = res.join(victim_circumstances_spark, "victim_id", "inner")
res = res.join(circumstances_spark, "circumstances_id", "inner")
res = res.drop("justifiable_force_id")
res = res.drop("circumstances_type")
res = res.filter(res.circumstances_code == 21)
res = res.distinct()
res.show()
res.count()