In [1]:
sc

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
spark

In [4]:
# Define functions to parse txt files containing items, stores, customers, and transactions
from pyspark.sql import Row
from datetime import datetime
from pyspark.sql import functions as F
def parseStore(s):
    l=s.split('|')
    return Row(store_num=int(l[0]), 
               store_name=l[1],               
               store_zone=l[2],
               store_city=l[3], 
               store_state=l[4], 
               store_type=int(l[5]))
def parseItem(s):
    l=s.split('|')
    return Row(item_number=int(l[0]), 
               dept_categ_class=l[1],               
               item_des=l[2],
               item_unt_qty=float(l[3]), 
               size_unit_desc=l[4], 
               brand_code=l[5], 
               dept_num=int(l[6]), 
               dept_name=l[7], 
               categ_num=int(l[8]), 
               categ_name=l[9], 
               class_num=int(l[10]),
               class_name=l[11])
def parseCustomer(s):
    l=s.split('|')
    return Row(hshld_acct=int(l[0]),
               birth_yr_head_hh=l[1],
               hh_income=l[2],
               hh_size=l[3],
               adult_count=l[4],
               child_count=l[5],
               birth_yr_oldest=l[6],
               birth_yr_youngest=l[7],
               bad_address=l[8],
               privacy=l[9],
               application_date=datetime.strptime(l[10],'%Y-%m-%d'),
               wine_email_sent=int(l[11]),
               wine_email_open=int(l[12]),
               wine_email_click=int(l[13]))
def parsePostrans(s):
    l=s.split('|')
    return Row(hshld_acct=int(l[0]),
               acct_num=int(l[1]),
               trans_num=int(l[2]),
               trans_date=datetime.strptime(l[3],'%Y-%m-%d'),
               store_num=int(l[4]),
               item_number=int(l[5]),
               dept_categ_class=l[6],
               unit_count=int(l[7]),
               net_sales=float(l[8]),
               gross_sales=float(l[9]),
               manuf_coupon=float(l[10]))




In [5]:
path='/public/tbiswas2/csc261/spark/wegmans/'

storeDF=spark.createDataFrame(sc.textFile(path+'wegmans_store_master.txt').map(lambda x: parseStore(x)))
customerDF=spark.createDataFrame(sc.textFile(path+'wegmans_customer_master.txt').map(lambda x: parseCustomer(x)))
itemDF=spark.createDataFrame(sc.textFile(path+'wegmans_item_master.txt').map(lambda x: parseItem(x)))
postransDF=spark.createDataFrame(sc.textFile(path+'partial_transaction.dat').map(lambda x: parsePostrans(x)))

# Q1

In [10]:
storeDF.count()

94

# Q2

In [11]:
itemDF[['dept_name']].drop_duplicates().count()

46

# Q3

In [12]:
storeDF.filter(storeDF.store_state == 'NY').count()

50

# Q4

In [13]:
postransDF.select([F.min('trans_date'), F.max('trans_date')]).show()

+-------------------+-------------------+
|    min(trans_date)|    max(trans_date)|
+-------------------+-------------------+
|2013-04-28 00:00:00|2014-04-26 00:00:00|
+-------------------+-------------------+



# Q5

In [14]:
postransDF.join(storeDF, 'store_num').filter(storeDF.store_name == 'WEGMANS MARKETPLACE').count()

22613

# Q6

In [15]:
postransDF.groupBy(postransDF.trans_num).agg(F.sum('unit_count')).agg(F.mean('sum(unit_count)')).show()

+--------------------+
|avg(sum(unit_count))|
+--------------------+
|  14.368592701342282|
+--------------------+



# Q7

In [16]:
itemDF.filter(itemDF['class_name']=='WHOLE MILK').count()

7

# Q8

In [23]:
itemDF.filter(itemDF.class_name =='WHOLE MILK')\
    .join(postransDF, 'item_number')\
    .groupBy('hshld_acct')\
    .agg(F.sum('unit_count'))\
    .join(customerDF, 'hshld_acct')\
    .sort('sum(unit_count)',ascending=False)\
    .select('hshld_acct',
               'birth_yr_head_hh',
               'hh_income',
               'hh_size',
               'adult_count',
               'child_count',
               'birth_yr_oldest',
               'birth_yr_youngest',
               'bad_address',
               'privacy',
               'application_date',
               'wine_email_sent',
               'wine_email_open',
               'wine_email_click')\
    .show(1)

