In [0]:
%sh
python -m pip install socrata-py
python -m pip install openclean
python -m pip install openclean-geo
python -m pip install geopy

In [1]:
%pyspark
historical_dob_df = spark.read.options(header='true').csv('/user/CS-GY-6513/project_data/data-cityofnewyork-us.bty7-2jhb.csv')
dob_cert_occupancy_df = spark.read.options(header='true').csv('/user/CS-GY-6513/project_data/data-cityofnewyork-us.bs8b-p36w.csv')
housing_litigations_df = spark.read.options(header='true').csv('/user/CS-GY-6513/project_data/data-cityofnewyork-us.59kj-x8nc.csv')
housing_maintenance_code_complaints_df = spark.read.options(header='true').csv('/user/CS-GY-6513/project_data/data-cityofnewyork-us.uwyv-629c.csv')
housing_maintenance_code_violations_df = spark.read.options(header='true').csv('/user/CS-GY-6513/project_data/data-cityofnewyork-us.wvxf-dwi5.csv')
historical_dob_df.createOrReplaceTempView('historical_dob')
dob_cert_occupancy_df.createOrReplaceTempView('dob_cert_occupancy')
housing_litigations_df.createOrReplaceTempView('housing_litigations')
housing_maintenance_code_complaints_df.createOrReplaceTempView('housing_maintenance_code_complaints')
housing_maintenance_code_violations_df.createOrReplaceTempView('housing_maintenance_code_violations')

In [2]:
%pyspark
historical_dob_df.select('BIN', 'Number', 'Street', 'Postcode', 'BOROUGH').show()

In [3]:
%pyspark
historical_dob_df.count()

In [4]:
%pyspark
historical_dob_df.filter(historical_dob_df['Postcode'].isNull()).filter(historical_dob_df['Latitude'].isNull()).count()

In [5]:
%pyspark
historical_dob_distinct_building_df = historical_dob_df.select('BIN', 'Number', 'Street', 'Postcode', 'BOROUGH').distinct()

In [6]:
%pyspark
historical_dob_distinct_building_cnt = historical_dob_distinct_building_df.groupby('BIN').count()

In [7]:
%pyspark
historical_dob_distinct_building_cnt.filter(historical_dob_distinct_building_cnt['count'] > 1).show()

In [8]:
%pyspark
historical_dob_distinct_building_df.filter(historical_dob_distinct_building_df['BIN'] == 1024757).show()

In [9]:
%pyspark
housing_litigations_df.select('LitigationID', 'BuildingID', 'BIN', 'HouseNumber', 'StreetName', 'Zip', 'Boro', 'Latitude', 'Longitude').sample(False, 0.001, 11212).show(n=200)


In [10]:
%pyspark
housing_litigations_boro_df = spark.sql("select LitigationID, BuildingID, BIN, HouseNumber, StreetName, Zip, case when Boro = 1 then 'Manhattan' when Boro = 2 then 'Bronx' when Boro = 3 then 'Brooklyn' when Boro = 4 then 'Queens' when Boro = 5 then 'Staten Island' else Boro end as Borough, Latitude, Longitude from housing_litigations")

In [11]:
%pyspark
housing_litigations_boro_df.filter(housing_litigations_boro_df['Zip'].isNull()).filter(housing_litigations_boro_df['Latitude'].isNull()).count()

In [12]:
%pyspark
housing_litigations_boro_df.select('LitigationID', 'BuildingID', 'BIN', 'HouseNumber', 'StreetName', 'Zip', 'Borough', 'Latitude', 'Longitude').sample(False, 0.001, 21236236).show(n=200)

In [13]:
%pyspark
housing_litigations_boro_df.filter(housing_litigations_boro_df['Zip'] == 10463).show()

In [14]:
%pyspark
housing_maintenance_code_violations_df.filter(housing_maintenance_code_violations_df['Postcode'].isNull()).filter(housing_maintenance_code_violations_df['Latitude'].isNull()).count()

