# Importing ARCOS Data with Dask

Last week, we used dask to play with a few datasets to get a feel for how dask works. In order to help us develop code that would run quickly, however, we worked with very small, safe datasets. 

Today, we will continue to work with dask, but this time using much larger datasets. This means that (a) doing things incorrectly may lead to your computer crashing (So save all your open files before you start!), and (b) many of the commands you are being asked run will take several minutes each. 

For familiarity, and so you can see what advantages dask can bring to your workflow, today we'll be working with the DEA ARCOS drug shipment database published by the Washington Post! However, to strike a balance between size and speed, we'll be working with a slightly thinned version that has only the last two years of data, instead of all six.

## Exercise 1

Download the thinned ARCOS data [from this link](https://www.dropbox.com/s/o7nc6yvrwog4ozi/arcos_2011_2012.tsv.zip?dl=0). It should be about 2GB zipped, 25 GB unzipped. 

In [1]:
pip install dask

Collecting dask
  Downloading dask-2023.12.0-py3-none-any.whl (1.2 MB)
                                              0.0/1.2 MB ? eta -:--:--
                                              0.0/1.2 MB ? eta -:--:--
                                              0.0/1.2 MB ? eta -:--:--
                                              0.0/1.2 MB ? eta -:--:--
                                              0.0/1.2 MB ? eta -:--:--
     -                                        0.0/1.2 MB 217.9 kB/s eta 0:00:06
     --                                       0.1/1.2 MB 297.7 kB/s eta 0:00:04
     ------------                             0.4/1.2 MB 1.5 MB/s eta 0:00:01
     ---------------------------              0.8/1.2 MB 3.0 MB/s eta 0:00:01
     ---------------------------------------- 1.2/1.2 MB 3.6 MB/s eta 0:00:00
Collecting click>=8.1 (from dask)
  Downloading click-8.1.7-py3-none-any.whl (97 kB)
                                              0.0/97.9 kB ? eta -:--:--
     ------------------

In [4]:
import dask.dataframe as dd

file_path = r"D:\Duke\Fall 23\Data Python\arcos_2011_2012.tsv"

# Use dask to read the data
df = dd.read_csv(file_path, sep="\t")

In [9]:
import pandas as pd

# Read a small sample of the data
sample_df = pd.read_csv(file_path, sep="\t", nrows=100)

# Inspect the inferred data types
print(sample_df.dtypes)

Unnamed: 0                 int64
REPORTER_DEA_NO           object
REPORTER_BUS_ACT          object
REPORTER_NAME             object
REPORTER_ADDL_CO_INFO     object
REPORTER_ADDRESS1         object
REPORTER_ADDRESS2        float64
REPORTER_CITY             object
REPORTER_STATE            object
REPORTER_ZIP               int64
REPORTER_COUNTY           object
BUYER_DEA_NO              object
BUYER_BUS_ACT             object
BUYER_NAME                object
BUYER_ADDL_CO_INFO        object
BUYER_ADDRESS1            object
BUYER_ADDRESS2            object
BUYER_CITY                object
BUYER_STATE               object
BUYER_ZIP                  int64
BUYER_COUNTY              object
TRANSACTION_CODE          object
DRUG_CODE                  int64
NDC_NO                     int64
DRUG_NAME                 object
QUANTITY                 float64
UNIT                     float64
ACTION_INDICATOR         float64
ORDER_FORM_NO            float64
CORRECTION_NO            float64
STRENGTH  

In [10]:
# Read a small sample of the data using Pandas to infer data types
sample_df = pd.read_csv(file_path, sep="\t", nrows=100)
dtype_dict = sample_df.dtypes.to_dict()

# Use Dask to read the entire dataset with inferred data types
df = dd.read_csv(file_path, sep="\t", dtype=dtype_dict)

## Exercise 2

Our goal today is going to be to find the pharmaceutical company that has shipped the most opioids (`MME_Conversion_Factor * CALC_BASE_WT_IN_GM`) in the US.

When working with large datasets, it is good practice to begin by prototyping your code with a subset of your data. So begin by using `pandas` to read in the first 100,000 lines of the ARCOS data and write pandas code to compute the shipments from each shipper (the group that reported the shipment). 

In [11]:
arcos_subset = pd.read_csv(file_path, sep="\t", nrows=100000)

# Check the structure of the data and inspect the first few rows
print(arcos_subset.head())

   Unnamed: 0 REPORTER_DEA_NO REPORTER_BUS_ACT               REPORTER_NAME  \
0           0       PA0006836      DISTRIBUTOR  ACE SURGICAL SUPPLY CO INC   
1           9       PA0021179      DISTRIBUTOR                APOTHECA INC   
2          10       PA0021179      DISTRIBUTOR                APOTHECA INC   
3          16       PA0021179      DISTRIBUTOR                APOTHECA INC   
4          17       PA0021179      DISTRIBUTOR                APOTHECA INC   

  REPORTER_ADDL_CO_INFO  REPORTER_ADDRESS1 REPORTER_ADDRESS2 REPORTER_CITY  \
0                   NaN  1034 PEARL STREET               NaN      BROCKTON   
1                   NaN     1622 N 16TH ST               NaN       PHOENIX   
2                   NaN     1622 N 16TH ST               NaN       PHOENIX   
3                   NaN     1622 N 16TH ST               NaN       PHOENIX   
4                   NaN     1622 N 16TH ST               NaN       PHOENIX   

  REPORTER_STATE  REPORTER_ZIP  ...                          P

  arcos_subset = pd.read_csv(file_path, sep="\t", nrows=100000)


In [12]:
# Compute shipments from each shipper
shipments_by_shipper = (
    arcos_subset.groupby("REPORTER_BUS_ACT")
    .agg({"MME_Conversion_Factor": "sum", "CALC_BASE_WT_IN_GM": "sum"})
    .reset_index()
)

# Create a new column for total shipments (MME * CALC_BASE_WT_IN_GM)
shipments_by_shipper["Total_Shipments"] = (
    shipments_by_shipper["MME_Conversion_Factor"]
    * shipments_by_shipper["CALC_BASE_WT_IN_GM"]
)

# Sort the DataFrame by total shipments in descending order
shipments_by_shipper = shipments_by_shipper.sort_values(
    by="Total_Shipments", ascending=False
)

# Display the result (shipper with the most shipments)
print("Shipper with the most shipments:")
print(shipments_by_shipper.iloc[0])

Shipper with the most shipments:
REPORTER_BUS_ACT                DISTRIBUTOR
MME_Conversion_Factor              126572.5
CALC_BASE_WT_IN_GM            342829.438631
Total_Shipments          43392779121.144394
Name: 0, dtype: object


## Exercise 3

Now let's turn to dask. Re-write your code for dask, and calculate the total shipments by reporting company. Remember: 

- Activate a conda environment with a clean dask installation.
- Start by spinning up a distributed cluster.
- Dask won't read compressed files, so you have to unzip your ARCOS data. 
- Start your cluster in a cell all by itself since you don't want to keep re-running the "start a cluster" code. 

If you need to review dask basic code, [check here](https://nickeubank.github.io/practicaldatascience_book/notebooks/PDS_not_yet_in_coursera/30_big_data/70_dask.html).

As you run your code, make sure to click on the Dashboard link below where you created your cluster:

![dask_dashboard](images/dask_cluster.png)

Among other things, the bar across the bottom should give you a sense of how long your task will take:

![dask_progress](images/dask_progress.png)

(For context, my computer (which has 10 cores) only took a couple seconds. My computer is fast, but most computers should be done within a couple minutes, tops).


## Exercise 4

Now let's calculate, *for each state*, what company shipped the most pills?

Note you will quickly find that you can't sort in dask -- sorting in parallel is *really* tricky! So you'll have to work around that. Do what you need to do on the big dataset first, then compute it all so you get it as a regular pandas dataframe, then finish. 

Does this seem like a situation where a single company is responsible for the opioid epidemic?

## Exercise 5 

Now go ahead and try and re-do the chunking you did by hand for your project (with this 2 years of data) -- calculate, for each year, the total morphine equivalents sent to each county in the US. 

## Exercise 6

Now, re-write your opioid project's initial opioid import using dask. Each person on your team should create a NEW branch to try this. The person who wrote the initial chunking code can help everyone else understand what they did originally and the data, but everyone should write their own code. 

**WARNING:** You will probably run into a lot of type errors (depending on how the ARCOS data has changed since last year). With real world messy data one of the biggest problems with dask is that it struggles if halfway through dataset it discovers that the column it *thought* was floats contains text. That's why, in the dask reading, [I specified the column type for so many columns](https://nickeubank.github.io/practicaldatascience_book/notebooks/PDS_not_yet_in_coursera/30_big_data/70_dask.html#what-can-dask-do-for-me) as `objects` explicitly. Then, because occasionally there data cleanliness issues, I had to do some converting data types by hand. 