In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import sum, month, year, countDistinct, avg, round
import os

In [2]:
## create a SparkSession
SUBMIT_ARGS = "--jars /Users/HAmin/JDBC/mysql-connector-java-8.0.12.jar pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

spark = SparkSession \
    .builder \
    .appName("join_file_with_mysql") \
    .config("spark.shuffle.service.enabled","true") \
    .config("spark.dynamicAllocation.enabled","true") \
    .config("spark.executor.cores","5") \
    .getOrCreate()

In [3]:
## create a DataFrame for the MySQL customer table
df_size = spark.read.format("jdbc").options(
    url ="jdbc:mysql://localhost:3306/ProjectData?useLegacyDatetimeCode=false&serverTimezone=UTC",
    driver="com.mysql.jdbc.Driver",
    dbtable="size",
    user="root",
    password="theAlchemist"
).load()
 
## register the customers data as a temporary table
df_size.registerTempTable("size")
df_size.show()


+------+--------+
|SIZEID|SIZENAME|
+------+--------+
|     1|   50 mL|
|     2|  100 mL|
|     3|  187 mL|
|     4|  200 mL|
|     5|  375 mL|
|     6|  500 mL|
|     7|  750 mL|
|     8|    1 Lt|
|     9|  1.5 Lt|
|    10| 1.75 Lt|
|    11|    3 Lt|
|    12|    4 Lt|
|    13|    5 Lt|
|    14|    7 Oz|
|    15|   12 Oz|
|    16|   16 Oz|
|    17|   20 Oz|
|    18|   32 Oz|
|    19|   64 Oz|
|    20|   22 Oz|
+------+--------+
only showing top 20 rows



In [4]:
df_itemlist = spark.read.format("jdbc").options(
    url ="jdbc:mysql://localhost:3306/ProjectData?useLegacyDatetimeCode=false&serverTimezone=UTC",
    driver="com.mysql.jdbc.Driver",
    dbtable="itemlist",
    user="root",
    password="theAlchemist"
).load()
 
## register the customers data as a temporary table
df_itemlist.registerTempTable("itemlist")
df_itemlist.printSchema()

root
 |-- SKU: integer (nullable = true)
 |-- ITEMNAME: string (nullable = true)
 |-- ITEMTYPE: integer (nullable = true)
 |-- VINTAGE: string (nullable = true)
 |-- PRICEPERUNIT: double (nullable = true)
 |-- TOTALQTY: integer (nullable = true)
 |-- CATID: integer (nullable = true)
 |-- DEPID: integer (nullable = true)
 |-- PACKID: string (nullable = true)
 |-- SIZEID: string (nullable = true)
 |-- NONTAXABLE: integer (nullable = true)
 |-- TAX1: integer (nullable = true)
 |-- NONACTIVE: integer (nullable = true)
 |-- ITEMLEVEL: integer (nullable = true)
 |-- CONTROLSKU: string (nullable = true)
 |-- KEYWORD: string (nullable = true)



In [5]:
pandaitemlist = df_itemlist.toPandas()

In [9]:
df_txninfo = spark.read.format("jdbc").options(
    url ="jdbc:mysql://localhost:3306/ProjectData?useLegacyDatetimeCode=false&serverTimezone=UTC",
    driver="com.mysql.jdbc.Driver",
    dbtable="transaction_info",
    user="root",
    password="theAlchemist"
).load()
 
## register the customers data as a temporary table
df_txninfo.registerTempTable("txninfo")
df_txninfo.printSchema()

root
 |-- TXNID: long (nullable = true)
 |-- TSEQNUM: integer (nullable = true)
 |-- ACCESSLOGNUM: integer (nullable = true)
 |-- CUSTOMERID: string (nullable = true)
 |-- TXNDATE: timestamp (nullable = true)
 |-- TAX1RATE: double (nullable = true)
 |-- TAXABLE1AMT: double (nullable = true)
 |-- SUBTOTAL: double (nullable = true)
 |-- TAX1TOTAL: string (nullable = true)
 |-- SAVINGS: double (nullable = true)
 |-- NETPAYABLE: double (nullable = true)



In [6]:
df_txnitems = spark.read.format("jdbc").options(
    url ="jdbc:mysql://localhost:3306/ProjectData?useLegacyDatetimeCode=false&serverTimezone=UTC",
    driver="com.mysql.jdbc.Driver",
    dbtable="transaction_items",
    user="root",
    password="theAlchemist"
).load()
 
## register the customers data as a temporary table
df_txnitems.registerTempTable("txnitems")
df_txnitems.printSchema()

