#### Load Spark

In [1]:
import pyspark
pyspark.__file__

'/opt/spark-3.2.1-bin-hadoop3.2/python/pyspark/__init__.py'

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('SSStores_ETL') \
    .getOrCreate()

22/11/19 07:05:22 WARN Utils: Your hostname, vvioo resolves to a loopback address: 127.0.1.1; using 192.168.100.9 instead (on interface wlp2s0)
22/11/19 07:05:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/19 07:05:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Load Data

In [4]:
SSExcelData = spark.read \
        .option("header", "true") \
        .csv('../data/SSExcelSource.csv')

In [5]:
SSExcelData.printSchema()

root
 |-- SalesUnits: string (nullable = true)
 |-- SalesDollar: string (nullable = true)
 |-- SalesCost: string (nullable = true)
 |-- CustID: string (nullable = true)
 |-- StoreID: string (nullable = true)
 |-- ItemID: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Month : string (nullable = true)
 |-- Year: string (nullable = true)



In [6]:
SSExcelData.show(5)

+----------+-----------+---------+--------+--------+--------+---+------+------+
|SalesUnits|SalesDollar|SalesCost|  CustID| StoreID|  ItemID|Day|Month |  Year|
+----------+-----------+---------+--------+--------+--------+---+------+------+
|     111.0|     1111.0|   1111.0|C0954327|S1010398|I0036577|1.0|   2.0|2014.0|
|     222.0|     2222.0|   2222.0|C8654390|S9432910|I0036566|3.0|   7.0|2017.0|
|     333.0|     3333.0|   3333.0|C9128574|S0954327|I0036566|1.0|   5.0|2017.0|
|     444.0|     4444.0|   4444.0|C9403348|S0954327|I0036577|1.0|   2.0|2017.0|
|     101.0|     1001.0|   1001.0|C0954327|S9432910|I0036577|3.0|   7.0|2017.0|
+----------+-----------+---------+--------+--------+--------+---+------+------+
only showing top 5 rows



In [7]:
SSExcelData = SSExcelData.withColumnRenamed("Month ", "Month")

In [8]:
SSExcelData = SSExcelData.withColumn("SalesUnits",SSExcelData.SalesUnits.cast('int')) \
                         .withColumn("SalesDollar",SSExcelData.SalesDollar.cast('int')) \
                         .withColumn("SalesCost",SSExcelData.SalesCost.cast('int')) \
                         .withColumn("Day",SSExcelData.Day.cast('int')) \
                         .withColumn("Month",SSExcelData.Month.cast('int')) \
                         .withColumn("Year",SSExcelData.Year.cast('int'))

In [9]:
SSExcelData.printSchema()

root
 |-- SalesUnits: integer (nullable = true)
 |-- SalesDollar: integer (nullable = true)
 |-- SalesCost: integer (nullable = true)
 |-- CustID: string (nullable = true)
 |-- StoreID: string (nullable = true)
 |-- ItemID: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)



In [10]:
SSExcelData.show(5)

+----------+-----------+---------+--------+--------+--------+---+-----+----+
|SalesUnits|SalesDollar|SalesCost|  CustID| StoreID|  ItemID|Day|Month|Year|
+----------+-----------+---------+--------+--------+--------+---+-----+----+
|       111|       1111|     1111|C0954327|S1010398|I0036577|  1|    2|2014|
|       222|       2222|     2222|C8654390|S9432910|I0036566|  3|    7|2017|
|       333|       3333|     3333|C9128574|S0954327|I0036566|  1|    5|2017|
|       444|       4444|     4444|C9403348|S0954327|I0036577|  1|    2|2017|
|       101|       1001|     1001|C0954327|S9432910|I0036577|  3|    7|2017|
+----------+-----------+---------+--------+--------+--------+---+-----+----+
only showing top 5 rows



In [11]:
# Filter Rows Step
SSExcelData = SSExcelData.na.drop("any")

In [12]:
# Sort Rows Step
SSExcelData = SSExcelData.sort("Year","Month","Day")
SSExcelData.show(5)

+----------+-----------+---------+--------+--------+--------+---+-----+----+
|SalesUnits|SalesDollar|SalesCost|  CustID| StoreID|  ItemID|Day|Month|Year|
+----------+-----------+---------+--------+--------+--------+---+-----+----+
|       303|       3003|     3003|C9128574|S0954327|I0036566|  1|    2|2014|
|       111|       1111|     1111|C0954327|S1010398|I0036577|  1|    2|2014|
|       121|       4224|     4224|C0954327|S0954327|I0036566|  1|    5|2015|
|       444|       4444|     4444|C9403348|S0954327|I0036577|  1|    2|2017|
|       333|       3333|     3333|C9128574|S0954327|I0036566|  1|    5|2017|
+----------+-----------+---------+--------+--------+--------+---+-----+----+
only showing top 5 rows



