# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [6]:
pip install s3fs

Collecting s3fs
  Downloading s3fs-0.4.2-py3-none-any.whl (19 kB)
Installing collected packages: s3fs
Successfully installed s3fs-0.4.2
Note: you may need to restart the kernel to use updated packages.


In [8]:
import configparser
from datetime import datetime
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month,dayofweek, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import *
import logging
import s3fs

In [10]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 


This dataset pulls together data from several different sources related to police-related violence in the United States. The dataset currently includes these types of data:

- Police shootings
- Citizen fatalities involving police
- Police officer deaths suffered in the line of duty
- Summary statistics by city on people killed, arrests, population
- City poopulation
- Police department headcounts
- Detailed crimes and arrests for the prime city in the four largest metro areas.
- Locations for all BLM protests since May 25, 2020



### 1. Names of states in the US
- The dataset below shows the name and abbreviation of states in the US 

In [11]:
# Read in the data here
us_state = "s3://datalake-blm/raw/us_state.csv"

In [12]:
us_state_df = pd.read_csv(us_state)

In [13]:
us_state_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50 entries, 0 to 49
Data columns (total 2 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   state         50 non-null     object
 1   state_abbrev  50 non-null     object
dtypes: object(2)
memory usage: 928.0+ bytes


In [14]:
us_state_df.head(5)

Unnamed: 0,state,state_abbrev
0,Alabama,AL
1,Alaska,AK
2,Arizona,AZ
3,Arkansas,AR
4,California,CA


In [15]:
us_state_df.count()

state           50
state_abbrev    50
dtype: int64

### 2. Budgets in different states in the US 


In [16]:
budgets = "s3://datalake-blm/raw/budgets.csv"

In [17]:
budgets_df = pd.read_csv(budgets)

In [18]:
budgets_df.head(5)

Unnamed: 0,year,city_name,id_city,city_population,cpi,rev_total_city,rev_general_city,intergovt_rev_city,igr_federal_city,igr_state_city,...,cash_other_offsets,cash_other_bonds,cash_other_other,county_name,id_county,county_population,relationship_city_school,enrollment,districts_in_city,consolidated_govt
0,1977,AK: Anchorage,22002001.0,174500,4.044885,5342.24,4956.92,2148.77,279.32,1869.46,...,178.51,787.93,691.32,,,,4.0,36855.0,,1.0
1,1978,AK: Anchorage,22002001.0,177000,3.759509,5948.99,5490.05,2468.11,403.24,2064.86,...,187.53,1395.82,1158.01,,,,4.0,36804.0,,1.0
2,1979,AK: Anchorage,22002001.0,179600,3.376308,6158.68,5746.64,2573.34,496.97,2076.37,...,249.03,1812.04,1314.62,,,,4.0,36757.0,,1.0
3,1980,AK: Anchorage,22002001.0,178800,2.974757,5654.93,5210.77,2313.62,371.46,1942.16,...,986.73,1330.72,1572.66,,,,4.0,36008.0,,1.0
4,1981,AK: Anchorage,22002001.0,174431,2.69659,6192.83,5736.81,2771.43,338.76,2432.67,...,977.32,1549.51,1669.55,,,,4.0,34557.0,,1.0


In [19]:
budgets_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6232 entries, 0 to 6231
Columns: 662 entries, year to consolidated_govt
dtypes: float64(658), int64(2), object(2)
memory usage: 31.5+ MB


- This dataset has has more than 600 columns and it is not easy to inspect all the columns. 
- I might just extract the population of each city so I can map this information up with other data sets

In [20]:
budgets_df.count()

year                        6232
city_name                   6232
id_city                     6191
city_population             6232
cpi                         6150
                            ... 
county_population           5418
relationship_city_school    6150
enrollment                  6196
districts_in_city           4741
consolidated_govt           6150
Length: 662, dtype: int64

### 3. Police Killings 
- The dataset below shows the information of civilians killed by police in the US

In [21]:
police_killings = 's3://datalake-blm/raw/police_killings.csv'

In [22]:
police_killings_df = pd.read_csv(police_killings)

In [23]:
police_killings_df.head(5)

Unnamed: 0,Victim's name,Victim's age,Victim's gender,Victim's race,URL of image of victim,Date of Incident (month/day/year),Street Address of Incident,City,State,Zipcode,...,Unnamed: 56,Unnamed: 57,Unnamed: 58,Unnamed: 59,Unnamed: 60,Unnamed: 61,Unnamed: 62,Unnamed: 63,Unnamed: 64,Unnamed: 65
0,Eric M. Tellez,28.0,Male,White,https://fatalencounters.org/wp-content/uploads...,31/12/2019,Broad St.,Globe,AZ,85501.0,...,,,,,,,,,,
1,Name withheld by police,,Male,Unknown race,,31/12/2019,7239-7411 I-40,Memphis,AR,38103.0,...,,,,,,,,,,
2,Terry Hudson,57.0,Male,Black,,31/12/2019,3600 N 24th St,Omaha,NE,68110.0,...,,,,,,,,,,
3,Malik Williams,23.0,Male,Black,,31/12/2019,30800 14th Avenue South,Federal Way,WA,98003.0,...,,,,,,,,,,
4,Frederick Perkins,37.0,Male,Black,,31/12/2019,17057 N Outer 40 Rd,Chesterfield,MO,63005.0,...,,,,,,,,,,


In [24]:
police_killings_df.nunique()

Victim's name             7411
Victim's age                87
Victim's gender              4
Victim's race                8
URL of image of victim    4191
                          ... 
Unnamed: 61                  0
Unnamed: 62                  0
Unnamed: 63                  0
Unnamed: 64                  0
Unnamed: 65                  0
Length: 66, dtype: int64

In [25]:
police_killings_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7907 entries, 0 to 7906
Data columns (total 66 columns):
 #   Column                                                                                                                                                                      Non-Null Count  Dtype  
---  ------                                                                                                                                                                      --------------  -----  
 0   Victim's name                                                                                                                                                               7663 non-null   object 
 1   Victim's age                                                                                                                                                                7596 non-null   object 
 2   Victim's gender                                                                     

- This dataset has so many unnamed columns that have no values. I will need to delete them.
- Some columns have null values. Need to explore that a bit more

### 4. Police Deaths
- The dataset below shows information of police who got killed when on duty 

In [26]:
police_deaths = 's3://datalake-blm/raw/police_deaths_538.csv'

In [27]:
police_deaths_df = pd.read_csv(police_deaths)

In [28]:
police_deaths_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 22800 entries, 0 to 22799
Data columns (total 10 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   person       22800 non-null  object
 1   dept         22800 non-null  object
 2   eow          22800 non-null  object
 3   cause        22800 non-null  object
 4   cause_short  22800 non-null  object
 5   date         22800 non-null  object
 6   year         22800 non-null  int64 
 7   canine       22800 non-null  bool  
 8   dept_name    22800 non-null  object
 9   state        22800 non-null  object
dtypes: bool(1), int64(1), object(8)
memory usage: 1.6+ MB


- This datasets looks more clean. It doesn't need a lot of further curation

In [29]:
police_deaths_df.head()

Unnamed: 0,person,dept,eow,cause,cause_short,date,year,canine,dept_name,state
0,Constable Darius Quimby,"Albany County Constable's Office, NY","EOW: Monday, January 3, 1791",Cause of Death: Gunfire,Gunfire,1791-01-03,1791,False,Albany County Constable's Office,NY
1,Sheriff Cornelius Hogeboom,"Columbia County Sheriff's Office, NY","EOW: Saturday, October 22, 1791",Cause of Death: Gunfire,Gunfire,1791-10-22,1791,False,Columbia County Sheriff's Office,NY
2,Deputy Sheriff Isaac Smith,"Westchester County Sheriff's Department, NY","EOW: Thursday, May 17, 1792",Cause of Death: Gunfire,Gunfire,1792-05-17,1792,False,Westchester County Sheriff's Department,NY
3,Marshal Robert Forsyth,United States Department of Justice - United S...,"EOW: Saturday, January 11, 1794",Cause of Death: Gunfire,Gunfire,1794-01-11,1794,False,United States Department of Justice - United S...,US
4,Sheriff Robert Maxwell,"Greenville County Sheriff's Office, SC","EOW: Sunday, November 12, 1797",Cause of Death: Gunfire,Gunfire,1797-11-12,1797,False,Greenville County Sheriff's Office,SC


In [30]:
police_deaths_df.nunique()
## Some names seem to have been duplicated. Will need to make sure I don't de-dupe important information

person         22742
dept            6528
eow            17158
cause             36
cause_short       36
date           17158
year             202
canine             2
dept_name       5525
state             60
dtype: int64

### 5. Protest data
- The dataset below shows all BLM protests since 25th May 2020

In [31]:
protest = 's3a://datalake-blm/raw/protests.json'

Given it is a json file, I wanna use spark to explore it and print the schema

In [32]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [33]:
protests_df = spark.read.json(protest)

In [34]:
protests_df.printSchema()

root
 |-- features: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- geometry: struct (nullable = true)
 |    |    |    |-- coordinates: array (nullable = true)
 |    |    |    |    |-- element: double (containsNull = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- properties: struct (nullable = true)
 |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |-- Batch: string (nullable = true)
 |    |    |    |-- City: string (nullable = true)
 |    |    |    |-- Comments: string (nullable = true)
 |    |    |    |-- Country: string (nullable = true)
 |    |    |    |-- Date_Added: long (nullable = true)
 |    |    |    |-- Mailing_Abbreviation: string (nullable = true)
 |    |    |    |-- OBJECTID: long (nullable = true)
 |    |    |    |-- Region: string (nullable = true)
 |    |    |    |-- Search_Label: string (nullable = true)
 |    |    |    |-- X_Longitude: double (n

- Given that this is a json file, I will need to flatten the columns to turn data into tabular format

In [35]:
protests_df.show(5)

+--------------------+-----------------+
|            features|             type|
+--------------------+-----------------+
|[[[[-134.40678999...|FeatureCollection|
+--------------------+-----------------+



### 6. Los Angeles crime data from 2001 to 2019

In [36]:
la = 's3a://datalake-blm/raw/LA Crime_Data_from_2010_to_2019.csv'

In [38]:
la_df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(la)

Py4JJavaError: An error occurred while calling o92.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 20, localhost, executor driver): com.amazonaws.AmazonClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:454)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1111)
	at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:91)
	at org.apache.hadoop.fs.s3a.S3AInputStream.openIfNeeded(S3AInputStream.java:62)
	at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:156)
	at java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$24.apply(RDD.scala:1167)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$24.apply(RDD.scala:1167)
	at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:2157)
	at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:2157)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
	at org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:231)
	at org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:200)
	at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
	at com.amazonaws.http.conn.$Proxy15.getConnection(Unknown Source)
	at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:422)
	at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:835)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:384)
	... 46 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1169)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1162)
	at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$.infer(CSVInferSchema.scala:44)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.inferFromDataset(CSVDataSource.scala:261)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:237)
	at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:68)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:63)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:179)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.AmazonClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:454)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1111)
	at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:91)
	at org.apache.hadoop.fs.s3a.S3AInputStream.openIfNeeded(S3AInputStream.java:62)
	at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:156)
	at java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$24.apply(RDD.scala:1167)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$24.apply(RDD.scala:1167)
	at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:2157)
	at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:2157)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
	at org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:231)
	at org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:200)
	at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
	at com.amazonaws.http.conn.$Proxy15.getConnection(Unknown Source)
	at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:422)
	at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:835)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:384)
	... 46 more


