In [0]:
# loading sales file from AWS s3

df_load_sales = spark.read.csv("s3://amazon-l0-landing-prod/landing/sales/sales.csv", header=True, inferSchema=True)

df_load_sales.createOrReplaceTempView("df_load_sales")

spark.sql("select * from df_load_sales limit 10").show()


+-----------+-------+---------+----------+----+----+----+--------+----------+---------------+
|Customer ID|Terr ID|Market ID|Product ID| NRx|NBRx| TRx| Revenue| Sale Date|  Customer Type|
+-----------+-------+---------+----------+----+----+----+--------+----------+---------------+
|   ID135600| 134314|     4001|      1002|1500| 196|2181|$43,833 |2025-02-07|        Clinics|
|   ID135601| 134303|     4001|      1000|1194| 237|1797|$48,717 |2024-08-08|        Clinics|
|   ID135602| 134307|     4004|      1000| 618|1457|2162| $8,744 |2025-06-15|      Hospitals|
|   ID135603| 134293|     4004|      1000|1455| 123|1918|$25,321 |2025-07-15|       Pharmacy|
|   ID135604| 134310|     4002|      1003| 130|1304|1458|$13,669 |2024-08-13|        Clinics|
|   ID135605| 134296|     4001|      1003| 619| 717|1625| $7,552 |2024-12-05|      Hospitals|
|   ID135606| 134320|     4002|      1000|1779| 100|2148|$48,563 |2024-08-08|      Hospitals|
|   ID135607| 134323|     4001|      1000|1727| 287|2245|$13

In [0]:
# Restructuring the file

# Renaming headers
df_restructured_sales = df_load_sales.withColumnRenamed("Customer ID", "cust_id").withColumnRenamed("Terr ID", "terr_id").withColumnRenamed("Market ID", "mkt_id").withColumnRenamed("Product ID", "prod_id").withColumnRenamed("Sales Date", "sls_dt").withColumnRenamed("Customer Type", "cust_typ").withColumnRenamed("NRx", "nrx_ct").withColumnRenamed("NBRx", "nbrx_ct").withColumnRenamed("TRx", "trx_ct").withColumnRenamed("Sale Date", "sls_dt")

df_restructured_sales.createOrReplaceTempView("df_restructured_sales")

# Renaming segment values
# Adding timestamp
df_renamed_sales = spark.sql("""
                        select
                        terr_id,
                        cust_id,
                        mkt_id, 
                        prod_id,
                        cust_typ,
                        nrx_ct,
                        nbrx_ct,
                        trx_ct,
                        sls_dt,
                        now() as load_dt
                        from df_restructured_sales
                        """)

df_renamed_sales.createOrReplaceTempView("df_renamed_sales")

df_renamed_sales.show()


+-------+--------+------+-------+---------------+------+-------+------+----------+--------------------+
|terr_id| cust_id|mkt_id|prod_id|       cust_typ|nrx_ct|nbrx_ct|trx_ct|    sls_dt|             load_dt|
+-------+--------+------+-------+---------------+------+-------+------+----------+--------------------+
| 134314|ID135600|  4001|   1002|        Clinics|  1500|    196|  2181|2025-02-07|2025-11-10 04:08:...|
| 134303|ID135601|  4001|   1000|        Clinics|  1194|    237|  1797|2024-08-08|2025-11-10 04:08:...|
| 134307|ID135602|  4004|   1000|      Hospitals|   618|   1457|  2162|2025-06-15|2025-11-10 04:08:...|
| 134293|ID135603|  4004|   1000|       Pharmacy|  1455|    123|  1918|2025-07-15|2025-11-10 04:08:...|
| 134310|ID135604|  4002|   1003|        Clinics|   130|   1304|  1458|2024-08-13|2025-11-10 04:08:...|
| 134296|ID135605|  4001|   1003|      Hospitals|   619|    717|  1625|2024-12-05|2025-11-10 04:08:...|
| 134320|ID135606|  4002|   1000|      Hospitals|  1779|    100|

In [0]:
# loading dimensions from l1 as parquet files

df_load_cust = spark.read.parquet("s3://amazon-l1-staging-prod/staging/customer/")
df_load_cust.createOrReplaceTempView("df_load_cust")

df_load_seg = spark.read.parquet("s3://amazon-l1-staging-prod/staging/segmentation/")
df_load_seg.createOrReplaceTempView("df_load_seg")

df_load_geo = spark.read.parquet("s3://amazon-l1-staging-prod/staging/geography/")
df_load_geo.createOrReplaceTempView("df_load_geo")

df_load_prod = spark.read.parquet("s3://amazon-l1-staging-prod/staging/product/")
df_load_prod.createOrReplaceTempView("df_load_prod")

df_load_seg.show()



