# Install Packages

In [None]:
!pip install humanfriendly
!pip install datamart-geo
!pip install datamart-profiler
!pip install openclean
!pip install openclean-geo
!pip install fuzzywuzzy

# Import Packages

In [None]:
# General
import os
import re
import gzip
import time
import datetime
import dateutil
import pandas as pd
import humanfriendly

# Data Cleaning
from fuzzywuzzy import fuzz
import datamart_geo
import datamart_profiler
import openclean
from openclean.data.source.socrata import Socrata
from openclean import pipeline
from openclean.profiling.column import DefaultColumnProfiler
from openclean.function.eval.base import Eval, Col
from openclean.function.eval.logic import And, Or
from openclean.function.eval.null import IsEmpty
from openclean.function.value.null import is_empty
from openclean.data.refdata import RefStore
from openclean_geo.address.usstreet import StandardizeUSStreetName
from openclean.cluster.key import KeyCollision
from openclean.function.value.key.fingerprint import Fingerprint



# Mount Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
#os.chdir("/content/drive/MyDrive/Colab Notebooks/Big Data")

Mounted at /content/drive


# Helper Functions

In [None]:
nullVariants = set(["NA", "N.A", "N.A.", "N/A", "PO", "P.O", "P.O.", "P/O", "NONE", "NULL"])

def preprocessing(sample, sampleProfile):
    # Remove columns that are largely empty
    columns = []
    columnCutOffMargin = 0.6
    for i in range(len(sampleProfile)):
        count = (sampleProfile[i]["stats"]["emptyValueCount"]/sampleProfile[i]["stats"]["totalValueCount"])
        if count < columnCutOffMargin: columns.append(sampleProfile[i]["column"])
    sample = sample.select(columns)
    return sample, columns

def cleanName(name):
    name = name.upper()
    if name in nullVariants: return "N/A"
    # Remove Prefixes
    name = re.sub(r".*\.", "", name)
    # Remove all characters except alphabets
    name = re.sub("[^A-Z ]", "", name)
    # Remove multiple spaces
    name = re.sub(" +", " ", name)
    return "N/A" if is_empty(name) else name.strip()

def cleanPhone(phone):
    phone = phone.upper()
    if phone in nullVariants or len(phone) < 10: return "N/A"
    # Remove everything except digits
    phone = re.sub("\D", "", phone)
    # Take only the last ten digits
    phone = phone[-10:]
    return "N/A" if is_empty(phone) or len(phone) < 10 else phone

def cleanZip(zip):
    zip = zip.upper()
    if zip in nullVariants or len(zip) < 5: return "N/A"
    # Consider only the part before dash
    zip = zip.split("-")[0]
    # Remove everything except digits
    zip = re.sub("\D", "", zip)
    return "N/A" if is_empty(zip) or len(zip) < 5 or len(zip) > 5 else zip

def cleanHouseNumber(houseNumber):
    houseNumber = houseNumber.upper()
    if houseNumber in nullVariants: return "N/A"
    # Replace all characters except digits and dash
    houseNumber = re.sub("[^0-9\-]", "", houseNumber)
    return "N/A" if is_empty(houseNumber) else houseNumber

def cleanNumber(number):
    number = number.upper()
    if number in nullVariants: return "N/A"
    # Remove everything except digits
    number = re.sub("\D", "", number)
    return "N/A" if is_empty(number) else number

def cleanStreet(street):
    if is_empty(street): return "N/A"
    # Standardize the street names
    streetStandFunction = StandardizeUSStreetName(characters='upper', alphanum=True, repeated=False)
    street = ''.join(streetStandFunction.apply([street], threads=None))
    return street

def cleanCity(city, cityLookup):
    return cityLookup.get(city, "N/A")

