#### DataFrames are built on top of RDD's and provide database type functionality
#### We will use some sample data from Wegmans to explore DataFrames
#### This notebook creates DataFrames from TextFiles 

In [1]:
sc

<pyspark.context.SparkContext at 0x2b1a5d518f10>

In [2]:
from pyspark.sql import Row
from datetime import datetime

# Define functions to parse txt files containing itmes, stores, customers, and transactions
def parseStore(s):
    l=s.split('|')
    return Row(store_num=int(l[0]), 
               store_name=l[1],               
               store_zone=l[2],
               store_city=l[3], 
               store_state=l[4], 
               store_type=int(l[5]))

def parseItem(s):
    l=s.split('|')
    return Row(item_number=int(l[0]), 
               dept_categ_class=l[1],               
               item_des=l[2],
               item_unt_qty=float(l[3]), 
               size_unit_desc=l[4], 
               brand_code=l[5], 
               dept_num=int(l[6]), 
               dept_name=l[7], 
               categ_num=int(l[8]), 
               categ_name=l[9], 
               class_num=int(l[10]),
               class_name=l[11])

def parseCustomer(s):
    l=s.split('|')
    return Row(hshld_acct=int(l[0]),
               birth_yr_head_hh=l[1],
               hh_income=l[2],
               hh_size=l[3],
               adult_count=l[4],
               child_count=l[5],
               birth_yr_oldest=l[6],
               birth_yr_youngest=l[7],
               bad_address=l[8],
               privacy=l[9],
               application_date=datetime.strptime(l[10],'%Y-%m-%d'),
               wine_email_sent=int(l[11]),
               wine_email_open=int(l[12]),
               wine_email_click=int(l[13]))

def parsePostrans(s):
    l=s.split('|')
    return Row(hshld_acct=int(l[0]),
               acct_num=int(l[1]),
               trans_num=int(l[2]),
               trans_date=datetime.strptime(l[3],'%Y-%m-%d'),
               store_num=int(l[4]),
               item_number=int(l[5]),
               dept_categ_class=l[6],
               unit_count=int(l[7]),
               net_sales=float(l[8]),
               gross_sales=float(l[9]),
               manuf_coupon=float(l[10]))

In [3]:
path='/public/tbiswas2/csc261/spark/wegmans/'

storeDF=spark.createDataFrame(sc.textFile(path+'wegmans_store_master.txt').map(lambda x: parseStore(x)))
customerDF=spark.createDataFrame(sc.textFile(path+'wegmans_customer_master.txt').map(lambda x: parseCustomer(x)))
itemDF=spark.createDataFrame(sc.textFile(path+'wegmans_item_master.txt').map(lambda x: parseItem(x)))
postransDF=spark.createDataFrame(sc.textFile(path+'partial_transaction.dat').map(lambda x: parsePostrans(x)))

In [4]:
storeDF.show(5)
customerDF.show(5)
itemDF.show(5)
postransDF.show(5)

+-------------+--------------------+---------+-----------+----------+----------+
|   store_city|          store_name|store_num|store_state|store_type|store_zone|
+-------------+--------------------+---------+-----------+----------+----------+
|        DEPEW|   WEGMANS DICK ROAD|       80|         NY|         1|   BUFFALO|
|      AMHERST|WEGMANS ALBERTA D...|       82|         NY|         1|   BUFFALO|
|WILLIAMSVILLE|WEGMANS SHERIDAN ...|       83|         NY|         1|   BUFFALO|
|      BUFFALO|    WEGMANS MCKINLEY|       84|         NY|         1|   BUFFALO|
|      AMHERST|WEGMANS NIAGARA F...|       86|         NY|         1|   BUFFALO|
+-------------+--------------------+---------+-----------+----------+----------+
only showing top 5 rows

+-----------+--------------------+-----------+----------------+---------------+-----------------+-----------+---------+-------+----------+-------+----------------+---------------+---------------+
|adult_count|    application_date|bad_address|birt

# Query 4

In [5]:
postransDD = postransDF.select("trans_date").rdd
maxdate = postransDD.max()[0]
mindate = postransDD.min()[0]
print(maxdate)
print(mindate)

2014-04-26 00:00:00
2013-04-28 00:00:00


# Query 8

In [6]:
import pyspark.sql.functions as F


milktableDF = itemDF.filter(itemDF['class_name'] == 'WHOLE MILK')
milkfilterDF = milktableDF.select('class_name', 'item_number')
milkjoinDF = postransDF.join(milkfilterDF, milkfilterDF.item_number == postransDF.item_number)
milkhouse = milkjoinDF.select('hshld_acct','unit_count')
milksumDF = milkhouse.groupBy('hshld_acct').agg(F.sum('unit_count').alias('Units_sold'))
maxmilkDD = milksumDF.select('Units_sold').rdd
maxmilkval = maxmilkDD.max()[0]
#print(maxmilkval)
milksumordered = milksumDF.orderBy(milksumDF.Units_sold.desc())
maxhousemilkDF = milksumDF.filter(milksumDF['Units_sold'] == maxmilkval)
maxmilkhshldDF = maxhousemilkDF.select('hshld_acct')
maxmilkhshldnum = maxmilkhshldDF.rdd.max()[0]
milkbuyertuple = customerDF.filter(customerDF['hshld_acct'] == maxmilkhshldnum).show()



+-----------+--------------------+-----------+----------------+---------------+-----------------+-----------+---------+-------+----------+-------+----------------+---------------+---------------+
|adult_count|    application_date|bad_address|birth_yr_head_hh|birth_yr_oldest|birth_yr_youngest|child_count|hh_income|hh_size|hshld_acct|privacy|wine_email_click|wine_email_open|wine_email_sent|
+-----------+--------------------+-----------+----------------+---------------+-----------------+-----------+---------+-------+----------+-------+----------------+---------------+---------------+
|          6|1982-10-19 00:00:...|          N|            1983|              0|                0|          0|    87500|      6|    301443|      N|               0|              0|              0|
+-----------+--------------------+-----------+----------------+---------------+-----------------+-----------+---------+-------+----------+-------+----------------+---------------+---------------+



# Query 12

In [7]:
##Find the top three (3) stores with the highest number of customers. Display the name of the stores and the
##number of customers.

from pyspark.sql import functions as F

distinctposDF = postransDF.select('hshld_acct', 'store_num').distinct()
custcountbystoreDF = distinctposDF.groupBy('store_num').count().sort('count', ascending=False)
#custcountbystoreDF.show()
storecustDF = storeDF.join(custcountbystoreDF, storeDF.store_num == custcountbystoreDF.store_num).select('store_name', 'count')
storecustDF.sort('count', ascending=False).show(3)

+--------------------+-----+
|          store_name|count|
+--------------------+-----+
|CENTURY LIQUOR AN...| 1001|
|   WEGMANS PITTSFORD|  727|
| WEGMANS EAST AVENUE|  560|
+--------------------+-----+
only showing top 3 rows