+--------+-------+--------------------+
| cust_id| seg_nm|             load_dt|
+--------+-------+--------------------+
|ID135600|   High|2025-11-10 02:15:...|
|ID135601| Medium|2025-11-10 02:15:...|
|ID135602|    Low|2025-11-10 02:15:...|
|ID135603|    Low|2025-11-10 02:15:...|
|ID135604|    Low|2025-11-10 02:15:...|
|ID135605|    Low|2025-11-10 02:15:...|
|ID135606| Medium|2025-11-10 02:15:...|
|ID135607|   High|2025-11-10 02:15:...|
|ID135608|   High|2025-11-10 02:15:...|
|ID135609|Unknown|2025-11-10 02:15:...|
|ID135610|Unknown|2025-11-10 02:15:...|
|ID135611|Unknown|2025-11-10 02:15:...|
|ID135612| Medium|2025-11-10 02:15:...|
|ID135613|    Low|2025-11-10 02:15:...|
|ID135614|    Low|2025-11-10 02:15:...|
|ID135615|    Low|2025-11-10 02:15:...|
|ID135616|    Low|2025-11-10 02:15:...|
|ID135617| Medium|2025-11-10 02:15:...|
|ID135618|   High|2025-11-10 02:15:...|
|ID135619|   High|2025-11-10 02:15:...|
+--------+-------+--------------------+
only showing top 20 rows


In [0]:
# creating time buckets
df_tb_sales = spark.sql("""
    select
        terr_id,
        cust_id,
        mkt_id, 
        prod_id,
        concat('M', lpad(month(sls_dt), 2, '0')) as tb_nm,
        case 
            when year(sls_dt) = 2025 then 'Curr'
            when year(sls_dt) = 2024 then 'Prev'
            else null
        end as tb_typ,
        cust_typ,
        sum(nrx_ct) as nrx_ct,
        sum(nbrx_ct) as nbrx_ct,
        sum(trx_ct) as trx_ct
    from
        df_restructured_sales
    group by
        terr_id,
        cust_id,
        mkt_id, 
        prod_id,
        tb_nm,
        tb_typ,
        cust_typ

    union all

    select
        terr_id,
        cust_id,
        mkt_id, 
        prod_id,
        "Y" as tb_nm,
        case 
            when year(sls_dt) = 2025 then 'Curr'
            when year(sls_dt) = 2024 then 'Prev'
            else null
        end as tb_typ,
        cust_typ,
        sum(nrx_ct) as nrx_ct,
        sum(nbrx_ct) as nbrx_ct,
        sum(trx_ct) as trx_ct
    from
        df_restructured_sales
    group by
        terr_id,
        cust_id,
        mkt_id, 
        prod_id,
        tb_nm,
        tb_typ,
        cust_typ
""")

df_tb_sales.createOrReplaceTempView("df_tb_sales")
display(spark.sql("select distinct tb_nm, tb_typ from df_tb_sales order by tb_nm, tb_typ"))

tb_nm,tb_typ
M01,Curr
M02,Curr
M03,Curr
M04,Curr
M05,Curr
M06,Curr
M07,Curr
M07,Prev
M08,Prev
M09,Prev


In [0]:
# bringing in product details
# broadcasting smaller table

df_prod_sales = spark.sql("""
                          select /*+ BROADCASTJOIN(df_load_prod) */
                            terr_id,
                            cust_id,
                            sales.mkt_id, 
                            mkt_nm,
                            sales.prod_id,
                            prod_nm,
                            tb_nm,
                            tb_typ,
                            cust_typ,
                            nrx_ct,
                            nbrx_ct,
                            trx_ct
                          from df_tb_sales sales
                          join
                          df_load_prod prod
                          on sales.prod_id = prod.prod_id
                          and sales.mkt_id = prod.mkt_id
                          """)

df_prod_sales.createOrReplaceTempView("df_prod_sales")
df_prod_sales.show()

+-------+--------+------+--------------+-------+---------+-----+------+---------------+------+-------+------+
|terr_id| cust_id|mkt_id|        mkt_nm|prod_id|  prod_nm|tb_nm|tb_typ|       cust_typ|nrx_ct|nbrx_ct|trx_ct|
+-------+--------+------+--------------+-------+---------+-----+------+---------------+------+-------+------+
| 134289|ID136310|  4000|         Renal|   1001|Product B|  M04|  Curr|   Supermarkets|   506|    678|  1394|
| 134321|ID136077|  4004|      Oncology|   1003|Product D|  M05|  Curr|   Supermarkets|  1215|     87|  1455|
| 134318|ID136079|  4000|         Renal|   1003|Product D|  M01|  Curr|        Clinics|  1028|    451|  1914|
| 134314|ID135871|  4003|Cardiovascular|   1003|Product D|  M04|  Curr|       Pharmacy|   504|    369|  1346|
| 134298|ID136173|  4001|   Respiratory|   1003|Product D|  M08|  Prev|        Clinics|  1176|    221|  1431|
| 134313|ID135953|  4000|         Renal|   1001|Product B|  M10|  Prev|   Supermarkets|   587|   1083|  1872|
| 134318|I

