# Data Cleaning
Sonia Castelo Quispe (scq202@nyu.edu)

## Cleaning Parking Violations

Start Spark for Python (pyspark):

In [1]:
from pyspark import SparkContext, SparkConf
cf = SparkConf()
cf.set("spark.submit.deployMode","client")
sc = SparkContext.getOrCreate(cf)
from pyspark.sql import SparkSession
spark = SparkSession \
	    .builder \
	    .appName("Python Spark SQL basic example") \
	    .config("spark.some.config.option", "some-value") \
	    .getOrCreate()
                            

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/20 03:50:17 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/04/20 03:50:17 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/04/20 03:50:17 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/04/20 03:50:17 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


Load the CSV file: 

In [2]:
parking_df = spark.read.csv(path='/shared/CS-GY-6513/parking-violations/parking-violations-header.csv',header=True)

                                                                                

### Check data

In [3]:
# Check the columns by looking at the schema
parking_df.printSchema()

root
 |-- summons_number: string (nullable = true)
 |-- issue_date: string (nullable = true)
 |-- violation_code: string (nullable = true)
 |-- violation_county: string (nullable = true)
 |-- violation_description: string (nullable = true)
 |-- violation_location: string (nullable = true)
 |-- violation_precinct: string (nullable = true)
 |-- violation_time: string (nullable = true)
 |-- time_first_observed: string (nullable = true)
 |-- meter_number: string (nullable = true)
 |-- issuer_code: string (nullable = true)
 |-- issuer_command: string (nullable = true)
 |-- issuer_precinct: string (nullable = true)
 |-- issuing_agency: string (nullable = true)
 |-- plate_id: string (nullable = true)
 |-- plate_type: string (nullable = true)
 |-- registration_state: string (nullable = true)
 |-- street_name: string (nullable = true)
 |-- vehicle_body_type: string (nullable = true)
 |-- vehicle_color: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_year: strin

In [4]:
# Display 2 rows & columns vertically to see how the data look like.
parking_df.show(2, vertical=True)

-RECORD 0------------------------------
 summons_number        | 1307964308    
 issue_date            | 2016-03-07    
 violation_code        | 14            
 violation_county      | NY            
 violation_description | null          
 violation_location    | 1             
 violation_precinct    | 1             
 violation_time        | 1040P         
 time_first_observed   | null          
 meter_number          | -             
 issuer_code           | 160307        
 issuer_command        | 0001          
 issuer_precinct       | 1             
 issuing_agency        | K             
 plate_id              | GBH2444       
 plate_type            | PAS           
 registration_state    | NY            
 street_name           | N/S WARREN ST 
 vehicle_body_type     | SDN           
 vehicle_color         | BLACK         
 vehicle_make          | HONDA         
 vehicle_year          | 2008          
-RECORD 1------------------------------
 summons_number        | 1362655727    


In [5]:
# Show the statistics
parking_df.describe().show(vertical=True)



-RECORD 0-------------------------------------
 summary               | count                
 summons_number        | 1014017              
 issue_date            | 1014017              
 violation_code        | 1014017              
 violation_county      | 821043               
 violation_description | 912310               
 violation_location    | 1014017              
 violation_precinct    | 1014017              
 violation_time        | 1014003              
 time_first_observed   | 113101               
 meter_number          | 179551               
 issuer_code           | 1014017              
 issuer_command        | 824678               
 issuer_precinct       | 1014017              
 issuing_agency        | 1014017              
 plate_id              | 1014017              
 plate_type            | 1014017              
 registration_state    | 1014017              
 street_name           | 1013615              
 vehicle_body_type     | 1010433              
 vehicle_colo

                                                                                

### 1. Uniqueness Constraints 

Uniqueness Constraints or Key constraints: keys are unique across all tuples and null values are not allowed.

#### 1.1. Checking for Duplicates
- Use ``parking_df.count()`` to check how many instances we have, then check how many distinct/unique values we have by using ``parking_df.select('summons_number').distinct().count()``. If there are more instances than distinct values, this would mean that we have duplicate keys. 

In [6]:
# Check how many instances we have
total_instances = parking_df.count()
print('Count total number of instances      : ', total_instances)

# Check how many distinct/unique values we have
distinct_values = parking_df.select('summons_number').distinct().count()
print('Count total number of distinct values: ', distinct_values)

Count total number of instances      :  1014017




Count total number of distinct values:  1014017


                                                                                

Based on the above results, we can conclude that the keys are uniques since both results are the same.

#### 1.2. Checking for Null values
- In the below snippet ``isnan()`` is a SQL function that is used to check for NAN values and ``isNull()`` is a Column class function that is used to check for Null values.

In [7]:
from pyspark.sql.functions import col,isnan,when,count

df_Columns=["summons_number"]
parking_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_Columns]
   ).show()

+--------------+
|summons_number|
+--------------+
|             0|
+--------------+



Based on the above result, there are no null values.

### Create a Temp Table


In [8]:
# Register the DataFrame as a SQL temporary view
parking_df.createOrReplaceTempView("parking")

# Testing
spark.sql("SELECT count(DISTINCT plate_type) FROM parking").show()




+--------------------------+
|count(DISTINCT plate_type)|
+--------------------------+
|                        75|
+--------------------------+



                                                                                

### 2. Outliers and Anomalies in Dataset Columns
One common profiling task is the detection of outliers (anomalies) in datasets. In our example we focus on outlier values in individual columns.

#### Plate ID
This example looks at the distinct values in the column Plate ID.

In [9]:
# List the distinct plate types in the table. Display 20 rows 
spark.sql("SELECT DISTINCT plate_type FROM parking").show(20)

+----------+
|plate_type|
+----------+
|       APP|
|       RGL|
|       CHC|
|       CCK|
|       CLG|
|       NYA|
|       FAR|
|       BOT|
|       SOS|
|       SPC|
|       SUP|
|       OMO|
|       LMB|
|       STA|
|       COM|
|       HAM|
|       NYS|
|       RGC|
|       TRC|
|       BOB|
+----------+
only showing top 20 rows



#### Plate Type
List each distinct ``plate type`` and its frequency in the table

