In [0]:
#initialization cell - user parameters

from pyspark.sql.functions import *
import pandas
import json
from datetime import datetime, timedelta
import boto3 

s3 = boto3.client('s3') 
bucket = "" #fill in bucket name

#input - uncompressed json file from the NDC zip retrieved from https://open.fda.gov/apis/drug/ndc/download/
fda_file_location = f"s3://{bucket}/fda/drug-ndc-0001-of-0001.json"




Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [0]:

fda_df = spark.read.option("inferSchema", "true").option("multiline","true").json(fda_file_location)




Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [0]:

fda_results_df = fda_df.select(explode(fda_df.results).alias("results"))




Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [0]:
#Top metric output

#if not using AWS Athena, substitute the last 2 lines of each block with: this_df.toPandas().to_json('path/to/output/file.json',orient='records')

for product_type in ["HUMAN PRESCRIPTION DRUG","HUMAN OTC DRUG"]:
        
    filename = product_type.replace(" ","_").lower() + "_top_labelers.json"
    this_df = fda_results_df.select("results.product_type",col("results.labeler_name").alias("name")).where(f"results.product_type == '{product_type}'").groupby("product_type","name").count().withColumnRenamed("count","calc").sort(col("count").desc()).limit(10)
    df_as_list = this_df.toPandas().to_dict('records')
    s3.put_object(Body = json.dumps(df_as_list), Bucket = bucket, Key = f"fda/{filename}")
        
    filename = product_type.replace(" ","_").lower() + "_top_ingredients.json"
    ingredients_df = fda_results_df.select("results.product_type","results.marketing_start_date",explode("results.active_ingredients").alias("ingredients"))
    this_df = ingredients_df.select("product_type","ingredients.name").where(f"product_type == '{product_type}'").groupby("product_type","name").count().withColumnRenamed("count","calc").sort(col("count").desc()).limit(10)
    df_as_list = this_df.toPandas().to_dict('records')
    s3.put_object(Body = json.dumps(df_as_list), Bucket = bucket, Key = f"fda/{filename}")
    
    filename = product_type.replace(" ","_").lower() + "_top_pharmclass.json"
    pharmclass_df = fda_results_df.select("results.product_type","results.marketing_start_date",explode("results.pharm_class").alias("pharmclass"))
    this_df = pharmclass_df.select("product_type",col("pharmclass").alias("name")).where(f"product_type == '{product_type}'").groupby("product_type","name").count().withColumnRenamed("count","calc").sort(col("count").desc()).limit(10)
    df_as_list = this_df.toPandas().to_dict('records')
    s3.put_object(Body = json.dumps(df_as_list), Bucket = bucket, Key = f"fda/{filename}")




Progress:   0%|          |elapsed time = 00:00s



In [0]:
#Most changed metric

year_ago_dt = datetime.today() - timedelta(days = 365)
year_ago_str = str(year_ago_dt.year) + str(year_ago_dt.strftime('%m')) + str(year_ago_dt.strftime('%d'))

two_years_ago_dt = datetime.today() - timedelta(days = 730)
two_years_ago_str = str(two_years_ago_dt.year) + str(two_years_ago_dt.strftime('%m')) + str(two_years_ago_dt.strftime('%d'))


for product_type in ["HUMAN PRESCRIPTION DRUG","HUMAN OTC DRUG"]:
        
    filename = product_type.replace(" ","_").lower() + "_mostchange_labelers.json"
    fda_results_df.createOrReplaceTempView("view_fda_results_df")
    spark.sql(f"select results.product_type,results.labeler_name,count(*) as last_year_count FROM view_fda_results_df where results.product_type = '{product_type}' and results.marketing_start_date >= '{two_years_ago_str}' and results.marketing_start_date < '{year_ago_str}' group by results.product_type,results.labeler_name").createOrReplaceTempView("view_last_year_df")   
    spark.sql(f"select results.product_type,results.labeler_name,count(*) as this_year_count FROM view_fda_results_df where results.product_type = '{product_type}' and results.marketing_start_date >= '{year_ago_str}' group by results.product_type,results.labeler_name").createOrReplaceTempView("view_this_year_df") 
    this_df = spark.sql("select last.product_type, last.labeler_name as name, round(((this_year_count-last_year_count)/last_year_count*100),1) as calc from view_last_year_df last join view_this_year_df this on last.product_type = this.product_type and last.labeler_name = this.labeler_name order by round(((this_year_count-last_year_count)/last_year_count*100),1) desc limit 10") 
    df_as_list = this_df.toPandas().to_dict('records')
    s3.put_object(Body = json.dumps(df_as_list), Bucket = bucket, Key = f"fda/{filename}")
    
    filename = product_type.replace(" ","_").lower() + "_mostchange_ingredients.json"
    ingredients_df.createOrReplaceTempView("view_ingredients_df")
    spark.sql(f"select product_type,ingredients.name,count(*) as last_year_count FROM view_ingredients_df where product_type = '{product_type}' and marketing_start_date >= '{two_years_ago_str}' and marketing_start_date < '{year_ago_str}' group by product_type,ingredients.name").createOrReplaceTempView("view_last_year_df")   
    spark.sql(f"select product_type,ingredients.name,count(*) as this_year_count FROM view_ingredients_df where product_type = '{product_type}' and marketing_start_date >= '{year_ago_str}' group by product_type,ingredients.name").createOrReplaceTempView("view_this_year_df") 
    this_df = spark.sql("select last.product_type, last.name, round(((this_year_count-last_year_count)/last_year_count*100),1) as calc from view_last_year_df last join view_this_year_df this on last.product_type = this.product_type and last.name = this.name order by round(((this_year_count-last_year_count)/last_year_count*100),1) desc limit 10") 
    df_as_list = this_df.toPandas().to_dict('records')
    s3.put_object(Body = json.dumps(df_as_list), Bucket = bucket, Key = f"fda/{filename}")

    filename = product_type.replace(" ","_").lower() + "_mostchange_pharmclass.json"
    pharmclass_df.createOrReplaceTempView("view_pharmclass_df")
    spark.sql(f"select product_type,pharmclass,count(*) as last_year_count FROM view_pharmclass_df where product_type = '{product_type}' and marketing_start_date >= '{two_years_ago_str}' and marketing_start_date < '{year_ago_str}' group by product_type,pharmclass").createOrReplaceTempView("view_last_year_df")   
    spark.sql(f"select product_type,pharmclass,count(*) as this_year_count FROM view_pharmclass_df where product_type = '{product_type}' and marketing_start_date >= '{year_ago_str}' group by product_type,pharmclass").createOrReplaceTempView("view_this_year_df") 
    this_df = spark.sql("select last.product_type, last.pharmclass as name, round(((this_year_count-last_year_count)/last_year_count*100),1) as calc from view_last_year_df last join view_this_year_df this on last.product_type = this.product_type and last.pharmclass = this.pharmclass order by round(((this_year_count-last_year_count)/last_year_count*100),1) desc limit 10") 
    df_as_list = this_df.toPandas().to_dict('records')
    s3.put_object(Body = json.dumps(df_as_list), Bucket = bucket, Key = f"fda/{filename}")
        




Progress:   0%|          |elapsed time = 00:00s