In [0]:
# bringing in customer details

df_cust_sales = spark.sql("""
                          select
                            terr_id,
                            sales.cust_id,
                            cust_nm,
                            mkt_id, 
                            mkt_nm,
                            prod_id,
                            prod_nm,
                            tb_nm,
                            tb_typ,
                            cust_typ,
                            nrx_ct,
                            nbrx_ct,
                            trx_ct
                          from df_prod_sales sales
                          join
                          df_load_cust cust
                          on sales.cust_id = cust.cust_id
                          """)
df_cust_sales.createOrReplaceTempView("df_cust_sales")
df_cust_sales.show()                         
                          

+-------+--------+--------------------+------+--------------+-------+---------+-----+------+---------------+------+-------+------+
|terr_id| cust_id|             cust_nm|mkt_id|        mkt_nm|prod_id|  prod_nm|tb_nm|tb_typ|       cust_typ|nrx_ct|nbrx_ct|trx_ct|
+-------+--------+--------------------+------+--------------+-------+---------+-----+------+---------------+------+-------+------+
| 134289|ID136310|      Stephen Becker|  4000|         Renal|   1001|Product B|  M04|  Curr|   Supermarkets|   506|    678|  1394|
| 134321|ID136077|Kathleen Jenkins DVM|  4004|      Oncology|   1003|Product D|  M05|  Curr|   Supermarkets|  1215|     87|  1455|
| 134318|ID136079|      Jennifer Jones|  4000|         Renal|   1003|Product D|  M01|  Curr|        Clinics|  1028|    451|  1914|
| 134314|ID135871|     Heather Simmons|  4003|Cardiovascular|   1003|Product D|  M04|  Curr|       Pharmacy|   504|    369|  1346|
| 134298|ID136173|   Nicholas Caldwell|  4001|   Respiratory|   1003|Product D|  M0

In [0]:
# bringing in segment details

df_seg_sales = spark.sql("""select
                          terr_id,
                            sales.cust_id,
                            cust_nm,
                            mkt_id, 
                            mkt_nm,
                            prod_id,
                            prod_nm,
                            tb_nm,
                            tb_typ,
                            cust_typ,
                            seg_nm,
                            nrx_ct,
                            nbrx_ct,
                            trx_ct
                          from df_cust_sales sales
                          join
                          df_load_seg seg
                          on sales.cust_id = seg.cust_id
                          """)

df_seg_sales.createOrReplaceTempView("df_seg_sales")
df_seg_sales.show()

+-------+--------+--------------------+------+--------------+-------+---------+-----+------+---------------+------+------+-------+------+
|terr_id| cust_id|             cust_nm|mkt_id|        mkt_nm|prod_id|  prod_nm|tb_nm|tb_typ|       cust_typ|seg_nm|nrx_ct|nbrx_ct|trx_ct|
+-------+--------+--------------------+------+--------------+-------+---------+-----+------+---------------+------+------+-------+------+
| 134289|ID136310|      Stephen Becker|  4000|         Renal|   1001|Product B|  M04|  Curr|   Supermarkets|  High|   506|    678|  1394|
| 134321|ID136077|Kathleen Jenkins DVM|  4004|      Oncology|   1003|Product D|  M05|  Curr|   Supermarkets|  High|  1215|     87|  1455|
| 134318|ID136079|      Jennifer Jones|  4000|         Renal|   1003|Product D|  M01|  Curr|        Clinics|Medium|  1028|    451|  1914|
| 134314|ID135871|     Heather Simmons|  4003|Cardiovascular|   1003|Product D|  M04|  Curr|       Pharmacy|  High|   504|    369|  1346|
| 134298|ID136173|   Nicholas Cald

In [0]:
# ranking hcps by nbrx
df_nbrx_sales = spark.sql("""
                          select
                            terr_id,
                            cust_id,
                            cust_nm,
                            mkt_id, 
                            mkt_nm,
                            prod_id,
                            prod_nm,
                            tb_nm,
                            tb_typ,
                            cust_typ,
                            seg_nm, 
                            nbrx_ct,
                            nrx_ct,
                            nbrx_ct,
                            trx_ct,
                            dense_rank() over (partition by terr_id order by nbrx_ct desc) as nbrx_rank
                          from df_seg_sales
                          where tb_nm = "Y"
                          and tb_typ = "Curr"
                          and prod_nm = "Product A" 
                          group by 
                           terr_id,
                            cust_id,
                            cust_nm,
                            mkt_id, 
                            mkt_nm,
                            prod_id,
                            prod_nm,
                            tb_nm,
                            tb_typ,
                            cust_typ,
                            seg_nm, 
                            nbrx_ct,
                            nrx_ct,
                            nbrx_ct,
                            trx_ct
                          """)
df_nbrx_sales.createOrReplaceTempView("df_nbrx_sales")
spark.sql("select * from df_nbrx_sales where nbrx_rank >1").show()
# df_nbrx_sales.show()