+----------+----------------+---------+-------+-----------+-----------+---------------+-----------------+-----------+-------+-------------------+---------------+---------------+----------------+
|hshld_acct|birth_yr_head_hh|hh_income|hh_size|adult_count|child_count|birth_yr_oldest|birth_yr_youngest|bad_address|privacy|   application_date|wine_email_sent|wine_email_open|wine_email_click|
+----------+----------------+---------+-------+-----------+-----------+---------------+-----------------+-----------+-------+-------------------+---------------+---------------+----------------+
|    301443|            1983|    87500|      6|          6|          0|              0|                0|          N|      N|1982-10-19 00:00:00|              0|              0|               0|
+----------+----------------+---------+-------+-----------+-----------+---------------+-----------------+-----------+-------+-------------------+---------------+---------------+----------------+
only showing top 1 row



# Q9

In [104]:
itemDF.filter(itemDF.size_unit_desc =='LB').sort('item_unt_qty',ascending=False).show(1)

+----------+----------+---------+----------+---------+----------------+---------+--------+--------------------+-----------+------------+--------------+
|brand_code|categ_name|categ_num|class_name|class_num|dept_categ_class|dept_name|dept_num|            item_des|item_number|item_unt_qty|size_unit_desc|
+----------+----------+---------+----------+---------+----------------+---------+--------+--------------------+-----------+------------+--------------+
|      MORT|  SEASONAL|       83| ROCK SALT|       40|          048340|  GROCERY|       4|MORTON SAFE-T-SAL...|     158570|        50.0|            LB|
+----------+----------+---------+----------+---------+----------------+---------+--------+--------------------+-----------+------------+--------------+
only showing top 1 row



In [19]:
a = itemDF.filter(itemDF.size_unit_desc =='LB').sort('item_unt_qty',ascending=False)

In [20]:
a.filter(a.item_unt_qty == 50).show()

+----------+----------+---------+------------+---------+----------------+---------+--------+--------------------+-----------+------------+--------------+
|brand_code|categ_name|categ_num|  class_name|class_num|dept_categ_class|dept_name|dept_num|            item_des|item_number|item_unt_qty|size_unit_desc|
+----------+----------+---------+------------+---------+----------------+---------+--------+--------------------+-----------+------------+--------------+
|      PURI|  DOG FOOD|       67|DRY DOG FOOD|       40|          046740|  GROCERY|       4|PURINA DOG CHOW B...|     559137|        50.0|            LB|
|      MORT|  SEASONAL|       83|   ROCK SALT|       40|          048340|  GROCERY|       4|MORTON SAFE-T-SAL...|     158570|        50.0|            LB|
|      DCRS|  SEASONAL|       83|   ROCK SALT|       40|          048340|  GROCERY|       4|DC EARLY MELT ANT...|     536983|        50.0|            LB|
|      DCRS|  SEASONAL|       83|   ROCK SALT|       40|          048340|  G

# Q10

In [21]:
postransDF.groupBy(postransDF.trans_num).agg(F.sum('unit_count')).sort('sum(unit_count)',ascending = False).show(3)

+----------+---------------+
| trans_num|sum(unit_count)|
+----------+---------------+
|2140152751|            160|
|  62935858|            154|
|2135179433|            137|
+----------+---------------+
only showing top 3 rows



# Q11

In [37]:
a = storeDF.join(postransDF,'store_num').select('hshld_acct', 'store_num','store_name')\
.distinct().join(customerDF,'hshld_acct')

In [38]:
a.groupBy(storeDF.store_name).agg(F.mean('hh_size')).show()

+--------------------+------------------+
|          store_name|      avg(hh_size)|
+--------------------+------------------+
|    WEGMANS PERINTON|2.8173374613003097|
|   WEGMANS HOLT ROAD| 3.304109589041096|
|     WEGMANS GENESEO| 3.130434782608696|
|   WEGMANS FAIRMOUNT|               3.0|
|WEGMANS JOHNSON CITY|3.0714285714285716|
|    WEGMANS ONONDAGA|               4.0|
|      WEGMANS AUBURN|2.8181818181818183|
|WEGMANS GREAT NOR...|2.2222222222222223|
|    WEGMANS PENFIELD|3.2285714285714286|
|    WEGMANS MT. READ| 3.059322033898305|
|  WEGMANS AMHERST ST|3.6666666666666665|
| WEGMANS IRONDEQUOIT| 3.088235294117647|
| WEGMANS TRANSIT RD.| 4.444444444444445|
|     WEGMANS HORNELL|2.3333333333333335|
|      WEGMANS ELMIRA|               2.0|
|         LIQUOR CITY|               3.5|
| WEGMANS EAST AVENUE| 2.780357142857143|
| WEGMANS CANANDAIGUA|3.0974025974025974|
|      WEGMANS DEWITT|2.9411764705882355|
|WEGMANS MILITARY ...| 3.857142857142857|
+--------------------+------------

