In [19]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import os

In [2]:
sc = pyspark.SparkContext(appName="Test")

In [3]:
spark = SparkSession.builder.getOrCreate()

In [5]:
spark.catalog.listTables()

[]

In [11]:
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

In [12]:
pd_temp

Unnamed: 0,0
0,0.37357
1,0.299461
2,0.106243
3,0.324899
4,0.146596
5,0.845526
6,0.397989
7,0.624958
8,0.264176
9,0.504396


In [13]:
# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)

In [15]:
spark_temp.show()

+-------------------+
|                  0|
+-------------------+
| 0.3735697605572128|
| 0.2994613481293389|
|0.10624306224626434|
|0.32489926687458326|
|0.14659599897197795|
| 0.8455256376005582|
|0.39798908360638774|
| 0.6249576927341451|
| 0.2641758060590681|
| 0.5043961973892639|
+-------------------+



In [16]:
# Examine the tables in the catalog
print(spark.catalog.listTables())

[]


In [17]:
# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView('temp')

In [18]:
# Examine the tables in the catalog again
print(spark.catalog.listTables())

[Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [20]:
def getPath(*paths):
    workspace_dir = os.path.abspath(os.path.join(os.getcwd()))
    path = os.path.join(workspace_dir, *paths)
    return path

In [26]:
spark_csv1 = spark.read.csv(
    getPath('data', 'BI', 'NW-Outlet-Sales-Transaction-091011-2017.csv'), 
    header=True, 
    sep='\t')

In [27]:
spark_csv1.show()

+---+--------------------+---------+---------------+--------+------------+---------+------------------+------------+-------+---------------+----------------+-------------------+--------------------+--------------------+------------------+---------------------+-----------+
|_c0|              BRANDY|CAL_MONTH|           CITY|DATE_KEY|DAY_OF_MONTH| DIVISION|       GROSS_SALES|LOCAL_REGION| OUTLET|OUTLET_LATITUDE|OUTLET_LONGITUDE|        POSITION_CD|             STD_SKU|        SUBCAT_BRAND|      SUB_DIVISION|TOTAL_SPENT_PROMOTION|URBAN_RURAL|
+---+--------------------+---------+---------------+--------+------------+---------+------------------+------------+-------+---------------+----------------+-------------------+--------------------+--------------------+------------------+---------------------+-----------+
|  0|              Chinsu|        9|   T. Thái Bình|20170904|           4|    Foods| 432000.0000000000|    Miền Bắc| 112745|  20.4336933000|  106.3648159000|  SM-GV-10KF0830.01|Chil

In [30]:
spark.catalog.listTables()

[Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [31]:
spark_csv1.createOrReplaceTempView('transaction')

In [32]:
spark.catalog.listTables()

[Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='transaction', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [37]:
query = "SELECT OUTLET, STD_SKU, GROSS_SALES FROM transaction"
result = spark.sql(query)

In [42]:
%%time
result.show(truncate=False)

+-------+--------------------------------------------------+------------------+
|OUTLET |STD_SKU                                           |GROSS_SALES       |
+-------+--------------------------------------------------+------------------+
|112745 |Chilli Sauces Chinsu 250gr                        |432000.0000000000 |
|2100552|Instant Noodle Omachi Bò 80gr                     |672000.0000000000 |
|352731 |Chilli Sauces Chinsu 250gr                        |216000.0000000000 |
|167131 |Instant Coffee Wake up 3 in 1 Sài Gòn 16gr        |70500.0000000000  |
|295806 |Chilli Sauces Rồng Việt 200gr                     |120000.0000000000 |
|6237   |Instant Noodle Omachi Spaghetti Bò 91gr           |180000.0000000000 |
|141839 |Sterilized Sausages Heo cao bồi xốt spaghetti 20gr|162000.0000000000 |
|49794  |Soya Sauces Tam Thái Tử Nhị Ca 500ml              |150000.0000000000 |
|88264  |Instant Noodle Kokomi Đại hải sản 75gr            |87000.0000000000  |
|88536  |Instant Noodle Kokomi Đại Tôm 7

In [48]:
%%time
# Create the DataFrame flights
df_transaction = spark.table('transaction')

CPU times: user 2.29 ms, sys: 0 ns, total: 2.29 ms
Wall time: 5.75 ms


In [49]:
type(df_transaction)

pyspark.sql.dataframe.DataFrame

In [50]:
# Add scale_gross_sales
df_transaction = df_transaction.withColumn('scale_gross_sales', df_transaction.GROSS_SALES/1000000)

In [51]:
%%time
df_transaction.show()

+---+--------------------+---------+---------------+--------+------------+---------+------------------+------------+-------+---------------+----------------+-------------------+--------------------+--------------------+------------------+---------------------+-----------+-----------------+
|_c0|              BRANDY|CAL_MONTH|           CITY|DATE_KEY|DAY_OF_MONTH| DIVISION|       GROSS_SALES|LOCAL_REGION| OUTLET|OUTLET_LATITUDE|OUTLET_LONGITUDE|        POSITION_CD|             STD_SKU|        SUBCAT_BRAND|      SUB_DIVISION|TOTAL_SPENT_PROMOTION|URBAN_RURAL|scale_gross_sales|
+---+--------------------+---------+---------------+--------+------------+---------+------------------+------------+-------+---------------+----------------+-------------------+--------------------+--------------------+------------------+---------------------+-----------+-----------------+
|  0|              Chinsu|        9|   T. Thái Bình|20170904|           4|    Foods| 432000.0000000000|    Miền Bắc| 112745|  2

In [53]:
#Filter with a SQL string
df_transaction1 = df_transaction.filter('scale_gross_sales > 1')
df_transaction1.show()

+---+--------------------+---------+---------------+--------+------------+---------+-------------------+------------+-------+---------------+----------------+--------------------+--------------------+--------------------+------------------+---------------------+-----------+-----------------+
|_c0|              BRANDY|CAL_MONTH|           CITY|DATE_KEY|DAY_OF_MONTH| DIVISION|        GROSS_SALES|LOCAL_REGION| OUTLET|OUTLET_LATITUDE|OUTLET_LONGITUDE|         POSITION_CD|             STD_SKU|        SUBCAT_BRAND|      SUB_DIVISION|TOTAL_SPENT_PROMOTION|URBAN_RURAL|scale_gross_sales|
+---+--------------------+---------+---------------+--------+------------+---------+-------------------+------------+-------+---------------+----------------+--------------------+--------------------+--------------------+------------------+---------------------+-----------+-----------------+
| 10|     Wake up SG 3in1|       11|  T. Bình Phước|20171127|          27|Beverages| 1280000.0000000000|   Miền Đông| 171

In [54]:
#Filter with a boolean column
df_transaction2 = df_transaction.filter(df_transaction.scale_gross_sales > 1)
df_transaction2.show()

+---+--------------------+---------+---------------+--------+------------+---------+-------------------+------------+-------+---------------+----------------+--------------------+--------------------+--------------------+------------------+---------------------+-----------+-----------------+
|_c0|              BRANDY|CAL_MONTH|           CITY|DATE_KEY|DAY_OF_MONTH| DIVISION|        GROSS_SALES|LOCAL_REGION| OUTLET|OUTLET_LATITUDE|OUTLET_LONGITUDE|         POSITION_CD|             STD_SKU|        SUBCAT_BRAND|      SUB_DIVISION|TOTAL_SPENT_PROMOTION|URBAN_RURAL|scale_gross_sales|
+---+--------------------+---------+---------------+--------+------------+---------+-------------------+------------+-------+---------------+----------------+--------------------+--------------------+--------------------+------------------+---------------------+-----------+-----------------+
| 10|     Wake up SG 3in1|       11|  T. Bình Phước|20171127|          27|Beverages| 1280000.0000000000|   Miền Đông| 171

In [55]:
df_selected1 = df_transaction.select('OUTLET', 'STD_SKU', 'scale_gross_sales')

In [57]:
df_selected1.show(truncate=False)

+-------+--------------------------------------------------+-----------------+
|OUTLET |STD_SKU                                           |scale_gross_sales|
+-------+--------------------------------------------------+-----------------+
|112745 |Chilli Sauces Chinsu 250gr                        |0.432            |
|2100552|Instant Noodle Omachi Bò 80gr                     |0.672            |
|352731 |Chilli Sauces Chinsu 250gr                        |0.216            |
|167131 |Instant Coffee Wake up 3 in 1 Sài Gòn 16gr        |0.0705           |
|295806 |Chilli Sauces Rồng Việt 200gr                     |0.12             |
|6237   |Instant Noodle Omachi Spaghetti Bò 91gr           |0.18             |
|141839 |Sterilized Sausages Heo cao bồi xốt spaghetti 20gr|0.162            |
|49794  |Soya Sauces Tam Thái Tử Nhị Ca 500ml              |0.15             |
|88264  |Instant Noodle Kokomi Đại hải sản 75gr            |0.087            |
|88536  |Instant Noodle Kokomi Đại Tôm 75gr         

In [58]:
df_selected2 = df_transaction.select(
    df_transaction.OUTLET, 
    df_transaction.STD_SKU, 
    df_transaction.scale_gross_sales)

In [59]:
df_selected2.show(truncate=False)

+-------+--------------------------------------------------+-----------------+
|OUTLET |STD_SKU                                           |scale_gross_sales|
+-------+--------------------------------------------------+-----------------+
|112745 |Chilli Sauces Chinsu 250gr                        |0.432            |
|2100552|Instant Noodle Omachi Bò 80gr                     |0.672            |
|352731 |Chilli Sauces Chinsu 250gr                        |0.216            |
|167131 |Instant Coffee Wake up 3 in 1 Sài Gòn 16gr        |0.0705           |
|295806 |Chilli Sauces Rồng Việt 200gr                     |0.12             |
|6237   |Instant Noodle Omachi Spaghetti Bò 91gr           |0.18             |
|141839 |Sterilized Sausages Heo cao bồi xốt spaghetti 20gr|0.162            |
|49794  |Soya Sauces Tam Thái Tử Nhị Ca 500ml              |0.15             |
|88264  |Instant Noodle Kokomi Đại hải sản 75gr            |0.087            |
|88536  |Instant Noodle Kokomi Đại Tôm 75gr         

In [63]:
filterA = df_selected2.STD_SKU == 'Instant Noodle Omachi Bò 80gr'
filterB = df_selected2.OUTLET == 2100552

In [64]:
df_selected3 = df_selected2.filter(filterA).filter(filterB)

In [66]:
%%time
df_selected3.show(truncate=False)

+-------+-----------------------------+-----------------+
|OUTLET |STD_SKU                      |scale_gross_sales|
+-------+-----------------------------+-----------------+
|2100552|Instant Noodle Omachi Bò 80gr|0.672            |
|2100552|Instant Noodle Omachi Bò 80gr|1.512            |
|2100552|Instant Noodle Omachi Bò 80gr|1.68             |
|2100552|Instant Noodle Omachi Bò 80gr|1.344            |
|2100552|Instant Noodle Omachi Bò 80gr|0.672            |
|2100552|Instant Noodle Omachi Bò 80gr|0.84             |
|2100552|Instant Noodle Omachi Bò 80gr|1.512            |
+-------+-----------------------------+-----------------+

CPU times: user 2.94 ms, sys: 0 ns, total: 2.94 ms
Wall time: 2.33 s


In [68]:
usd_convert = (df_transaction.GROSS_SALES/22000).alias('usd')

In [71]:
df_selected4 = df_transaction.select('OUTLET', 'STD_SKU', usd_convert)

In [73]:
df_selected4.show(truncate=False)

+-------+--------------------------------------------------+------------------+
|OUTLET |STD_SKU                                           |usd               |
+-------+--------------------------------------------------+------------------+
|112745 |Chilli Sauces Chinsu 250gr                        |19.636363636363637|
|2100552|Instant Noodle Omachi Bò 80gr                     |30.545454545454547|
|352731 |Chilli Sauces Chinsu 250gr                        |9.818181818181818 |
|167131 |Instant Coffee Wake up 3 in 1 Sài Gòn 16gr        |3.2045454545454546|
|295806 |Chilli Sauces Rồng Việt 200gr                     |5.454545454545454 |
|6237   |Instant Noodle Omachi Spaghetti Bò 91gr           |8.181818181818182 |
|141839 |Sterilized Sausages Heo cao bồi xốt spaghetti 20gr|7.363636363636363 |
|49794  |Soya Sauces Tam Thái Tử Nhị Ca 500ml              |6.818181818181818 |
|88264  |Instant Noodle Kokomi Đại hải sản 75gr            |3.9545454545454546|
|88536  |Instant Noodle Kokomi Đại Tôm 7

In [76]:
usd_df = df_transaction.selectExpr('OUTLET', 'STD_SKU', 'GROSS_SALES/22000 AS USD')

In [77]:
usd_df.show()

+-------+--------------------+------------------+
| OUTLET|             STD_SKU|               USD|
+-------+--------------------+------------------+
| 112745|Chilli Sauces Chi...|19.636363636363637|
|2100552|Instant Noodle Om...|30.545454545454547|
| 352731|Chilli Sauces Chi...| 9.818181818181818|
| 167131|Instant Coffee Wa...|3.2045454545454546|
| 295806|Chilli Sauces Rồn...| 5.454545454545454|
|   6237|Instant Noodle Om...| 8.181818181818182|
| 141839|Sterilized Sausag...| 7.363636363636363|
|  49794|Soya Sauces Tam T...| 6.818181818181818|
|  88264|Instant Noodle Ko...|3.9545454545454546|
|  88536|Instant Noodle Ko...|3.9545454545454546|
| 171697|Instant Coffee Wa...| 58.18181818181818|
| 276968|Energy Drinks Wak...| 7.727272734545454|
| 240193|Instant Cereals V...|30.227272727272727|
| 157945|Fish Sauces Chins...|20.454545454545453|
| 144069|Instant Noodle Om...| 3.409090909090909|
|2223686|Instant Noodle Ti...| 36.63636363636363|
| 128650|Instant Noodle Ko...| 48.63636363636363|


In [81]:
df_transaction = df_transaction.withColumn('GROSS_SALES', df_transaction.GROSS_SALES.cast('double'))

In [87]:
df_transaction.filter(filterB).groupBy().max('GROSS_SALES').show()

+----------------+
|max(GROSS_SALES)|
+----------------+
|       3097500.0|
+----------------+



In [82]:
df_transaction.filter(filterB).groupBy().min('GROSS_SALES').show()

+----------------+
|min(GROSS_SALES)|
+----------------+
|         54000.0|
+----------------+



In [83]:
df_transaction.groupBy().min('GROSS_SALES').show()

+----------------+
|min(GROSS_SALES)|
+----------------+
|          2140.0|
+----------------+



In [86]:
df_transaction.groupBy().max('GROSS_SALES').show()

+----------------+
|max(GROSS_SALES)|
+----------------+
|          4.92E8|
+----------------+



In [89]:
%%time
group_outlet = df_transaction.groupBy('OUTLET')
group_outlet.count().show()

+-------+-----+
| OUTLET|count|
+-------+-----+
| 253260|   84|
| 227734|   24|
|2012669|    9|
|2032295|   19|
| 161110|   59|
|  43462|    5|
| 266503|   21|
|  98870|   16|
|2174449|   49|
|  90022|  110|
| 180891|   35|
|  62646|   28|
|  23459|    8|
|2212972|  119|
| 164418|   70|
|2094896|   20|
|2081310|   17|
|  48738|   49|
| 234036|   17|
| 121556|   65|
+-------+-----+
only showing top 20 rows

CPU times: user 5.24 ms, sys: 0 ns, total: 5.24 ms
Wall time: 4.57 s


In [91]:
%%time
group_outlet.avg('GROSS_SALES').show()

+-------+------------------+
| OUTLET|  avg(GROSS_SALES)|
+-------+------------------+
| 253260| 617658.3334023809|
| 227734| 410564.5833333333|
|2012669|136411.11111111112|
|2032295| 202434.2105263158|
| 161110| 984762.7118922035|
|  43462|      64200.000008|
| 266503|111723.80953809523|
|  98870|          610500.0|
|2174449|282937.75510693877|
|  90022|2364339.0910436367|
| 180891| 343248.5714285714|
|  62646|      360253.57145|
|  23459|          410687.5|
|2212972| 5236947.846116471|
| 164418|1204714.2858342857|
|2094896|     272880.000005|
|2081310| 203200.0000352941|
|  48738|186831.63265510203|
| 234036|168394.11764705883|
| 121556| 866095.3847212307|
+-------+------------------+
only showing top 20 rows

CPU times: user 3.98 ms, sys: 815 µs, total: 4.8 ms
Wall time: 3.92 s


In [93]:
import pyspark.sql.functions as F

In [94]:
group_outlet.agg(F.stddev('GROSS_SALES')).show()

+-------+------------------------+
| OUTLET|stddev_samp(GROSS_SALES)|
+-------+------------------------+
| 253260|       570644.1032924389|
| 227734|      141416.47594740882|
|2012669|      151899.19720364263|
|2032295|      122523.34930832977|
| 161110|       2238850.679892904|
|  43462|       21273.81017979619|
| 266503|        81935.4008199764|
|  98870|        619127.747722552|
|2174449|       296205.4046101515|
|  90022|       4551694.868388649|
| 180891|       193708.3015795666|
|  62646|       233884.9344902043|
|  23459|       273559.7267639059|
|2212972|       7213991.182220034|
| 164418|       1221865.195495334|
|2094896|       236356.6798386246|
|2081310|       294818.8935450627|
|  48738|      172086.94044550936|
| 234036|       82583.47648431432|
| 121556|       620996.8077937417|
+-------+------------------------+
only showing top 20 rows



In [95]:
spark_csv2 = spark.read.csv(
    getPath('data', 'BI', 'XX_PRODUCT_D.csv'), 
    header=True, 
    sep=',')

In [103]:
spark_csv2.columns

['PRICE_TYPE_CD',
 'PRICING_TEXT',
 'PR_POSTN_ID',
 'VENDR_OU_ID',
 'VENDR_PART_NUM',
 'ROW_ID',
 'ITEM_CODE',
 'ITEM_DESC',
 'INDUSTRY',
 'DIVISION',
 'SUB_DIVISION',
 'CATEGORY',
 'SUB_CATEGORY',
 'INDUSTRY_DESC',
 'DIVISION_DESC',
 'SUB_DIVISION_DESC',
 'CATEGORY_DESC',
 'SUB_CATEGORY_DESC',
 'BRAND',
 'BRANDY',
 'VARIANT',
 'BRAND_DESC',
 'BRANDY_DESC',
 'VARIANT_DESC',
 'PRODUCT_FORM',
 'PACK_TYPE',
 'PACK_SIZE',
 'STANDARD_SKU',
 'PRODUCT_FORM_DESC',
 'PACK_TYPE_DESC',
 'PACK_SIZE_DESC',
 'STANDARD_SKU_DESC',
 'EBS_ITEMID',
 'STATUS',
 'ORDERABLE',
 'PRIMARY_VENDOR_OPERATING_UNIT',
 'UOM1',
 'X_MSC_UOM_2',
 'UOM1_WEIGHT',
 'UOM_2_WEIGHT',
 'UOM2_TO_UOM1_CONV_VAL',
 'MSRP',
 'ROW_WID',
 'EFF_START_DATE',
 'EFF_END_DATE',
 'DIM_ACTIVE_STATUS',
 'ORG_DIV',
 'PR_EQUIV_PROD_ID',
 'X_MSC_PROD_TYPE',
 'ALT_PROD_PRICE',
 'ALT_ITEM_CODE',
 'ALT_ITEM_DESC',
 'ALT_UOM2_TO_UOM1_CONV_VAL',
 'PROD_RETAIL_PRICE',
 'DIVISION_SHORT_DESC',
 'SUB_DIVISION_SHORT_DESC',
 'CATEGORY_SHORT_DESC',
 'SUB_

In [117]:

spark_csv2 = spark_csv2.drop_duplicates(subset=['STANDARD_SKU'])

In [118]:
spark_csv2 = spark_csv2.withColumnRenamed('STANDARD_SKU_DESC', 'STD_SKU')

In [119]:
spark_csv2.select('STANDARD_SKU', 'STD_SKU').show(truncate=False)

+------------+----------------------------------------------+
|STANDARD_SKU|STD_SKU                                       |
+------------+----------------------------------------------+
|853         |Instant Coffee Wake up 3 in 1 Cao Cấp MN 17gr |
|7           |Fish Sauces Nam Ngư Chinsu Nam Ngư 500ml      |
|307         |Instant Coffee Vinacafe 3 in 1 Chất SG 29gr   |
|169         |Sports Drinks Vĩnh Hảo Aktiva Isotonic 500ml  |
|205         |Fish Sauces Chinsu 3 ngon 500ml               |
|334         |Beer Lager Premium Sư Tử Trắng Light 330ml    |
|272         |Sterilized sausages SNF Mini 20gr             |
|15          |Fish Sauces Nam Ngư Siêu tiết kiệm 800ml      |
|232         |Instant Noodle Omachi Chef Cua 85gr           |
|234         |Fish Sauces Nam Ngư Đệ Nhị 4.8L               |
|155         |Mineral Water Vĩnh Hảo Không ga 500ml         |
|862         |Soya Sauces Tam Thái Tử Thượng hạng 300ml     |
|132         |R&G Coffee Vinacafe Mundo 200gr               |
|154    

In [120]:
spark_df1 = spark_csv1.join(spark_csv2, on='STD_SKU', how='inner')

In [109]:
spark_df1.columns

['STD_SKU',
 '_c0',
 'BRANDY',
 'CAL_MONTH',
 'CITY',
 'DATE_KEY',
 'DAY_OF_MONTH',
 'DIVISION',
 'GROSS_SALES',
 'LOCAL_REGION',
 'OUTLET',
 'OUTLET_LATITUDE',
 'OUTLET_LONGITUDE',
 'POSITION_CD',
 'SUBCAT_BRAND',
 'SUB_DIVISION',
 'TOTAL_SPENT_PROMOTION',
 'URBAN_RURAL',
 'PRICE_TYPE_CD',
 'PRICING_TEXT',
 'PR_POSTN_ID',
 'VENDR_OU_ID',
 'VENDR_PART_NUM',
 'ROW_ID',
 'ITEM_CODE',
 'ITEM_DESC',
 'INDUSTRY',
 'DIVISION',
 'SUB_DIVISION',
 'CATEGORY',
 'SUB_CATEGORY',
 'INDUSTRY_DESC',
 'DIVISION_DESC',
 'SUB_DIVISION_DESC',
 'CATEGORY_DESC',
 'SUB_CATEGORY_DESC',
 'BRAND',
 'BRANDY',
 'VARIANT',
 'BRAND_DESC',
 'BRANDY_DESC',
 'VARIANT_DESC',
 'PRODUCT_FORM',
 'PACK_TYPE',
 'PACK_SIZE',
 'STANDARD_SKU',
 'PRODUCT_FORM_DESC',
 'PACK_TYPE_DESC',
 'PACK_SIZE_DESC',
 'EBS_ITEMID',
 'STATUS',
 'ORDERABLE',
 'PRIMARY_VENDOR_OPERATING_UNIT',
 'UOM1',
 'X_MSC_UOM_2',
 'UOM1_WEIGHT',
 'UOM_2_WEIGHT',
 'UOM2_TO_UOM1_CONV_VAL',
 'MSRP',
 'ROW_WID',
 'EFF_START_DATE',
 'EFF_END_DATE',
 'DIM_ACTIVE

In [121]:
spark_df2 = spark_df1.select('CAL_MONTH', 'OUTLET', 'STANDARD_SKU', 'GROSS_SALES')

In [124]:
spark_df2 = spark_df2.withColumn('GROSS_SALES', spark_df2.GROSS_SALES.cast('double'))

In [125]:
spark_df2.show()

+---------+-------+------------+------------+
|CAL_MONTH| OUTLET|STANDARD_SKU| GROSS_SALES|
+---------+-------+------------+------------+
|        9| 112745|          48|    432000.0|
|        9|2100552|         238|    672000.0|
|        9| 352731|          48|    216000.0|
|       10| 167131|         864|     70500.0|
|       11| 295806|          55|    120000.0|
|       11|   6237|         300|    180000.0|
|       11| 141839|         840|    162000.0|
|       10|  49794|          37|    150000.0|
|       10|  88264|         837|     87000.0|
|       11|  88536|         793|     87000.0|
|       11| 171697|          97|   1280000.0|
|        9| 276968|         166|170000.00016|
|        9| 240193|         146|    665000.0|
|        9| 157945|           2|    450000.0|
|       10| 144069|         848|     75000.0|
|       11|2223686|         874|    806000.0|
|       11| 128650|         245|   1070000.0|
|       11|2185025|         874|     62000.0|
|       11| 139397|         153|  

In [145]:
transaction_data1 = spark_df2.groupBy('CAL_MONTH', 'OUTLET', 'STANDARD_SKU').sum('GROSS_SALES')

In [146]:
transaction_data1 = transaction_data1.withColumnRenamed('sum(GROSS_SALES)', 'GROSS_SALES')

In [147]:
transaction_data1.show()

+---------+-------+------------+-----------+
|CAL_MONTH| OUTLET|STANDARD_SKU|GROSS_SALES|
+---------+-------+------------+-----------+
|       11|2298914|         166|    42500.0|
|        9| 223340|          62|1360000.002|
|       11|  30126|         238|   504000.0|
|       11|  62469|         840|   388800.0|
|       10| 109729|          28|   264000.0|
|       10|2196157|         847|    75000.0|
|        9|    545|          17|  1327500.0|
|        9|2280578|          97|   205800.0|
|        9|2149200|          80|   142800.0|
|        9| 266503|         841|   340200.0|
|        9|2048894|          13|   2.5584E7|
|       10| 192633|          13|   492000.0|
|       10|  97277|          62|    68000.0|
|       11| 257927|         300|   900000.0|
|        9|2068897|         340|130000.0002|
|       10| 309661|         155|   891000.0|
|       11| 246331|          62|   340000.0|
|       10|2176841|         321|   690000.0|
|       10| 261786|          14|   531000.0|
|       10

In [148]:
transaction_data2 = transaction_data1.groupBy('OUTLET', 'STANDARD_SKU').avg('GROSS_SALES')

In [149]:
transaction_data2.show()

+-------+------------+------------------+
| OUTLET|STANDARD_SKU|  avg(GROSS_SALES)|
+-------+------------+------------------+
| 112785|          13|          1.2792E7|
|2217921|          62|          816000.0|
| 225803|          17|         2950000.0|
| 333993|          80|         2856000.0|
| 108578|         841|           36450.0|
|2072904|         300|          180000.0|
|2154660|          14|          973500.0|
|  47505|         152|          162000.0|
| 134818|          13|          246000.0|
| 161058|          13|          984000.0|
|2211503|          13|         2952000.0|
|  80942|         146|          665000.0|
|2021492|           2|         1350000.0|
| 243623|         793|         1595000.0|
| 141646|         245|          214000.0|
| 143742|         258|1266666.6668333334|
| 124128|         238|          616000.0|
|  30235|          97|         1477000.0|
| 360077|         166|       42500.00004|
|  58646|         366|185000.00003333334|
+-------+------------+------------

In [152]:
transaction_data2 = transaction_data2.withColumnRenamed('avg(GROSS_SALES)', 'AVG_GROSS_SALES')

In [153]:
transaction_data2 = transaction_data2.withColumn('AVG_GROSS_SALES', transaction_data2.AVG_GROSS_SALES/1000000)

In [157]:
transaction_data2.show()

+-------+------------+-------------------+
| OUTLET|STANDARD_SKU|    AVG_GROSS_SALES|
+-------+------------+-------------------+
| 112785|      112785|             12.792|
|2217921|     2217921|              0.816|
| 225803|      225803|               2.95|
| 333993|      333993|              2.856|
| 108578|      108578|            0.03645|
|2072904|     2072904|               0.18|
|2154660|     2154660|             0.9735|
|  47505|       47505|              0.162|
| 134818|      134818|              0.246|
| 161058|      161058|              0.984|
|2211503|     2211503|              2.952|
|  80942|       80942|              0.665|
|2021492|     2021492|               1.35|
| 243623|      243623|              1.595|
| 141646|      141646|              0.214|
| 143742|      143742| 1.2666666668333335|
| 124128|      124128|              0.616|
|  30235|       30235|              1.477|
| 360077|      360077|      0.04250000004|
|  58646|       58646|0.18500000003333333|
+-------+--

In [156]:
transaction_data2 = transaction_data2.withColumn('OUTLET', transaction_data2.OUTLET.cast('integer'))
transaction_data2 = transaction_data2.withColumn('STANDARD_SKU', transaction_data2.OUTLET.cast('integer'))