def cleanCityUtility(sample, columnName):
    refData = RefStore()
    refData\
        .load('encyclopaedia_britannica:us_cities', auto_download=True)\
        .df()\
    # Get reference data
    referenceCityNames = refData.load('encyclopaedia_britannica:us_cities', auto_download=True).distinct('city')
    distinctCityNames = sample.distinct(columnName)
    cityLookup = {}

    # Compare city name to valid city names and add to lookup table if the similarity is high
    for distinctCityName in map(lambda x: x.strip().upper(), distinctCityNames):
        if is_empty(distinctCityName): cityLookup[distinctCityName] = "N/A"
        if cityLookup.get(distinctCityName, None) != None: continue
        if (distinctCityName == "NYC" or distinctCityName == "NY" or distinctCityName == "N.Y." or fuzz.ratio("NY", distinctCityName) > 70 or fuzz.ratio("NYC", distinctCityName) > 70):
            cityLookup[distinctCityName] = "NEW YORK"
            continue
        if (distinctCityName == "BKLYN" or distinctCityName == "BKYN" or fuzz.ratio("BKYN", distinctCityName) > 70):
            cityLookup[distinctCityName] = "BROOKLYN"
            continue
        flag = False
        for referenceCityName in map(lambda x: x.upper(), referenceCityNames):
            matchPercentage = fuzz.ratio(referenceCityName, distinctCityName)
            if matchPercentage > 70:
                flag = True
                cityLookup[distinctCityName] = referenceCityName
                break
        if not flag: cityLookup[distinctCityName] = "N/A"
    return sample.update([columnName], lambda a: cleanCity(a, cityLookup))

def cleanState(state, stateRefData):
    return state if state in stateRefData else "N/A"

def cleanStateUtility(sample, columnName):
    # Get refernce data
    refData = RefStore()
    refData\
        .load('nyc.gov:dof:state_codes', auto_download=True)\
        .df()\
        .head()
    stateRefData = refData.load('nyc.gov:dof:state_codes', auto_download=True).distinct('code')
    return sample.update([columnName], lambda a: cleanState(a, stateRefData))

def cleanOther(other):
    other = other.strip().upper()
    return "N/A" if other in nullVariants or is_empty(other) else other

def clean(sample, columns, sampleProfile):
    # Remove rows that are redundant
    columnsForRowRemoval = []
    rowCutOffMargin = 0.1
    for i in range(len(sampleProfile)):
        count = (sampleProfile[i]["stats"]["emptyValueCount"]/sampleProfile[i]["stats"]["totalValueCount"])
        if count < rowCutOffMargin and count > 0:
            columnsForRowRemoval.append(sampleProfile[i]["column"])
    for column in columnsForRowRemoval:
        sample = sample.delete(IsEmpty(column))

    # Clean the data according to different stratergies
    for columnName in columns:
        # Clean street related columns
        if columnName.upper().find("STREET") != -1:
            sample = sample.update([columnName], lambda a: cleanStreet(a))
        # Clean name related columns
        elif columnName.upper().find("NAME") != -1 and columnName.upper().find("BUSINESS") == -1:
            sample = sample.update([columnName], lambda a: cleanName(a))
        # Clean phone related columns
        elif columnName.upper().find("PHONE") != -1 or columnName.upper().find("MOBILE") != -1 or columnName.upper().find("TELEPHONE") != -1:
            sample = sample.update([columnName], lambda a: cleanPhone(a))
        # Clean zip related columns
        elif columnName.upper().find("PIN") != -1 or columnName.upper().find("POST") != -1 or columnName.upper().find("ZIP") != -1:
            sample = sample.update([columnName], lambda a: cleanZip(a))
        # Clean house number related columns
        elif columnName.upper().find("HOUSE") != -1 and (columnName.upper().find("#") != -1 or columnName.upper().find("NO") != -1 or columnName.upper().find("NUMBER") != -1):
            sample = sample.update([columnName], lambda a: cleanHouseNumber(a))
        # Clean number related columns
        elif columnName.upper().find("#") != -1 or columnName.upper().find("NO") != -1 or columnName.upper().find("NUMBER") != -1:
            sample = sample.update([columnName], lambda a: cleanNumber(a))
        # Clean city related columns
        elif columnName.upper().find("CITY") != -1:
            sample = cleanCityUtility(sample, columnName)
        # Clean state related columns
        elif columnName.upper().find("STATE") != -1 or columnName.upper().find("PROVINCE") != -1:
            sample = cleanStateUtility(sample, columnName)
        # Clean all other columns
        else:
            sample = sample.update([columnName], lambda a: cleanOther(a))
    return sample

# Select Dataset