+-------+--------+--------------------+------+--------------+-------+---------+-----+------+---------------+-------+-------+------+-------+------+---------+
|terr_id| cust_id|             cust_nm|mkt_id|        mkt_nm|prod_id|  prod_nm|tb_nm|tb_typ|       cust_typ| seg_nm|nbrx_ct|nrx_ct|nbrx_ct|trx_ct|nbrx_rank|
+-------+--------+--------------------+------+--------------+-------+---------+-----+------+---------------+-------+-------+------+-------+------+---------+
| 134287|ID135805|       Stephen Smith|  4002|      Diabetes|   1000|Product A|    Y|  Curr|        Clinics|   High|   1245|  1315|   1245|  2865|        2|
| 134287|ID135846|Miss Elizabeth Evans|  4000|         Renal|   1000|Product A|    Y|  Curr|Online Pharmacy|    Low|   1049|  1169|   1049|  2286|        3|
| 134287|ID136341|     Jennifer Tucker|  4004|      Oncology|   1000|Product A|    Y|  Curr|Online Pharmacy| Medium|    555|  1915|    555|  2619|        4|
| 134287|ID136087|         Tami Robles|  4000|         Ren

In [0]:
# bringing in geography details

df_hcp_geo_sales = spark.sql("""select
                            sales.terr_id,
                            terr_nm, 
                            dist_nm, 
                            reg_nm, 
                            area_nm,
                            cust_id,
                            cust_nm,
                            mkt_id, 
                            mkt_nm,
                            prod_id,
                            prod_nm,
                            tb_nm,
                            tb_typ,
                            cust_typ,
                            seg_nm,
                            nbrx_rank,
                            nrx_ct,
                            nbrx_ct,
                            trx_ct
                          from df_nbrx_sales sales
                          join
                          df_load_geo geo
                          on sales.terr_id = geo.terr_id
                          """)

df_hcp_geo_sales.createOrReplaceTempView("df_hcp_geo_sales")
df_hcp_geo_sales.show()

+-------+-------+-------+---------+-------+--------+--------------------+------+-----------+-------+---------+-----+------+---------------+-------+---------+------+-------+------+
|terr_id|terr_nm|dist_nm|   reg_nm|area_nm| cust_id|             cust_nm|mkt_id|     mkt_nm|prod_id|  prod_nm|tb_nm|tb_typ|       cust_typ| seg_nm|nbrx_rank|nrx_ct|nbrx_ct|trx_ct|
+-------+-------+-------+---------+-------+--------+--------------------+------+-----------+-------+---------+-----+------+---------------+-------+---------+------+-------+------+
| 134287| Terr 2| Dist 1|  South 1|  South|ID135984|          Erik Silva|  4004|   Oncology|   1000|Product A|    Y|  Curr|   Supermarkets| Medium|        1|  1208|   1320|  2917|
| 134287| Terr 2| Dist 1|  South 1|  South|ID135805|       Stephen Smith|  4002|   Diabetes|   1000|Product A|    Y|  Curr|        Clinics|   High|        2|  1315|   1245|  2865|
| 134287| Terr 2| Dist 1|  South 1|  South|ID135846|Miss Elizabeth Evans|  4000|      Renal|   1000|

In [0]:
#pushing HCP level ranked fact to s3

# Defining s3 write path
s3_path = "s3://amazon-l2-reporting-prod/datalake/"+"hcp_sales/"
#print (s3_path)

df_hcp_geo_sales_final = spark.sql("""select
                            *, now() as load_dt
                          from df_hcp_geo_sales
                          """)
df_hcp_geo_sales_final.createOrReplaceTempView("df_hcp_geo_sales_final")
df_hcp_geo_sales_final.show()

# Write parquet file to s3

df_hcp_geo_sales_final.write.parquet(s3_path)

+-------+-------+-------+---------+-------+--------+--------------------+------+-----------+-------+---------+-----+------+---------------+-------+---------+------+-------+------+--------------------+
|terr_id|terr_nm|dist_nm|   reg_nm|area_nm| cust_id|             cust_nm|mkt_id|     mkt_nm|prod_id|  prod_nm|tb_nm|tb_typ|       cust_typ| seg_nm|nbrx_rank|nrx_ct|nbrx_ct|trx_ct|             load_dt|
+-------+-------+-------+---------+-------+--------+--------------------+------+-----------+-------+---------+-----+------+---------------+-------+---------+------+-------+------+--------------------+
| 134287| Terr 2| Dist 1|  South 1|  South|ID135984|          Erik Silva|  4004|   Oncology|   1000|Product A|    Y|  Curr|   Supermarkets| Medium|        1|  1208|   1320|  2917|2025-11-10 05:12:...|
| 134287| Terr 2| Dist 1|  South 1|  South|ID135805|       Stephen Smith|  4002|   Diabetes|   1000|Product A|    Y|  Curr|        Clinics|   High|        2|  1315|   1245|  2865|2025-11-10 05:12:

