In [1]:
from pyspark.sql import SQLContext
import json
import pymongo
import urllib
from bson.code import Code
import pprint

sqlContext = SQLContext(sc)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
18,application_1498539553950_0011,pyspark,idle,Link,Link,✔


SparkContext available as 'sc'.
HiveContext available as 'sqlContext'.


### Part A: Loading of JSON data into MongoDB

In [2]:
# Define Custom Function to Remove dots from keynames, MongoDB doesn't allow keys with dot characters
def remove_dot_key(obj):
    for key in obj.keys():
        new_key = key.replace(".", "")
        if new_key != key:
            obj[new_key] = obj[key]
            del obj[key]
    return obj

In [3]:
# Read JSON data and parse into an array of JSON items
zipsDF = sqlContext.read.json("/tmp/zips.json")
presDF = sqlContext.read.json("/tmp/roam_prescription_based_prediction.jsonl")

# Remove dots from the keynames in JSON since MongoDB requires key name not to contain any dots characters
zipsJSON = [json.loads(item, object_hook = remove_dot_key) for item in zipsDF.toJSON().collect()]
presJSON = [json.loads(item, object_hook = remove_dot_key) for item in presDF.toJSON().collect()]

In [4]:
# Initiate Mongo client
mongohost = "52.169.71.31"
mongouser = "root"
mongopw = "y64RwQJMjsIY"

mongoURI = 'mongodb://' + mongouser + ':' + urllib.quote_plus(mongopw) + '@' + mongohost
client = pymongo.MongoClient(mongoURI)

In [5]:
# Write the data into Zips and Pres collections, drop before inserting to ensure we create new collections
courseworkDB = client.courseworkdb
zips = courseworkDB.zipscollection
zips.drop()
pres = courseworkDB.prescollection
pres.drop()

zipsResults = zips.insert_many(zipsJSON)
presResults = pres.insert_many(presJSON)

#### Schema of Two JSON Collections

In [6]:
def printSchema(obj, indent = ""):
    for key in obj.keys():
        typeStr = type(obj[key]).__name__
        print(indent + key + ": " + typeStr)
        if typeStr == "dict":
            printSchema(obj[key], indent + "\t")

##### Zips Collection

In [7]:
printSchema(zips.find_one())

city: unicode
state: unicode
_id: unicode
pop: int
loc: list

As seen above, the three fields `city`, `state` and `_id` are unicode strings. They represent the city name, state name and zip code respectively.  

The field `pop` is of integer type and the value is the population in the zip code. The field `loc` is a list, it contains two floating number subfields indicating the geographical location (longitude & latitude) of the zip code.

##### Prescription Collection

In [8]:
printSchema(pres.find_one())

_id: ObjectId
npi: unicode
provider_variables: dict
	brand_name_rx_count: int
	gender: unicode
	region: unicode
	settlement_type: unicode
	specialty: unicode
	years_practicing: int
	generic_rx_count: int
cms_prescription_counts: dict
	LABETALOL HCL: int
	LEVOTHYROXINE SODIUM: int
	IRBESARTAN: int
	PREDNISONE: int
	DIOVAN: int
	ATENOLOL-CHLORTHALIDONE: int
	BENAZEPRIL HCL: int
	FUROSEMIDE: int
	FENOFIBRATE: int
	ALPRAZOLAM: int
	AMLODIPINE BESYLATE-BENAZEPRIL: int
	HYDROCODONE-ACETAMINOPHEN: int
	NEXIUM: int
	HYDROCHLOROTHIAZIDE: int
	LOSARTAN POTASSIUM: int
	CALCITRIOL: int
	CLONIDINE HCL: int
	GLYBURIDE: int
	BYSTOLIC: int
	CALCIUM ACETATE: int
	SIMVASTATIN: int
	LANTUS: int
	MINOXIDIL: int
	METOPROLOL SUCCINATE: int
	CARISOPRODOL: int
	METOLAZONE: int
	LOSARTAN-HYDROCHLOROTHIAZIDE: int
	SENSIPAR: int
	OXYCODONE HCL: int
	POTASSIUM CHLORIDE: int
	SPIRONOLACTONE: int
	RANITIDINE HCL: int
	METOPROLOL TARTRATE: int
	ATENOLOL: int
	DOXAZOSIN MESYLATE: int
	MIDODRINE HCL: int
	GABAPENTIN: 

