# Header

In [None]:
import os

## Spark Setup

In [None]:
!apt-get update -q # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.
!pip install -q pyspark==3.1.1

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:5 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [737 kB]
Hit:8 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:10 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 Packages [1,848 kB]
Hit:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:13 http://archive.ubuntu.com

In [None]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName("feature_engineering")
    .config('spark.ui.port', '4050')
    .config('spark.driver.memory', '16g')
    .config('spark.executor.memory', '16g')
    .getOrCreate()
)

## Notebook Configuration

In [None]:
# path to the this notebook
# NOTE: Replace this with your project path if needed
PROJECT_PATH = (
    "/content/drive/My Drive/W210"
    if "google.colab" in str(get_ipython())
    else "."
)

# path to the data folder
# NOTE: Replace this with your data path if needed
DATA_PATH = f"{PROJECT_PATH}/data" if "google.colab" in str(get_ipython()) else PROJECT_PATH
# NOTE: For colab we use content so it doesn"t load on google drive storage
RAW_DATA_PATH = f"{PROJECT_PATH}/data" if "google.colab" in str(get_ipython()) else f"{PROJECT_PATH}/data"

## Colab Setup

In [None]:
if "google.colab" in str(get_ipython()):
    from google.colab import drive
    drive.mount("/content/drive")

    # setup libraries used by notebook
    #os.system("pip install -q kaggle")

os.chdir(PROJECT_PATH)

Mounted at /content/drive


## Library Import

In [None]:
import itertools
import json
import requests
import shutil
import typing
import zipfile

from io import BytesIO
from itertools import chain
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.window import Window

from tqdm import tqdm

---

# Configs

In [None]:
DOSAGE_GROUPS = {
    "AEROSOL": "Inhalable",
    "AUGMENTED": "Topical",
    "BAR": "Oral Solid",
    "BEAD": "Oral Solid",
    "CAPSULE": "Oral Solid",
    "CELLULAR SHEET": "Implantable",
    "CHEWABLE": "Oral Solid",
    "CHEWABLE GEL": "Oral Solid",
    "CHEWING": "Oral Solid",
    "CLOTH": "Topical",
    "COATED": "Oral Solid",
    "COATED PELLETS": "Oral Solid",
    "CONCENTRATE": "Liquid",
    "CREAM": "Topical",
    "CRYSTAL": "Oral Solid",
    "DELAYED RELEASE": "Oral Solid",
    "DELAYED RELEASE PARTICLES": "Oral Solid",
    "DELAYED RELEASE PELLETS": "Oral Solid",
    "DENTIFRICE": "Oral Care",
    "DISC": "Topical",
    "DOUCHE": "Liquid",
    "DRESSING": "Topical",
    "DRUG-ELUTING CONTACT LENS": "Ophthalmic",
    "EFFERVESCENT": "Oral Solid",
    "ELIXIR": "Liquid",
    "EMULSION": "Liquid",
    "ENEMA": "Rectal",
    "EXTENDED RELEASE": "Oral Solid",
    "EXTRACT": "Liquid",
    "FILM": "Topical",
    "FILM COATED": "Oral Solid",
    "FOAM": "Topical",
    "FOR SOLUTION": "Liquid",
    "FOR SUSPENSION": "Liquid",
    "GAS": "Inhalable",
    "GEL": "Topical",
    "GEL FORMING / DROPS": "Ophthalmic",
    "GELATIN COATED": "Oral Solid",
    "GLOBULE": "Oral Solid",
    "GRANULE": "Oral Solid",
    "GUM": "Oral Solid",
    "IMPLANT": "Implantable",
    "IMPLANTABLE": "Implantable",
    "INHALANT": "Inhalable",
    "INJECTABLE": "Injectable",
    "INJECTION": "Injectable",
    "INSERT": "Vaginal",
    "INTRAUTERINE DEVICE": "Implantable",
    "IRRIGANT": "Liquid",
    "JELLY": "Topical",
    "KIT": "Miscellaneous",
    "LINIMENT": "Topical",
    "LIPID COMPLEX": "Injectable",
    "LIPOSOMAL": "Injectable",
    "LIPSTICK": "Oral Care",
    "LIQUID": "Liquid",
    "LIQUID FILLED": "Oral Solid",
    "LOTION": "Topical",
    "LOTION/SHAMPOO": "Topical",
    "LOZENGE": "Oral Solid",
    "LYOPHILIZED": "Injectable",
    "METERED": "Inhalable",
    "MOUTHWASH": "Oral Care",
    "MULTILAYER": "Oral Solid",
    "NOT APPLICABLE": "Miscellaneous",
    "OIL": "Topical",
    "OINTMENT": "Topical",
    "ORALLY DISINTEGRATING": "Oral Solid",
    "PASTE": "Topical",
    "PASTILLE": "Oral Solid",
    "PATCH": "Transdermal",
    "PELLET": "Oral Solid",
    "PELLETS": "Oral Solid",
    "PILL": "Oral Solid",
    "PLASTER": "Topical",
    "POULTICE": "Topical",
    "POWDER": "Oral Solid",
    "RING": "Vaginal",
    "RINSE": "Oral Care",
    "SALVE": "Topical",
    "SHAMPOO": "Topical",
    "SOAP": "Topical",
    "SOLUBLE": "Oral Solid",
    "SOLUTION": "Liquid",
    "SOLUTION/ DROPS": "Ophthalmic",
    "SPONGE": "Topical",
    "SPRAY": "Topical",
    "STICK": "Topical",
    "STRIP": "Oral Solid",
    "SUGAR COATED": "Oral Solid",
    "SUPPOSITORY": "Rectal",
    "SUSPENSION": "Liquid",
    "SUSPENSION/ DROPS": "Ophthalmic",
    "SWAB": "Topical",
    "SYRUP": "Liquid",
    "SYSTEM": "Transdermal",
    "TABLET": "Oral Solid",
    "TABLET WITH SENSOR": "Oral Solid",
    "TAPE": "Topical",
    "TINCTURE": "Liquid",
    "WAFER": "Oral Solid"
}
DOSAGE_GROUPS = {k: "dosage_" + "_".join(v.lower().split()) for k, v in DOSAGE_GROUPS.items()}