In [0]:
# dropping customer grain and aggregating sales
# calculating market volume
# creating a bucket for all customer types

df_mkt_sales = spark.sql("""
    WITH base_sales AS (
        SELECT
            terr_id,
            terr_nm, 
            dist_nm, 
            reg_nm, 
            area_nm,
            mkt_id, 
            mkt_nm,
            prod_id,
            prod_nm,
            tb_nm,
            tb_typ,
            cust_typ,
            seg_nm,
            SUM(nrx_ct) AS nrx_ct,
            SUM(nbrx_ct) AS nbrx_ct,
            SUM(trx_ct) AS trx_ct
        FROM
            df_geo_sales
        GROUP BY 
            terr_id,
            terr_nm, 
            dist_nm, 
            reg_nm, 
            area_nm,
            mkt_id, 
            mkt_nm,
            prod_id,
            prod_nm,
            tb_nm,
            tb_typ,
            cust_typ,
            seg_nm
    ),
    all_sales AS (
        SELECT
            terr_id,
            terr_nm, 
            dist_nm, 
            reg_nm, 
            area_nm,
            mkt_id, 
            mkt_nm,
            prod_id,
            prod_nm,
            tb_nm,
            tb_typ,
            'ALL' AS cust_typ,
            seg_nm,
            SUM(nrx_ct) AS nrx_ct,
            SUM(nbrx_ct) AS nbrx_ct,
            SUM(trx_ct) AS trx_ct
        FROM base_sales
        GROUP BY
            terr_id,
            terr_nm, 
            dist_nm, 
            reg_nm, 
            area_nm,
            mkt_id, 
            mkt_nm,
            prod_id,
            prod_nm,
            tb_nm,
            tb_typ,
            seg_nm
    )
    select
        *,
        SUM(nrx_ct) OVER (
            PARTITION BY terr_id, mkt_id, tb_nm, tb_typ
        ) AS nrx_mkt_vol,
        SUM(nbrx_ct) OVER (
            PARTITION BY terr_id, mkt_id, tb_nm, tb_typ
        ) AS nbrx_mkt_vol,
        SUM(trx_ct) OVER (
            PARTITION BY terr_id, mkt_id, tb_nm, tb_typ
        ) AS trx_mkt_vol
    from base_sales

    union all

    select
        *,
        SUM(nrx_ct) OVER (
            PARTITION BY terr_id, mkt_id, tb_nm, tb_typ
        ) AS nrx_mkt_vol,
        SUM(nbrx_ct) OVER (
            PARTITION BY terr_id, mkt_id, tb_nm, tb_typ
        ) AS nbrx_mkt_vol,
        SUM(trx_ct) OVER (
            PARTITION BY terr_id, mkt_id, tb_nm, tb_typ
        ) AS trx_mkt_vol
    FROM all_sales
""")
df_mkt_sales.createOrReplaceTempView("df_mkt_sales")
display(df_mkt_sales)

terr_id,terr_nm,dist_nm,reg_nm,area_nm,mkt_id,mkt_nm,prod_id,prod_nm,tb_nm,tb_typ,cust_typ,seg_nm,nrx_ct,nbrx_ct,trx_ct,nrx_mkt_vol,nbrx_mkt_vol,trx_mkt_vol
134287,Terr 2,Dist 1,South 1,South,4000,Renal,1000,Product A,Y,Curr,Clinics,High,110,97,387,1925,1190,3722
134287,Terr 2,Dist 1,South 1,South,4000,Renal,1000,Product A,Y,Curr,Pharmacy,Low,646,44,1049,1925,1190,3722
134287,Terr 2,Dist 1,South 1,South,4000,Renal,1000,Product A,Y,Curr,Online Pharmacy,Low,1169,1049,2286,1925,1190,3722
134287,Terr 2,Dist 1,South 1,South,4002,Diabetes,1000,Product A,Y,Curr,Clinics,High,1315,1245,2865,1315,1245,2865
134287,Terr 2,Dist 1,South 1,South,4004,Oncology,1000,Product A,Y,Curr,Online Pharmacy,Medium,1915,555,2619,3123,1875,5536
134287,Terr 2,Dist 1,South 1,South,4004,Oncology,1000,Product A,Y,Curr,Supermarkets,Medium,1208,1320,2917,3123,1875,5536
134289,Terr 1,Dist 2,East 2,East,4002,Diabetes,1000,Product A,Y,Curr,Clinics,Low,290,36,578,290,36,578
134291,Terr 2,Dist 2,Central 1,Central,4001,Respiratory,1000,Product A,Y,Curr,Supermarkets,Low,1069,1284,2643,1069,1284,2643
134292,Terr 1,Dist 1,North 1,North,4001,Respiratory,1000,Product A,Y,Curr,Hospitals,Medium,600,641,1478,600,641,1478
134292,Terr 1,Dist 1,North 1,North,4002,Diabetes,1000,Product A,Y,Curr,Clinics,High,411,1321,1769,411,1321,1769