In [10]:
# List each distinct plate type and its frequency in the table
# e.g.
#    plate_type count
#    PAS        740554
#    COM        190147
distinc_plate_type = spark.sql("SELECT DISTINCT plate_type, count(*) AS COUNT FROM parking GROUP BY plate_type")   
distinc_plate_type.show()

+----------+------+
|plate_type| COUNT|
+----------+------+
|       APP|  1952|
|       RGL|   524|
|       CHC|    35|
|       CCK|     2|
|       CLG|     2|
|       NYA|     3|
|       FAR|     2|
|       BOT|     2|
|       SOS|    15|
|       SPC|    28|
|       SUP|     8|
|       OMO|     2|
|       LMB|     2|
|       STA|    33|
|       COM|190147|
|       HAM|    19|
|       NYS|     5|
|       RGC|    25|
|       TRC|  2784|
|       BOB|     6|
+----------+------+
only showing top 20 rows



Order the results of the previous query by count, the most frequent should appear first.

In [11]:
# order the results of the previous query by count, the most frequent should appear first
order_distinc_plate_type = spark.sql("SELECT DISTINCT plate_type, count(*) AS COUNT FROM parking GROUP BY plate_type ORDER BY COUNT desc")   
order_distinc_plate_type.show()

+----------+------+
|plate_type| COUNT|
+----------+------+
|       PAS|740554|
|       COM|190147|
|       OMT| 35480|
|       OMS|  9032|
|       SRF|  8341|
|       IRP|  5291|
|       999|  4467|
|       TRC|  2784|
|       OMR|  2158|
|       APP|  1952|
|       MOT|  1851|
|       ORG|  1591|
|       CMB|  1368|
|       MED|  1211|
|       OML|  1181|
|       PSD|   900|
|       SPO|   823|
|       SCL|   700|
|       TOW|   611|
|       RGL|   524|
+----------+------+
only showing top 20 rows



Change ``plate_type`` 999 to null

In [12]:
# change plate_type 999 to null
from pyspark.sql import functions as F
parking_df2 = parking_df.withColumn('plate_type', F.when(parking_df['plate_type']=='999', 'null').otherwise(parking_df['plate_type']))
parking_df2.groupBy('plate_type').count().orderBy('count', ascending=False).show(26)

+----------+------+
|plate_type| count|
+----------+------+
|       PAS|740554|
|       COM|190147|
|       OMT| 35480|
|       OMS|  9032|
|       SRF|  8341|
|       IRP|  5291|
|      null|  4467|
|       TRC|  2784|
|       OMR|  2158|
|       APP|  1952|
|       MOT|  1851|
|       ORG|  1591|
|       CMB|  1368|
|       MED|  1211|
|       OML|  1181|
|       PSD|   900|
|       SPO|   823|
|       SCL|   700|
|       TOW|   611|
|       RGL|   524|
|       VAS|   427|
|       SRN|   348|
|       DLR|   333|
|       TRA|   318|
|       ITP|   283|
|       TRL|   223|
+----------+------+
only showing top 26 rows



Remove all rows where ``plate_type=999``

In [13]:
# remove all rows where plate_type=999
parking_df3 = parking_df.filter(parking_df["plate_type"] != '999')

print('Count total number of rows before filtering:               ', parking_df.count())
print('Count total number of rows after removing  plate_type=999: ', parking_df3.count())

Count total number of rows before filtering:                1014017
Count total number of rows after removing  plate_type=999:  1009550


Based on the about results, we can see that ``4467`` rows contained ``plate_type=999``. They werer removed by using the filter function.

#### Violation County

Suppose we are interested in analyzing violations based on what county they occur. We might want to exclude rows that have a blank entry in the violation_county column.

In [14]:
# How many rows have Blank Entries in violation_county?
blank_entries_viocounty = parking_df.filter( parking_df["violation_county"].isNull()).count()

print('Count total number of rows with blank entries in violation_county: ', blank_entries_viocounty)

Count total number of rows with blank entries in violation_county:  192974


Check each distinct ``violation_county`` and its frequency in the table. Then order the results of the previous query by count, the most frequent should appear first

In [15]:
spark.sql("SELECT DISTINCT violation_county, count(*) AS COUNT FROM parking GROUP BY violation_county ORDER BY COUNT desc").show()

+----------------+------+
|violation_county| COUNT|
+----------------+------+
|              NY|341628|
|               K|207630|
|            null|192974|
|               Q|164615|
|              BX| 98643|
|               R|  8527|
+----------------+------+



To clean this data, we can create a new dataframe without any black entries in violation_county.

In [16]:
# Create a new dataframe without any black entries in violation_county
parking_df4 = parking_df.filter(parking_df["violation_county"].isNotNull())
print('Count total number of rows before filtering:                                  ', parking_df.count())
print('Count total number of rows after removing  black entries in violation_county: ', parking_df4.count())
print(parking_df.count()-parking_df4.count(), 'rows which contains back entries were removed from violation_county.')

Count total number of rows before filtering:                                   1014017
Count total number of rows after removing  black entries in violation_county:  821043
192974 rows which contains back entries were removed from violation_county.


### 3. Clustering
Clustering is one of the Similarity-based method for cleaning.

Let's clean the plate_ids.
``Clustering`` helps detect entries in a column that are close together (and thus represent the same value)

In [17]:
# Let's clean the plate_ids
plate_id_rdd = parking_df.select('plate_id').rdd.flatMap(list)
plate_id_rdd.take(50)

                                                                                

['GBH2444',
 'GKZ2313',
 'N346594',
 'GDP2624',
 '42555JU',
 '62636MD',
 'DPE3045',
 'FMW7832',
 'DSD2130',
 '65111MB',
 'GMZ3750',
 '44884',
 'XZ876G',
 'PIKINE',
 'GMU4296',
 'GEJ8235',
 '74452JW',
 '42972JW',
 '66951',
 '63400JM',
 'GGS5172',
 '51329A',
 '49216KA',
 '31695JZ',
 '79638KA',
 '88720MB',
 'ERP5344',
 'FWM1758',
 '14307LV',
 'EWT1353',
 '65566PA',
 'FPF5158',
 '24393MC',
 '24393MG',
 'FXR1798',
 'FWH9893',
 '88629JH',
 '1510332',
 'DJE1615',
 '2208656',
 'GXC7520',
 'GRC4443',
 'T639084C',
 'GARFR5',
 'GTJ6780',
 '401ZGU',
 'ZWF21Z',
 'S274036',
 'PF090W',
 'GLR6718']