In [None]:
la_df.show(5)

In [None]:
la_df.printSchema()

In [234]:
la_df.count()

2114699

### 7. New York Crime Data

In [35]:
#ny = 's3a://datalake-blm/raw/NYPD_Arrests_Data__Historic_.json'
ny = 's3a://datalake-blm/raw/NYPD_Arrests_Data__Historic_.csv'

In [36]:
#ny_df = spark.read.json(ny)
ny_df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(ny)

In [37]:
ny_df.show(2)

+----------+-----------+-----+--------------------+-----+--------------+----------+----------+-----------+---------------+-----------------+---------+--------+--------------+----------+----------+------------------+------------------+--------------------+
|ARREST_KEY|ARREST_DATE|PD_CD|             PD_DESC|KY_CD|     OFNS_DESC|  LAW_CODE|LAW_CAT_CD|ARREST_BORO|ARREST_PRECINCT|JURISDICTION_CODE|AGE_GROUP|PERP_SEX|     PERP_RACE|X_COORD_CD|Y_COORD_CD|          Latitude|         Longitude|             Lon_Lat|
+----------+-----------+-----+--------------------+-----+--------------+----------+----------+-----------+---------------+-----------------+---------+--------+--------------+----------+----------+------------------+------------------+--------------------+
| 190294601| 11/23/2018|  109|ASSAULT 2,1,UNCLA...|  106|FELONY ASSAULT|PL 1200512|         F|          B|             40|                0|    25-44|       M|         BLACK| 1008096.0|  233595.0|40.807816227000046|-73.9138626609999