In [15]:
%pyspark
housing_maintenance_code_violations_sample = housing_maintenance_code_violations_df.sample(False, 0.1, 11212)
housing_maintenance_code_violations_sample.count()

In [16]:
%pyspark
housing_maintenance_code_violations_sample.select('ViolationID', 'BuildingID', 'BIN', 'HouseNumber', 'LowHouseNumber', 'HighHouseNumber', 'StreetName', 'Postcode', 'Borough', 'Latitude', 'Longitude')\
.filter(housing_maintenance_code_violations_sample['Postcode'] == 10463)\
.filter(housing_maintenance_code_violations_sample['Borough'] != 'BRONX')\
.show()

In [17]:
%pyspark
housing_maintenance_code_violations_sample.filter(housing_maintenance_code_violations_sample['Postcode'].isNull()).select('ViolationID', 'BuildingID', 'BIN', 'HouseNumber', 'LowHouseNumber', 'HighHouseNumber', 'StreetName', 'Postcode', 'Borough', 'Latitude', 'Longitude').show()

In [18]:
%pyspark
housing_maintenance_code_violations_sample.filter(housing_maintenance_code_violations_df['StreetName'] == '109-55 VAN WYCK EXPRESSWAY').select('ViolationID', 'BuildingID', 'BIN', 'HouseNumber', 'LowHouseNumber', 'HighHouseNumber', 'StreetName', 'Postcode', 'Borough', 'Latitude', 'Longitude').show()

In [19]:
%pyspark
housing_maintenance_code_violations_sample.select('StreetName').distinct().count()

In [20]:
%sh
/usr/bin/hadoop fs -rm -r housing_maintenance_code_violations_sample.csv

In [21]:
%pyspark
housing_maintenance_code_violations_sample.write.option("header", True).csv('./housing_maintenance_code_violations_sample.csv')

In [22]:
%sh
rm housing_maintenance_code_violations_sample.csv


In [23]:
%sh
/usr/bin/hadoop fs -getmerge housing_maintenance_code_violations_sample.csv housing_maintenance_code_violations_sample.csv

In [24]:
%python
from openclean.pipeline import stream
from openclean.function.eval.base import Col
from openclean.function.eval.logic import And
from openclean.function.eval.null import IsNotEmpty, IsEmpty
from openclean.operator.map.violations import fd_violations
from openclean.cluster.key import key_collision
from openclean_geo.address.usstreet import USStreetNameKey

def lat_long_boro_fd(ds, latCol, longCol, boroCol):
    data = ds.select([boroCol, latCol, longCol]).where(And(IsNotEmpty(latCol), IsNotEmpty(longCol), IsNotEmpty(boroCol))).update(boroCol, str.upper)
    df = data\
        .select([boroCol, latCol, longCol])\
        .to_df()

    groups = fd_violations(df, lhs=[latCol, longCol], rhs=boroCol)
    return groups
    
def cleanstreet(street,ds_full): 
    streets = ds_full.select(street).distinct()
    clusters = key_collision(func=USStreetNameKey(), values=streets, minsize=2, threads=4)
    group_mapping = dict()
    def print_cluster(cnumber, cluster):
        for val, count in cluster.items():
            group_mapping[val]=cluster.suggestion()
    clusters.sort(key=lambda c: len(c), reverse=True)
    for i in range(len(clusters)):
        print_cluster(i + 1, clusters[i])
    return ds_full.update(street,lambda x: group_mapping[x] if x in group_mapping else x)

In [25]:
%python
housing_maintenance_code_violations_sample_ds = stream('./housing_maintenance_code_violations_sample.csv')

In [26]:
%python
housing_maintenance_code_violations_sample_ds.where(Col('StreetName') == '27TH STREET').select(['HouseNumber', 'StreetName', 'Postcode']).head()

In [27]:
%python
cleaned = cleanstreet('StreetName', housing_maintenance_code_violations_sample_ds)

In [28]:
%python
cleaned.where(IsEmpty('Postcode')).count()