In [0]:
# aggregating sales at different geo levels
df_agg_sales = spark.sql("""
        SELECT
            "" as terr_id,
            "" as terr_nm, 
            dist_nm, 
            reg_nm, 
            area_nm,
            "Nation" as nat_nm,
            mkt_id, 
            mkt_nm,
            prod_id,
            prod_nm,
            tb_nm,
            tb_typ,
            cust_typ,
            seg_nm,
            SUM(nrx_ct) AS nrx_ct,
            SUM(nbrx_ct) AS nbrx_ct,
            SUM(trx_ct) AS trx_ct,
            SUM(nrx_mkt_vol) AS nrx_mkt_vol,
            SUM(nbrx_mkt_vol) AS nbrx_mkt_vol,
            SUM(trx_mkt_vol) AS trx_mkt_vol
        FROM
            df_mkt_sales
            group BY
            dist_nm, 
            reg_nm, 
            area_nm,
            mkt_id, 
            mkt_nm,
            prod_id,
            prod_nm,
            tb_nm,
            tb_typ,
            cust_typ,
            seg_nm
           
            """)

df_agg_sales.createOrReplaceTempView("df_agg_sales")
display(df_agg_sales)

df_agg_sales_reg = spark.sql("""
        SELECT
            "" as terr_id,
            "" as terr_nm, 
            "" as dist_nm, 
            reg_nm, 
            area_nm,
            "Nation" as nat_nm,
            mkt_id, 
            mkt_nm,
            prod_id,
            prod_nm,
            tb_nm,
            tb_typ,
            cust_typ,
            seg_nm,
            SUM(nrx_ct) AS nrx_ct,
            SUM(nbrx_ct) AS nbrx_ct,
            SUM(trx_ct) AS trx_ct,
            SUM(nrx_mkt_vol) AS nrx_mkt_vol,
            SUM(nbrx_mkt_vol) AS nbrx_mkt_vol,
            SUM(trx_mkt_vol) AS trx_mkt_vol
        FROM
            df_mkt_sales
            group BY 
            reg_nm, 
            area_nm,
            mkt_id, 
            mkt_nm,
            prod_id,
            prod_nm,
            tb_nm,
            tb_typ,
            cust_typ,
            seg_nm
           
            """)

df_agg_sales_reg.createOrReplaceTempView("df_agg_sales_reg")
display(df_agg_sales_reg)

df_agg_sales_area = spark.sql("""
        SELECT
            "" as terr_id,
            "" as terr_nm, 
            "" as dist_nm, 
            "" as reg_nm, 
            area_nm,
            "Nation" as nat_nm,
            mkt_id, 
            mkt_nm,
            prod_id,
            prod_nm,
            tb_nm,
            tb_typ,
            cust_typ,
            seg_nm,
            SUM(nrx_ct) AS nrx_ct,
            SUM(nbrx_ct) AS nbrx_ct,
            SUM(trx_ct) AS trx_ct,
            SUM(nrx_mkt_vol) AS nrx_mkt_vol,
            SUM(nbrx_mkt_vol) AS nbrx_mkt_vol,
            SUM(trx_mkt_vol) AS trx_mkt_vol
        FROM
            df_mkt_sales
            group BY 
            area_nm,
            mkt_id, 
            mkt_nm,
            prod_id,
            prod_nm,
            tb_nm,
            tb_typ,
            cust_typ,
            seg_nm
           
            """)

df_agg_sales_area.createOrReplaceTempView("df_agg_sales_area")
display(df_agg_sales_area)


df_agg_sales_nat = spark.sql("""
        SELECT
            "" as terr_id,
            "" as terr_nm, 
            "" as dist_nm, 
            "" as reg_nm, 
            "" as area_nm,
            "Nation" as nat_nm,
            mkt_id, 
            mkt_nm,
            prod_id,
            prod_nm,
            tb_nm,
            tb_typ,
            cust_typ,
            seg_nm,
            SUM(nrx_ct) AS nrx_ct,
            SUM(nbrx_ct) AS nbrx_ct,
            SUM(trx_ct) AS trx_ct,
            SUM(nrx_mkt_vol) AS nrx_mkt_vol,
            SUM(nbrx_mkt_vol) AS nbrx_mkt_vol,
            SUM(trx_mkt_vol) AS trx_mkt_vol
        FROM
            df_mkt_sales
            group BY 
            
            mkt_id, 
            mkt_nm,
            prod_id,
            prod_nm,
            tb_nm,
            tb_typ,
            cust_typ,
            seg_nm
           
            """)

df_agg_sales_nat.createOrReplaceTempView("df_agg_sales_nat")
display(df_agg_sales_nat)


# union all the dfs at end
df_agg_sales_all = df_agg_sales.unionByName(df_agg_sales_reg).unionByName(df_agg_sales_area).unionByName(df_agg_sales_nat)
display(df_agg_sales_all)