In [34]:
ny_df.printSchema()

root
 |-- {: string (nullable = true)



In [241]:
ny_df.count()

550765

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

#### 2. Budgets 
- I want to extract the city_population column from this table and create a new column labelling the state of each record

In [39]:
budgets_df.head(2)

Unnamed: 0,year,city_name,id_city,city_population,cpi,rev_total_city,rev_general_city,intergovt_rev_city,igr_federal_city,igr_state_city,...,cash_other_offsets,cash_other_bonds,cash_other_other,county_name,id_county,county_population,relationship_city_school,enrollment,districts_in_city,consolidated_govt
0,1977,AK: Anchorage,22002001.0,174500,4.044885,5342.24,4956.92,2148.77,279.32,1869.46,...,178.51,787.93,691.32,,,,4.0,36855.0,,1.0
1,1978,AK: Anchorage,22002001.0,177000,3.759509,5948.99,5490.05,2468.11,403.24,2064.86,...,187.53,1395.82,1158.01,,,,4.0,36804.0,,1.0


In [40]:
budgets_df.count()

year                        6232
city_name                   6232
id_city                     6191
city_population             6232
cpi                         6150
                            ... 
county_population           5418
relationship_city_school    6150
enrollment                  6196
districts_in_city           4741
consolidated_govt           6150
Length: 662, dtype: int64

- Given that year, city_name and city_population all have the same number of records, I should simply check if there are any duplicates.

In [41]:
len(budgets_df)

6232

In [42]:
budgets_df.nunique()

year                          41
city_name                    152
id_city                      151
city_population             6145
cpi                           41
                            ... 
county_population           4995
relationship_city_school       5
enrollment                  5108
districts_in_city             22
consolidated_govt              2
Length: 662, dtype: int64

In [43]:
budgets_df =budgets_df.drop_duplicates()

In [44]:
len(budgets_df)

6232

In [45]:
#I previously noticed that some records don't have id_city. I want to know what kind information do these records hold
budgets_df[budgets_df.id_city.isnull()].head(3)

Unnamed: 0,year,city_name,id_city,city_population,cpi,rev_total_city,rev_general_city,intergovt_rev_city,igr_federal_city,igr_state_city,...,cash_other_offsets,cash_other_bonds,cash_other_other,county_name,id_county,county_population,relationship_city_school,enrollment,districts_in_city,consolidated_govt
410,1977,Average for Cities,,350083,,2312.35,1889.98,760.76,338.25,422.51,...,366.03,387.54,839.45,,,690459.0,,59844.0,,
411,1978,Average for Cities,,349355,,2290.69,1846.08,696.37,360.9,335.47,...,495.97,403.98,869.54,,,696845.0,,64642.0,,
412,1979,Average for Cities,,348967,,2256.35,1816.53,693.98,352.31,341.68,...,302.54,461.02,893.65,,,704711.0,,56383.0,,


- Okay, those records show data average for cities. I don't need them and I will drop these records.

In [46]:
budgets_df_cleaned = budgets_df[budgets_df.id_city.notnull()]

In [47]:
budgets_df_cleaned.count()

year                        6191
city_name                   6191
id_city                     6191
city_population             6191
cpi                         6150
                            ... 
county_population           5377
relationship_city_school    6150
enrollment                  6155
districts_in_city           4741
consolidated_govt           6150
Length: 662, dtype: int64

- Data looks a lot cleaner now. I don't want to use the dropna function because it looks like the issues come from some missing data in te county_name, id_county, enrollment, consolidated_govt and districts_in_city columns. I won't use these columns anyway.

In [48]:
budgets_df_cleaned.head(1)

Unnamed: 0,year,city_name,id_city,city_population,cpi,rev_total_city,rev_general_city,intergovt_rev_city,igr_federal_city,igr_state_city,...,cash_other_offsets,cash_other_bonds,cash_other_other,county_name,id_county,county_population,relationship_city_school,enrollment,districts_in_city,consolidated_govt
0,1977,AK: Anchorage,22002001.0,174500,4.044885,5342.24,4956.92,2148.77,279.32,1869.46,...,178.51,787.93,691.32,,,,4.0,36855.0,,1.0


In [49]:
len(budgets_df_cleaned)

6191

- I want to drop the index column as that column shows inaccurate values

In [50]:
budgets_df_cleaned =budgets_df_cleaned.reset_index(drop=True)

In [51]:
budgets_df_cleaned.head(2)

Unnamed: 0,year,city_name,id_city,city_population,cpi,rev_total_city,rev_general_city,intergovt_rev_city,igr_federal_city,igr_state_city,...,cash_other_offsets,cash_other_bonds,cash_other_other,county_name,id_county,county_population,relationship_city_school,enrollment,districts_in_city,consolidated_govt
0,1977,AK: Anchorage,22002001.0,174500,4.044885,5342.24,4956.92,2148.77,279.32,1869.46,...,178.51,787.93,691.32,,,,4.0,36855.0,,1.0
1,1978,AK: Anchorage,22002001.0,177000,3.759509,5948.99,5490.05,2468.11,403.24,2064.86,...,187.53,1395.82,1158.01,,,,4.0,36804.0,,1.0


- Now I am going to create a dataset with the popultion information only

In [52]:
city_population = budgets_df_cleaned[['year','city_name','id_city','city_population']]

In [53]:
city_population.head(2)

Unnamed: 0,year,city_name,id_city,city_population
0,1977,AK: Anchorage,22002001.0,174500
1,1978,AK: Anchorage,22002001.0,177000


In [54]:
city_population["state"]= city_population["city_name"].str.split(":")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


In [55]:
city_population.head(2)

Unnamed: 0,year,city_name,id_city,city_population,state
0,1977,AK: Anchorage,22002001.0,174500,"[AK, Anchorage]"
1,1978,AK: Anchorage,22002001.0,177000,"[AK, Anchorage]"


In [56]:
city_population["state"]= city_population["city_name"].str.split(":",n=1,expand= True) 

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


In [57]:
city_population.head(2)

Unnamed: 0,year,city_name,id_city,city_population,state
0,1977,AK: Anchorage,22002001.0,174500,AK
1,1978,AK: Anchorage,22002001.0,177000,AK


In [58]:
len(city_population)

6191

In [59]:
city_population.groupby('state').nunique()

Unnamed: 0_level_0,year,city_name,id_city,city_population,state
state,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
AK,41,2,2,82,1
AL,41,3,3,123,1
AR,41,2,2,82,1
AZ,41,3,3,123,1
CA,41,16,16,656,1
CO,41,3,3,122,1
CT,41,3,3,123,1
DC,41,1,1,41,1
DE,41,2,2,79,1
FL,41,8,8,328,1


- I noticed that this dataset has median values for cities, which I don't want. So I am going to delete these records

In [60]:
city_population.query('state == "Median for Cities"').count()

year               41
city_name          41
id_city            41
city_population    41
state              41
dtype: int64

- After deleting the 41 records, I should expect to 6150 records in the dataset

In [61]:
city_population = city_population.query('state != "Median for Cities"')

In [62]:
len(city_population)

6150

In [63]:
city_population_file = 'city_population.csv'

In [64]:
s3path = 's3://datalake-blm/stage/'

In [65]:
#city_population.to_csv(city_population_s3, index = False)
city_population.to_csv(os.path.join(s3path, city_population_file))

In [276]:
pd.__version__

'0.23.3'

#### 3. Police Killings

In [277]:
police_killings_df.head(2)

Unnamed: 0,Victim's name,Victim's age,Victim's gender,Victim's race,URL of image of victim,Date of Incident (month/day/year),Street Address of Incident,City,State,Zipcode,...,Unnamed: 56,Unnamed: 57,Unnamed: 58,Unnamed: 59,Unnamed: 60,Unnamed: 61,Unnamed: 62,Unnamed: 63,Unnamed: 64,Unnamed: 65
0,Eric M. Tellez,28.0,Male,White,https://fatalencounters.org/wp-content/uploads...,31/12/2019,Broad St.,Globe,AZ,85501.0,...,,,,,,,,,,
1,Name withheld by police,,Male,Unknown race,,31/12/2019,7239-7411 I-40,Memphis,AR,38103.0,...,,,,,,,,,,


In [278]:
len(police_killings_df)

7907

In [421]:
police_killings_cleaned = police_killings_df[["Victim's name", "Victim's age", "Victim's gender","Victim's race","URL of image of victim","Date of Incident (month/day/year)","Street Address of Incident"\
                                             ,"City","State","Zipcode","County","Agency responsible for death","Cause of death","A brief description of the circumstances surrounding the death"\
                                             ,"Official disposition of death (justified or other)","Criminal Charges?","Link to news article or photo of official document","Symptoms of mental illness?"\
                                             ,"Unarmed","Alleged Weapon (Source: WaPo)","Alleged Threat Level (Source: WaPo)","Fleeing (Source: WaPo)","Body Camera (Source: WaPo)","WaPo ID (If included in WaPo database)"\
                                             ,"Off-Duty Killing?","Geography (via Trulia methodology based on zipcode population density: http://jedkolko.com/wp-content/uploads/2015/05/full-ZCTA-urban-suburban-rural-classification.xlsx )"\
                                             ,"ID"]]

In [422]:
police_killings_cleaned.count()

Victim's name                                                                                                                                                                 7663
Victim's age                                                                                                                                                                  7596
Victim's gender                                                                                                                                                               7655
Victim's race                                                                                                                                                                 7663
URL of image of victim                                                                                                                                                        4200
Date of Incident (month/day/year)                                                                        

In [423]:
police_killings_cleaned.head(3)

Unnamed: 0,Victim's name,Victim's age,Victim's gender,Victim's race,URL of image of victim,Date of Incident (month/day/year),Street Address of Incident,City,State,Zipcode,...,Symptoms of mental illness?,Unarmed,Alleged Weapon (Source: WaPo),Alleged Threat Level (Source: WaPo),Fleeing (Source: WaPo),Body Camera (Source: WaPo),WaPo ID (If included in WaPo database),Off-Duty Killing?,Geography (via Trulia methodology based on zipcode population density: http://jedkolko.com/wp-content/uploads/2015/05/full-ZCTA-urban-suburban-rural-classification.xlsx ),ID
0,Eric M. Tellez,28.0,Male,White,https://fatalencounters.org/wp-content/uploads...,31/12/2019,Broad St.,Globe,AZ,85501.0,...,No,Allegedly Armed,knife,other,not fleeing,no,5332.0,,Rural,7664.0
1,Name withheld by police,,Male,Unknown race,,31/12/2019,7239-7411 I-40,Memphis,AR,38103.0,...,No,Unclear,unclear,other,,,,,Urban,7665.0
2,Terry Hudson,57.0,Male,Black,,31/12/2019,3600 N 24th St,Omaha,NE,68110.0,...,No,Allegedly Armed,gun,attack,not fleeing,no,5359.0,,Urban,7661.0


- The names of columns are long and messy, I am going to tidy them up

In [424]:
new_columns = {
   "Victim's name":"victim_name",
   "Victim's age":"victim_age",
   "Victim's race":"victim_race",
   "Victim's gender":"victim_gender",
   "URL of image of victim":"victim_image",
   "Date of Incident (month/day/year)":"date_of_incident",
   "Street Address of Incident":"street_address_of_incident",
   "City":"city",
   "State":"state",
   "Zipcode":"zipcode",
   "Symptoms of mental illness?": "has_symtoms_of_mental_ilness",
   "Unarmed":'unarmed',
   "Alleged Weapon (Source: WaPo)":"alleged_weapon",
   "Alleged Threat Level (Source: WaPo)":"alleged_threat_level",
   "Fleeing (Source: WaPo)":"fleeing",
   "Body Camera (Source: WaPo)":"body_camera",
   "WaPo ID (If included in WaPo database)":"wapo_id",
   "Off-Duty Killing?":"is_off_duty_killing",
   "Geography (via Trulia methodology based on zipcode population density: http://jedkolko.com/wp-content/uploads/2015/05/full-ZCTA-urban-suburban-rural-classification.xlsx )":"geography",
   "ID":"id"
}

In [425]:
police_killings_cleaned = police_killings_cleaned.rename(columns=new_columns)

In [426]:
police_killings_cleaned.head(2)

Unnamed: 0,victim_name,victim_age,victim_gender,victim_race,victim_image,date_of_incident,street_address_of_incident,city,state,zipcode,...,has_symtoms_of_mental_ilness,unarmed,alleged_weapon,alleged_threat_level,fleeing,body_camera,wapo_id,is_off_duty_killing,geography,id
0,Eric M. Tellez,28.0,Male,White,https://fatalencounters.org/wp-content/uploads...,31/12/2019,Broad St.,Globe,AZ,85501.0,...,No,Allegedly Armed,knife,other,not fleeing,no,5332.0,,Rural,7664.0
1,Name withheld by police,,Male,Unknown race,,31/12/2019,7239-7411 I-40,Memphis,AR,38103.0,...,No,Unclear,unclear,other,,,,,Urban,7665.0


In [427]:
police_killings_cleaned.query("victim_name == 'Name withheld by police'").count()

victim_name                                                       210
victim_age                                                        154
victim_gender                                                     205
victim_race                                                       210
victim_image                                                        0
date_of_incident                                                  210
street_address_of_incident                                        198
city                                                              209
state                                                             210
zipcode                                                           204
County                                                            205
Agency responsible for death                                      205
Cause of death                                                    210
A brief description of the circumstances surrounding the death    205
Official disposition

In [429]:
police_killings_cleaned.state.unique()

array(['AZ', 'AR', 'NE', 'WA', 'MO', 'WV', 'MS', 'KS', 'TX', 'CA', 'SD',
       'IN', 'AK', 'OH', 'TN', 'MD', 'VA', 'UT', 'ME', 'FL', 'GA', 'OK',
       'PA', 'NY', 'NC', 'CO', 'MT', 'HI', 'WI', 'MN', 'MI', 'AL', 'SC',
       'NJ', 'IA', 'LA', 'OR', 'DE', 'NV', 'IL', 'ID', 'MA', 'KY', 'NM',
       'VT', 'WY', 'DC', 'CT', 'NH', 'RI', 'ND', nan], dtype=object)

In [430]:
police_killings_cleaned.state.nunique()

51

In [431]:
police_killings_cleaned[(police_killings_cleaned.state.isnull())].head(20)

Unnamed: 0,victim_name,victim_age,victim_gender,victim_race,victim_image,date_of_incident,street_address_of_incident,city,state,zipcode,...,has_symtoms_of_mental_ilness,unarmed,alleged_weapon,alleged_threat_level,fleeing,body_camera,wapo_id,is_off_duty_killing,geography,id
7663,,,,,,,,,,,...,,,,,,,,,,
7664,,,,,,,,,,,...,,,,,,,,,,
7665,,,,,,,,,,,...,,,,,,,,,,
7666,,,,,,,,,,,...,,,,,,,,,,
7667,,,,,,,,,,,...,,,,,,,,,,
7668,,,,,,,,,,,...,,,,,,,,,,
7669,,,,,,,,,,,...,,,,,,,,,,
7670,,,,,,,,,,,...,,,,,,,,,,
7671,,,,,,,,,,,...,,,,,,,,,,
7672,,,,,,,,,,,...,,,,,,,,,,


- Seems like all the records with no state also do not have any records in other columns. I will delete them too

In [433]:
police_killings_cleaned = police_kpolice_killings_cleanedillings_cleaned[(police_killings_cleaned.state.notnull())]

In [451]:
len(police_killings_cleaned)

7663

In [452]:
police_killings_cleaned.state.unique()

array(['AZ', 'AR', 'NE', 'WA', 'MO', 'WV', 'MS', 'KS', 'TX', 'CA', 'SD',
       'IN', 'AK', 'OH', 'TN', 'MD', 'VA', 'UT', 'ME', 'FL', 'GA', 'OK',
       'PA', 'NY', 'NC', 'CO', 'MT', 'HI', 'WI', 'MN', 'MI', 'AL', 'SC',
       'NJ', 'IA', 'LA', 'OR', 'DE', 'NV', 'IL', 'ID', 'MA', 'KY', 'NM',
       'VT', 'WY', 'DC', 'CT', 'NH', 'RI', 'ND'], dtype=object)

In [453]:
police_killings_cleaned.state.nunique()

51

- This is bizarre that it still has 51 states after deleting all the null values. I will park this for now but will map this dataset with the state_name dataset later to ensure there is no duplicated states

In [454]:
police_killings_cleaned.victim_gender.unique()

array(['Male', 'Female', nan, 'Transgender', 'Unknown'], dtype=object)

In [455]:
police_killings_cleaned.victim_gender.value_counts()

Male           7253
Female          391
Transgender       7
Unknown           4
Name: victim_gender, dtype: int64

In [456]:
police_killings_cleaned.unarmed.unique()

array(['Allegedly Armed', 'Unclear', 'Vehicle', 'Unarmed'], dtype=object)

In [457]:
police_killings_cleaned.unarmed.value_counts()

Allegedly Armed    5428
Unarmed            1073
Unclear             649
Vehicle             513
Name: unarmed, dtype: int64

In [458]:
police_killings_cleaned.unarmed.unique()

array(['Allegedly Armed', 'Unclear', 'Vehicle', 'Unarmed'], dtype=object)

In [459]:
police_killings_cleaned.victim_race.unique()

array(['White', 'Unknown Race', 'Black', 'Hispanic', 'Pacific Islander',
       'Asian', 'Native American'], dtype=object)

- I will group 'Unknown race' and 'Unknown Race' together

In [460]:
police_killings_cleaned.victim_race = police_killings_cleaned.victim_race.replace('Unknown race', 'Unknown Race')


In [461]:
police_killings_cleaned.victim_race.unique()

array(['White', 'Unknown Race', 'Black', 'Hispanic', 'Pacific Islander',
       'Asian', 'Native American'], dtype=object)

In [462]:
police_killings_cleaned.alleged_threat_level.unique()

array(['other', 'attack', nan, 'undetermined'], dtype=object)

In [463]:
police_killings_cleaned.alleged_threat_level.value_counts()

attack          3395
other           1595
undetermined     291
Name: alleged_threat_level, dtype: int64

In [464]:
police_killings_cleaned.fleeing.unique()

array(['not fleeing', nan, 'car', 'Foot', 'Not fleeing', '0', 'foot',
       'Car', 'Other'], dtype=object)

In [465]:
police_killings_cleaned.fleeing.value_counts()

Not fleeing    3365
Car             808
Foot            632
Other           144
0                83
not fleeing       8
car               5
foot              2
Name: fleeing, dtype: int64

In [466]:
police_killings_cleaned[(police_killings_cleaned.fleeing == '0')].head(5)

Unnamed: 0,victim_name,victim_age,victim_gender,victim_race,victim_image,date_of_incident,street_address_of_incident,city,state,zipcode,...,has_symtoms_of_mental_ilness,unarmed,alleged_weapon,alleged_threat_level,fleeing,body_camera,wapo_id,is_off_duty_killing,geography,id
11,John Bott,76,Male,Unknown Race,,29/12/2019,US-78,Byhalia,MS,38611.0,...,Unknown,Allegedly Armed,gun,undetermined,0,No,5310.0,,Rural,7654.0
23,Christopher Camacho,16,Male,Unknown Race,,27/12/2019,138 Washington St,Limerick,ME,4048.0,...,No,Unclear,undetermined,undetermined,0,No,5322.0,,Rural,7647.0
24,Antonio Smith,35,Male,Black,https://fatalencounters.org/wp-content/uploads...,26/12/2019,3600 Hallbrook St,Memphis,TN,38127.0,...,No,Unclear,unknown weapon,other,0,No,5314.0,,Suburban,7643.0
43,Gary Wayne Madewell,38,Male,White,https://fatalencounters.org/wp-content/uploads...,19/12/2019,5 Boyd Ln,Carthage,TN,37030.0,...,No,Allegedly Armed,knife,other,0,No,5294.0,,Rural,7620.0
54,Dana Brown,27,Male,Pacific Islander,https://fatalencounters.org/wp-content/uploads...,17/12/2019,Malakole Street,Kapolei,HI,96707.0,...,No,Allegedly Armed,knife,attack,0,Yes,5290.0,,Suburban,7616.0


- There are some duplicated values due to use of capital letters. I will tidy it up. I will also group '0' under 'Other'

In [467]:
police_killings_cleaned.fleeing = police_killings_cleaned.fleeing.replace('not fleeing', 'Not fleeing')
police_killings_cleaned.fleeing = police_killings_cleaned.fleeing.replace('car', 'Car')
police_killings_cleaned.fleeing = police_killings_cleaned.fleeing.replace('foot', 'Foot')
police_killings_cleaned.fleeing = police_killings_cleaned.fleeing.replace('0', 'Other')

In [468]:
police_killings_cleaned.fleeing.value_counts()

Not fleeing    3373
Car             813
Foot            634
Other           227
Name: fleeing, dtype: int64

In [469]:
police_killings_cleaned.is_off_duty_killing.unique()

array([nan, 'Off-Duty'], dtype=object)

In [470]:
police_killings_cleaned.is_off_duty_killing.nunique()

1

In [471]:
police_killings_cleaned.alleged_weapon.unique()

array(['knife', 'unclear', 'gun', 'vehicle', 'chain', 'unknown weapon',
       'Taser', 'undetermined', 'gun and vehicle', 'shovel', 'gun and car',
       'toy weapon', 'unarmed', 'baseball bat', 'hatchet',
       'car, knife and mace', 'sword', 'vehicle and machete',
       'screwdriver', 'vehicle and gun', 'BB gun', 'sharp object',
       'box cutter', 'ax', 'hammer', 'crowbar', 'chair', 'scissors', 'toy',
       'baseball bat and knife', 'straight edge razor', 'machete', 'baton',
       'Airsoft pistol', 'air pistol', 'wasp spray', 'BB gun and vehicle',
       'piece of wood', 'gun and knife', 'garden tool', 'barstool',
       'walking stick', 'wrench', 'beer bottle', 'meat cleaver',
       'metal pipe', 'flag pole', 'rock', 'lawn mower blade', 'crossbow',
       'metal object', 'bow and arrow', 'pick-axe', 'lamp', 'glass shard',
       'incendiary device', 'unknown', 'pipe', 'pole and knife',
       'Unknown weapon', 'lighter fluid', 'pitchfork', 'gun and sword',
       'taser', 'b

- There are too many items in the weapon column. I am not going to tidy it up

In [472]:
police_killings_cleaned.geography.unique()

array(['Rural', 'Urban', 'Suburban', nan], dtype=object)

In [473]:
police_killings_cleaned.geography.nunique()

3

In [474]:
victims_list = 'victims_list.csv'

In [475]:
police_killings_cleaned.to_csv(os.path.join(s3path, victims_list))

FileNotFoundError: [Errno 2] No such file or directory: 's3://datalake-blm/stage/victims_list.csv'

#### 4. Police Deaths

In [None]:
police_deaths_df.info()

In [None]:
police_deaths_df.nunique()

In [None]:
police_deaths_df

#### 5. Protest

In [476]:
protests_df.printSchema()

root
 |-- features: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- geometry: struct (nullable = true)
 |    |    |    |-- coordinates: array (nullable = true)
 |    |    |    |    |-- element: double (containsNull = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- properties: struct (nullable = true)
 |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |-- Batch: string (nullable = true)
 |    |    |    |-- City: string (nullable = true)
 |    |    |    |-- Comments: string (nullable = true)
 |    |    |    |-- Country: string (nullable = true)
 |    |    |    |-- Date_Added: long (nullable = true)
 |    |    |    |-- Mailing_Abbreviation: string (nullable = true)
 |    |    |    |-- OBJECTID: long (nullable = true)
 |    |    |    |-- Region: string (nullable = true)
 |    |    |    |-- Search_Label: string (nullable = true)
 |    |    |    |-- X_Longitude: double (n

In [478]:
from pyspark.sql.functions import explode
# I am going to use spark explode function to flatten the data

In [479]:
protests_df_exploded = protests_df.select('type',explode('features').alias('features'))

In [480]:
protests_cleaned = protests_df_exploded.select(col('type').alias('featuretype'),'features.geometry.type','features.id','features.properties.Alias'\
                    ,'features.properties.Batch','features.properties.City','features.properties.Comments','features.properties.Country'\
                   ,'features.properties.Date_Added','features.properties.Mailing_Abbreviation','features.properties.OBJECTID'\
                   ,'features.properties.Region','features.properties.Search_Label','features.properties.X_Longitude','features.properties.Y_Latitude')

In [481]:
protests_cleaned.show(20)

+-----------------+-----+---+-----+-----------+------------+--------+-------+-------------+--------------------+--------+--------+--------------------+-------------------+------------------+
|      featuretype| type| id|Alias|      Batch|        City|Comments|Country|   Date_Added|Mailing_Abbreviation|OBJECTID|  Region|        Search_Label|        X_Longitude|        Y_Latitude|
+-----------------+-----+---+-----+-----------+------------+--------+-------+-------------+--------------------+--------+--------+--------------------+-------------------+------------------+
|FeatureCollection|Point|  1| null|2020_0602_1|      Juneau|    null|    USA|1591056000000|                  AK|       1|  Alaska| Juneau, Alaska, USA|-134.40678999999994| 58.29973000000007|
|FeatureCollection|Point|  2| null|2020_0602_1|   Anchorage|    null|    USA|1591056000000|                  AK|       2|  Alaska|Anchorage, Alaska...|-149.85824999999997| 61.21753000000007|
|FeatureCollection|Point|  3| null|2020_0602_

In [482]:
protests_cleaned.printSchema()

root
 |-- featuretype: string (nullable = true)
 |-- type: string (nullable = true)
 |-- id: long (nullable = true)
 |-- Alias: string (nullable = true)
 |-- Batch: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Comments: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Date_Added: long (nullable = true)
 |-- Mailing_Abbreviation: string (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- Region: string (nullable = true)
 |-- Search_Label: string (nullable = true)
 |-- X_Longitude: double (nullable = true)
 |-- Y_Latitude: double (nullable = true)



In [487]:
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month,dayofweek, dayofmonth, hour, weekofyear, date_format
from datetime import datetime

In [496]:
get_timestamp = udf(lambda x: datetime.utcfromtimestamp(int(x)/1000), TimestampType())
protests_cleaned = protests_cleaned.withColumn('Date_Added',get_timestamp('Date_Added'))

In [497]:
protests_cleaned.show(5)

Py4JJavaError: An error occurred while calling o619.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 43.0 failed 1 times, most recent failure: Lost task 0.0 in stage 43.0 (TID 124, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 83, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 77, in <lambda>
    return lambda *a: g(f(*a))
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 77, in <lambda>
    return lambda *a: g(f(*a))
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 77, in <lambda>
    return lambda *a: g(f(*a))
  File "<ipython-input-490-c2a7cb457bb5>", line 1, in <lambda>
ValueError: year 52388 is out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.GeneratedMethodAccessor96.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 83, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 77, in <lambda>
    return lambda *a: g(f(*a))
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 77, in <lambda>
    return lambda *a: g(f(*a))
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 77, in <lambda>
    return lambda *a: g(f(*a))
  File "<ipython-input-490-c2a7cb457bb5>", line 1, in <lambda>
ValueError: year 52388 is out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.