In [57]:
import os
import sys
import pyspark.sql.functions

# Preparing the data
## Ingest dictionary and source file
(use dictionary as reference)

In [3]:
datadict = spark.read.option("sep", ",").option("header","true").csv("gs://119-f19-opioidbucket/data_dictionary.csv")

In [4]:
datadict.show()

+--------------------+--------------------+
|          ColumnName|         Description|
+--------------------+--------------------+
|     REPORTER_DEA_NO|Unique id of enti...|
|    REPORTER_BUS_ACT|Type of business ...|
|       REPORTER_NAME|Name of entity re...|
|REPORTER_ADDL_CO_...|Additional compan...|
|   REPORTER_ADDRESS1|Address of entity...|
|   REPORTER_ADDRESS2|Additional addres...|
|       REPORTER_CITY|City of entity re...|
|      REPORTER_STATE|State of entity r...|
|        REPORTER_ZIP|Zip code of entit...|
|     REPORTER_COUNTY|County of entity ...|
|        BUYER_DEA_NO|Unique id of enti...|
|       BUYER_BUS_ACT|Type of business ...|
|          BUYER_NAME|Name of entity re...|
|  BUYER_ADDL_CO_INFO|Additional compan...|
|      BUYER_ADDRESS1|Address of entity...|
|      BUYER_ADDRESS2|Additional addres...|
|          BUYER_CITY|City of entity re...|
|         BUYER_STATE|State of entity r...|
|           BUYER_ZIP|Zip code of entit...|
|        BUYER_COUNTY|County of 

In [None]:
df = spark.read.option("sep", "\t").option("header", "true").option("inferSchema", "true").csv("gs://119-f19-opioidbucket/arcos_all_washpost.tsv")
# this take a few minutes.
df.printSchema()
## ideas for speeding up:
#co-locate compute and the buckets
#more nodes; specialize?