ROUTES_GROUPS = {
    "route_auricular_otic": "AuricularOtic",
    "route_buccal": "Mucosal",
    "route_cutaneous": "Topical",
    "route_dental": "Mucosal",
    "route_endotracheal": "Respiratory",
    "route_enteral": "Enteral",
    "route_epidural": "Intraspinal",
    "route_extracorporeal": "Extracorporeal",
    "route_hemodialysis": "Extracorporeal",
    "route_infiltration": "Infiltration",
    "route_intra-arterial": "Intravascular",
    "route_intra-articular": "Intraarticular",
    "route_intrabronchial": "Respiratory",
    "route_intracameral": "Ophthalmic",
    "route_intracanalicular": "Ophthalmic",
    "route_intracardiac": "Intracardiac",
    "route_intracaudal": "Intracaudal",
    "route_intracavernous": "Intracavernous",
    "route_intracavitary": "Intracavitary",
    "route_intracoronary": "Intracardiac",
    "route_intradermal": "Intradermal",
    "route_intralesional": "Intradermal",
    "route_intraluminal": "Intraluminal",
    "route_intralymphatic": "Intralymphatic",
    "route_intramedullary": "Intramedullary",
    "route_intrameningeal": "Intrameningeal",
    "route_intramuscular": "Parenteral",
    "route_intraocular": "Ophthalmic",
    "route_intraperitoneal": "Intraperitoneal",
    "route_intrapleural": "Intrapleural",
    "route_intrasinal": "Intrasinal",
    "route_intraspinal": "Intraspinal",
    "route_intrasynovial": "Intrasynovial",
    "route_intrathecal": "Intraspinal",
    "route_intratympanic": "Intratympanic",
    "route_intrauterine": "Intrauterine",
    "route_intravascular": "Intravascular",
    "route_intravenous": "Intravascular",
    "route_intraventricular": "Intraventricular",
    "route_intravesical": "Intravesical",
    "route_intravitreal": "Ophthalmic",
    "route_iontophoresis": "Topical",
    "route_irrigation": "Topical",
    "route_nasal": "Respiratory",
    "route_ophthalmic": "Ophthalmic",
    "route_oral": "Enteral",
    "route_oropharyngeal": "Mucosal",
    "route_parenteral": "Parenteral",
    "route_percutaneous": "Topical",
    "route_perineural": "Perineural",
    "route_periodontal": "Mucosal",
    "route_rectal": "Enteral",
    "route_respiratory_inhalation": "Respiratory",
    "route_retrobulbar": "Ophthalmic",
    "route_soft_tissue": "Topical",
    "route_subarachnoid": "Intraspinal",
    "route_subconjunctival": "Ophthalmic",
    "route_subcutaneous": "Parenteral",
    "route_subgingival": "Mucosal",
    "route_sublingual": "Mucosal",
    "route_submucosal": "Mucosal",
    "route_suprachoroidal": "Ophthalmic",
    "route_topical": "Topical",
    "route_transdermal": "Topical",
    "route_transmucosal": "Mucosal",
    "route_transtracheal": "Respiratory",
    "route_ureteral": "Ureteral",
    "route_urethral": "Urethral",
    "route_vaginal": "Vaginal",
}
ROUTES_GROUPS = {k: "route_" + "_".join(v.lower().split()) for k, v in ROUTES_GROUPS.items()}