# Q12

In [47]:
storeDF.join(postransDF,'store_num').select('hshld_acct', 'store_num','store_name')\
.distinct().join(customerDF,'hshld_acct').groupBy(storeDF.store_name).count().sort('count',ascending=False).show(3)

+--------------------+-----+
|          store_name|count|
+--------------------+-----+
|CENTURY LIQUOR AN...| 1001|
|   WEGMANS PITTSFORD|  727|
| WEGMANS EAST AVENUE|  560|
+--------------------+-----+
only showing top 3 rows



# Q13

In [65]:
a = postransDF.join(storeDF,'store_num').groupBy(postransDF.store_num,postransDF.trans_date).agg(F.sum('net_sales'))

In [71]:
b = a.groupBy('store_num').agg(F.max('sum(net_sales)'))

In [72]:
b.join(storeDF,'store_num').select('store_name','max(sum(net_sales))')\
.sort('max(sum(net_sales))',ascending = False).show(3)

+-----------------+-------------------+
|       store_name|max(sum(net_sales))|
+-----------------+-------------------+
|WEGMANS PITTSFORD|  7624.229999999884|
|WEGMANS HOLT ROAD|  4240.679999999956|
| WEGMANS PERINTON|  4144.779999999963|
+-----------------+-------------------+
only showing top 3 rows



# Q14

In [80]:
postransDF.groupBy('trans_num').agg(F.sum('net_sales')).sort('sum(net_sales)',ascending = False).show(3)

+---------+------------------+
|trans_num|    sum(net_sales)|
+---------+------------------+
| 23315450|           1740.08|
| 92756442|           1373.76|
|133258672|1202.9399999999998|
+---------+------------------+
only showing top 3 rows



In [97]:
rdd = sc.textFile(path+'partial_transaction.dat')

In [98]:
tran_item = rdd.map(lambda x: (int(x[2]), int(x[5])))

In [99]:
trans = set(['23315450','92756442','133258672'])

In [100]:
result = tran_item.filter(lambda x: x[0] in trans)

In [101]:
a = result.groupByKey().mapValues(list)

# Below this section are not answers for the project
# Try

In [73]:
a = storeDF.join(postransDF,'store_num').distinct().select('hshld_acct', 'store_num','store_name')\
.join(customerDF,'hshld_acct')

In [74]:
a.groupBy(storeDF.store_name).agg(F.mean('hh_size')).show()

+--------------------+------------------+
|          store_name|      avg(hh_size)|
+--------------------+------------------+
|    WEGMANS PERINTON|3.0208901961823496|
|   WEGMANS HOLT ROAD|3.8538420913651965|
|     WEGMANS GENESEO|3.2712167232445952|
|   WEGMANS FAIRMOUNT|3.0344827586206895|
|WEGMANS JOHNSON CITY| 2.310344827586207|
|    WEGMANS ONONDAGA|               4.0|
|      WEGMANS AUBURN| 3.235294117647059|
|WEGMANS GREAT NOR...|2.0063291139240507|
|    WEGMANS PENFIELD| 4.068088933564352|
|    WEGMANS MT. READ|3.0286485744359526|
|  WEGMANS AMHERST ST| 5.173878205128205|
| WEGMANS IRONDEQUOIT| 2.720486056029696|
| WEGMANS TRANSIT RD.| 4.270557029177719|
|     WEGMANS HORNELL| 1.810483870967742|
|      WEGMANS ELMIRA|0.5217391304347826|
|         LIQUOR CITY|2.6666666666666665|
| WEGMANS EAST AVENUE|2.8224429817560397|
| WEGMANS CANANDAIGUA|3.0959485530546624|
|      WEGMANS DEWITT| 3.913114754098361|
|WEGMANS MILITARY ...| 3.650485436893204|
+--------------------+------------