root
 |-- REPORTER_DEA_NO: string (nullable = true)
 |-- REPORTER_BUS_ACT: string (nullable = true)
 |-- REPORTER_NAME: string (nullable = true)
 |-- REPORTER_ADDL_CO_INFO: string (nullable = true)
 |-- REPORTER_ADDRESS1: string (nullable = true)
 |-- REPORTER_ADDRESS2: string (nullable = true)
 |-- REPORTER_CITY: string (nullable = true)
 |-- REPORTER_STATE: string (nullable = true)
 |-- REPORTER_ZIP: integer (nullable = true)
 |-- REPORTER_COUNTY: string (nullable = true)
 |-- BUYER_DEA_NO: string (nullable = true)
 |-- BUYER_BUS_ACT: string (nullable = true)
 |-- BUYER_NAME: string (nullable = true)
 |-- BUYER_ADDL_CO_INFO: string (nullable = true)
 |-- BUYER_ADDRESS1: string (nullable = true)
 |-- BUYER_ADDRESS2: string (nullable = true)
 |-- BUYER_CITY: string (nullable = true)
 |-- BUYER_STATE: string (nullable = true)
 |-- BUYER_ZIP: integer (nullable = true)
 |-- BUYER_COUNTY: string (nullable = true)
 |-- TRANSACTION_CODE: string (nullable = true)
 |-- DRUG_CODE: integer (null

In [6]:
# df.count() # => 178,598,026 records!

## Paring down to State of interest

In [58]:
#parameterized
_state = 'NH'
df1 = df.filter(df.BUYER_STATE == _state)

In [9]:
# df1.count() #757944. This took a WHILE!

In [10]:
df1.head()
df1 = df1.cache()

Row(REPORTER_DEA_NO=u'PB0020139', REPORTER_BUS_ACT=u'DISTRIBUTOR', REPORTER_NAME=u'BURLINGTON DRUG COMPANY', REPORTER_ADDL_CO_INFO=u'null', REPORTER_ADDRESS1=u'91 CATAMOUNT DR', REPORTER_ADDRESS2=u'null', REPORTER_CITY=u'MILTON', REPORTER_STATE=u'VT', REPORTER_ZIP=5468, REPORTER_COUNTY=u'CHITTENDEN', BUYER_DEA_NO=u'AB3017212', BUYER_BUS_ACT=u'RETAIL PHARMACY', BUYER_NAME=u'BANNON PHARMACY INC', BUYER_ADDL_CO_INFO=u'null', BUYER_ADDRESS1=u'109 PLEASANT ST', BUYER_ADDRESS2=u'null', BUYER_CITY=u'CLAREMONT', BUYER_STATE=u'NH', BUYER_ZIP=3743, BUYER_COUNTY=u'SULLIVAN', TRANSACTION_CODE=u'S', DRUG_CODE=9193, NDC_NO=u'53746011805', DRUG_NAME=u'HYDROCODONE', QUANTITY=1.0, UNIT=u'null', ACTION_INDICATOR=u'null', ORDER_FORM_NO=u'null', CORRECTION_NO=u'null', STRENGTH=u'null', TRANSACTION_DATE=9082008, CALC_BASE_WT_IN_GM=2.27025, DOSAGE_UNIT=u'500.0', TRANSACTION_ID=803008893, Product_Name=u'HYDROCODONE.BITARTRATE 7.5MG/APAP 75', Ingredient_Name=u'HYDROCODONE BITARTRATE HEMIPENTAHYDRATE', Measure

In [60]:
### lets get sql with it###
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

df1.createOrReplaceTempView("StateOpioids")

#specifically, we want to look at Coos County, from our initial analysis:
results = sqlContext.sql("SELECT BUYER_NAME, BUYER_ADDL_CO_INFO, BUYER_CITY,\
                         COUNT(TRANSACTION_id) AS REPORT_COUNT,\
                         SUM(DOSAGE_UNIT) AS PILL_SUM \
                         FROM StateOpioids \
                         WHERE 1=1 \
                         AND BUYER_COUNTY = 'COOS'\
                         GROUP BY BUYER_NAME,BUYER_ADDL_CO_INFO, BUYER_CITY") #instantaneous

results.printSchema()
# what I actually want to do: sort and get max value, by column name.

root
 |-- BUYER_NAME: string (nullable = true)
 |-- BUYER_ADDL_CO_INFO: string (nullable = true)
 |-- BUYER_CITY: string (nullable = true)
 |-- REPORT_COUNT: long (nullable = false)
 |-- PILL_SUM: double (nullable = true)



## Results: Pharmacy detail in Coos County, NH
There are 9 pharmacies in the dataset for Coos County.

The top hits might be likely targets for pill diversion investigation.

In [13]:
# this collect step may take a few minutes as well
# this took ~8 minutes with following settings: central, 1000GB master, 4 500GB helper nodes
results.sort(results.PILL_SUM.desc()).show(20,False)

+-------------------------------+-------------------------------+----------+------------+---------+
|BUYER_NAME                     |BUYER_ADDL_CO_INFO             |BUYER_CITY|REPORT_COUNT|PILL_SUM |
+-------------------------------+-------------------------------+----------+------------+---------+
|RITE AID OF NEW HAMPSHIRE, INC.|RITE AID #4138                 |COLEBROOK |3831        |2383380.0|
|RITE AID OF NEW HAMPSHIRE, INC.|RITE AID #4127                 |LANCASTER |3246        |2356640.0|
|WAL-MART PHARMACY 10-2634      |null                           |GORHAM    |5323        |1555000.0|
|MAXI DRUG NORTH, INC.          |RITE AID #10287                |BERLIN    |2492        |997700.0 |
|LAPERLE'S IGA PHARMACY         |null                           |COLEBROOK |1936        |394600.0 |
|RITE AID OF NEW HAMPSHIRE INC  |RITE AID PHARMACY #4157        |GORHAM    |858         |202600.0 |
|RITE AID OF NEW HAMPSHIRE INC  |null                           |BERLIN    |410         |199700.0 |


## Digging in: Coos County
### Potential tool: change in strength and volume by pharmacy over time?
We want to see if the strength and volume by pharmacy over time shows any interesting results:
  in COOS COUNTY

In [29]:
strength_vals = sqlContext.sql(
                      "SELECT \
                          BUYER_DEA_NO, BUYER_NAME, \
                          RIGHT(TRANSACTION_DATE,4) AS YEAR, \
                          SUM(DOSAGE_UNIT) AS TOTAL_PILLS, \
                          FORMAT_NUMBER(AVG(dos_str),2) AS AVG_DOSE \
                       FROM StateOpioids \
                       WHERE BUYER_COUNTY = 'COOS' \
                       GROUP BY BUYER_DEA_NO, BUYER_NAME, YEAR\
                       ORDER BY BUYER_DEA_NO, YEAR DESC") #instantaneous"
    
strength_vals.show(50,False)

+------------+-------------------------------+----+-----------+--------+
|BUYER_DEA_NO|BUYER_NAME                     |YEAR|TOTAL_PILLS|AVG_DOSE|
+------------+-------------------------------+----+-----------+--------+
|BM5180601   |MAXI DRUG NORTH, INC.          |2007|35900.0    |11.91   |
|BM5180601   |MAXI DRUG NORTH, INC.          |2006|81300.0    |12.52   |
|BR3822978   |RITE AID OF NEW HAMPSHIRE INC  |2007|73800.0    |14.56   |
|BR3822978   |RITE AID OF NEW HAMPSHIRE INC  |2006|125900.0   |12.88   |
|BR4157738   |RITE AID OF NEW HAMPSHIRE, INC.|2012|474950.0   |14.61   |
|BR4157738   |RITE AID OF NEW HAMPSHIRE, INC.|2011|494680.0   |13.84   |
|BR4157738   |RITE AID OF NEW HAMPSHIRE, INC.|2010|342250.0   |15.56   |
|BR4157738   |RITE AID OF NEW HAMPSHIRE, INC.|2009|313100.0   |13.32   |
|BR4157738   |RITE AID OF NEW HAMPSHIRE, INC.|2008|259800.0   |11.98   |
|BR4157738   |RITE AID OF NEW HAMPSHIRE, INC.|2007|264300.0   |11.82   |
|BR4157738   |RITE AID OF NEW HAMPSHIRE, INC.|2006|

## Conclusion:
- No clear trends on dosage strength. 
- The increase in pills over time in each pharmacy is obvious.

## Next:
### Deep-dive into who the biggest offenders sourced Opioids from.
There is plenty more to be done here, but we have demonstrated Spark can help with finding the needles in the haystack.

This sort of query could help **journalists or researches determine where to follow up**.

In [25]:
### We've identified DEA NO. BR4157738 and BR4157841 as our high offenders.

### Who did they buy from? What were the changes? These would be questions for research.
sellers = sqlContext.sql(
                        "SELECT \
                            BUYER_NAME, BUYER_DEA_NO, REPORTER_DEA_NO, REPORTER_NAME,\
                            RIGHT(TRANSACTION_DATE,4) AS YEAR, \
                            SUM(DOSAGE_UNIT) AS TOTAL_PILLS \
                         FROM StateOpioids \
                         WHERE BUYER_DEA_NO = 'BR4157738' \
                         GROUP BY BUYER_NAME, BUYER_DEA_NO, REPORTER_DEA_NO, REPORTER_NAME, YEAR\
                         ORDER BY YEAR DESC, TOTAL_PILLS DESC")

sellers.show(50,False)
## we see that RITE AID bought most of its pills from McKesson. Could be useful information.

+-------------------------------+------------+---------------+------------------------+----+-----------+
|BUYER_NAME                     |BUYER_DEA_NO|REPORTER_DEA_NO|REPORTER_NAME           |YEAR|TOTAL_PILLS|
+-------------------------------+------------+---------------+------------------------+----+-----------+
|RITE AID OF NEW HAMPSHIRE, INC.|BR4157738   |PM0020850      |MCKESSON CORPORATION    |2012|331540.0   |
|RITE AID OF NEW HAMPSHIRE, INC.|BR4157738   |RE0356003      |ECKERD CORPORATION      |2012|119510.0   |
|RITE AID OF NEW HAMPSHIRE, INC.|BR4157738   |RA0287020      |ANDA PHARMACEUTICALS INC|2012|23800.0    |
|RITE AID OF NEW HAMPSHIRE, INC.|BR4157738   |RA0180733      |ANDA, INC               |2012|100.0      |
|RITE AID OF NEW HAMPSHIRE, INC.|BR4157738   |PM0020850      |MCKESSON CORPORATION    |2011|384860.0   |
|RITE AID OF NEW HAMPSHIRE, INC.|BR4157738   |RE0356003      |ECKERD CORPORATION      |2011|109820.0   |
|RITE AID OF NEW HAMPSHIRE, INC.|BR4157738   |PM0020850

## Observations so far:
There is no obvious change over time in the average dose strength of the pills; just in overall volume of pills.

Market changes It looks like somes pharmacies either stopped operating or selling opioids after '07-'08, which may have driven up the numbers at the remaining large chain locations (Rite Aids, Walmart). This could have been due to rule changes or regulations. Raw volume still increased statewide, despite fewer pharmacy buyers.



# Next Steps: 
## utilize pyspark/sparkSQL to join large pharmacy and opioid datasets
Note that the pharmacy datasets won't be cut down before SparkSQL gets to them - performance is still very good.

In [45]:
# ingest Arcos pharmacy national dataset - latlon level
dfLatLon = spark.read.option("sep", ",").option("header", "true").option("inferSchema", "true").csv("gs://119-f19-opioidbucket/pharmacies_latlon.csv")
# dfLatLon.printSchema()

dfLatLon.createOrReplaceTempView("PharmLatLon")

# ingest Arcos pharmacy national dataset - tract-level
dfTract = spark.read.option("sep", ",").option("header", "true").option("inferSchema", "true").csv("gs://119-f19-opioidbucket/pharmacies_tracts.csv")
# dfTract.printSchema()

dfTract.createOrReplaceTempView("PharmTract")

In [180]:
## next step: write query to aggregate X at pharmacy level, join in tract and latlon data on DEA ID.
#specifically, we want to look at Coos County, from our initial analysis:
coos_results = sqlContext.sql("SELECT so.BUYER_DEA_NO, pt.GEOID, ll.lat, ll.lon,\
                         SUM(DOSAGE_UNIT) AS PILL_SUM \
                         FROM StateOpioids so\
                         LEFT JOIN PharmTract pt on pt.BUYER_DEA_NO = so.BUYER_DEA_NO \
                         LEFT JOIN PharmLatLon ll on ll.BUYER_DEA_NO = so.BUYER_DEA_NO \
                         WHERE 1=1 \
                         AND so.BUYER_COUNTY = 'COOS'\
                         GROUP BY so.BUYER_DEA_NO, pt.GEOID, pt.TRACTCE, pt.LSAD, ll.lat, ll.lon")

# coos_results.printSchema()

In [None]:
coos_results.show(20,False)

In [42]:
state_results = sqlContext.sql("SELECT so.BUYER_DEA_NO, pt.GEOID, ll.lat, ll.lon,\
                         SUM(DOSAGE_UNIT) AS PILL_SUM \
                         FROM StateOpioids so\
                         INNER JOIN PharmTract pt on pt.BUYER_DEA_NO = so.BUYER_DEA_NO \
                         LEFT JOIN PharmLatLon ll on ll.BUYER_DEA_NO = so.BUYER_DEA_NO \
                         WHERE 1=1 \
                         GROUP BY so.BUYER_DEA_NO, pt.GEOID, pt.TRACTCE, ll.lat, ll.lon")
##NOTE that some pharmacies lack tract record; these tended to be very small pharmacies/included sales of <8k pills over 7 years.

In [55]:
state_results.show(5,False)
state_results.count()

+------------+-----------+----+----------+-----------+---------+
|BUYER_DEA_NO|GEOID      |LSAD|lat       |lon        |PILL_SUM |
+------------+-----------+----+----------+-----------+---------+
|BR1058937   |33011011102|CT  |42.7139059|-71.4418565|1368420.0|
|BT8680438   |33013032500|CT  |43.198989 |-71.562263 |1959660.0|
|BR4157841   |33007950500|CT  |44.4942167|-71.5720511|2356640.0|
|FW3514735   |33009960200|CT  |44.2274551|-71.7481351|4100.0   |
|BS3106502   |33011002500|CT  |42.9577343|-71.4416246|678840.0 |
+------------+-----------+----+----------+-----------+---------+
only showing top 5 rows



354

## Write CSV to bucket for separate (geospatial, etc.) analysis:


In [49]:
# type(state_results)
pDF = state_results.toPandas().to_csv("pharmacyAgg.csv", encoding='utf-8', index=False)

# Question: which county experienced the greatest volume change in pills?
Using PySpark to recreate, verify, and expand on the WaPo Investigative team's API work.

In [181]:
# ingest population data:
popdata = spark.read.option("sep", ",").option("header","true").option("inferSchema", "true").csv("gs://119-f19-opioidbucket/pop_counties_20062012.csv")
# popdata.printSchema()
# popdata.head()
popdata1 = popdata.withColumn("population", popdata.population.cast('int'))
# popdata1.head()

In [182]:
popdata1.createOrReplaceTempView("popdata")
# test = sqlContext.sql("SELECT * FROM popdata")
# test.show(20,False)

In [183]:
county_diffs = sqlContext.sql("with cte1 AS ( \
                               SELECT so.BUYER_COUNTY,\
                                 so.BUYER_STATE,\
                                 SUM(DOSAGE_UNIT) AS TOTAL_PILLS, \
                                 RIGHT(so.TRANSACTION_DATE,4) AS YEAR\
                                 FROM StateOpioids so\
                                 WHERE 1=1 \
                                 GROUP BY so.BUYER_COUNTY, so.BUYER_STATE, YEAR)\
                               SELECT  \
                                 cte1.BUYER_COUNTY,\
                                 MIN(TOTAL_PILLS/population) AS MIN_PER_CAPITA,\
                                 MAX(TOTAL_PILLS/population) AS MAX_PER_CAPITA\
                               FROM cte1\
                               LEFT JOIN popdata pop ON upper(pop.BUYER_COUNTY) = upper(cte1.BUYER_COUNTY) \
                                                        and CAST(pop.year as int) = CAST(cte1.YEAR as int)\
                                                        and upper(pop.BUYER_STATE) = upper(cte1.BUYER_STATE)\
                               GROUP BY cte1.BUYER_COUNTY")

# county_diffs.show(20,False)
pDF2 = county_diffs.toPandas()

## Reproducing results:

In [165]:
pDF2["MAXDIFF"] = pDF2["MAX_PER_CAPITA"] - pDF2["MIN_PER_CAPITA"]
print(pDF2.sort_values(by=['MAXDIFF'], ascending=False))

   BUYER_COUNTY  MIN_PER_CAPITA  MAX_PER_CAPITA    MAXDIFF
0          COOS       25.519016       42.175712  16.656696
2       CARROLL       24.620105       40.703084  16.082979
6       GRAFTON       29.983722       43.882564  13.898842
3     STRAFFORD       28.062278       41.103297  13.041019
8       BELKNAP       25.655436       38.280403  12.624967
5    ROCKINGHAM       24.090553       34.604759  10.514206
4      CHESHIRE       20.036571       30.473733  10.437162
1     MERRIMACK       29.474862       39.737589  10.262727
9  HILLSBOROUGH       19.864957       27.545441   7.680484
7      SULLIVAN       23.403079       28.395483   4.992404


## Answer: Coos County, NH

We have demonstrated that we can compute the maximum change over that seven year period by county very quickly from (mostly) raw data.

Compare against the overall values:


In [194]:
state_diff = sqlContext.sql("with cte1 AS ( \
                               SELECT so.BUYER_STATE,\
                                 SUM(DOSAGE_UNIT) AS TOTAL_PILLS, \
                                 RIGHT(so.TRANSACTION_DATE,4) AS YEAR\
                                 FROM StateOpioids so\
                                 WHERE 1=1 \
                                 GROUP BY so.BUYER_STATE, YEAR)\
                             , cte2 AS (\
                                 SELECT SUM(population) AS POPSUM \
                                       , YEAR\
                                       , BUYER_STATE\
                                    FROM popdata\
                                    GROUP BY YEAR, BUYER_STATE)\
                               SELECT  \
                                 cte1.BUYER_STATE,\
                                 MIN(TOTAL_PILLS/POPSUM) AS MIN_PER_CAPITA,\
                                 MAX(TOTAL_PILLS/POPSUM) AS MAX_PER_CAPITA\
                               FROM cte1\
                               LEFT JOIN cte2 pop ON CAST(pop.year as int) = CAST(cte1.YEAR as int)\
                                                        and upper(pop.BUYER_STATE) = upper(cte1.BUYER_STATE)\
                               GROUP BY cte1.BUYER_STATE")

# state_diff.show(5,False)
pDF2A = state_diff.toPandas()
pDF2A["MAXDIFF"] = pDF2A["MAX_PER_CAPITA"] - pDF2A["MIN_PER_CAPITA"]

pDF2A.head()

Unnamed: 0,BUYER_STATE,MIN_PER_CAPITA,MAX_PER_CAPITA,MAXDIFF
0,NH,24.03082,34.205114,10.174295


## Results:
### Coos County prescription opioid volume increased at a rate ~60% more than statewide!
We have shown a way to quickly compute the county and overall state per capita changes in pill volume 2006-2012 (7 years). Researchers or journalists could use this approach to look deeper beyond the WaPo Investigative team's API.

As our out-of-Spark Social Vulnerability Index analysis will show, this is a public health crisis that intersects with other social factors. Coos County's struggles with the opioid crisis and other public health issues likely were exacerbated and potentially contributed to the relative flood of prescription opioids, in some way.





and now... How powerful is Spark?
## BONUS!
This can be altered to look at the national results! With minimal code change:

In [177]:
df.createOrReplaceTempView("NationalOpioids")

nat_county_diffs = sqlContext.sql("with cte1 AS ( \
                               SELECT so.BUYER_COUNTY,\
                                 so.BUYER_STATE,\
                                 SUM(DOSAGE_UNIT) AS TOTAL_PILLS, \
                                 RIGHT(so.TRANSACTION_DATE,4) AS YEAR\
                                 FROM NationalOpioids so\
                                 WHERE 1=1 \
                                 GROUP BY so.BUYER_COUNTY, so.BUYER_STATE, YEAR)\
                               SELECT  \
                                 cte1.BUYER_COUNTY,\
                                 cte1.BUYER_STATE,\
                                 MIN(TOTAL_PILLS/population) AS MIN_PER_CAPITA,\
                                 MAX(TOTAL_PILLS/population) AS MAX_PER_CAPITA\
                               FROM cte1\
                               LEFT JOIN popdata pop ON upper(pop.BUYER_COUNTY) = upper(cte1.BUYER_COUNTY) \
                                                        and CAST(pop.year as int) = CAST(cte1.YEAR as int)\
                                                        and upper(pop.BUYER_STATE) = upper(cte1.BUYER_STATE)\
                               GROUP BY cte1.BUYER_COUNTY, cte1.BUYER_STATE")

pDF3 = nat_county_diffs.toPandas() #collect step

In [179]:
pDF3["MAXDIFF"] = pDF3["MAX_PER_CAPITA"] - pDF3["MIN_PER_CAPITA"]
print(pDF3.sort_values(by=['MAXDIFF'], ascending=False))

           BUYER_COUNTY BUYER_STATE  MIN_PER_CAPITA  MAX_PER_CAPITA  \
3110        LEAVENWORTH          KS       68.473825      501.605074   
1066         CHARLESTON          SC       83.294254      381.957373   
2439              MINGO          WV      104.245686      364.808194   
1277            KIMBALL          NE       20.425308      204.717711   
142               FLOYD          KY       90.967226      246.946173   
322           TROUSDALE          TN       49.220555      170.108350   
174         NORTON CITY          VA      238.666667      347.527071   
1978  MARTINSVILLE CITY          VA      191.296128      296.840513   
1831         GALAX CITY          VA       95.478994      191.777554   
71                BACON          GA       57.549148      153.588294   
3119            PICKETT          TN       39.822815      133.967413   
2071              PERRY          KY      123.557876      215.552190   
1771             OWSLEY          KY       64.979970      156.717045   
861   

In [None]:
# this is why it's important to dig deeper - looks like the Leavenworth example is due to an anomaly in how the shipping data is calculated:
# https://www.kcur.org/post/leavenworth-county-kansas-may-not-be-catastrophic-opioid-hotspot-new-data-appear-show#stream/0
# /end bonus round!