The above is the fields contain in a sample JSON document. The field `npi` represents the National Provider Identifier of the healthcare provider (i.e. doctor).  

There are a number of fields embedded in `provider_variables`:
- `gender`: Unicode string represents the gender of the doctor
- `region`: Unicode string represents the region in the USA where the doctor is practising
- `settlement_type`: Value is either "urban" or "non-urban", indicating the area type where the doctor is located
- `specialty`: Unicode string which indicates the doctor's specialty
- `years_practicing`: An integer represents the doctor's years of practice
- `generic_rx_count`: An integer represents the total number of prescriptions by the doctor

The `cms_prescription_counts` field contains many embedded subfields where the key is named after a drug and the value is the numbef of prescriptions of that drug. The embedded subfields can vary from doctor to doctor since the prescribed medicine varies.

### Part B: Queries for "Zip codes" data

#### 1) Count the total number of cities in Washington state (code: "WA")

Each document in the zips collection represent an area with an unique zipcode. A large city may contain multiple zipcodes hence same city name may exist in multiple documents. We will have to identify the city with distinct city names in Washington.

In [9]:
# Since a large city may contain many zipcodes, distinct() function is used to identify city with distinct names in state "WA"
print "Total number of cities in WA:", len(zips.find( { "state": "WA" } ).distinct("city"))

Total number of cities in WA: 397

#### 2) Find the total population of each state (i.e. sort states by their population in ascending order).

In [10]:
totalPop = zips.aggregate([
        # Sum up population in each state
        { "$group": { "_id": "$state", "total": { "$sum": "$pop" } } }, 
        # Sort by total population in each state in ascending order
        { "$sort": { "total": 1 } }, 
        # Display the field names in easy-to-read fashion
        { "$project": { "State": "$_id", 
                        "Total Population": "$total", 
                        "_id": 0
                      }
         }])

for population in totalPop:
    pprint.pprint(population)

{u'State': u'WY', u'Total Population': 453528}
{u'State': u'AK', u'Total Population': 544698}
{u'State': u'VT', u'Total Population': 562758}
{u'State': u'DC', u'Total Population': 606900}
{u'State': u'ND', u'Total Population': 638272}
{u'State': u'DE', u'Total Population': 666168}
{u'State': u'SD', u'Total Population': 695397}
{u'State': u'MT', u'Total Population': 798948}
{u'State': u'RI', u'Total Population': 1003218}
{u'State': u'ID', u'Total Population': 1006749}
{u'State': u'HI', u'Total Population': 1108229}
{u'State': u'NH', u'Total Population': 1109252}
{u'State': u'NV', u'Total Population': 1201833}
{u'State': u'ME', u'Total Population': 1226648}
{u'State': u'NM', u'Total Population': 1515069}
{u'State': u'NE', u'Total Population': 1578139}
{u'State': u'UT', u'Total Population': 1722850}
{u'State': u'WV', u'Total Population': 1793146}
{u'State': u'AR', u'Total Population': 2350725}
{u'State': u'KS', u'Total Population': 2475285}
{u'State': u'MS', u'Total Population': 2573216}


#### 3) Find the 10 closest cities to WEST BROOKLYN, IL. You might want to use the "$near" operator

In [11]:
# Set the loc field as legacy 2D index in order to use $near operator on it
zips.create_index([ ( "loc", pymongo.GEO2D ) ])

# Store the coordinates of WEST BROOKLYN, IL
wbCoord = zips.find_one( { "city": "WEST BROOKLYN", "state": "IL" } )['loc']