root
 |-- TXNITEM_ID: integer (nullable = true)
 |-- SKU: integer (nullable = true)
 |-- TXNID: long (nullable = true)
 |-- ITEMPRICE: double (nullable = true)
 |-- SELLPRICE: double (nullable = true)
 |-- QTYBEFORE: string (nullable = true)
 |-- QUANTITY: double (nullable = true)
 |-- COST: double (nullable = true)
 |-- ISTAX1APPLIED: integer (nullable = true)
 |-- LINEAMOUNT: double (nullable = true)
 |-- SIZEID: string (nullable = true)
 |-- ISSTOCKLOG: string (nullable = true)



In [11]:
## join the DataSets
sql='''
SELECT tinfo.TXNDATE, titems.TXNID, titems.SKU, titems.quantity
FROM txninfo tinfo, txnitems titems
WHERE tinfo.TXNID = titems.TXNID
'''
 
apriori_base = spark.sql(sql)
apriori_base.registerTempTable("apriori_base")
apriori_base.show()

+-------------------+----------------+-----+--------+
|            TXNDATE|           TXNID|  SKU|quantity|
+-------------------+----------------+-----+--------+
|2017-06-15 07:40:32|1706151139580606|70820|    10.0|
|2017-07-04 06:56:32|1707041055576502|63621|     1.0|
|2017-07-04 06:56:32|1707041055576502|61242|     1.0|
|2017-07-18 10:24:52|1707181424207806|70766|     1.0|
|2017-07-19 09:12:10|1707191311481106|71265|     1.0|
|2017-07-21 11:43:26|1707211542070806|71280|     1.0|
|2017-09-01 11:35:40|1709011535347306|70400|     1.0|
|2017-10-23 16:50:11|1710232049559706|71263|     1.0|
|2017-11-30 12:11:27|1711301711049302|72878|     1.0|
|2017-11-30 12:11:27|1711301711049302|56334|     1.0|
|2017-12-01 10:24:40|1712011524107202|73081|     1.0|
|2017-12-14 13:08:31|1712141806573502|58357|     1.0|
|2017-12-14 13:08:31|1712141806573502|71739|     1.0|
|2017-12-14 13:08:31|1712141806573502|71739|     1.0|
|2017-12-14 13:08:31|1712141806573502|74740|     1.0|
|2017-12-14 13:08:31|1712141

In [8]:
df_txnitems

DataFrame[TXNITEM_ID: int, SKU: int, TXNID: bigint, ITEMPRICE: double, SELLPRICE: double, QTYBEFORE: string, QUANTITY: double, COST: double, ISTAX1APPLIED: int, LINEAMOUNT: double, SIZEID: string, ISSTOCKLOG: string]

In [12]:
apriori_base.sort("TXNDATE").show(15)

+-------------------+------------------+-----+--------+
|            TXNDATE|             TXNID|  SKU|quantity|
+-------------------+------------------+-----+--------+
|2017-04-03 13:20:05|170403171807404001|71442|     1.0|
|2017-04-03 13:20:05|170403171807404001|71441|     1.0|
|2017-04-03 13:20:44|170403172012768701|71442|     1.0|
|2017-04-04 04:49:42|170404094858499806|71593|     1.0|
|2017-04-04 04:49:42|170404094858499806|71406|     1.0|
|2017-04-04 04:49:42|170404094858499806|69455|     1.0|
|2017-04-04 09:54:39|170404145408367701|61970|     1.0|
|2017-04-04 10:11:39|170404151120372802|61970|     1.0|
|2017-04-04 10:45:57|170404154553355001|61970|     1.0|
|2017-04-04 11:26:28|170404162621973802|61970|     1.0|
|2017-04-04 11:30:09|170404163002826702|68908|     1.0|
|2017-04-04 11:31:27|170404163110830602|69846|     1.0|
|2017-04-04 11:33:39|170404163332495102|61970|     1.0|
|2017-04-04 11:54:49|170404165409730602|70262|     1.0|
|2017-04-04 11:54:49|170404165409730602|66179|  

In [22]:
apriori_sum = apriori_base.groupBy("TXNDATE", "TXNID", "SKU").agg(sum("quantity")).sort("TXNDATE")

In [23]:
apriori_sum.show(30)

+-------------------+------------------+-----+-------------+
|            TXNDATE|             TXNID|  SKU|sum(quantity)|
+-------------------+------------------+-----+-------------+
|2017-04-03 13:20:05|170403171807404001|71441|          1.0|
|2017-04-03 13:20:05|170403171807404001|71442|          1.0|
|2017-04-03 13:20:44|170403172012768701|71442|          1.0|
|2017-04-04 04:49:42|170404094858499806|71593|          1.0|
|2017-04-04 04:49:42|170404094858499806|69455|          1.0|
|2017-04-04 04:49:42|170404094858499806|71406|          1.0|
|2017-04-04 09:54:39|170404145408367701|61970|          1.0|
|2017-04-04 10:11:39|170404151120372802|61970|          1.0|
|2017-04-04 10:45:57|170404154553355001|61970|          1.0|
|2017-04-04 11:26:28|170404162621973802|61970|          1.0|
|2017-04-04 11:30:09|170404163002826702|68908|          1.0|
|2017-04-04 11:31:27|170404163110830602|69846|          1.0|
|2017-04-04 11:33:39|170404163332495102|61970|          1.0|
|2017-04-04 11:54:49|170