# Data Load

In [None]:
shortages_ts = spark.read.parquet(f"{DATA_PATH}/preprocessed/shortages_monday.parquet")
shortages_ts = shortages_ts.withColumn("date", F.to_utc_timestamp(F.from_unixtime(F.col("date")/1000000000,'yyyy-MM-dd HH:mm:ss'),'GMT'))

In [None]:
ndc_df = spark.read.parquet(f"{DATA_PATH}/preprocessed/comp_ndc.parquet")
packaging_df = spark.read.parquet(f"{DATA_PATH}/preprocessed/comp_packaging.parquet")
manufacturer_df = spark.read.parquet(f"{DATA_PATH}/preprocessed/comp_manufacturer.parquet")
route_df = spark.read.parquet(f"{DATA_PATH}/preprocessed/comp_route.parquet")
recalls_df = spark.read.parquet(f"{DATA_PATH}/preprocessed/recalls.parquet")
inspections_df = spark.read.parquet(f"{DATA_PATH}/preprocessed/inspections.parquet")
compliance_df = spark.read.parquet(f"{DATA_PATH}/preprocessed/compliance.parquet")

import polars as pl

adverse_events = pl.read_parquet(f"{DATA_PATH}/preprocessed/adverse_events.parquet")
adverse_events = adverse_events.group_by("package_ndc", "event_date").agg(pl.n_unique("event_id").alias("events"))
adverse_events = adverse_events.with_columns(event_date=pl.col("event_date").str.to_datetime("%Y%m%d"))
adverse_events = adverse_events.to_pandas().assign(events=lambda f: f["events"].astype("int64"))
adverse_events.to_parquet(f"{DATA_PATH}/preprocessed/adverse_events_agg.parquet", index=False)

In [None]:
adverse_df = spark.read.parquet(f"{DATA_PATH}/preprocessed/adverse_events_agg.parquet")

# Feature Engineering

## Basic Features

### Spine

In [None]:
spine = packaging_df.select("package_ndc", "product_ndc").distinct().cache()

### Dosage Form

In [None]:
# create a dataframe of dosage forms
mapping = F.create_map([F.lit(x) for x in chain(*DOSAGE_GROUPS.items())])
dosage_form = (
    ndc_df.select("product_ndc", "dosage_form")
    .distinct()
    .fillna("unk", subset=["dosage_form"])
    .withColumn("dosage_form", F.split(F.col("dosage_form"), ","))
    .select("product_ndc", F.explode("dosage_form").alias("dosage_form"))
    .withColumn("dosage_form", mapping[F.trim(F.col("dosage_form"))])
    .withColumn("k", F.lit(1))
    .groupBy("product_ndc")
    .pivot("dosage_form")
    .agg(F.max("k"))
)
dosage_form = spine.join(dosage_form, on="product_ndc", how="left").fillna(0).drop("product_ndc").localCheckpoint()

In [None]:
dosage_form.show(5)