# Find 10 closest cities to WEST BROOKLYN, IL (sorted by distance)
closestCities = zips.find( 
    # Automatically orted by distance to WEST BROOKLYN, IL coordinates (from closest to furthest)
    { "loc": { "$near": wbCoord }, 
      # Exclude WEST BROOKLYN, IL from the results (i.e. retain the city whose name is not WEST BROOKLYN or state is not IL)
      "$or": [ { "city": { "$ne": "WEST BROOKLYN" } }, 
               { "state": { "$ne": "IL" } } ] 
    }, 
    # Only show City Name and Coordinates
    { "city": 1, "loc": 1, "_id": 0 }
).limit(10)
    
# Print the results
for city in closestCities:
    pprint.pprint(city)

{u'city': u'SUBLETTE', u'loc': [-89.235409, 41.633144]}
{u'city': u'COMPTON', u'loc': [-89.087708, 41.684976]}
{u'city': u'ASHTON', u'loc': [-89.2086, 41.864327]}
{u'city': u'AMBOY', u'loc': [-89.34716, 41.704181]}
{u'city': u'FRANKLIN GROVE', u'loc': [-89.317112, 41.857968]}
{u'city': u'MENDOTA', u'loc': [-89.10828, 41.544308]}
{u'city': u'STEWARD', u'loc': [-89.015086, 41.847545]}
{u'city': u'LA MOILLE', u'loc': [-89.297024, 41.537557]}
{u'city': u'LEE', u'loc': [-88.971386, 41.786418]}
{u'city': u'PAW PAW', u'loc': [-88.967377, 41.685228]}

#### 4) * Considering the `region` of each US state, according to this source, find the total population in each of the four regions (West, South, Midwest, and Northeast). Use linking to do it.

In [12]:
# Create new Regions Collection for region and states mapping
regions = courseworkDB.regionscollection

# Populate the collection with region to state mapping data
regionsData = [{ "_id": "Northeast", 
                 "states": [ "CT", "ME", "MA", "NH", "RI", "VT", "NJ", "NY", "PA" ] }, 
               { "_id": "Midwest", 
                 "states": [ "IL", "IN", "MI", "OH", "WI", "IA", "KS", "MN", "MO", "NE", "ND", "SD" ] }, 
               { "_id": "South", 
                 "states": [ "DE", "FL", "GA", "MD", "NC", "SC", "VA", "DC", "WV", "AL", "KY", "MS", "TN", "AR", "LA", "OK", 
                             "TX" ] }, 
               { "_id": "West", 
                 "states": [ "AZ", "CO", "ID", "MT", "NV", "NM", "UT", "WY", "AK", "CA", "HI", "OR", "WA" ] }]

result = regions.insert_many(regionsData)

In [13]:
# Linking Regions Collection with Zips Collection
regionPop = regions.aggregate([ 
        { "$unwind": "$states" }, # Unwind deconstructs the array of states
        # Lookup to join the state in regions collection with zips collection
        { "$lookup": { "from": "zipscollection", 
                       "localField": "states", 
                       "foreignField": "state", 
                       "as": "zipCode" } 
        }, 
        { "$unwind": "$zipCode" }, 
        # Group by region
        { "$group": { "_id": "$_id", 
                      "totalPop": { "$sum": "$zipCode.pop" } 
                    } 
        }, 
        { "$sort": { "totalPop": 1 } }, 
        # Only show region and total population in final output
        { "$project": { "Region": "$_id", 
                        "Total Population": "$totalPop", 
                        "_id": 0
                      } 
        } ])

# Show results, sorted in total population in the region
for population in regionPop:
    pprint.pprint(population)

{u'Region': u'Northeast', u'Total Population': 50807650}
{u'Region': u'West', u'Total Population': 52774790}
{u'Region': u'Midwest', u'Total Population': 59652438}
{u'Region': u'South', u'Total Population': 85173522}