"Key Collision" methods are based on the idea of creating a key value that contains only 
 the most valuable or meaningful part of the string and groups together different strings based 
 on the fact that their key is the same (hence the name "key collision").
Fingerprinting Method:
note that the order of these operations (the last 3 lines) is significant.
remove leading and trailing whitespace	
change all characters to their lowercase representation
remove all punctuation and control characters
normalize extended western characters to their ASCII representation (for example "gödel" → "godel")
split the string into whitespace-separated tokens
sort the tokens and remove duplicates
join the tokens back together


In [18]:
import string, unicodedata
def fingerprint(value):
    key = unicodedata.normalize('NFKD', value).encode('ascii','ignore').decode()
    key = set(key.strip().lower().translate(str.maketrans('','',string.punctuation)).split())
    key = ' '.join(sorted(list(key)))
    return (key, value)

In [19]:
plate_id_rdd.distinct().map(fingerprint).first()

                                                                                

('ghn4591', 'GHN4591')

Apply the fingerprint function to all plate_id values, and group the ones that have the same key

In [20]:
# mapValues(list) - makes the grouped values into a list
plate_id_rdd.distinct(). \
	map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1). \
	mapValues(list). \
	collect()


                                                                                

[('2970cp', ['2970.CP', '2970CP']),
 ('41690ja', ['41690JA', '41690JA+']),
 ('jnp981', ['JNP981', 'JNP981&']),
 ('xt549k', ['XT.549K', 'XT549K']),
 ('ap717y', ['AP717Y', 'AP7!17Y']),
 ('xbgv20', ['XBGV20', 'XBG.V20']),
 ('zgk7779', ['ZGK7779', 'ZGK.7779']),
 ('12224mg', ['12224MG', '1222]4MG']),
 ('47879mg', ['47879MG.', '47879MG']),
 ('jhd0328', ['JHD0328', 'JHD0328.']),
 ('849rzb', ['849RZB', '849RZB.']),
 ('l08275', ['L08(275', 'L08275']),
 ('hkv4504', ['HKV4504', 'HKV!4504']),
 ('7', ['7', '7!']),
 ('ab73725', ['AB.73725', 'AB73725']),
 ('ete3059', ['ETE3059+', 'ETE3059']),
 ('88cs02', ['88CS02', '88C.S02']),
 ('l21741', ['L.21741', 'L21741']),
 ('l21687', ['L21687', 'L.21687']),
 ('na', ['N/A', 'NA']),
 ('jcw0303', ['JCW0303`', 'JCW0303']),
 ('64582md', ['64582MD.', '64582MD']),
 ('6786cx', ['6786CX', '6786.CX']),
 ('zxf293', ['ZXF293+', 'ZXF293']),
 ('k90404', ['K.90404', 'K90404']),
 ('aj511c', ['AJ511C', 'AJ.511C']),
 ('u57afu', ['U57AFU', 'U57.AFU']),
 ('hcv1327', ['HCV1327', 

In [21]:
plate_id_rdd.distinct(). \
	map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1).take(5)


                                                                                

[('xbgv20', <pyspark.resultiterable.ResultIterable at 0x7f9fd0859190>),
 ('ap717y', <pyspark.resultiterable.ResultIterable at 0x7f9fd0859700>),
 ('zgk7779', <pyspark.resultiterable.ResultIterable at 0x7f9fd0859220>),
 ('xt549k', <pyspark.resultiterable.ResultIterable at 0x7f9fd08595b0>),
 ('41690ja', <pyspark.resultiterable.ResultIterable at 0x7f9fd08591f0>)]

To determine a cluster should be merged or not, we can look more closely at the data. As we can see in the example bellow, it seems like both ``2970CP`` and ``2970.CP`` represent the same plate id. So, we may want to rename all of tthem using one of the values, for example, the value ``2970CP``.

In [22]:
# check the other attributes to determine if the two ids correspond to the same entity
parking_df.where((parking_df.plate_id == '2970CP') | \
	(parking_df.plate_id == '2970.CP')). \
	show()


+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+---------------+-----------------+-------------+------------+------------+
|summons_number|issue_date|violation_code|violation_county|violation_description|violation_location|violation_precinct|violation_time|time_first_observed|meter_number|issuer_code|issuer_command|issuer_precinct|issuing_agency|plate_id|plate_type|registration_state|    street_name|vehicle_body_type|vehicle_color|vehicle_make|vehicle_year|
+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+---------------+-----------------+-------------+----------

We can perform the same analysis on other columns like ``street_name``. As shown bellow.

In [23]:
# clustering on the street_name column.
street_name_rdd = parking_df.select('street_name').distinct().rdd.flatMap(list).filter(lambda x: x!=None)

stname_clusters = street_name_rdd.distinct(). \
	map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1). \
	mapValues(list). \
	collect()

stname_clusters

                                                                                

[('plains rd white', ['White Plains Rd', 'WHITE PLAINS RD']),
 ('ave riverdale', ['Riverdale Ave', 'RIVERDALE AVE']),
 ('eldert ln', ['ELDERT LN', 'Eldert Ln']),
 ('ave chittenden', ['CHITTENDEN AVE', 'Chittenden Ave']),
 ('5 avenue', ['5 AVENUE', 'AVENUE 5']),
 ('184th st', ['184th St', '184TH ST']),
 ('ave ryder', ['Ryder Ave', 'RYDER AVE']),
 ('ave foothill', ['Foothill Ave', 'FOOTHILL AVE']),
 ('ave lott', ['Lott Ave', 'LOTT AVE']),
 ('marine way', ['Marine Way', 'MARINE WAY']),
 ('astor pl', ['Astor Pl', 'ASTOR PL']),
 ('37th ave', ['37TH AVE', '37th Ave']),
 ('clawson st', ['Clawson St', 'CLAWSON ST']),
 ('ave shepard', ['SHEPARD AVE', 'Shepard Ave']),
 ('gramercy park', ['Gramercy Park', 'GRAMERCY PARK']),
 ('degraw st', ['Degraw St', 'DEGRAW ST']),
 ('ave glenmore', ['Glenmore Ave', 'GLENMORE AVE']),
 ('ave neptune', ['Neptune Ave', 'NEPTUNE  AVE', 'NEPTUNE AVE']),
 ('hartman ln', ['HARTMAN LN', 'Hartman Ln']),
 ('7 arr jfkia terminal', ['TERMINAL 7 JFKIA ARR', 'JFKIA TERMINAL 

In [24]:
street_name_rdd.take(3)

['Mount Hope Pl', 'Carver Loop', 'Rombouts Ave']

In [25]:
# apply the fingerprint to all street addresses
#[('court square', 'COURT SQUARE'),
# ('31 e st', 'E 31 ST'),
# ('island randalls', 'RANDALLS ISLAND'),
# ('45th e street', 'E 45TH STREET'),
# ('ave sheridan', 'SHERIDAN AVE')]
street_name_rdd.distinct(). \
	map(fingerprint). \
	collect()

[('ave sheridan', 'SHERIDAN AVE'),
 ('a jerome street west', 'WEST STREET JEROME A'),
 ('ave monticello', 'Monticello Ave'),
 ('richmond terale', 'RICHMOND TERALE'),
 ('dr riverside', 'RIVERSIDE  DR'),
 ('ns remser st', 'N/S REMSER ST'),
 ('gorsline st', 'GORSLINE ST'),
 ('fame hall of terrace', 'HALL OF FAME TERRACE'),
 ('ave kingsland', 'Kingsland Ave'),
 ('6 ave co sw', 'SW C/O 6 AVE'),
 ('ave schenck', 'SCHENCK AVE'),
 ('avenue givan', 'GIVAN AVENUE'),
 ('31st ave', '31st Ave'),
 ('ave bolton', 'Bolton Ave'),
 ('st waldron', 'Waldron St'),
 ('pl regent', 'Regent Pl'),
 ('118 st w', 'W 118 ST'),
 ('52 e str', 'E 52 STR'),
 ('100 st', '100 ST'),
 ('autumn ave', 'AUTUMN AVE'),
 ('172 co e', 'C/O E 172'),
 ('av jamaica', 'JAMAICA AV'),
 ('cedar', 'CEDAR'),
 ('ave porter', 'Porter Ave'),
 ('gordon st', 'Gordon St'),
 ('ashland pl', 'ASHLAND PL'),
 ('227 east', 'EAST 227'),
 ('avenue manor', 'MANOR AVENUE'),
 ('ave claflin co', 'C/O CLAFLIN AVE'),
 ('58 beach street', 'BEACH 58 STREET'),

### 4. Value Constraints
Value constraints refine a simple type by defining limits on the values which it can represent. It is often useful to be able to constrain the values which an element can take.

Suppose we are interested in analyzing Registration_State based on the values that it can take. First at all, it should be a string. We might want to exclude rows that have a numerical entry in the Registration_State column.

In [26]:
# Check the ticket counts based in registered state
registered_state_wise_tickets = spark.sql("select Registration_State as registration_state, count(*) as ticket_count \
                               from parking \
                               group by registration_state \
                               order by ticket_count desc")

registered_state_wise_tickets.show()

+------------------+------------+
|registration_state|ticket_count|
+------------------+------------+
|                NY|      794106|
|                NJ|       93049|
|                PA|       25434|
|                CT|       13168|
|                FL|       12440|
|                MA|        8720|
|                IN|        8092|
|                VA|        6628|
|                MD|        5232|
|                NC|        5082|
|                99|        3326|
|                IL|        3265|
|                GA|        3188|
|                TX|        2907|
|                OH|        2233|
|                ME|        2092|
|                AZ|        1992|
|                CA|        1991|
|                SC|        1985|
|                OK|        1857|
+------------------+------------+
only showing top 20 rows



Based on the about results, we can see that ``3326`` rows contained ``Registration_State=99``. It is definetelly wrong since registration state must be a string not a numerical value. To solve this, we can remove all this entries from our data.

As we can see bellow, they were removed by using the filter function.

In [27]:
# How many rows have 99 in Registration State?
re_count = parking_df.filter( parking_df["Registration_State"] == "99").count()

print('Count total number of rows with 99 state in Registration_State: ', re_count)

# Remove rows containing Registration_State=99.
filter_99_re = parking_df.filter(~(parking_df["Registration_State"] == "99"))
filter_99_re.groupBy('Registration_State').count().orderBy('count', ascending=False).show(26)

Count total number of rows with 99 state in Registration_State:  3326
+------------------+------+
|Registration_State| count|
+------------------+------+
|                NY|794106|
|                NJ| 93049|
|                PA| 25434|
|                CT| 13168|
|                FL| 12440|
|                MA|  8720|
|                IN|  8092|
|                VA|  6628|
|                MD|  5232|
|                NC|  5082|
|                IL|  3265|
|                GA|  3188|
|                TX|  2907|
|                OH|  2233|
|                ME|  2092|
|                AZ|  1992|
|                CA|  1991|
|                SC|  1985|
|                OK|  1857|
|                TN|  1727|
|                MI|  1557|
|                MN|  1466|
|                DE|  1367|
|                RI|  1084|
|                NH|   904|
|                AL|   688|
+------------------+------+
only showing top 26 rows



Otherkind of value constrains can be found in the data depending on the column type, for example, if we are analysing the issue date, the values shouldn't be take place in the future.

### 5. Use of master data

The column ``Registration State`` contains a total of 64 values, which is more than the 50 U.S. states and even two more than in the list of state codes that is published by the NYC Department of Finance.

In [28]:
# Count of distinct Registration states
spark.sql("select count(distinct Registration_State) as count from parking").show()

+-----+
|count|
+-----+
|   64|
+-----+



In [29]:
# Check the ticket counts based in registered state
registered_state_wise_tickets = spark.sql("select Registration_State as registration_state, count(*) as ticket_count \
                               from parking \
                               group by registration_state \
                               order by ticket_count desc")

registered_state_wise_tickets.show(65)

+------------------+------------+
|registration_state|ticket_count|
+------------------+------------+
|                NY|      794106|
|                NJ|       93049|
|                PA|       25434|
|                CT|       13168|
|                FL|       12440|
|                MA|        8720|
|                IN|        8092|
|                VA|        6628|
|                MD|        5232|
|                NC|        5082|
|                99|        3326|
|                IL|        3265|
|                GA|        3188|
|                TX|        2907|
|                OH|        2233|
|                ME|        2092|
|                AZ|        1992|
|                CA|        1991|
|                SC|        1985|
|                OK|        1857|
|                TN|        1727|
|                MI|        1557|
|                MN|        1466|
|                DE|        1367|
|                RI|        1084|
|                NH|         904|
|             

One solution is to use a master data to identify the invalid values. In this case, the master data would be a curated list of license plate state codes. The official list of license plate states has 67 values (including the 50 U.S. states, provinces and territories in Canada, Mexico, U.S. government vehicles). For this purpose, we used the open-source library ``refdata``, which is inside the openclean library, to provide access to master datasets available on the Web such as ``NYC Finance - State Codes``. To install ``openclean``, we just need to run ``pip install openclean-core`` in your terminal. More information about how to install openclean is detailed here: https://openclean.readthedocs.io/source/installation.html

In [30]:
from openclean.data.refdata import RefStore
refdata = RefStore()
for entry in refdata.repository().find():
    print(f'{entry.identifier:<35}:  {entry.name}')

company_suffixes                   :  Company Suffixes
encyclopaedia_britannica:us_cities :  Cities in the U.S.
nyc.gov:dof:state_codes            :  NYC Finance - State Codes
restcountries.eu                   :  REST Countries
usps:street_abbrev                 :  C1 Street Suffix Abbreviations
usps:secondary_unit_designators    :  C2 Secondary Unit Designators
wikipedia:us_states                :  States and territories of the U.S.
admins                             :  Geo Administrative Levels


In [31]:
# Download the license plate state codes dataset.
refdata\
    .load('nyc.gov:dof:state_codes', auto_download=True)\
    .df()\
    .head()

Unnamed: 0,code,name,type
0,AL,Alabama,US
1,AK,Alaska,US
2,AZ,Arizona,US
3,AR,Arkansas,US
4,CA,California,US


NOTE: In case, you can not install ``openclean`` to get the master data called NYC Finance - State Codes, here is the list of states: Just uncomment the following code:


In [32]:
# states_ref = {'AB',
#  'AK',
#  'AL',
#  'AR',
#  'AZ',
#  'BC',
#  'CA',
#  'CO',
#  'CT',
#  'DC',
#  'DE',
#  'DP',
#  'FL',
#  'FO',
#  'GA',
#  'GV',
#  'HI',
#  'IA',
#  'ID',
#  'IL',
#  'IN',
#  'KS',
#  'KY',
#  'LA',
#  'MA',
#  'MB',
#  'MD',
#  'ME',
#  'MI',
#  'MN',
#  'MO',
#  'MS',
#  'MT',
#  'MX',
#  'NB',
#  'NC',
#  'ND',
#  'NE',
#  'NF',
#  'NH',
#  'NJ',
#  'NM',
#  'NS',
#  'NT',
#  'NV',
#  'NY',
#  'OH',
#  'OK',
#  'ON',
#  'OR',
#  'PA',
#  'PE',
#  'QB',
#  'RI',
#  'SC',
#  'SD',
#  'SK',
#  'TN',
#  'TX',
#  'UT',
#  'VA',
#  'VT',
#  'WA',
#  'WI',
#  'WV',
#  'WY',
#  'YT'}

In [33]:
# Get set of distinct state codes from the master data called NYC Finance - State Codes from openclean.
states_ref = refdata.load('nyc.gov:dof:state_codes', auto_download=True).distinct('code')
states_ref

{'AB',
 'AK',
 'AL',
 'AR',
 'AZ',
 'BC',
 'CA',
 'CO',
 'CT',
 'DC',
 'DE',
 'DP',
 'FL',
 'FO',
 'GA',
 'GV',
 'HI',
 'IA',
 'ID',
 'IL',
 'IN',
 'KS',
 'KY',
 'LA',
 'MA',
 'MB',
 'MD',
 'ME',
 'MI',
 'MN',
 'MO',
 'MS',
 'MT',
 'MX',
 'NB',
 'NC',
 'ND',
 'NE',
 'NF',
 'NH',
 'NJ',
 'NM',
 'NS',
 'NT',
 'NV',
 'NY',
 'OH',
 'OK',
 'ON',
 'OR',
 'PA',
 'PE',
 'QB',
 'RI',
 'SC',
 'SD',
 'SK',
 'TN',
 'TX',
 'UT',
 'VA',
 'VT',
 'WA',
 'WI',
 'WV',
 'WY',
 'YT'}

In [34]:
# Get set of distinct state codes from Registration_State column of our Parking dataset.
registration_state_rdd = parking_df.select('Registration_State').distinct().rdd.flatMap(list).collect()
registration_state_rdd


['SC',
 'NS',
 'LA',
 'AZ',
 'MN',
 'NJ',
 'DC',
 'OR',
 'VA',
 '99',
 'RI',
 'KY',
 'WY',
 'BC',
 'NH',
 'MI',
 'NV',
 'GV',
 'QB',
 'WI',
 'ID',
 'CA',
 'CT',
 'NE',
 'MT',
 'NC',
 'VT',
 'MD',
 'DE',
 'MO',
 'IL',
 'ME',
 'ND',
 'WA',
 'MB',
 'MS',
 'AL',
 'IN',
 'OH',
 'TN',
 'NM',
 'IA',
 'PA',
 'SD',
 'NY',
 'ON',
 'AB',
 'TX',
 'PE',
 'WV',
 'GA',
 'MA',
 'FL',
 'CO',
 'AK',
 'KS',
 'YT',
 'OK',
 'PR',
 'NB',
 'AR',
 'HI',
 'UT',
 'DP']

Print information for entries in the ``Registration State`` column that do not occur in the master dataset.

In [35]:
for state in registration_state_rdd:
    if state not in states_ref:
        print(state)

99
PR


The result shows that 99 and PR are the two values that do not occur in the master dataset.

In [36]:
# How many rows have 99 and PR in Registration State?
_99_count = parking_df.filter( parking_df["Registration_State"] == "99").count()
_PR_count = parking_df.filter( parking_df["Registration_State"] == "PR").count()


print('Count total number of rows with 99 state in Registration_State: ', _99_count)
print('Count total number of rows with PR state in Registration_State: ', _PR_count)

Count total number of rows with 99 state in Registration_State:  3326
Count total number of rows with PR state in Registration_State:  24


### 6. Functional Dependencies
A functional dependency is a constraint that specifies the relationship between two sets of attributes where one set can accurately determine the value of other sets. 
For example, the combination of Plate ID and Registration State should uniquely identify a vehicle. For each vehicle we make the assumption that the vehicle color doesn't change within one financial year. Thus, the functional dependency that [Plate ID, Registration State] determines the Vehicle Color should hold. Violations of that dependency point to different representations of the same color value. 

To test our assumption, first, we check if our data is within one financial year. As we can see, our data is actually within the fiscal year ``2016`` so we can go ahead with our example.

In [37]:
spark.sql("SELECT count(DISTINCT issue_date) FROM parking").show()
spark.sql("select issue_date as issue_date, count(*) as date_count \
                               from parking \
                               group by issue_date \
                               order by date_count desc").show(31)


+--------------------------+
|count(DISTINCT issue_date)|
+--------------------------+
|                        31|
+--------------------------+

+----------+----------+
|issue_date|date_count|
+----------+----------+
|2016-03-01|     45090|
|2016-03-08|     44166|
|2016-03-10|     42878|
|2016-03-15|     42118|
|2016-03-03|     41751|
|2016-03-31|     41558|
|2016-03-22|     41292|
|2016-03-02|     40291|
|2016-03-09|     40289|
|2016-03-18|     40269|
|2016-03-17|     40131|
|2016-03-07|     40062|
|2016-03-11|     39502|
|2016-03-23|     39262|
|2016-03-29|     39179|
|2016-03-16|     39134|
|2016-03-30|     38070|
|2016-03-24|     37037|
|2016-03-21|     33361|
|2016-03-28|     33136|
|2016-03-04|     32235|
|2016-03-14|     31733|
|2016-03-25|     27807|
|2016-03-05|     23431|
|2016-03-12|     23365|
|2016-03-19|     21560|
|2016-03-26|     20909|
|2016-03-06|     10132|
|2016-03-13|      8800|
|2016-03-20|      8301|
|2016-03-27|      7168|
+----------+----------+



In [38]:
# parking_df_fd1 = parking_df.withColumn('vehicle_color', F.lower(parking_df['Vehicle_Color']))


df_vc = parking_df\
    .select(['Plate_ID', 'Registration_State', 'Vehicle_Color'])\
    .filter((~parking_df["Vehicle_Color"].isNull()) & (parking_df["Registration_State"] != '99') & (parking_df["plate_type"] != '999'))\
    .withColumn('vehicle_color', F.upper(parking_df['Vehicle_Color']))

In [39]:
df_vc.show()

+--------+------------------+-------------+
|Plate_ID|Registration_State|vehicle_color|
+--------+------------------+-------------+
| GBH2444|                NY|        BLACK|
| GKZ2313|                NY|        WHITE|
| GDP2624|                NY|        WHITE|
| 42555JU|                NY|        WHITE|
| 62636MD|                NY|          WHT|
| DPE3045|                NY|        GREEN|
| FMW7832|                NY|        GREEN|
| DSD2130|                NY|         GOLD|
| 65111MB|                NY|           WH|
| GMZ3750|                NY|          BLK|
|   44884|                RI|        WHITE|
|  XZ876G|                NJ|        WHITE|
|  PIKINE|                MA|          GLD|
| GMU4296|                NY|        WHITE|
| GEJ8235|                NY|          WHT|
| 74452JW|                NY|          WHT|
| 42972JW|                NY|           WT|
|   66951|                NY|          WHT|
| 63400JM|                NY|           BR|
| GGS5172|                NY|   

In [40]:

print( parking_df.distinct().count())
print(df_vc.distinct().count())

                                                                                

1014017
660738


In [41]:
var_pid = df_vc.select('Plate_ID').rdd.flatMap(list).collect()
var_rs = df_vc.select('Registration_State').rdd.flatMap(list).collect()
var_vc = df_vc.select('Vehicle_Color').rdd.flatMap(list).collect()

In [42]:
obj_fd = {}

for i in range(len(var_pid)):
    obj_name = var_pid[i] + var_rs[i]
    if obj_name in obj_fd:
        temp1 = obj_fd[obj_name]
        vc_new_color = var_vc[i]
        if vc_new_color not in temp1:
            temp1.append(vc_new_color)
            obj_fd[obj_name] = temp1
    else:
        temp = []
        temp.append(var_vc[i])
        obj_fd[obj_name] = temp


Below are the groups of tuples that violate the functional dependency. For example, we can see that (BLACK, BK), (WHITE, WH), and ('GRY', 'SILVE', 'GY') appear to be representations for the same color, giving us an indication for a possible mapping to standardize vehicle colors. 

In [43]:
for x in obj_fd.keys():
    if len(obj_fd[x]) >1:
        print(x, ": " , obj_fd[x])

GBH2444NY :  ['BLACK', 'BK']
42555JUNY :  ['WHITE', 'WH']
DSD2130NY :  ['GOLD', 'GY']
GMZ3750NY :  ['BLK', 'BK']
GEJ8235NY :  ['WHT', 'WH']
74452JWNY :  ['WHT', 'WH', 'WHITE']
49216KANY :  ['WH', 'WHT', 'BRN', 'WHITE']
79638KANY :  ['GRY', 'SILVE', 'GY']
24393MCNY :  ['WH', 'WHITE']
GXC7520NY :  ['GREY', 'GY']
GRC4443NY :  ['GREY', 'GY']
T639084CNY :  ['BLACK', 'BK']
GTJ6780NY :  ['BLACK', 'BK']
PF090WFL :  ['SLVR', 'GREY']
FTT2816NY :  ['BLUE', 'BL']
56248MGNY :  ['BR', 'BROWN', 'GOLD']
12996JUNY :  ['WH', 'WHITE']
80963MBNY :  ['GREY', 'GRAY', 'GRY', 'GY']
HAK2246NY :  ['BK', 'BLACK']
HFJ2092NY :  ['GY', 'LTG']
GVG3196NY :  ['GY', 'SLVR']
GXB2024NY :  ['BLACK', 'BK']
HCT2268NY :  ['RED', 'RD']
T687197CNY :  ['BLACK', 'BK']
GBS7839NY :  ['BLUE', 'BK']
EAB5723NY :  ['TAN', 'OTHER']
2PS558MA :  ['SILVE', 'GREY']
VLL2419VA :  ['GRAY', 'GREEN', 'BLUE', 'GREY']
FPJ4878NY :  ['SILVE', 'GREY', 'GY']
35415MBNY :  ['BROWN', 'BR', 'BRWN']
FFN5218NY :  ['WHT', 'WH']
GMR1833NY :  ['GREY', 'GY']
8

To clean the data using functional dependency outputs, we can standarize the name of the Vehicle_Color by choosing just one denomination and removing the other ones.

### 7. Entity Resolution
Identify and merge different representations of the same entity

In [44]:
# Using concat_ws() function of Pypsark SQL concatenated three string input columns (firstname, middlename, lastname) into a single string column (Fullname) and separated each column with “_” separator.

In [45]:
# Check the columns by looking at the schema
parking_df.printSchema()

root
 |-- summons_number: string (nullable = true)
 |-- issue_date: string (nullable = true)
 |-- violation_code: string (nullable = true)
 |-- violation_county: string (nullable = true)
 |-- violation_description: string (nullable = true)
 |-- violation_location: string (nullable = true)
 |-- violation_precinct: string (nullable = true)
 |-- violation_time: string (nullable = true)
 |-- time_first_observed: string (nullable = true)
 |-- meter_number: string (nullable = true)
 |-- issuer_code: string (nullable = true)
 |-- issuer_command: string (nullable = true)
 |-- issuer_precinct: string (nullable = true)
 |-- issuing_agency: string (nullable = true)
 |-- plate_id: string (nullable = true)
 |-- plate_type: string (nullable = true)
 |-- registration_state: string (nullable = true)
 |-- street_name: string (nullable = true)
 |-- vehicle_body_type: string (nullable = true)
 |-- vehicle_color: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_year: strin

In [47]:
from pyspark.sql.functions import concat_ws,col
parking_df_er=parking_df.select(concat_ws(' ',
                                          parking_df.issue_date,
                                          parking_df.violation_code,
                                          parking_df.violation_county,
                                          parking_df.violation_time,
                                          parking_df.issuer_code,
                                          parking_df.issuer_command,
                                          parking_df.plate_id,
                                          parking_df.plate_type,
                                          parking_df.registration_state,
                                          parking_df.street_name,
                                         )
              .alias("features_parking"),"summons_number")
parking_df_er.show()

+--------------------+--------------+
|    features_parking|summons_number|
+--------------------+--------------+
|2016-03-07 14 NY ...|    1307964308|
|2016-03-02 98 BX ...|    1362655727|
|2016-03-01 21 NY ...|    1363178234|
|2016-03-02 74 K 1...|    1365797030|
|2016-03-03 38 NY ...|    1366529595|
|2016-03-07 20 NY ...|    1366571757|
|2016-03-01 21 NY ...|    1363178192|
|2016-03-12 21 BX ...|    1362906062|
|2016-03-09 40 K 0...|    1367591351|
|2016-03-19 20 NY ...|    1354042244|
|2016-03-17 40 BX ...|    1359423576|
|2016-03-19 85 BX ...|    1358746333|
|2016-03-13 40 K 0...|    1361067974|
|2016-03-29 40 NY ...|    1362335939|
|2016-03-31 21 BX ...|    1362902056|
|2016-03-16 46 K 0...|    1362963860|
|2016-03-08 78 NY ...|    1365462523|
|2016-03-14 19 NY ...|    1366567511|
|2016-03-26 46 BX ...|    1366720556|
|2016-03-20 45 NY ...|    1367171477|
+--------------------+--------------+
only showing top 20 rows



In [48]:
parking_df_sample = parking_df_er.limit(400)

In [49]:
parking_df_sample.show()

+--------------------+--------------+
|    features_parking|summons_number|
+--------------------+--------------+
|2016-03-07 14 NY ...|    1307964308|
|2016-03-02 98 BX ...|    1362655727|
|2016-03-01 21 NY ...|    1363178234|
|2016-03-02 74 K 1...|    1365797030|
|2016-03-03 38 NY ...|    1366529595|
|2016-03-07 20 NY ...|    1366571757|
|2016-03-01 21 NY ...|    1363178192|
|2016-03-12 21 BX ...|    1362906062|
|2016-03-09 40 K 0...|    1367591351|
|2016-03-19 20 NY ...|    1354042244|
|2016-03-17 40 BX ...|    1359423576|
|2016-03-19 85 BX ...|    1358746333|
|2016-03-13 40 K 0...|    1361067974|
|2016-03-29 40 NY ...|    1362335939|
|2016-03-31 21 BX ...|    1362902056|
|2016-03-16 46 K 0...|    1362963860|
|2016-03-08 78 NY ...|    1365462523|
|2016-03-14 19 NY ...|    1366567511|
|2016-03-26 46 BX ...|    1366720556|
|2016-03-20 45 NY ...|    1367171477|
+--------------------+--------------+
only showing top 20 rows



In [50]:
import pyspark.sql.functions as F
matrix = parking_df_sample.select(F.col('summons_number').alias('summons_number1'), F.col('features_parking').alias('features_parking1')) \
    .crossJoin(parking_df_sample.select(F.col('summons_number').alias('summons_number2'), F.col('features_parking').alias('features_parking2'))) \
    .filter(F.col('summons_number1') > F.col('summons_number2'))

We can use levenshtein distance to find similar entities

In [51]:
from pyspark.sql.functions import udf

from pyspark.sql.types import StringType, IntegerType
res = matrix.withColumn('similarity', F.levenshtein(F.col('features_parking1'), F.col('features_parking2')))


In [81]:
res.show(45)

+---------------+--------------------+---------------+--------------------+----------+
|summons_number1|   features_parking1|summons_number2|   features_parking2|similarity|
+---------------+--------------------+---------------+--------------------+----------+
|     7363675041|2016-03-11 17 NY ...|     7363675028|2016-03-11 20 NY ...|        24|
|     7363675119|2016-03-15 19 NY ...|     7363675028|2016-03-11 20 NY ...|        34|
|     7363675119|2016-03-15 19 NY ...|     7363675041|2016-03-11 17 NY ...|        32|
|     7363675132|2016-03-16 14 NY ...|     7363675028|2016-03-11 20 NY ...|        21|
|     7363675132|2016-03-16 14 NY ...|     7363675041|2016-03-11 17 NY ...|        19|
|     7363675132|2016-03-16 14 NY ...|     7363675119|2016-03-15 19 NY ...|        33|
|     7363675156|2016-03-16 19 NY ...|     7363675028|2016-03-11 20 NY ...|        25|
|     7363675156|2016-03-16 19 NY ...|     7363675041|2016-03-11 17 NY ...|        25|
|     7363675156|2016-03-16 19 NY ...|     

In [83]:
res.count()

79800

In [92]:
res.orderBy('similarity', ascending=True).show(40)

[Stage 282:>                                                        (0 + 1) / 1]

+---------------+--------------------+---------------+--------------------+----------+
|summons_number1|   features_parking1|summons_number2|   features_parking2|similarity|
+---------------+--------------------+---------------+--------------------+----------+
|     7372179002|2016-03-11 71 K 0...|     7372178990|2016-03-11 70 K 0...|         2|
|     7372180715|2016-03-17 71 K 0...|     7372180703|2016-03-17 74 K 0...|         2|
|     7372177303|2016-03-07 71 K 0...|     7372177297|2016-03-07 38 K 0...|         3|
|     7372178459|2016-03-10 38 K 0...|     7372177807|2016-03-08 38 K 0...|         3|
|     7372179464|2016-03-11 74 K 0...|     7372179452|2016-03-11 46 K 0...|         3|
|     7372180065|2016-03-15 71 K 0...|     7372180053|2016-03-15 20 K 0...|         3|
|     7367298227|2016-03-04 70 NY ...|     7367298215|2016-03-04 31 NY ...|         3|
|     7372180624|2016-03-16 70 K 0...|     7372180612|2016-03-16 38 K 0...|         3|
|     7372178903|2016-03-10 71 K 0...|     

                                                                                

After we compute the similarity between entities, we can compare if they are really the same entity by inspecting individual outputs.
For example let see this two entities which were found very similar (similarity equals to 2): ``1388230409`` and ``1388230392``. Looking at the rows we can see they are actually the same, the only thing that have changed is the violation_code. So it may be a human error while introducing the data. In this case we should rremove one of the entries. 

In [86]:
parking_df.where((parking_df['summons_number'] == '1388230409') | \
	(parking_df['summons_number'] == '1388230392')). \
	show()

+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+------------+
|summons_number|issue_date|violation_code|violation_county|violation_description|violation_location|violation_precinct|violation_time|time_first_observed|meter_number|issuer_code|issuer_command|issuer_precinct|issuing_agency|plate_id|plate_type|registration_state|street_name|vehicle_body_type|vehicle_color|vehicle_make|vehicle_year|
+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+---------

Analysing two more cases:

In [93]:
parking_df.where((parking_df['summons_number'] == '7372179002') | \
	(parking_df['summons_number'] == '7372178990')). \
	show()

+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+------------+
|summons_number|issue_date|violation_code|violation_county|violation_description|violation_location|violation_precinct|violation_time|time_first_observed|meter_number|issuer_code|issuer_command|issuer_precinct|issuing_agency|plate_id|plate_type|registration_state|street_name|vehicle_body_type|vehicle_color|vehicle_make|vehicle_year|
+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+---------

In [94]:
parking_df.where((parking_df['summons_number'] == '7372180715') | \
	(parking_df['summons_number'] == '7372180703')). \
	show()
 

+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+------------+
|summons_number|issue_date|violation_code|violation_county|violation_description|violation_location|violation_precinct|violation_time|time_first_observed|meter_number|issuer_code|issuer_command|issuer_precinct|issuing_agency|plate_id|plate_type|registration_state|street_name|vehicle_body_type|vehicle_color|vehicle_make|vehicle_year|
+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+---------

23/04/20 02:57:03 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container marked as failed: container_e02_1679692348553_19484_01_000002 on host: nyu-dataproc-sw-v05q.c.hpc-dataproc-19b8.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.
23/04/20 02:57:03 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 2 on nyu-dataproc-sw-v05q.c.hpc-dataproc-19b8.internal: Container marked as failed: container_e02_1679692348553_19484_01_000002 on host: nyu-dataproc-sw-v05q.c.hpc-dataproc-19b8.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.


To fix all the issues, we may want to remove entities which has a mean similarity values less than 25. This measure was found looking at the stadistics.

In [91]:
# Show the statistics
res.select(res['similarity']).describe().show()

[Stage 274:>                                                        (0 + 1) / 1]

+-------+------------------+
|summary|        similarity|
+-------+------------------+
|  count|             79800|
|   mean|25.613032581453634|
| stddev| 6.093391357837535|
|    min|                 2|
|    max|                45|
+-------+------------------+



                                                                                