#### Load Postgres Data

##### Time Dimension

In [13]:
# JDBC connect: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
# JDBC URI: https://stackoverflow.com/a/3582627
# SSTimeDim Step
SSTimeDim = spark.read \
                 .format("jdbc") \
                 .option("url", "jdbc:postgresql://localhost:5432/coursera_dwh") \
                 .option("dbtable", "ssstores.sstimedim") \
                 .option("user", "postgres") \
                 .option("password", "postgres") \
                 .load()

In [14]:
SSTimeDim.printSchema()

root
 |-- timeno: integer (nullable = true)
 |-- timeday: integer (nullable = true)
 |-- timemonth: integer (nullable = true)
 |-- timequarter: integer (nullable = true)
 |-- timeyear: integer (nullable = true)
 |-- timedayofweek: integer (nullable = true)
 |-- timefiscalyear: integer (nullable = true)



In [15]:
# Sort Rows 2
SSTimeDim = SSTimeDim.sort("timeyear","timemonth","timeday")

In [16]:
SSTimeDim.show(5)

+------+-------+---------+-----------+--------+-------------+--------------+
|timeno|timeday|timemonth|timequarter|timeyear|timedayofweek|timefiscalyear|
+------+-------+---------+-----------+--------+-------------+--------------+
|     1|      1|        2|          1|    2014|            2|          2014|
|     2|      1|        5|          2|    2014|            4|          2014|
|     3|      3|        7|          3|    2014|            3|          2014|
|     4|      4|       10|          4|    2014|            2|          2014|
|     5|      1|        2|          1|    2015|            2|          2015|
+------+-------+---------+-----------+--------+-------------+--------------+
only showing top 5 rows



#### Join

In [17]:
joined_tmp = SSExcelData.join(SSTimeDim,
                              (SSExcelData.Day == SSTimeDim.timeday) &
                              (SSExcelData.Month == SSTimeDim.timemonth) &
                              (SSExcelData.Year == SSTimeDim.timeyear),
                              'inner'
                             )

In [18]:
print((joined_tmp.count(), len(joined_tmp.columns)))

(9, 16)


In [19]:
joined_tmp.printSchema()

root
 |-- SalesUnits: integer (nullable = true)
 |-- SalesDollar: integer (nullable = true)
 |-- SalesCost: integer (nullable = true)
 |-- CustID: string (nullable = true)
 |-- StoreID: string (nullable = true)
 |-- ItemID: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- timeno: integer (nullable = true)
 |-- timeday: integer (nullable = true)
 |-- timemonth: integer (nullable = true)
 |-- timequarter: integer (nullable = true)
 |-- timeyear: integer (nullable = true)
 |-- timedayofweek: integer (nullable = true)
 |-- timefiscalyear: integer (nullable = true)



##### Item Dimension

In [20]:
SSItem = (spark.read
          .format("jdbc")
          .option("url", "jdbc:postgresql://localhost:5432/coursera_dwh")
          .option("dbtable", "ssstores.ssitem")
          .option("user", "postgres")
          .option("password", "postgres")
          .load()
         )

In [21]:
SSItem.printSchema()

root
 |-- itemid: string (nullable = true)
 |-- itemname: string (nullable = true)
 |-- itembrand: string (nullable = true)
 |-- itemcategory: string (nullable = true)
 |-- itemunitprice: decimal(12,2) (nullable = true)



In [22]:
SSItem.show(5)

+--------+--------------------+--------------+--------------------+-------------+
|  itemid|            itemname|     itembrand|        itemcategory|itemunitprice|
+--------+--------------------+--------------+--------------------+-------------+
|I0036566|17 inch Color Mon...|ColorMeg, Inc.|         Electronics|       169.00|
|I0036577|19 inch Color Mon...|ColorMeg, Inc.|         Electronics|       319.00|
|I1114590|R3000 Color Laser...|        Connex|            Printing|       699.00|
|I1412138|10 Foot Printer C...|       Ethlite|Computer Accessories|        12.00|
|I1445671|8-Outlet Surge Pr...|     Intersafe|Computer Accessories|        14.99|
+--------+--------------------+--------------+--------------------+-------------+
only showing top 5 rows



##### Join SSItem

In [23]:
joined_tmp = joined_tmp.sort("ItemID")

In [24]:
SSItem = SSItem.sort("itemid")

In [25]:
joined_tmp2 = joined_tmp.join(SSItem, ['ItemID'])

In [26]:
print((joined_tmp2.count(), len(joined_tmp2.columns)))