terr_id,terr_nm,dist_nm,reg_nm,area_nm,nat_nm,mkt_id,mkt_nm,prod_id,prod_nm,tb_nm,tb_typ,cust_typ,seg_nm,nrx_ct,nbrx_ct,trx_ct,nrx_mkt_vol,nbrx_mkt_vol,trx_mkt_vol
,,Dist 1,North 2,North,Nation,4002,Diabetes,1000,Product A,Y,Curr,Pharmacy,Medium,996,259,1750,996,259,1750
,,Dist 1,Central 2,Central,Nation,4004,Oncology,1000,Product A,Y,Curr,Supermarkets,Low,200,231,482,200,231,482
,,Dist 1,Central 2,Central,Nation,4004,Oncology,1000,Product A,Y,Curr,Clinics,Medium,1161,654,2314,1161,654,2314
,,Dist 1,West 1,West,Nation,4003,Cardiovascular,1000,Product A,Y,Curr,Supermarkets,Low,994,511,1882,2493,1225,4174
,,Dist 2,East 1,East,Nation,4000,Renal,1000,Product A,Y,Curr,Pharmacy,Medium,1082,1237,2648,1953,2067,4371
,,Dist 2,West 2,West,Nation,4003,Cardiovascular,1000,Product A,Y,Curr,Hospitals,High,415,155,1033,415,155,1033
,,Dist 2,North 1,North,Nation,4000,Renal,1000,Product A,Y,Curr,Hospitals,Low,355,1300,1791,355,1300,1791
,,Dist 2,South 1,South,Nation,4001,Respiratory,1000,Product A,Y,Curr,Pharmacy,Medium,354,92,541,354,92,541
,,Dist 2,Central 1,Central,Nation,4001,Respiratory,1000,Product A,Y,Curr,Supermarkets,Low,1069,1284,2643,1069,1284,2643
,,Dist 2,North 2,North,Nation,4001,Respiratory,1000,Product A,Y,Curr,Pharmacy,Low,451,1196,2006,451,1196,2006


terr_id,terr_nm,dist_nm,reg_nm,area_nm,nat_nm,mkt_id,mkt_nm,prod_id,prod_nm,tb_nm,tb_typ,cust_typ,seg_nm,nrx_ct,nbrx_ct,trx_ct,nrx_mkt_vol,nbrx_mkt_vol,trx_mkt_vol
,,,West 2,West,Nation,4004,Oncology,1000,Product A,Y,Curr,Hospitals,High,1423,103,1562,3664,2329,6565
,,,South 1,South,Nation,4000,Renal,1000,Product A,Y,Curr,Online Pharmacy,Low,2833,2170,5217,3589,2311,6653
,,,Central 1,Central,Nation,4001,Respiratory,1000,Product A,Y,Curr,Hospitals,Low,399,847,1740,399,847,1740
,,,South 1,South,Nation,4000,Renal,1000,Product A,Y,Curr,Pharmacy,Low,646,44,1049,1925,1190,3722
,,,West 2,West,Nation,4004,Oncology,1000,Product A,Y,Curr,Online Pharmacy,High,1135,876,2331,2096,1170,4009
,,,Central 1,Central,Nation,4002,Diabetes,1000,Product A,Y,Curr,Pharmacy,Low,1291,1379,3004,2705,2659,5748
,,,West 1,West,Nation,4002,Diabetes,1000,Product A,Y,Curr,Supermarkets,High,1432,428,2229,1432,428,2229
,,,Central 2,Central,Nation,4000,Renal,1000,Product A,Y,Curr,Clinics,Medium,153,1458,1630,153,1458,1630
,,,East 1,East,Nation,4000,Renal,1000,Product A,Y,Curr,Hospitals,Low,871,830,1723,1953,2067,4371
,,,North 1,North,Nation,4001,Respiratory,1000,Product A,Y,Curr,Online Pharmacy,High,1503,602,2603,2255,1433,4510


terr_id,terr_nm,dist_nm,reg_nm,area_nm,nat_nm,mkt_id,mkt_nm,prod_id,prod_nm,tb_nm,tb_typ,cust_typ,seg_nm,nrx_ct,nbrx_ct,trx_ct,nrx_mkt_vol,nbrx_mkt_vol,trx_mkt_vol
,,,,West,Nation,4000,Renal,1000,Product A,Y,Curr,Hospitals,Medium,1820,44,2362,1820,44,2362
,,,,East,Nation,4004,Oncology,1000,Product A,Y,Curr,Hospitals,High,776,161,1050,776,161,1050
,,,,East,Nation,4002,Diabetes,1000,Product A,Y,Curr,Online Pharmacy,Medium,677,954,1865,1683,1498,3633
,,,,East,Nation,4004,Oncology,1000,Product A,Y,Curr,Clinics,Medium,1454,275,2179,2909,398,4097
,,,,West,Nation,4000,Renal,1000,Product A,Y,Curr,Online Pharmacy,Medium,698,64,834,698,64,834
,,,,Central,Nation,4001,Respiratory,1000,Product A,Y,Curr,Hospitals,Low,399,847,1740,399,847,1740
,,,,North,Nation,4001,Respiratory,1000,Product A,Y,Curr,Hospitals,Medium,600,641,1478,600,641,1478
,,,,Central,Nation,4000,Renal,1000,Product A,Y,Curr,Online Pharmacy,High,684,664,1669,684,664,1669
,,,,Central,Nation,4002,Diabetes,1000,Product A,Y,Curr,Clinics,Medium,1414,1280,2744,2705,2659,5748
,,,,West,Nation,4002,Diabetes,1000,Product A,Y,Curr,Pharmacy,Medium,1589,768,2632,1589,768,2632