### Queries for "Prescription-based prediction" dataset

#### 5) Count the total number of records

In [14]:
print "Total number of records in prescription collection:", pres.count()

Total number of records in prescription collection: 239930

#### 6) Find the specialty/-ies of all doctors who have prescribed "HALOPERIDOL"

In [15]:
# find the distinct doctor specialties where HALOPERIDOL is prescribed
specialties = pres \
    .find( { "cms_prescription_counts.HALOPERIDOL": { "$exists": True } } ) \
    .distinct("provider_variables.specialty")

# Show the specialties in ascending alphabetical order
for specialty in sorted(specialties):
    print(specialty)

Acute Care
Addiction (Substance Use Disorder)
Addiction Medicine
Addiction Psychiatry
Adolescent Medicine
Adolescent and Children Mental Health
Adult Health
Adult Medicine
Adult Mental Health
Allergy & Immunology
Anatomic Pathology
Anatomic Pathology & Clinical Pathology
Behavioral Neurology & Neuropsychiatry
Cardiovascular Disease
Child & Adolescent Psychiatry
Clinical
Clinical Cardiac Electrophysiology
Clinical Child & Adolescent
Clinical Neurophysiology
Cognitive & Behavioral
Community Health
Critical Care Medicine
Developmental  Behavioral Pediatrics
Diagnostic Neuroimaging
Diagnostic Radiology
Emergency Medical Services
Endocrinology, Diabetes & Metabolism
Endodontics
Family
Family Health
Foot & Ankle Surgery
Forensic Psychiatry
Gastroenterology
General Practice
Geriatric Medicine
Geriatric Psychiatry
Gerontology
Gynecologic Oncology
Gynecology
Hematology
Hematology & Oncology
Hospice and Palliative Medicine
Infectious Disease
Interventional Cardiology
Medical
Medical Oncology
Me

#### 7) Find the total number of patients visited, separately for each region (in one query).  

One drug prescription is assumed as one patient visit. The field "generic_rx_count" provides info on the total prescriptions per doctor. Hence we sum up "generic_rx_count" for each region to get the "total number of patient visits".

In [16]:
# Assuming each prescription is a patient visit, "generic_rx_count" provides info on the total prescriptions per doctor
# Group by region and sum the total prescriptions (i.e. patient visits)
patientVisits = pres.aggregate([
        { "$group": { "_id": "$provider_variables.region", 
                      "total": { "$sum": "$provider_variables.generic_rx_count" } 
                    } 
        }, 
        { "$sort": { "total": 1 } }, 
        { "$project": { "Region": "$_id", 
                        "Total Patient Visits": "$total", 
                        "_id": 0
                      } 
        } ])

# Show the results, in ascending order of total patient visits
for visit in patientVisits:
    pprint.pprint(visit)

{u'Region': u'West', u'Total Patient Visits': 34602228}
{u'Region': u'Midwest', u'Total Patient Visits': 39198695}
{u'Region': u'Northeast', u'Total Patient Visits': 41893296}
{u'Region': u'South', u'Total Patient Visits': 75181384}

#### 8) Find the total amount of prescribed "ATORVASTATIN CALCIUM"

In [17]:
# Group by "None" in order to sum every single "ATORVASTATIN CALCIUM"
totalAtor = pres.aggregate([ 
        { "$group": { "_id": None, 
                      "total": {"$sum": "$cms_prescription_counts.ATORVASTATIN CALCIUM" } 
                    }
        }, 
        { "$project": { "Drug": "ATORVASTATIN CALCIUM", 
                        "Total Prescription": "$total", 
                        "_id": 0
                      }
        } ])

for amount in totalAtor:
    pprint.pprint(amount)

{u'Drug': u'ATORVASTATIN CALCIUM', u'Total Prescription': 4534888}

#### 9) Find the drug that is prescribed the most in "non-urban" areas. (in terms of number of prescriptions, not the total amount).  