(8, 20)


In [27]:
joined_tmp2.printSchema()

root
 |-- ItemID: string (nullable = true)
 |-- SalesUnits: integer (nullable = true)
 |-- SalesDollar: integer (nullable = true)
 |-- SalesCost: integer (nullable = true)
 |-- CustID: string (nullable = true)
 |-- StoreID: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- timeno: integer (nullable = true)
 |-- timeday: integer (nullable = true)
 |-- timemonth: integer (nullable = true)
 |-- timequarter: integer (nullable = true)
 |-- timeyear: integer (nullable = true)
 |-- timedayofweek: integer (nullable = true)
 |-- timefiscalyear: integer (nullable = true)
 |-- itemname: string (nullable = true)
 |-- itembrand: string (nullable = true)
 |-- itemcategory: string (nullable = true)
 |-- itemunitprice: decimal(12,2) (nullable = true)



##### Customer Dim

In [28]:
SSCustomer = (spark.read
              .format("jdbc")
              .option("url", "jdbc:postgresql://localhost:5432/coursera_dwh")
              .option("dbtable", "ssstores.sscustomer")
              .option("user", "postgres")
              .option("password", "postgres")
              .load()
             )

In [29]:
SSCustomer.show(5)

+--------+------------+-------------+---------------+---------+---------+----------+----------+
|  custid|    custname|    custphone|     custstreet| custcity|custstate|   custzip|custnation|
+--------+------------+-------------+---------------+---------+---------+----------+----------+
|C0954327|Sheri Gordon|(303)123-1234|   336 Hill St.|Littleton|       CO|80129-5543|       USA|
|C1010398|Jim Glussman|(303)321-9876|1432 E. Ravenna|   Denver|       CO|80111-0033|       USA|
|C2388597| Beth Taylor|(206)124-9876| 2396 Rafter Rd|  Seattle|       WA|98103-1121|       USA|
|C3340959|  Betty Wise|(206)421-1276|  4334 153rd NW|  Seattle|       WA|98178-3311|       USA|
|C8543321|Ron Thompson|(206)891-7664|  789 122nd St.|   Renton|       WA|98666-1289|       USA|
+--------+------------+-------------+---------------+---------+---------+----------+----------+
only showing top 5 rows



In [30]:
SSCustomer.printSchema()

root
 |-- custid: string (nullable = true)
 |-- custname: string (nullable = true)
 |-- custphone: string (nullable = true)
 |-- custstreet: string (nullable = true)
 |-- custcity: string (nullable = true)
 |-- custstate: string (nullable = true)
 |-- custzip: string (nullable = true)
 |-- custnation: string (nullable = true)



In [31]:
joined_tmp2 = joined_tmp2.sort('CustID')

In [32]:
SSCustomer = SSCustomer.sort('custid')

In [33]:
joined_tmp3 = joined_tmp2.join(SSCustomer, ['CustID'], 'inner')

In [34]:
print((joined_tmp3.count(), len(joined_tmp3.columns)))

(8, 27)


In [35]:
joined_tmp3.printSchema()

root
 |-- CustID: string (nullable = true)
 |-- ItemID: string (nullable = true)
 |-- SalesUnits: integer (nullable = true)
 |-- SalesDollar: integer (nullable = true)
 |-- SalesCost: integer (nullable = true)
 |-- StoreID: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- timeno: integer (nullable = true)
 |-- timeday: integer (nullable = true)
 |-- timemonth: integer (nullable = true)
 |-- timequarter: integer (nullable = true)
 |-- timeyear: integer (nullable = true)
 |-- timedayofweek: integer (nullable = true)
 |-- timefiscalyear: integer (nullable = true)
 |-- itemname: string (nullable = true)
 |-- itembrand: string (nullable = true)
 |-- itemcategory: string (nullable = true)
 |-- itemunitprice: decimal(12,2) (nullable = true)
 |-- custname: string (nullable = true)
 |-- custphone: string (nullable = true)
 |-- custstreet: string (nullable = true)
 |-- custcity: string (nullable = true)
 

##### Store Dim

In [36]:
SSStore = (spark.read
           .format("jdbc")
           .option("url", "jdbc:postgresql://localhost:5432/coursera_dwh")
           .option("dbtable", "ssstores.ssstore")
           .option("user", "postgres")
           .option("password", "postgres")
           .load()
          )

In [37]:
SSStore.show(5)