In [29]:
%python
cleaned.where(IsEmpty('Postcode')).select(['ViolationID', 'BIN', 'HouseNumber', 'StreetName', 'Postcode', 'Borough']).head()

In [30]:
%python
data = cleaned.select(['Borough', 'HouseNumber', 'StreetName', 'Postcode']).where(And(IsNotEmpty('Borough'), IsNotEmpty('HouseNumber'), IsNotEmpty('StreetName'))).update('StreetName', str.upper).update('Borough', str.upper)
df = data\
    .select(['Borough', 'HouseNumber', 'StreetName', 'Postcode'])\
    .to_df()

groups = fd_violations(df, lhs=['Borough', 'HouseNumber', 'StreetName'], rhs='Postcode')


In [31]:
%python
for key in list(groups.keys())[:10]:
    print(groups.values(key=key, columns='Postcode'))
    print('\n')

In [32]:
%python
group_mapping = dict()
for key in groups.keys():
    values = groups.values(key=key, columns='Postcode')
    max=0
    maxValue=""
    for v in values:
        if v is None or len(v) == 0:
            continue
        if max<values[v]:
            max=values[v]
            maxValue=v
    group_mapping[key] = maxValue

In [33]:
%python
for key in list(group_mapping.keys())[:10]:
    print('{} = {}'.format(key, group_mapping[key]))

In [34]:
%python
from openclean.function.eval.domain import Lookup

cleaned2 = cleaned.where(IsEmpty('Postcode')).update('Postcode', Lookup(columns=['Borough', 'HouseNumber', 'StreetName'], mapping=group_mapping, default=Col('Postcode')))

In [35]:
%python
cleaned2.where(IsEmpty('Postcode')).count()

In [36]:
%python
cleaned2.where(IsEmpty('Postcode')).select(['ViolationID', 'BIN', 'HouseNumber', 'StreetName', 'Postcode', 'Borough', 'Latitude', 'Longitude']).head(100)

In [37]:
%python
cleaned.where(And(Col('HouseNumber') == '532', Col('StreetName') == 'EAST 142 STREET', IsNotEmpty('Postcode'))).select(['HouseNumber', 'StreetName', 'Postcode', 'Borough', 'BIN']).head()

In [38]:
%python
cleaned.where(Col('StreetName') == 'VAN WYCK EXPRESSWAY').select(['HouseNumber', 'StreetName', 'Postcode', 'Borough']).head(40)

In [39]:
%pyspark
housing_maintenance_code_violations_df.select('ViolationID', 'BuildingID', 'BIN', 'HouseNumber', 'LowHouseNumber', 'HighHouseNumber', 'StreetName', 'Postcode', 'Borough', 'Latitude', 'Longitude')\
.filter(housing_maintenance_code_violations_df['Postcode'] == 11232)\
.filter(housing_maintenance_code_violations_df['Borough'] != 'BROOKLYN')\
.show()

In [40]:
%pyspark

housing_maintenance_code_violations_df.select('ViolationID', 'BuildingID', 'BIN', 'HouseNumber', 'LowHouseNumber', 'HighHouseNumber', 'StreetName', 'Postcode', 'Borough', 'Latitude', 'Longitude').filter(housing_maintenance_code_violations_df['Longitude'].isNull()).filter(housing_maintenance_code_violations_df['BuildingID'] != 1).show(n=600)

In [41]:
%pyspark
housing_maintenance_code_violations_df.filter(housing_maintenance_code_violations_df['StreetName'] == '72 STREET').select('ViolationID', 'BuildingID', 'BIN', 'HouseNumber', 'LowHouseNumber', 'HighHouseNumber', 'StreetName', 'Postcode', 'Borough', 'Latitude', 'Longitude').show()

In [42]:
%pyspark
housing_maintenance_code_violations_df.filter(housing_maintenance_code_violations_df['BuildingID'] == 1).select('ViolationID', 'BuildingID', 'BIN', 'HouseNumber', 'LowHouseNumber', 'HighHouseNumber', 'StreetName', 'Postcode', 'Borough', 'Latitude', 'Longitude').show()