+------------+------------------+----------------+-----------------+-------------+--------------------+-----------------+----------------+-----------------+-------------+--------------+------------------+--------------+
| package_ndc|dosage_implantable|dosage_inhalable|dosage_injectable|dosage_liquid|dosage_miscellaneous|dosage_ophthalmic|dosage_oral_care|dosage_oral_solid|dosage_rectal|dosage_topical|dosage_transdermal|dosage_vaginal|
+------------+------------------+----------------+-----------------+-------------+--------------------+-----------------+----------------+-----------------+-------------+--------------+------------------+--------------+
|0002-2214-01|                 0|               0|                1|            1|                   0|                0|               0|                0|            0|             0|                 0|             0|
|0003-3631-12|                 0|               0|                0|            0|                   0|                0

### Marketing Category

In [None]:
# create a flag for marketing category
marketing_category = (
    ndc_df.select("product_ndc", "marketing_category")
    .distinct()
    .withColumn("marketing_category", F.lower(F.trim(F.col("marketing_category"))))
    .fillna("unk", subset=["marketing_category"])
    .withColumn(
        "marketing_category",
        F.concat(
            F.lit("mc_"),
            F.concat_ws("_", F.split(F.col("marketing_category"), " "))
        )
    )
    .withColumn("k", F.lit(1))
    .groupBy("product_ndc")
    .pivot("marketing_category")
    .agg(F.max("k"))
)
marketing_category = spine.join(marketing_category, on="product_ndc", how="left").fillna(0).drop("product_ndc").localCheckpoint()

In [None]:
marketing_category.show(5)

+------------+-------+------+------------------+-----------------------------------------------------+-----------+------------------------------+------------------------------+--------------+------+-------------------------+---------------------+----------------------+--------------------------+-------------------------------------------+------------------------+-------------------------+-------------------------+
| package_ndc|mc_anda|mc_bla|mc_bulk_ingredient|mc_bulk_ingredient_for_human_prescription_compounding|mc_cosmetic|mc_drug_for_further_processing|mc_emergency_use_authorization|mc_export_only|mc_nda|mc_nda_authorized_generic|mc_otc_monograph_drug|mc_otc_monograph_final|mc_otc_monograph_not_final|mc_unapproved_drug_for_use_in_drug_shortage|mc_unapproved_drug_other|mc_unapproved_homeopathic|mc_unapproved_medical_gas|
+------------+-------+------+------------------+-----------------------------------------------------+-----------+------------------------------+-------------------

### Route

In [None]:
# create a flag for route
mapping = F.create_map([F.lit(x) for x in chain(*ROUTES_GROUPS.items())])
route = (
    route_df.select("product_ndc", "route")
    .distinct()
    .withColumn("route", F.lower(F.trim(F.col("route"))))
    .fillna("unk", subset=["route"])
    .withColumn(
        "route",
        F.concat(
            F.lit("route_"),
            F.concat_ws(
                "_",
                F.split(
                    F.regexp_replace(
                        F.regexp_replace(F.col("route"), "\(", ""),
                        "\)",
                        "",
                    ),
                    " "
                )
            )
        )
    )
    .withColumn("route", mapping[F.trim(F.col("route"))])
    .withColumn("k", F.lit(1))
    .groupBy("product_ndc")
    .pivot("route")
    .agg(F.max("k"))
)
route = spine.join(route, on="product_ndc", how="left").fillna(0).drop("product_ndc").localCheckpoint()

In [None]:
route.show(5)

+------------+-------------------+-------------+--------------------+------------------+--------------------+------------------+-----------------+--------------------+-------------------+-----------------+------------------+--------------------+--------------------+--------------------+---------------------+------------------+----------------+-----------------+-------------------+-------------------+------------------+-------------------+----------------------+------------------+-------------+----------------+----------------+----------------+-----------------+-------------+--------------+--------------+-------------+
| package_ndc|route_auricularotic|route_enteral|route_extracorporeal|route_infiltration|route_intraarticular|route_intracardiac|route_intracaudal|route_intracavernous|route_intracavitary|route_intradermal|route_intraluminal|route_intralymphatic|route_intramedullary|route_intrameningeal|route_intraperitoneal|route_intrapleural|route_intrasinal|route_intraspinal|route_intra

## Marketing Features

### Age


