# Deduplication
For this exercise, you are provided with two datasets and it's up to you to do analysis using your expertise and identify how to deduplicate the data provided.


## Provided Data

### MySQL
Included is a MySQL container which you are able to connect from within Jupyter as well as externally. The provided database is called `doximity`.

The database contains four tables: `cities`, `states`, `countries` and `institutions`. The former three tables are accurate and requires no further manipulation. The `institutions` table is empty and you will be loading the provided CSV data from `data/institutions.csv.gz` into this table.

#### Connecting to MySQL Externally
Connect to the IP and PORT provided by running the following command from Terminal: `docker port infrastructureanalysts_database_1 3306`.

Example:
```shell
mysql -uroot \
    --host $(docker port infrastructureanalyst_database_1 3306 | cut -d ":" -f 1) \
    --port $(docker port infrastructureanalyst_database_1 3306 | cut -d ":" -f 2) \
    doximity
```

### CSV
There is a CSV file under `data` called `institutions.csv.gz`. Import it into the `institutions` table within this notebook. Below are some insights about this dataset:

1. The `source` column indicates whether this data was "Imported" through us, or if it was "User" entered. "User" entered data can't be trusted, consider treating them separately when doing analysis.
2. The `type` column indicates the service provided by these institutions. This varies from a "Residency" program, to "MedicalSchool", to "OtherTraining". Analysis will reveal duplications.
3. Institutions are provided a location, which could be used to find duplicate institutions.


## Expected Results

### Task A
Perform analysis on institutions whose source is "Imported" and provide a deduplicated set of institution name, address, city and country. Each deduplicated institution should show the original institutions that are being merged into the new institution.

> Note: `type` values aren't important for the end result because we care about the institutions themselves, not what services were provided by them.

### Task B
Perform analysis on institutions whose source is "User" and find commonalities for why users are entering duplicate institutions. Compare it to existing data where source is "Imported" as there are very likely duplications between these. Questions to consider:

1. Are there common misspellings?
2. Are there extraneous characters causing duplicated records from user error?
3. Are there abbreviations leading to duplicated records?


## Guidelines

1. Break down the problem into bite-sized pieces and illustrate your analysis in individual cells within the notebook.
2. Help the reviewer better understand your reasoning.
3. Help the reviewer visualze your approach. Show output where possible.
5. If the end result set is too large to display within the notebook, trim the output.
6. If you are struggling with Docker, you may do this problem outside of Docker containers but we will be reviewing and testing within Docker.

Ensure MySQL connector is available to connect to the database.

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = """
    --jars /home/jovyan/work/mysql-connector-java-5.1.43-bin.jar
    --py-files /home/jovyan/work/mysql-connector-java-5.1.43-bin.jar
    pyspark-shell
"""

Initialize Spark Context.

In [2]:
import pyspark
sparkContext = pyspark.SparkContext("local[*]")

Initialize SQL Context.

In [3]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sparkContext)

Read the first 10 countries from the database.

In [4]:
sqlContext.read.format("jdbc").options(
    url=os.environ["DATABASE_URL"],
    driver="com.mysql.jdbc.Driver",
    dbtable="countries",
    user="root"
).load().take(10)

[Row(id=3, abbreviation='AF', name='Afghanistan'),
 Row(id=4, abbreviation='AX', name='Aland Islands'),
 Row(id=5, abbreviation='AL', name='Albania'),
 Row(id=6, abbreviation='DZ', name='Algeria'),
 Row(id=7, abbreviation='AS', name='American Samoa'),
 Row(id=8, abbreviation='AD', name='Andorra'),
 Row(id=9, abbreviation='AO', name='Angola'),
 Row(id=10, abbreviation='AI', name='Anguilla'),
 Row(id=11, abbreviation='AQ', name='Antarctica'),
 Row(id=12, abbreviation='AG', name='Antigua and Barbuda')]

In [14]:
# Read CSV into pyspark dataframe

print('spark version: ', sparkContext.version)

institutions = sqlContext.read.csv('institutions.csv', header=True)

print('\ncolumn types: ', institutions.dtypes)

print('\nnumber of institutions: ', institutions.count())

institutions.show()

spark version:  2.2.0

column types:  [('id', 'string'), ('type', 'string'), ('name', 'string'), ('long_name', 'string'), ('address', 'string'), ('city_id', 'string'), ('country_id', 'string'), ('source', 'string')]

number of institutions:  97802
+---+---------+--------------------+--------------------+--------------------+-------+----------+--------+
| id|     type|                name|           long_name|             address|city_id|country_id|  source|
+---+---------+--------------------+--------------------+--------------------+-------+----------+--------+
|  1|Residency|St Anthony Hospit...|St Anthony Hospit...|                NULL|   NULL|        42|Imported|
|  2|Residency|Janeway Children'...|Janeway Children'...|300 Prince Philli...|   NULL|        42|Imported|
|  3|Residency|Memorial Universi...|Memorial Universi...|    Prince Philip Dr|   NULL|        42|Imported|
|  4|Residency|St Claires Mercy ...|St Claires Mercy ...|  154 Le Merchant Rd|   NULL|        42|Imported|
|  