In [27]:
apriori_jan = apriori_sum.filter(month("TXNDATE") == 1)
apriori_feb = apriori_sum.filter(month("TXNDATE") == 2)
apriori_mar = apriori_sum.filter(month("TXNDATE") == 3)
apriori_apr = apriori_sum.filter(month("TXNDATE") == 4)
apriori_may = apriori_sum.filter(month("TXNDATE") == 5)
apriori_jun = apriori_sum.filter(month("TXNDATE") == 6)
apriori_jul = apriori_sum.filter(month("TXNDATE") == 7)
apriori_aug = apriori_sum.filter(month("TXNDATE") == 8)
apriori_sep = apriori_sum.filter(month("TXNDATE") == 9)
apriori_oct = apriori_sum.filter(month("TXNDATE") == 10)
apriori_nov = apriori_sum.filter(month("TXNDATE") == 11)
apriori_dec = apriori_sum.filter(month("TXNDATE") == 12)


In [34]:
apriori_apr.groupby(year("TXNDATE")).agg(countDistinct("SKU")).show()

+-------------+-------------------+
|year(TXNDATE)|count(DISTINCT SKU)|
+-------------+-------------------+
|         2018|               5871|
|         2017|                913|
+-------------+-------------------+



In [35]:
apriori_may.count()

83439

In [None]:
# sql='''
# SELECT ab.TXNID, ab.SKU, sum(ab.quantity)
# FROM apriori_base ab
# WHERE ab.TXNDATE BETWEEN '' AND ''
# '''
 
# clean_output = spark.sql(sql)
# clean_output.printSchema()

In [None]:
# ## join the DataSets
# sql='''
# SELECT a.first_name, a.last_name, b.order_number, b.total
# FROM customers a, orders b
# WHERE a.customer_number = b.customer_number
# '''
 
# output = spark.sql(sql)
 
# ## save the data into an ORC file
# output.write.format("orc").save("/tmp/customer_orders")

In [36]:
df_txnitems.show(5)

+----------+-----+------------------+---------+---------+---------+--------+-----+-------------+----------+------+----------+
|TXNITEM_ID|  SKU|             TXNID|ITEMPRICE|SELLPRICE|QTYBEFORE|QUANTITY| COST|ISTAX1APPLIED|LINEAMOUNT|SIZEID|ISSTOCKLOG|
+----------+-----+------------------+---------+---------+---------+--------+-----+-------------+----------+------+----------+
|         1|71441|170403171807404001|    32.99|    27.99|      .00|     1.0|27.25|            1|     27.99|    15|          |
|         2|71442|170403171807404001|    16.99|    13.99|      .00|     1.0|13.63|            1|     13.99|    15|          |
|         3|71442|170403172012768701|    16.99|    13.99|      .00|     1.0|13.63|            1|     13.99|    15|          |
|         4|71593|170404094858499806|     1.17|     1.17|      .00|     1.0|  .72|            1|      1.17|      |          |
|         5|71406|170404094858499806|     2.49|     2.49|      .00|     1.0| 1.10|            1|      2.49|      |    

In [54]:
ClassFilter = df_txnitems.groupby("SKU").agg(round(avg("COST"),3).alias("Cost"),round(((avg("SELLPRICE")-avg("COST"))/avg("SELLPRICE")),3).alias("Margin"),sum("QUANTITY").alias("Quantity"))
ClassFilter.show(15)

+-----+-----+------+--------+
|  SKU| Cost|Margin|Quantity|
+-----+-----+------+--------+
|64423| 8.82| 0.096|   913.0|
|58305| 19.5| 0.239|     8.0|
|64628| 6.53| 0.266|    56.0|
|69352| 7.87| 0.183|   138.0|
|71527| 9.59| 0.197|    23.0|
|60769|19.25|  0.23|     5.0|
|66010| 3.01| 0.298|    71.0|
|69478|7.004| 0.269|   207.0|
|66166|9.182| 0.077|    53.0|
|69637|6.047| 0.327|    32.0|
|69454|17.09|  0.05|    34.0|
|67089| 9.33| 0.282|     6.0|
|64822| 5.33| 0.198|     3.0|
|64121| 2.25| 0.355|    79.0|
|70097|19.09| 0.045|    38.0|
+-----+-----+------+--------+
only showing top 15 rows



In [53]:
df_txnitems.agg(((avg("SELLPRICE")-avg("COST"))/avg("SELLPRICE"))).show()

+-----------------------------------------------+
|((avg(SELLPRICE) - avg(COST)) / avg(SELLPRICE))|
+-----------------------------------------------+
|                            0.13216022853298534|
+-----------------------------------------------+