Since the value of the "Drug Name" field represents the number of prescriptions for the drug, the following code calculates the sum of these values (for each individual drug prescribed in "non-urban" areas).  

If the intention is to find out the drug that is "prescribed by most doctors" in non-urban areas, simply change the emit into emit(key, 1) in the following code, as displayed in the commented block of codes.

In [18]:
# To find the number of prescriptions for each drug, sum up the values of the respective drug name fields
# Map function to emit all the individual drug prescription as key-value pairs
mapper = Code("""  function () {
                     for (var key in this.cms_prescription_counts) {
                       emit(key, this.cms_prescription_counts[key]);
                     }
                  }
              """)

# If the intention is to get the drug "prescribed by most doctors" in non-urban areas instead, use the following code instead
#mapper = Code("""  function () {
#                     for (var key in this.cms_prescription_counts) {
#                       emit(key, 1);
#                     }
#                  }
#              """)


# Reduce function to sum up all the values with same key (i.e. same drug type)
reducer = Code("""  function (key, values) {
                      return Array.sum(values);
                    }
               """)

# Perform this MapReduce function on prescription collection (in "non-urban" areas), and output to drug collection
pres.map_reduce(mapper, reducer, out = "drugcollection", query = { "provider_variables.settlement_type": "non-urban" } )

# Get number of prescriptions for all drugs
drugs = courseworkDB.drugcollection

# Get the most prescribed drug
mostUsedDrug = drugs.find().sort("value", -1).limit(1)

for drug in mostUsedDrug:
    print "Most prescribed drug in non-urban areas is %s with %d prescriptions" % (drug['_id'], drug['value'])

Most prescribed drug in non-urban areas is LISINOPRIL with 3341340 prescriptions

#### 10) * Considering the region of US states (Query #4) and the region where each prescription is recorded, find the average number of prescriptions per capita in each of the four regions in US. Again, use linking and integrate in the query code.

In [19]:
presPerCapita = regions.aggregate([ 
        # First part of lookup calculates the total population per region (Query #4)
        { "$unwind": "$states" }, 
        { "$lookup": { "from": "zipscollection", 
                       "localField": "states", 
                       "foreignField": "state", 
                       "as": "zipCode" } 
        }, 
        { "$unwind": "$zipCode" }, 
        { "$group": { "_id": "$_id", 
                      "totalPop": { "$sum": "$zipCode.pop" } 
                    } 
        }, 
        # Second lookup calculates the total number of prescriptions (i.e. sum of "generic_rx_count") in each region
        { "$lookup": { "from": "prescollection", 
                       "localField": "_id", 
                       "foreignField": "provider_variables.region", 
                       "as": "prescriptions" }
        }, 
        { "$unwind": "$prescriptions" }, 
        { "$group": { "_id": "$_id", 
                      "totalPop": { "$first": "$totalPop" }, # Use $first operator to retain the calculated region populations
                      "totalPres": { "$sum": "$prescriptions.provider_variables.generic_rx_count" }, 
                    }
        }, 
        # Divide the Total Prescriptions by Total Population to get the average number of prescriptions per capita
        { "$project": { "Region": "$_id", 
                        "Prescriptions Per Capita": { "$divide": [ "$totalPres", "$totalPop" ] }, 
                        "_id": 0
                      }
        }, 
        { "$sort": { "Prescriptions Per Capita": 1 } } ])

# Show the Average Prescriptions per Capita for each region in ascending order (and round to 4 decimal places)
for result in presPerCapita:
    print "Region: %s, Average Prescriptions per Capita: %.4f" % (result['Region'], result['Prescriptions Per Capita'])

Region: West, Average Prescriptions per Capita: 0.6557
Region: Midwest, Average Prescriptions per Capita: 0.6571
Region: Northeast, Average Prescriptions per Capita: 0.8245
Region: South, Average Prescriptions per Capita: 0.8827