In [6]:
# Write data frame of institutions to MySQL table

institutions.write.format('jdbc').options(
    url=os.environ["DATABASE_URL"],
    driver='com.mysql.jdbc.Driver',
    dbtable='institutions',
    user='root').mode('overwrite').save()

sqlContext.read.format("jdbc").options(
    url=os.environ["DATABASE_URL"],
    driver="com.mysql.jdbc.Driver",
    dbtable="institutions",
    user="root"
).load().take(10)

[Row(id='1', type='Residency', name='St Anthony Hospital (St Anthony NL)', long_name='St Anthony Hospital (St Anthony NL)', address='NULL', city_id='NULL', country_id='42', source='Imported'),
 Row(id='2', type='Residency', name="Janeway Children's Health and Rehabilitation Centre", long_name="Janeway Children's Health and Rehabilitation Centre", address='300 Prince Phillip Dr', city_id='NULL', country_id='42', source='Imported'),
 Row(id='3', type='Residency', name='Memorial University, Faculty of Medicine', long_name='Memorial University, Faculty of Medicine', address='Prince Philip Dr', city_id='NULL', country_id='42', source='Imported'),
 Row(id='4', type='Residency', name='St Claires Mercy Hospital', long_name='St Claires Mercy Hospital', address='154 Le Merchant Rd', city_id='NULL', country_id='42', source='Imported'),
 Row(id='5', type='Residency', name='St Johns General Hospital', long_name='St Johns General Hospital', address='Waterloo St Box 2100', city_id='NULL', country_id=

In [7]:
# Filter out user-sourced entries
imported_institutions = institutions.filter(institutions.source == "Imported")
                         
print('number of imported institutions: ', imported_institutions.count())

number of imported institutions:  19022


In [15]:
# TASK A - Deduplicate imported records.
# Convert data frame to a pair rdd with key = (name, address, city_code, country_code) and value = [ids].
import itertools

unique_imported_institutions = (imported_institutions
                       .rdd
                       .map(lambda x: ((x[2], x[4], x[5], x[6]), x[0]))
                       .groupByKey()
                       .mapValues(list))

print("number of unique imported insitutions: ", unique_imported_institutions.count(), '\n')
                     
unique_imported_institutions_map = unique_imported_institutions.collectAsMap()

x = itertools.islice(unique_imported_institutions_map.items(), 0, 100)

for key, value in x:
    print(key, ': ', value)

number of unique imported insitutions:  10250 

("Janeway Children's Health and Rehabilitation Centre", '300 Prince Phillip Dr', 'NULL', '42') :  ['2', '10240', '12287']
('Memorial University, Faculty of Medicine', 'Prince Philip Dr', 'NULL', '42') :  ['3', '3846', '12288']
('St Claires Mercy Hospital', '154 Le Merchant Rd', 'NULL', '42') :  ['4', '10241', '12289']
('Centre hospitalier Côte-des-Neiges', '4565 chemin de la Reine Marie', 'NULL', '42') :  ['8', '10242', '12293']
('Champlain Hospital of Verdun', '1350 Av Leclair', 'NULL', '42') :  ['9', '10243', '12294']
('Hopital Saint-Luc du CHUM (Montréal QC)', '1058 rue Saint-Denis', 'NULL', '42') :  ['10', '3853', '12295', '55587']
('Hotel Dieu de Montreal du CHUM', '3840 Rue Saint-Urbain', 'NULL', '42') :  ['13', '10244', '12298']
('Hôtel-Dieu de Québec', '11, côte du Palais', 'NULL', '42') :  ['14', '10245', '12299']
('McGill University Faculty Medicine Hospital', '3655 Drummond St', 'NULL', '42') :  ['15', '2595', '3857', '12300', 

In [9]:
# Filter out imported entries
user_institutions = institutions.filter(institutions.source == "User")
                         
print('number of user-entered institutions: ', user_institutions.count())

number of user-entered institutions:  78734


In [18]:
# TASK B - Deduplicate user-entered records.
# Convert data frame to a pair rdd with key = (name, address, city_code, country_code) and value = [id1, id2, ... idn].
import itertools

unique_user_institutions = (user_institutions
                       .rdd
                       .map(lambda x: ((x[2], x[4], x[5], x[6]), x[0]))
                       .groupByKey()
                       .mapValues(list))

print("number of unique user insitutions: ", unique_user_institutions.count(), '\n')
                     
unique_user_institutions_map = unique_user_institutions.collectAsMap()

x = itertools.islice(unique_user_institutions_map.items(), 0, 100)

for key, value in x:
    print(key, ': ', value)

number of unique user insitutions:  33292 

('University of North Carolina School of Public Health', 'Chapel Hill, North Carolina', '108351', '237') :  ['1362']
('Massachusetts Gen Hosp', 'NULL', 'NULL', 'NULL') :  ['18428', '20840', '21272', '21309', '21326', '21338', '21339', '21362', '21464', '21491', '21532', '21582', '21631', '21715', '21719', '21760', '21833', '21849', '21891', '21917', '21993', '22036', '22118', '22124', '22189', '22215', '22226', '22282', '22283', '22382', '22524', '22525', '22595', '22600', '22601', '22638', '22678', '22679', '22796', '22801', '22814', '22866', '22872', '23049', '23057', '23187', '23189', '23311', '23441']
('Central WA Family Med', 'NULL', 'NULL', 'NULL') :  ['18429', '18430', '18431']
('Medical University of SC', 'NULL', 'NULL', 'NULL') :  ['18472', '18473', '18474', '18475', '18476', '18477', '18478', '18479', '18481', '18482', '18484', '18485', '18486', '19024', '19338', '19416', '19569', '19592', '19594', '19611', '19643', '19754', '19811'

In [19]:
# TASK B (cont)
# The Youngstown State University entry is an example of duplicates in the user-entered data.  The duplicates are
# due to misspellings, variations on name and address fields, and NULL country ids.  I found this example of a
# duplicated insitution and many others like it by quickly scanning sorted views in an EXCEL spreadsheet.
from pyspark.sql.functions import col

youngstown_institutions = user_institutions.filter(col('address').isin(['Youngstown, Ohio', 'Youngstown', 'Youngstown, OH']))
                         
print('number of youngstown institutions: ', youngstown_institutions.count(), '\n')

youngstown_institutions.show(24)

number of youngstown institutions:  24 

+------+-------------+--------------------+--------------------+----------------+-------+----------+------+
|    id|         type|                name|           long_name|         address|city_id|country_id|source|
+------+-------------+--------------------+--------------------+----------------+-------+----------+------+
| 34936|OtherTraining|Youngstown State ...|Youngstown State ...|Youngstown, Ohio|   NULL|       237|  User|
| 39617|OtherTraining|Youngstown State ...|Youngstown State ...|Youngstown, Ohio|   NULL|       237|  User|
| 46179|OtherTraining|Youngstown State ...|Youngstown State ...|  Youngstown, OH|   NULL|       237|  User|
| 56543|OtherTraining|Youngstown State ...|Youngstown State ...|  Youngstown, OH|   NULL|       237|  User|
| 68267|OtherTraining|Youngstown State ...|Youngstown State ...|  Youngstown, OH|   NULL|      NULL|  User|
| 71203|OtherTraining|Youngstown State ...|Youngstown State ...|  Youngstown, OH|   NULL|      

In [20]:
# TASK B (cont)
# Find the intersection of the de-duplicated imported and user-entered institutions.  Print the key for the resulting 
# institutions along with a count of the duplicates for each source type.  It can be seen below, that all but
# two of the common unqiue institutions have NULL address and country fields.

common_unique_institutions = (unique_user_institutions
                              .subtractByKey(unique_user_institutions
                              .subtractByKey(unique_imported_institutions)))
                        
print('number of common unqiue institutions: ', common_unique_institutions.count(), '\n')

common_unique_institutions_map = common_unique_institutions.collectAsMap()

for key, value in common_unique_institutions_map.items():
    print(key, ': ', len(value), '/', len(unique_user_institutions_map[key]))


number of common unqiue institutions:  158 

('New York Eye Ear Infirmary', '310 E 14th St', '102570', '237') :  1 / 1
('Western Michigan University', 'NULL', 'NULL', '237') :  4 / 4
('Lenox Hill Hospital', 'NULL', 'NULL', '237') :  2 / 2
('Cleveland Clinic', 'NULL', 'NULL', '237') :  3 / 3
('Tucson Medical Center', 'NULL', 'NULL', '237') :  1 / 1
("Brigham and Women's Hospital", 'NULL', 'NULL', '237') :  3 / 3
('John H. Stroger, Jr. Hospital of Cook County', 'NULL', 'NULL', '237') :  1 / 1
('Johns Hopkins University School of Medicine', 'NULL', 'NULL', '237') :  9 / 9
('Long Island Jewish Medical Center', 'NULL', 'NULL', '237') :  3 / 3
('Baylor University Medical Center', 'NULL', 'NULL', '237') :  1 / 1
('Henry Ford Hospital', 'NULL', 'NULL', '237') :  7 / 7
('Jackson Memorial Hospital', 'NULL', 'NULL', '237') :  3 / 3
('Albany Medical Center', 'NULL', 'NULL', '237') :  3 / 3
('Yale New Haven Hospital', 'NULL', 'NULL', '237') :  2 / 2
('Pace University', 'NULL', 'NULL', '237') :  12 