In [None]:
dataCode = "ic3t-wcy2"
dataSet = Socrata().dataset(dataCode)
dataFile = "drive/MyDrive/"+dataCode+".tsv.gz"
# Download file only if it does not exist already.
if not os.path.isfile(dataFile):
    with gzip.open(dataFile, "wb") as f:
        print("Downloading... ", end="")
        dataSet.write(f)
    print("Done!")

fSize = humanfriendly.format_size(os.stat(dataFile).st_size)
print("Using '{}' in file {} of size {}".format(dataSet.name, dataFile, fSize))

Using 'DOB Job Application Filings' in file drive/MyDrive/ic3t-wcy2.tsv.gz of size 258.19 MB


# Set Data Stream

In [None]:
ds = pipeline.stream(dataFile)
print("The dataset has", ds.count(), "rows")
# ds.head()

The dataset has 1775815 rows


# Select a Sample from the Data

In [None]:
sample = ds.sample(96, 0)

# Profile the Columns

In [None]:
sampleProfile = sample.profile(default_profiler=DefaultColumnProfiler)
sampleProfile.stats()

Unnamed: 0,total,empty,distinct,uniqueness,entropy
Job #,96,0,96,1.000000,6.584963
Doc #,96,0,4,0.041667,0.739876
Borough,96,0,5,0.052083,1.960902
House #,96,0,91,0.947917,6.472932
Street Name,96,0,91,0.947917,6.472932
...,...,...,...,...,...
GIS_LONGITUDE,96,1,95,1.000000,6.569856
GIS_COUNCIL_DISTRICT,96,1,40,0.421053,4.924825
GIS_CENSUS_TRACT,96,1,85,0.894737,6.335491
GIS_NTA_NAME,96,1,61,0.642105,5.661638


# Remove Unnecessary Columns

In [None]:
sample, columns = preprocessing(sample, sampleProfile)

# Clean the Sample

In [None]:
cleanedSample = clean(sample, columns, sampleProfile)

# Creating Dataframe

In [None]:
dirtyDF = sample.to_df()
cleanDF = cleanedSample.to_df()


# Check Functional Dependency (Street, NTA -> BOROUGH)

In [None]:
from openclean.operator.map.violations import fd_violations
from openclean.operator.collector.count import distinct

street_fd = ""
nta_fd = ""
borough_fd = ""

for colName in columns:
  if street_fd == "" and colName.upper().find("STREET")!=-1:
    street_fd = colName
  elif nta_fd == "" and colName.upper().find("GIS_NTA")!=-1 or colName.upper().find("NTA")!=-1:
    nta_fd = colName
  elif borough_fd == "" and colName.upper().find("BOROUGH")!=-1:
    borough_fd = colName

if street_fd != "" and nta_fd != "" and borough_fd != "":
  fd1_violations = fd_violations(cleanDF, [street_fd, nta_fd], [borough_fd])

  print('# of violations for FD(Street, NTA -> BOROUGH) is {}\n'.format(len(fd1_violations)))
  for key, gr in fd1_violations.items():
      print(gr[[street_fd, nta_fd, borough_fd]])

#clean manually according to result
 

# of violations for FD(Street, NTA -> BOROUGH) is 0



# Checking Statistical Outliers in NTA

In [None]:
from collections import Counter

ensemble = Counter()

from openclean.embedding.feature.default import UniqueSetEmbedding
from openclean.profiling.anomalies.sklearn import (
    dbscan,
    isolation_forest,
    local_outlier_factor,
    one_class_svm,
    robust_covariance
)
if nta_fd != "":
  for f in [dbscan, isolation_forest, local_outlier_factor, one_class_svm, robust_covariance]:
      ensemble.update(f(cleanDF, nta_fd, features=UniqueSetEmbedding()))

  prev = 0
  for value, count in ensemble.most_common():
      if count < 3:
          break
      if count < prev:
          print()
      if count != prev:
          print('{}\t{}'.format(count, value))
      else:
          print('\t{}'.format(value))
      prev = count

#clean manually according to result

3	0


  % (self.n_neighbors, n_samples))


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Save the clean Output

In [None]:
if not os.path.exists(dataSet.name): os.makedirs(dataSet.name)
dirtyDF.to_csv("./"+dataSet.name+"/dirtySample.csv")
cleanDF.to_csv("./"+dataSet.name+"/cleanSample.csv")