In [None]:
age = (
    shortages_ts.join(
        packaging_df.select("package_ndc", "marketing_start_date")
        .withColumn("start_date", F.to_date(F.col("marketing_start_date"), "yyyyMMdd"))
        .withColumn("start_date", F.when(F.col("start_date").isNull(), F.to_date(F.col("marketing_start_date"), "dd-MMM-yy")).otherwise(F.col("start_date")))
        .groupBy("package_ndc")
        .agg(F.min("start_date").alias("start_date")),
        on="package_ndc",
        how="left"
    )
    .withColumn("date", F.to_date(F.col("date")))
    .withColumn("age", F.datediff(F.col("date"), F.col("start_date")))
    .drop("ndc", "start_date")
    .localCheckpoint()
)

In [None]:
age.show(5)

+------------+----------+------------------+----+
| package_ndc|      date|shortage_indicator| age|
+------------+----------+------------------+----+
|0002-2129-00|2021-11-22|                 0|2772|
|0002-2129-00|2021-11-29|                 0|2779|
|0002-2129-00|2021-12-06|                 0|2786|
|0002-2129-00|2021-12-13|                 0|2793|
|0002-2129-00|2021-12-20|                 0|2800|
+------------+----------+------------------+----+
only showing top 5 rows



### Days to Expiration

days_to_expiration = (
    shortages_ts.join(
        packaging_df.select("package_ndc", "marketing_end_date")
        .withColumn("end_date", F.to_date(F.col("marketing_end_date"), "yyyyMMdd"))
        .withColumn("end_date", F.when(F.col("end_date").isNull(), F.to_date(F.col("marketing_end_date"), "dd-MMM-yy")).otherwise(F.col("end_date")))
        .groupBy("package_ndc")
        .agg(F.max("end_date").alias("end_date")),
        on="package_ndc",
        how="left"
    )
    .withColumn("date", F.to_date(F.col("date")))
    .withColumn("age", F.datediff(F.col("end_date"), F.col("date")))
    .drop("ndc", "end_date")
    .localCheckpoint()
)

## Event Features

### Recalls

In [None]:
recalls_ts = (
    shortages_ts.alias("a")
    .join(
        recalls_df.withColumn("recall_date", F.to_utc_timestamp(F.from_unixtime(F.col("recall_date")/1000000000,'yyyy-MM-dd HH:mm:ss'),'GMT'))
        .alias("b"),
        on=(F.col("a.package_ndc") == F.col("b.package_ndc"))
        & (F.col("a.date") >= F.col("b.recall_date"))
        & (F.months_between(F.col("a.date"), F.col("b.recall_date")) <= 6)
        & (F.months_between(F.col("a.date"), F.col("b.recall_date")) >= 0),
        how="left"
    )
    .groupBy("a.package_ndc", "a.date")
    .agg(F.count("b.event_id").alias("recalls"))
    .localCheckpoint()
)

In [None]:
recalls_ts.show(5)

+------------+-------------------+-------+
| package_ndc|               date|recalls|
+------------+-------------------+-------+
|0002-0096-00|2021-11-22 00:00:00|      0|
|0002-0096-00|2022-06-27 00:00:00|      0|
|0002-0485-04|2022-03-28 00:00:00|      0|
|0002-1076-00|2022-01-10 00:00:00|      0|
|0002-1200-50|2022-02-14 00:00:00|      0|
+------------+-------------------+-------+
only showing top 5 rows



### Inspections

In [None]:
inspections_ts = (
    shortages_ts.alias("x")
    .join(
        inspections_df.alias("a")
        .withColumn(
            "inspection_date",
            F.to_utc_timestamp(F.from_unixtime(F.col("inspection_date")/1000000000,'yyyy-MM-dd HH:mm:ss'),'GMT')
        )
        .join(
            ndc_df.select("product_ndc", "labeler_name").distinct()
            .alias("b"),
            on=F.col("a.manufacturer") == F.col("b.labeler_name"),
            how="left"
        )
        .join(spine, on="product_ndc", how="inner")
        .alias("y"),
        on=(F.col("x.package_ndc") == F.col("y.package_ndc"))
        & (F.col("x.date") >= F.col("y.inspection_date"))
        & (F.months_between(F.col("x.date"), F.col("y.inspection_date")) <= 6)
        & (F.months_between(F.col("x.date"), F.col("y.inspection_date")) >= 0),
        how="left"
    )
    .groupBy("x.package_ndc", "x.date")
    .agg(F.count("y.inspection_id").alias("inspections"), F.max("y.in_vai").alias("in_vai"), F.max("y.in_oai").alias("in_oai"))
    .fillna(0)
    .localCheckpoint()
)