+--------+--------+-------------+---------------+---------+----------+----------+-----------+
| storeid|   divid| storemanager|    storestreet|storecity|storestate|  storezip|storenation|
+--------+--------+-------------+---------------+---------+----------+----------+-----------+
|S0954327|D0104030|    Jim Smith|   436 Hill St.|Littleton|        CO|80129-5543|        USA|
|S1010398|D0104030|Mary Glussman|1832 E. Ravenna|   Denver|        CO|80111-0033|        USA|
|S2388597|D0225030|     Beth Woo| 5496 Rafter Rd|  Seattle|        WA|98103-1121|        USA|
|S8543321|D3134030| Joe Thompson|  989 122nd St.|   Renton|        WA|98666-1289|        USA|
|S9403348|D0225030|   Mary Boren|1242 Crest Ave.|Englewood|        CO|80113-5431|        USA|
+--------+--------+-------------+---------------+---------+----------+----------+-----------+
only showing top 5 rows



In [38]:
SSStore.printSchema()

root
 |-- storeid: string (nullable = true)
 |-- divid: string (nullable = true)
 |-- storemanager: string (nullable = true)
 |-- storestreet: string (nullable = true)
 |-- storecity: string (nullable = true)
 |-- storestate: string (nullable = true)
 |-- storezip: string (nullable = true)
 |-- storenation: string (nullable = true)



In [39]:
joined_tmp3 = joined_tmp3.sort('StoreID')

In [40]:
SSStore = SSStore.sort('storeid')

In [88]:
joined_tmp4 = joined_tmp3.join(SSStore, ['StoreID'], 'inner')

In [42]:
print((joined_tmp4.count(), len(joined_tmp4.columns)))

(8, 34)


In [43]:
joined_tmp4.printSchema()

root
 |-- StoreID: string (nullable = true)
 |-- CustID: string (nullable = true)
 |-- ItemID: string (nullable = true)
 |-- SalesUnits: integer (nullable = true)
 |-- SalesDollar: integer (nullable = true)
 |-- SalesCost: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- timeno: integer (nullable = true)
 |-- timeday: integer (nullable = true)
 |-- timemonth: integer (nullable = true)
 |-- timequarter: integer (nullable = true)
 |-- timeyear: integer (nullable = true)
 |-- timedayofweek: integer (nullable = true)
 |-- timefiscalyear: integer (nullable = true)
 |-- itemname: string (nullable = true)
 |-- itembrand: string (nullable = true)
 |-- itemcategory: string (nullable = true)
 |-- itemunitprice: decimal(12,2) (nullable = true)
 |-- custname: string (nullable = true)
 |-- custphone: string (nullable = true)
 |-- custstreet: string (nullable = true)
 |-- custcity: string (nullable = true)
 

##### Store to Sales table

In [49]:
SSSales = (spark.read
           .format("jdbc")
           .option("url", "jdbc:postgresql://localhost:5432/coursera_dwh")
           .option("dbtable", "ssstores.sssales")
           .option("user", "postgres")
           .option("password", "postgres")
           .load())

In [50]:
SSSales.printSchema()

root
 |-- salesno: integer (nullable = true)
 |-- salesunits: integer (nullable = true)
 |-- salesdollar: decimal(12,2) (nullable = true)
 |-- salescost: decimal(12,2) (nullable = true)
 |-- custid: string (nullable = true)
 |-- itemid: string (nullable = true)
 |-- storeid: string (nullable = true)
 |-- timeno: integer (nullable = true)



In [90]:
table_cols = [col.name for col in SSSales.schema]
table_cols

['salesno',
 'salesunits',
 'salesdollar',
 'salescost',
 'custid',
 'itemid',
 'storeid',
 'timeno']

In [93]:
final_dump_df = joined_tmp4.select(table_cols)
final_dump_df.printSchema()

root
 |-- salesno: integer (nullable = false)
 |-- salesunits: integer (nullable = true)
 |-- salesdollar: integer (nullable = true)
 |-- salescost: integer (nullable = true)
 |-- custid: string (nullable = true)
 |-- itemid: string (nullable = true)
 |-- storeid: string (nullable = true)
 |-- timeno: integer (nullable = true)



##### Add Sequence

In [97]:
sequence_start = SSSales.count()
sequence_start

192

In [98]:
# https://stackoverflow.com/a/52318817
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

In [99]:
final_dump_df = final_dump_df.withColumn("salesno",
                                         row_number().over(Window.orderBy(monotonically_increasing_id())) + sequence_start
                                        )

In [100]:
final_dump_df \
            .write \
            .format("jdbc") \
            .mode('append') \
            .option("url", "jdbc:postgresql://localhost:5432/coursera_dwh") \
            .option("dbtable", "ssstores.sssales") \
            .option("user", "postgres") \
            .option("password", "postgres") \
            .save()

22/11/19 08:02:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/19 08:02:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/19 08:02:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/19 08:02:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/19 08:02:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/19 08:02:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/19 0