### Working with Virginia Criminal Expungement Data

Last updated: June 10, 2021

### Source:
https://virginiacourtdata.org/

Rehan Merchant rm2bt

**INSTRUCTIONS**  
In this project, you will upload a criminal expungement dataset, save it as a Delta table, and perform various tasks.  
Some of these tasks reinforce your Spark SQL skills. Be sure to show all code and results.

**TOTAL POINTS: 10**

In [0]:
from delta import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

spark

1) (1 PT) Upload the dataset from Collab and load it into a Delta lake table

In [0]:
# File location and type
file_location = "/FileStore/tables/circuit_criminal_2000_anon_00.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)


In [0]:
df.write.format("delta").save('/tmp/delta-table-1')

In [0]:
df = spark.read.format("delta").load("/tmp/delta-table-1")
df.show(vertical=True)

In [0]:
from delta.tables import *
from pyspark.sql.functions import *

# set the path
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table-1")

# deltaTable.toDF().show()

type(deltaTable)


2) (1 PT) Select the first 5 records from the delta lake table

In [0]:
%sql

SELECT * FROM delta.`/tmp/delta-table-1`
LIMIT 5;

HearingDate,HearingResult,HearingJury,HearingPlea,HearingType,HearingRoom,fips,Filed,Commencedby,Locality,Sex,Race,Address,Charge,CodeSection,ChargeType,Class,OffenseDate,ArrestDate,DispositionCode,DispositionDate,ConcludedBy,AmendedCharge,AmendedCodeSection,AmendedChargeType,JailPenitentiary,ConcurrentConsecutive,LifeDeath,SentenceTime,SentenceSuspended,OperatorLicenseSuspensionTime,FineAmount,Costs,FinesCostPaid,ProgramType,ProbationType,ProbationTime,ProbationStarts,CourtDMVSurrender,DriverImprovementClinic,DrivingRestrictions,RestrictionEffectiveDate,RestrictionEndDate,VAAlcoholSafetyAction,RestitutionPaid,RestitutionAmount,Military,TrafficFatality,AppealedDate,person_id
2000-12-19,Dismissed,,,Under Advisement,,91,2000-02-16,General District Court Appeal,COMMONWEALTH OF VA,Male,White Caucasian (Non-Hispanic),"BLUE GRASS, VA 24413",ELUDE LAW ENFORCEMENT OFFICER,46.2-817,Misdemeanor,,1999-11-19,,Dismissed,2000-12-19,Dismissal,,,,,,,,,,,,,,,,,,,,,,,,,,,,227220000000460
2000-09-19,Dismissed,,,Trial,,91,2000-05-19,J&Dr Appeal,COMMONWEALTH OF VA,Male,White Caucasian (Non-Hispanic),"STAUNTON, VA 24401",OBSCENE PHONE CALL,18.2-427,Misdemeanor,1.0,2000-03-10,,Not Guilty/Acquitted,2000-09-19,Dismissal,,,,,,,,,,,,,,,,,,,,,,,,,,,,352110000000068
2000-09-07,Sent,,,Trial,,91,2000-02-16,General District Court Appeal,COMMONWEALTH OF VA,Male,White Caucasian (Non-Hispanic),"BLUE GRASS, VA 24413",SHOOT FROM A ROAD,18.2-286,Misdemeanor,,1999-11-19,,Guilty,2000-09-07,Guilty Plea,,,,,,,,,,300.0,81.0,t,,,,,,,,,,,,,,,,216180000001247
2000-09-07,Nolle Prosequi,,,Trial,,91,2000-02-16,General District Court Appeal,COMMONWEALTH OF VA,Male,White Caucasian (Non-Hispanic),"BLUE GRASS, VA 24413",SHOOT FROM A VEHICLE,29.1-521(6),Misdemeanor,,1999-11-19,,Nolle Prosequi,2000-09-07,Nolle Prosequi,,,,,,,,,,,,,,,,,,,,,,,,,,,,216180000001247
2000-09-07,Sent,,,Trial,,91,2000-02-16,General District Court Appeal,COMMONWEALTH OF VA,Male,White Caucasian (Non-Hispanic),"BLUE GRASS, VA 24413",HUNT POSTED W/O WRITTEN PERMIT,18.2-134,Misdemeanor,,1999-11-19,,Guilty,2000-09-07,Guilty Plea,,,,,,,,,,50.0,53.0,t,,,,,,,,,,,,,,,,216180000001247


3) (1 PT) Show the number ("count") of each ChargeType in the table, and sort descending by count

In [0]:
temp_table_name = "circuit_criminal_2000_anon_00_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql 
select ChargeType, count(*)
from circuit_criminal_2000_anon_00_csv
group by ChargeType
order by count(*) desc;

ChargeType,count(1)
Felony,92561
Misdemeanor,43921
"Other (Animal Violations, Bond Appeals)",8
Infraction,5
,3
Civil,1


In [0]:
deltaTable.toDF().groupby('ChargeType').count().orderBy(col('count'), ascending=False).show()

4) (2 PTS) Create a small dataframe with these specs:  
- 2 rows of data
- a subset of the columns from the Delta table
- a column that is NOT in the Delta table

Next, try writing the records to the Delta table.
Since the records don't follow the schema of the Delta table, it would be problematic if this data was written.  
This is one of the issues with a *data lake*: it can become a dumping ground for bad data, producing a *data swamp*.

Fortunately, a Delta table prevents the write. **Make sure to show the error message that prints.**

In [0]:
deltaTable.update(
  set = { "fips" : expr("fips + 100")})

deltaTable.toDF().select('ChargeType', 'fips').show(2)


In [0]:
(deltaTable
 .alias("t")
 .merge(df.alias("s"), "t.fips = s.fips")
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute())

5) (1 PT) Explain the difference between INSERTING records and UPSERTING records.

Inserting adds a column that already exists into your table. Upserting will insert rows that don't exist while also updating rows that do exist (upsert means update and insert).

6) (2 PTS) You realized that all records where ChargeType: 'Infraction' should actually be ChargeType: 'Minor Infraction'  
Make this update to the Delta lake, and then rerun your query from Question 3.  
This should show that the Infractions migrated to Minor Infractions.

In [0]:
from pyspark.sql.functions import regexp_replace
df = deltaTable.toDF().withColumn('ChargeType', regexp_replace('ChargeType', 'Infraction', 'Minor Infraction'))
df.write.format("delta").mode("overwrite").save("/tmp/delta-table-1") 

In [0]:
df.groupby('ChargeType').count().orderBy(col('count'), ascending=False).show()

7) (2 PTS) You realize that actually 'Infraction' was the correct label for the migrated records from Question 6.  
Use the time travel functionality to load the original version of the delta table, and display the records with ChargeType: 'Infraction'  
Show only these columns: HearingDate, HearingResult, ChargeType

Wow, this feature bailed you out!

In [0]:
df0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table-1")
df0.select('HearingDate', 'HearingResult', 'ChargeType').filter(df0.ChargeType == 'Infraction').show()