In [None]:
inspections_ts.show(5)

+------------+-------------------+-----------+------+------+
| package_ndc|               date|inspections|in_vai|in_oai|
+------------+-------------------+-----------+------+------+
|0002-2129-00|2021-11-22 00:00:00|          0|     0|     0|
|0002-2129-00|2021-11-29 00:00:00|          0|     0|     0|
|0002-2129-00|2021-12-06 00:00:00|          0|     0|     0|
|0002-2129-00|2021-12-13 00:00:00|          0|     0|     0|
|0002-2129-00|2021-12-20 00:00:00|          0|     0|     0|
+------------+-------------------+-----------+------+------+
only showing top 5 rows



### Compliance

In [None]:
compliance_ts = (
    shortages_ts.alias("x")
    .join(
        compliance_df.alias("a")
        .withColumn(
            "case_date",
            F.to_utc_timestamp(F.from_unixtime(F.col("case_date")/1000000000,'yyyy-MM-dd HH:mm:ss'),'GMT')
        )
        .join(
            ndc_df.select("product_ndc", "labeler_name").distinct()
            .alias("b"),
            on=F.col("a.manufacturer") == F.col("b.labeler_name"),
            how="left"
        )
        .join(spine, on="product_ndc", how="inner")
        .alias("y"),
        on=(F.col("x.package_ndc") == F.col("y.package_ndc"))
        & (F.col("x.date") >= F.col("y.case_date"))
        & (F.months_between(F.col("x.date"), F.col("y.case_date")) <= 6)
        & (F.months_between(F.col("x.date"), F.col("y.case_date")) >= 0),
        how="left"
    )
    .groupBy("x.package_ndc", "x.date")
    .agg(F.count("y.case_id").alias("compliances"), F.max("y.in_injunction").alias("in_injunction"), F.max("y.in_seizure").alias("in_seizure"))
    .fillna(0)
    .localCheckpoint()
)

In [None]:
compliance_ts.show(5)

+------------+-------------------+-----------+-------------+----------+
| package_ndc|               date|compliances|in_injunction|in_seizure|
+------------+-------------------+-----------+-------------+----------+
|0002-2129-00|2021-11-22 00:00:00|          0|            0|         0|
|0002-2129-00|2021-11-29 00:00:00|          0|            0|         0|
|0002-2129-00|2021-12-06 00:00:00|          0|            0|         0|
|0002-2129-00|2021-12-13 00:00:00|          0|            0|         0|
|0002-2129-00|2021-12-20 00:00:00|          0|            0|         0|
+------------+-------------------+-----------+-------------+----------+
only showing top 5 rows



### Adverse Events

In [None]:
adverse_ts = (
    shortages_ts.alias("a")
    .join(
        adverse_df.withColumn("event_date", F.to_utc_timestamp(F.from_unixtime(F.col("event_date")/1000000000,'yyyy-MM-dd HH:mm:ss'),'GMT'))
        .alias("b"),
        on=(F.col("a.package_ndc") == F.col("b.package_ndc"))
        & (F.col("a.date") >= F.col("b.event_date"))
        & (F.months_between(F.col("a.date"), F.col("b.event_date")) <= 6)
        & (F.months_between(F.col("a.date"), F.col("b.event_date")) >= 0),
        how="left"
    )
    .groupBy("a.package_ndc", "a.date")
    .agg(F.count("b.event_date").alias("adverse_events"))
    .localCheckpoint()
)

In [None]:
adverse_ts.show(5)

+------------+-------------------+--------------+
| package_ndc|               date|adverse_events|
+------------+-------------------+--------------+
|0002-2129-00|2021-11-22 00:00:00|             0|
|0002-2129-00|2021-11-29 00:00:00|             0|
|0002-2129-00|2021-12-06 00:00:00|             0|
|0002-2129-00|2021-12-13 00:00:00|             0|
|0002-2129-00|2021-12-20 00:00:00|             0|
+------------+-------------------+--------------+
only showing top 5 rows