terr_id,terr_nm,dist_nm,reg_nm,area_nm,nat_nm,mkt_id,mkt_nm,prod_id,prod_nm,tb_nm,tb_typ,cust_typ,seg_nm,nrx_ct,nbrx_ct,trx_ct,nrx_mkt_vol,nbrx_mkt_vol,trx_mkt_vol
,,,,,Nation,4003,Cardiovascular,1000,Product A,Y,Curr,Pharmacy,High,1367,1005,2723,2967,1514,5161
,,,,,Nation,4001,Respiratory,1000,Product A,Y,Curr,Supermarkets,Medium,751,2310,3432,2728,4042,7514
,,,,,Nation,4004,Oncology,1000,Product A,Y,Curr,Clinics,Low,3519,1741,6190,6388,3409,11328
,,,,,Nation,4002,Diabetes,1000,Product A,Y,Curr,Online Pharmacy,Low,862,328,1361,862,328,1361
,,,,,Nation,4001,Respiratory,1000,Product A,Y,Curr,Online Pharmacy,Medium,2032,1484,4176,3806,3087,8419
,,,,,Nation,4000,Renal,1000,Product A,Y,Curr,Pharmacy,High,1696,96,1996,1696,96,1996
,,,,,Nation,4000,Renal,1000,Product A,Y,Curr,Online Pharmacy,Low,4451,2262,6946,5207,2403,8382
,,,,,Nation,4003,Cardiovascular,1000,Product A,Y,Curr,Hospitals,Low,3688,2226,6874,5055,3231,9597
,,,,,Nation,4004,Oncology,1000,Product A,Y,Curr,Supermarkets,Medium,1208,1320,2917,3123,1875,5536
,,,,,Nation,4000,Renal,1000,Product A,Y,Curr,Supermarkets,High,2046,881,3529,5309,1401,7723


terr_id,terr_nm,dist_nm,reg_nm,area_nm,nat_nm,mkt_id,mkt_nm,prod_id,prod_nm,tb_nm,tb_typ,cust_typ,seg_nm,nrx_ct,nbrx_ct,trx_ct,nrx_mkt_vol,nbrx_mkt_vol,trx_mkt_vol
,,Dist 1,North 2,North,Nation,4002,Diabetes,1000,Product A,Y,Curr,Pharmacy,Medium,996,259,1750,996,259,1750
,,Dist 1,Central 2,Central,Nation,4004,Oncology,1000,Product A,Y,Curr,Supermarkets,Low,200,231,482,200,231,482
,,Dist 1,Central 2,Central,Nation,4004,Oncology,1000,Product A,Y,Curr,Clinics,Medium,1161,654,2314,1161,654,2314
,,Dist 1,West 1,West,Nation,4003,Cardiovascular,1000,Product A,Y,Curr,Supermarkets,Low,994,511,1882,2493,1225,4174
,,Dist 2,East 1,East,Nation,4000,Renal,1000,Product A,Y,Curr,Pharmacy,Medium,1082,1237,2648,1953,2067,4371
,,Dist 2,West 2,West,Nation,4003,Cardiovascular,1000,Product A,Y,Curr,Hospitals,High,415,155,1033,415,155,1033
,,Dist 2,North 1,North,Nation,4000,Renal,1000,Product A,Y,Curr,Hospitals,Low,355,1300,1791,355,1300,1791
,,Dist 2,South 1,South,Nation,4001,Respiratory,1000,Product A,Y,Curr,Pharmacy,Medium,354,92,541,354,92,541
,,Dist 2,Central 1,Central,Nation,4001,Respiratory,1000,Product A,Y,Curr,Supermarkets,Low,1069,1284,2643,1069,1284,2643
,,Dist 2,North 2,North,Nation,4001,Respiratory,1000,Product A,Y,Curr,Pharmacy,Low,451,1196,2006,451,1196,2006


In [0]:
#pushing geo level sales to s3

# Defining s3 write path
s3_path = "s3://amazon-l2-reporting-prod/datalake/"+"geo_sales/"
#print (s3_path)

# Write parquet file to s3

df_agg_sales_all.write.parquet(s3_path)