# Importing ARCOS Data with Dask

Last week, we used dask to play with a few datasets to get a feel for how dask works. In order to help us develop code that would run quickly, however, we worked with very small, safe datasets. 

Today, we will continue to work with dask, but this time using much larger datasets. This means that (a) doing things incorrectly may lead to your computer crashing (So save all your open files before you start!), and (b) many of the commands you are being asked run will take several minutes each. 

For familiarity, and so you can see what advantages dask can bring to your workflow, today we'll be working with the DEA ARCOS drug shipment database published by the Washington Post! However, to strike a balance between size and speed, we'll be working with a slightly thinned version that has only the last two years of data, instead of all six.

In [47]:
import pandas as pd
import numpy as np

pd.set_option("mode.copy_on_write", True)

import warnings

warnings.simplefilter(action="ignore", category=FutureWarning)

## Exercise 1

Download the thinned ARCOS data [from this link](https://www.dropbox.com/s/o7nc6yvrwog4ozi/arcos_2011_2012.tsv.zip?dl=0). It should be about 2GB zipped, 25 GB unzipped. 

> I downloaded the file. It's 1.94GB zipped, 25.2GB unzipped.

## Exercise 2

Our goal today is going to be to find the pharmaceutical company that has shipped the most opioids (`MME_Conversion_Factor * CALC_BASE_WT_IN_GM`) in the US.

When working with large datasets, it is good practice to begin by prototyping your code with a subset of your data. So begin by using `pandas` to read in the first 100,000 lines of the ARCOS data and write pandas code to compute the shipments from each shipper (the group that reported the shipment). 

In [48]:
import pandas as pd

# file path to the dataset
file_path = "arcos_2011_2012.tsv"

# Reading in the first 100,000 lines
df = pd.read_csv(file_path, sep="\t", nrows=100000, low_memory=False)

In [49]:
# print the first few rows of the dataframe
df.head()

# Printing every column name in the DataFrame
# for column in df.columns:
#     print(column)

# print the column REPORTER_NAME
# print(df["REPORTER_NAME"])

Unnamed: 0.1,Unnamed: 0,REPORTER_DEA_NO,REPORTER_BUS_ACT,REPORTER_NAME,REPORTER_ADDL_CO_INFO,REPORTER_ADDRESS1,REPORTER_ADDRESS2,REPORTER_CITY,REPORTER_STATE,REPORTER_ZIP,...,Product_Name,Ingredient_Name,Measure,MME_Conversion_Factor,Combined_Labeler_Name,Revised_Company_Name,Reporter_family,dos_str,date,year
0,0,PA0006836,DISTRIBUTOR,ACE SURGICAL SUPPLY CO INC,,1034 PEARL STREET,,BROCKTON,MA,2301,...,HYDROCODONE BIT/ACETA 10MG/500MG USP,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,SpecGx LLC,Mallinckrodt,ACE Surgical Supply Co Inc,10.0,2012-12-26,2012
1,9,PA0021179,DISTRIBUTOR,APOTHECA INC,,1622 N 16TH ST,,PHOENIX,AZ,85006,...,HYDROCODONE BITARTRATE & ACETA 5MG/,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,Apotheca Inc.,Apotheca Inc.,Apotheca Inc,5.0,2012-12-05,2012
2,10,PA0021179,DISTRIBUTOR,APOTHECA INC,,1622 N 16TH ST,,PHOENIX,AZ,85006,...,HYDROCODONE BITARTRATE & ACETA 5MG/,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,Apotheca Inc.,Apotheca Inc.,Apotheca Inc,5.0,2012-07-24,2012
3,16,PA0021179,DISTRIBUTOR,APOTHECA INC,,1622 N 16TH ST,,PHOENIX,AZ,85006,...,HYDROCODONEBITARTRATE & ACETA 7.5MG,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,Apotheca Inc.,Apotheca Inc.,Apotheca Inc,7.5,2012-02-04,2012
4,17,PA0021179,DISTRIBUTOR,APOTHECA INC,,1622 N 16TH ST,,PHOENIX,AZ,85006,...,HYDROCODONE BITARTRATE & ACETA 5MG/,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,Apotheca Inc.,Apotheca Inc.,Apotheca Inc,5.0,2011-11-07,2011


In [50]:
# caclulate the total shipment
df["Total_Shipment"] = df["MME_Conversion_Factor"] * df["CALC_BASE_WT_IN_GM"]

df["Total_Shipment"]

0        0.60540
1        0.45405
2        0.60540
3        1.36215
4        0.60540
          ...   
99995    1.51350
99996    4.03425
99997    0.60540
99998    1.34475
99999    0.60540
Name: Total_Shipment, Length: 100000, dtype: float64

In [51]:
# calculate the total shipment for each shipper
shipper_totals = df.groupby("REPORTER_NAME")["Total_Shipment"].sum()

print("The total shipment for each shipper are:")
shipper_totals

The total shipment for each shipper are:


REPORTER_NAME
ACE SURGICAL SUPPLY CO INC              0.605400
AMERICAN SALES COMPANY               3432.058005
AMERISOURCEBERGEN DRUG CORP         34561.394892
APOTHECA INC                           23.913300
BLOODWORTH WHOLESALE DRUGS           1782.827325
BURLINGTON DRUG COMPANY              3889.490325
CAPITAL WHOLESALE DRUG & CO           188.318175
CARDINAL HEALTH 110, LLC            54352.323711
CESAR CASTILLO INC                      3.027000
DIK DRUG CO                          3278.514405
DISCOUNT DRUG MART                   1272.596205
FRANK W KERR INC                     8730.016283
H D SMITH WHOLESALE DRUG CO          6399.324050
H J HARKINS COMPANY INC               352.393956
HALS MED DENT SUPPLY CO., INC.         18.162000
KAISER FNDTN HEALTH PLAN NW          1159.892040
KAISER FOUNDATION HOSPITALS          3891.329580
KINRAY INC                          28620.315246
KPH HEALTHCARE SERVICES, INC.        1988.890350
LOUISIANA WHOLESALE DRUG CO         14787.765559
MCKESS

## Exercise 3

Now let's turn to dask. Re-write your code for dask, and calculate the total shipments by reporting company. Remember: 

- Activate a conda environment with a clean dask installation.
- Start by spinning up a distributed cluster.
- Dask won't read compressed files, so you have to unzip your ARCOS data. 
- Start your cluster in a cell all by itself since you don't want to keep re-running the "start a cluster" code. 

If you need to review dask basic code, [check here](https://nickeubank.github.io/practicaldatascience_book/notebooks/PDS_not_yet_in_coursera/30_big_data/70_dask.html).

As you run your code, make sure to click on the Dashboard link below where you created your cluster:

![dask_dashboard](images/dask_cluster.png)

Among other things, the bar across the bottom should give you a sense of how long your task will take:

![dask_progress](images/dask_progress.png)

(For context, my computer (which has 10 cores) only took a couple seconds. My computer is fast, but most computers should be done within a couple minutes, tops).


In [52]:
import os

print(f"I have {os.cpu_count()} logical cores.")

I have 8 logical cores.


In [53]:
from dask.distributed import Client

client = Client()
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 56388 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:56388/status,

0,1
Dashboard: http://127.0.0.1:56388/status,Workers: 4
Total threads: 8,Total memory: 8.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:56389,Workers: 4
Dashboard: http://127.0.0.1:56388/status,Total threads: 8
Started: Just now,Total memory: 8.00 GiB

0,1
Comm: tcp://127.0.0.1:56403,Total threads: 2
Dashboard: http://127.0.0.1:56404/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:56392,
Local directory: /var/folders/jd/jy6s5ctx4rs6qsw5t359vzpr0000gn/T/dask-scratch-space/worker-log0e1ac,Local directory: /var/folders/jd/jy6s5ctx4rs6qsw5t359vzpr0000gn/T/dask-scratch-space/worker-log0e1ac

0,1
Comm: tcp://127.0.0.1:56408,Total threads: 2
Dashboard: http://127.0.0.1:56411/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:56394,
Local directory: /var/folders/jd/jy6s5ctx4rs6qsw5t359vzpr0000gn/T/dask-scratch-space/worker-pcvc6hhg,Local directory: /var/folders/jd/jy6s5ctx4rs6qsw5t359vzpr0000gn/T/dask-scratch-space/worker-pcvc6hhg

0,1
Comm: tcp://127.0.0.1:56401,Total threads: 2
Dashboard: http://127.0.0.1:56406/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:56396,
Local directory: /var/folders/jd/jy6s5ctx4rs6qsw5t359vzpr0000gn/T/dask-scratch-space/worker-sd13lz67,Local directory: /var/folders/jd/jy6s5ctx4rs6qsw5t359vzpr0000gn/T/dask-scratch-space/worker-sd13lz67

0,1
Comm: tcp://127.0.0.1:56402,Total threads: 2
Dashboard: http://127.0.0.1:56405/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:56398,
Local directory: /var/folders/jd/jy6s5ctx4rs6qsw5t359vzpr0000gn/T/dask-scratch-space/worker-sicb7rwi,Local directory: /var/folders/jd/jy6s5ctx4rs6qsw5t359vzpr0000gn/T/dask-scratch-space/worker-sicb7rwi


In [54]:
# Load Dask
import dask.dataframe as dd

# Read the data using Dask, specifying only the needed columns
ddf = dd.read_csv(
    file_path,
    sep="\t",
    usecols=["MME_Conversion_Factor", "CALC_BASE_WT_IN_GM", "REPORTER_NAME"],
)

# caclulate the total shipment
ddf["Total_Shipment"] = ddf["MME_Conversion_Factor"] * ddf["CALC_BASE_WT_IN_GM"]

# calculate the total shipment for each shipper
total_shipments_by_company = ddf.groupby("REPORTER_NAME")["Total_Shipment"].sum()

# Compute the result
result = total_shipments_by_company.compute()

print("The first few rows of the total shipment for each shipper are:")
# Display the head of the result
result.head()

The first few rows of the total shipment for each shipper are:


REPORTER_NAME
ACE SURGICAL SUPPLY CO INC     2.784840e+01
AMERICAN SALES COMPANY         1.666851e+05
AMERISOURCEBERGEN DRUG CORP    2.553364e+07
APOTHECA INC                   1.659324e+03
BLOODWORTH WHOLESALE DRUGS     8.370920e+04
Name: Total_Shipment, dtype: float64

## Exercise 4

Now let's calculate, *for each state*, what company shipped the most pills?

Note you will quickly find that you can't sort in dask -- sorting in parallel is *really* tricky! So you'll have to work around that. Do what you need to do on the big dataset first, then compute it all so you get it as a regular pandas dataframe, then finish. 

Does this seem like a situation where a single company is responsible for the opioid epidemic?

In [55]:
# Read the selected columns
ddf = dd.read_csv(
    file_path,
    sep="\t",
    usecols=[
        "MME_Conversion_Factor",
        "CALC_BASE_WT_IN_GM",
        "REPORTER_NAME",
        "BUYER_STATE",
    ],
)

# Calculate total pills shipped
ddf["Total_Shipment"] = ddf["MME_Conversion_Factor"] * ddf["CALC_BASE_WT_IN_GM"]

# Group by State and Reporter, then sum the Total_Pills
grouped = ddf.groupby(["BUYER_STATE", "REPORTER_NAME"]).Total_Shipment.sum()

# Compute to get a Pandas DataFrame
grouped_df = grouped.compute()

# sort to find the company shipped the most for each state
result_df = grouped_df.reset_index().sort_values(
    by=["BUYER_STATE", "Total_Shipment"], ascending=[True, False]
)
max_shipments_per_state = result_df.groupby("BUYER_STATE").first()

# Display the result
max_shipments_per_state

Unnamed: 0_level_0,REPORTER_NAME,Total_Shipment
BUYER_STATE,Unnamed: 1_level_1,Unnamed: 2_level_1
AK,CARDINAL HEALTH,154913.7
AL,MCKESSON CORPORATION,2143844.0
AR,MCKESSON CORPORATION,605374.4
AZ,WALGREEN CO,2312494.0
CA,MCKESSON CORPORATION,5152963.0
CO,WALGREEN CO,1044110.0
CT,CARDINAL HEALTH,1016512.0
DC,CARDINAL HEALTH,132933.4
DE,WALGREEN CO,633937.5
FL,WALGREEN CO,6566340.0


In [56]:
# Count the number of occurrences for each shipper name
reporter_counts = max_shipments_per_state["REPORTER_NAME"].value_counts()

# Display the counts
reporter_counts

REPORTER_NAME
MCKESSON CORPORATION              23
CARDINAL HEALTH                   13
WALGREEN CO                        8
AMERISOURCEBERGEN DRUG CORP        4
MCKESSON DRUG COMPANY              2
AMERISOURCEBERGEN DRUG             1
MORRIS & DICKSON CO                1
CARDINAL HEALTH 110, LLC           1
DROGUERIA BETANCES, LLC            1
CARDINAL HEALTH P.R. 120, INC.     1
Name: count, dtype: int64[pyarrow]

> From the result, we can see that MCKESSON CORPORATION shipped the most pills in 23 US states,territories and districts. It is reasonable to infer that MCKESSON CORPORATION is most responsible for the opioid epidemic.

## Exercise 5 

Now go ahead and try and re-do the chunking you did by hand for your project (with this 2 years of data) -- calculate, for each year, the total morphine equivalents sent to each county in the US. 

In [57]:
# Define required columns
required_columns = [
    "BUYER_COUNTY",
    "BUYER_STATE",
    "TRANSACTION_DATE",
    "MME_Conversion_Factor",
    "CALC_BASE_WT_IN_GM",
]

chunk_size = 10**6
total_morphine_equivalents = []

# Process each chunk
for chunk in pd.read_csv(
    file_path,
    sep="\t",
    usecols=required_columns,
    chunksize=chunk_size,
    low_memory=False,
):
    # Convert TRANSACTION_DATE to year
    chunk["Year"] = pd.to_datetime(chunk["TRANSACTION_DATE"], format="%m%d%Y").dt.year

    # Calculate the morphine equivalent for each record
    chunk["Morphine_Equivalent"] = (
        chunk["MME_Conversion_Factor"] * chunk["CALC_BASE_WT_IN_GM"]
    )

    # Aggregate the total morphine equivalents by county and year
    aggregated = (
        chunk.groupby(["BUYER_COUNTY", "BUYER_STATE", "Year"])["Morphine_Equivalent"]
        .sum()
        .reset_index()
    )
    total_morphine_equivalents.append(aggregated)

# Concatenate the results from all chunks
final_result = pd.concat(total_morphine_equivalents)

# display the first few rows of the result
final_result.head()

Unnamed: 0,BUYER_COUNTY,BUYER_STATE,Year,Morphine_Equivalent
0,ABBEVILLE,SC,2011,81.990675
1,ABBEVILLE,SC,2012,95.01465
2,ACADIA,LA,2011,780.06741
3,ACADIA,LA,2012,618.513375
4,ACCOMACK,VA,2011,222.192124


In [58]:
# Group by county, state, and year to ensure unique entries and sum if appearing multiple times
final_result = (
    final_result.groupby(["BUYER_COUNTY", "BUYER_STATE", "Year"])["Morphine_Equivalent"]
    .sum()
    .reset_index()
)

# Display the first few rows of the result
final_result.head()

Unnamed: 0,BUYER_COUNTY,BUYER_STATE,Year,Morphine_Equivalent
0,ABBEVILLE,SC,2011,4833.266018
1,ABBEVILLE,SC,2012,4835.180167
2,ACADIA,LA,2011,32544.69986
3,ACADIA,LA,2012,27021.224292
4,ACCOMACK,VA,2011,8488.628364


In [60]:
print(
    "For each year, the total morphine equivalents sent to each county in the US are:"
)
# display the final dataset
final_result

For each year, the total morphine equivalents sent to each county in the US are:


Unnamed: 0,BUYER_COUNTY,BUYER_STATE,Year,Morphine_Equivalent
0,ABBEVILLE,SC,2011,4833.266018
1,ABBEVILLE,SC,2012,4835.180167
2,ACADIA,LA,2011,32544.699860
3,ACADIA,LA,2012,27021.224292
4,ACCOMACK,VA,2011,8488.628364
...,...,...,...,...
6168,YUMA,CO,2012,2340.620790
6169,ZAPATA,TX,2011,871.717762
6170,ZAPATA,TX,2012,958.798605
6171,ZAVALA,TX,2011,1178.827725


## Exercise 6

Now, re-write your opioid project's initial opioid import using dask. Each person on your team should create a NEW branch to try this. The person who wrote the initial chunking code can help everyone else understand what they did originally and the data, but everyone should write their own code. 

**WARNING:** You will probably run into a lot of type errors (depending on how the ARCOS data has changed since last year). With real world messy data one of the biggest problems with dask is that it struggles if halfway through dataset it discovers that the column it *thought* was floats contains text. That's why, in the dask reading, [I specified the column type for so many columns](https://nickeubank.github.io/practicaldatascience_book/notebooks/PDS_not_yet_in_coursera/30_big_data/70_dask.html#what-can-dask-do-for-me) as `objects` explicitly. Then, because occasionally there data cleanliness issues, I had to do some converting data types by hand. 

In [61]:
# select certain columns to keep
selected_columns = [
    "BUYER_STATE",
    "BUYER_COUNTY",
    "TRANSACTION_DATE",
    "DOSAGE_UNIT",
    "CALC_BASE_WT_IN_GM",
    "MME",
]

opiod_df = dd.read_csv(
    "arcos_all_washpost.tsv",
    sep="\t",
    usecols=selected_columns,
)

opiod_df["Year"] = dd.to_datetime(
    opiod_df["TRANSACTION_DATE"], format="%Y-%m-%d"
).dt.year

opiod_df = (
    opiod_df.groupby(["BUYER_COUNTY", "BUYER_STATE", "Year"])["MME"].sum().reset_index()
)

result_df = opiod_df.compute()

print("The head of the opioids shipment dataset is: ")
result_df.head()

FileNotFoundError: An error occurred while calling the read_csv method registered to the pandas backend.
Original Message: [Errno 2] No such file or directory: '/Users/tinayiluo/Downloads/arcos_all_washpost.tsv'