## Labeler Features

In [None]:
labeler_df = (
    shortages_ts.withColumn("labeler_code", F.split(F.col("package_ndc"), "-", 3).getItem(0))
    .withColumn("in_labeler_shortage", F.max("shortage_indicator").over(Window.partitionBy("package_ndc", "date")))
    .withColumn("drugs_shortage", F.sum("shortage_indicator").over(Window.partitionBy("package_ndc", "date")))
    .withColumn("drugs_total", F.count("shortage_indicator").over(Window.partitionBy("package_ndc", "date")))
    .withColumn("pct_shortage", F.col("drugs_shortage") / F.col("drugs_total"))
    .select("package_ndc", "date", "in_labeler_shortage", "pct_shortage")
    .localCheckpoint()
)

In [None]:
labeler_df.show(5)

+------------+-------------------+-------------------+------------+
| package_ndc|               date|in_labeler_shortage|pct_shortage|
+------------+-------------------+-------------------+------------+
|0002-0013-03|2023-10-23 00:00:00|                  0|         0.0|
|0002-0095-00|2023-08-21 00:00:00|                  0|         0.0|
|0002-0096-00|2021-11-22 00:00:00|                  0|         0.0|
|0002-0096-00|2022-06-27 00:00:00|                  0|         0.0|
|0002-0113-03|2023-09-18 00:00:00|                  0|         0.0|
+------------+-------------------+-------------------+------------+
only showing top 5 rows



## Combine Data

In [None]:
basic_df = (
    age.join(dosage_form, on="package_ndc", how="left")
    .join(marketing_category, on="package_ndc", how="left")
    .join(route, on="package_ndc", how="left")
    .join(recalls_ts, on=["package_ndc", "date"], how="left")
    .join(inspections_ts, on=["package_ndc", "date"], how="left")
    .join(compliance_ts, on=["package_ndc", "date"], how="left")
    .join(labeler_df, on=["package_ndc", "date"], how="left")
    .join(adverse_ts, on=["package_ndc", "date"], how="left")
    .withColumn("quarter", F.quarter(F.col("date")))
    .withColumn("cd_day", F.date_format(F.col("date"), "yyyyMMdd").cast("bigint"))
    .repartition("cd_day")
    .localCheckpoint()
)

In [None]:
basic_df.show(5)

+------------+----------+------------------+-----+------------------+----------------+-----------------+-------------+--------------------+-----------------+----------------+-----------------+-------------+--------------+------------------+--------------+-------+------+------------------+-----------------------------------------------------+-----------+------------------------------+------------------------------+--------------+------+-------------------------+---------------------+----------------------+--------------------------+-------------------------------------------+------------------------+-------------------------+-------------------------+-------------------+-------------+--------------------+------------------+--------------------+------------------+-----------------+--------------------+-------------------+-----------------+------------------+--------------------+--------------------+--------------------+---------------------+------------------+----------------+-----------

# Export

In [None]:
basic_df.groupBy("cd_day").agg(F.countDistinct("package_ndc")).show(150)

+--------+------------------+
|  cd_day|count(package_ndc)|
+--------+------------------+
|20220926|            259056|
|20230821|            227354|
|20230731|            225270|
|20230626|            222580|
|20220124|            244699|
|20230911|            229160|
|20231120|            234542|
|20230313|            213613|
|20220815|            255768|
|20220808|            254961|
|20231016|            232332|
|20220725|            253346|
|20220919|            258312|
|20230116|            208203|
|20221128|            238737|
|20230508|            218558|
|20231113|            234220|
|20220711|            251762|
|20220509|            254931|
|20220822|            256274|
|20221031|            236653|
|20230522|            219560|
|20230612|            221460|
|20230220|            211724|
|20220905|            257287|
|20220411|            252179|
|20231023|            232886|
|20230529|            220161|
|20230410|            216176|
|20231218|            236215|
|20230501|

In [None]:
basic_df.write.mode("overwrite").parquet(f"{DATA_PATH}/preprocessed/feng_spark.parquet")

basic_df.write.mode("overwrite").partitionBy("cd_day").parquet(f"{DATA_PATH}/preprocessed/feng_spark_2.parquet")

---