### Objective

- To read in the data from multi-table excel worksheet
- Convert the tables to multiple pandas dataframes
- Convert the pandas Dataframe to Pyspark Dataframes
- Create pipeline in Pyspark for Regression Analysis
- Write out the pyspark dataframes to database 

In [1]:
#Starting with import of pyspark and related modules

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import warnings
warnings.filterwarnings("ignore")

from pyspark.ml import *

#lets get some excel files into the system
import pandas as pd
import openpyxl

In [2]:
#Initiating the spark session with postgres driver    
sparkSQL = SparkSession.builder.appName('Spark SQL') \
        .config('spark.jars',"/usr/share/java/postgresql-42.2.26.jar") \
        .getOrCreate()

22/11/30 08:17:47 WARN Utils: Your hostname, codeStation resolves to a loopback address: 127.0.1.1; using 172.17.0.1 instead (on interface docker0)
22/11/30 08:17:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/11/30 08:17:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
mllibPath = "mllib/"
externalData = "externalData/"
ytDE = "/home/solverbot/Desktop/ytDE/csvfiles"

In [4]:
sparkReader = sparkSQL.read

In [5]:
sparkContext = sparkSQL.sparkContext

In [3]:
#I am trying to import a multi-worksheet XL, 
#the "Datasource" sheet is required sheet in the xl

datasource = pd.read_excel("sales Target Dashboard.xlsx",sheet_name="DataSource",
                          parse_dates=True)

In [4]:
datasource.columns

Index(['S/N', 'Date', 'Branch', 'Pizza Type', 'Quantity', 'Time', 'Time Range',
       'Price', 'Unnamed: 8', 'Unnamed: 9', 'Unnamed: 10', 'Unnamed: 11',
       'Unnamed: 12', 'Daily Target', 'Unnamed: 14', 'Unnamed: 15',
       'Unnamed: 16', 'Unnamed: 17', 'Sales Target', 'Unnamed: 19',
       'Unnamed: 20', 'Unnamed: 21', 'Unnamed: 22', 'Branch.1', 'Unnamed: 24',
       'Unnamed: 25'],
      dtype='object')

In [5]:
datasource.head()

Unnamed: 0,S/N,Date,Branch,Pizza Type,Quantity,Time,Time Range,Price,Unnamed: 8,Unnamed: 9,...,Unnamed: 16,Unnamed: 17,Sales Target,Unnamed: 19,Unnamed: 20,Unnamed: 21,Unnamed: 22,Branch.1,Unnamed: 24,Unnamed: 25
0,1,42349,Abuja,Meatzaa,5,0.333345,Before 9:00am,8,,,...,,,Pizza,Sales Target,Revenue,,,Branch,Manager,Location
1,2,42352,Ibadan,Extravaganza,4,0.333356,Before 9:00am,8,,,...,,,BBQ Chicken,17280,,,,Abuja,Christy Olson,Nigeria
2,3,42342,Ikoyi,BBQ Chicken,5,0.33338,Before 9:00am,16,,,...,,,BBQ Philly Steak,15232,,,,Ibadan,Dan Peterson,Nigeria
3,4,42352,Ibadan,Extravaganza,1,0.333414,Before 9:00am,8,,,...,,,Beef Suya,11772,,,,Ikoyi,Mable Lindsey,Nigeria
4,5,42345,Lekki,Meatzaa,4,0.333426,Before 9:00am,8,,,...,,,Chicken Bali,6400.8,,,,Lekki,Kyle Carr,Nigeria


In [6]:
datasource.shape

(5000, 26)

In [7]:
columns = datasource.columns

In [8]:
columns

Index(['S/N', 'Date', 'Branch', 'Pizza Type', 'Quantity', 'Time', 'Time Range',
       'Price', 'Unnamed: 8', 'Unnamed: 9', 'Unnamed: 10', 'Unnamed: 11',
       'Unnamed: 12', 'Daily Target', 'Unnamed: 14', 'Unnamed: 15',
       'Unnamed: 16', 'Unnamed: 17', 'Sales Target', 'Unnamed: 19',
       'Unnamed: 20', 'Unnamed: 21', 'Unnamed: 22', 'Branch.1', 'Unnamed: 24',
       'Unnamed: 25'],
      dtype='object')

In [9]:
cleanedDS = datasource[['S/N', 'Date', 'Branch', 'Pizza Type', 'Quantity', 'Time', 'Time Range',
       'Price', 'Daily Target','Unnamed: 14','Sales Target', 'Unnamed: 19',
       'Branch.1', 'Unnamed: 24','Unnamed: 25']]

In [10]:
#Sale Data is important table
saleData = cleanedDS[['S/N', 'Date', 'Branch', 
                      'Pizza Type', 'Quantity', 'Time', 
                      'Time Range','Price']]
saleData.shape

(5000, 8)

In [11]:
saleData.columns

Index(['S/N', 'Date', 'Branch', 'Pizza Type', 'Quantity', 'Time', 'Time Range',
       'Price'],
      dtype='object')

In [12]:
#Intermediate table
miscellaneousData = cleanedDS[['Daily Target','Unnamed: 14',
                               'Sales Target', 'Unnamed: 19',
                               'Branch.1', 'Unnamed: 24',
                               'Unnamed: 25']]
miscellaneousData.shape

(5000, 7)

In [22]:
miscellaneousData.head()

Unnamed: 0,Daily Target,Unnamed: 14,Sales Target,Unnamed: 19,Branch.1,Unnamed: 24,Unnamed: 25
0,Day,Target,Pizza,Sales Target,Branch,Manager,Location
1,2015-12-03 00:00:00,16552.8,BBQ Chicken,17280,Abuja,Christy Olson,Nigeria
2,2015-12-04 00:00:00,11481.6,BBQ Philly Steak,15232,Ibadan,Dan Peterson,Nigeria
3,2015-12-05 00:00:00,16772.8,Beef Suya,11772,Ikoyi,Mable Lindsey,Nigeria
4,2015-12-06 00:00:00,15488,Chicken Bali,6400.8,Lekki,Kyle Carr,Nigeria


In [13]:
dailyTarget = miscellaneousData[["Daily Target","Unnamed: 14"]]

In [14]:
dailyTarget.columns = ["DailyTarget","SalesTarget"]

In [15]:
dailyTarget.dropna(axis=0,inplace=True)

In [16]:
dailyTarget = dailyTarget.iloc[1:,:]
dailyTarget.columns

Index(['DailyTarget', 'SalesTarget'], dtype='object')

In [23]:
dailyTarget.to_csv('daily_target.csv',index=False,sep=',')

In [17]:
branchTarget = miscellaneousData[["Branch.1","Unnamed: 24","Unnamed: 25"]]

In [18]:
branchTarget.columns = ["Branch","Manager","Location"]

In [19]:
branchTarget.dropna(axis=0,inplace=True)
branctTarget = branchTarget.iloc[1:,:]
branchTarget.columns

Index(['Branch', 'Manager', 'Location'], dtype='object')

In [24]:
branchTarget.to_csv('branch_target.csv',index=False,sep=',')

In [20]:
productTarget = miscellaneousData[['Sales Target', 'Unnamed: 19']]
productTarget.columns = ['PizzaType','SalesTarget']
productTarget = productTarget.iloc[1:]
productTarget.dropna(axis=0,inplace=True)
productTarget.head(1)

Unnamed: 0,PizzaType,SalesTarget
1,BBQ Chicken,17280


In [25]:
productTarget.to_csv("product_target.csv",index=False,sep=',')

# Moving to the Pyspark World

In [24]:
psSaleDataDF = sparkSQL.createDataFrame(saleData)

In [25]:
psSaleDataDF.printSchema()

root
 |-- S/N: long (nullable = true)
 |-- Date: long (nullable = true)
 |-- Branch: string (nullable = true)
 |-- Pizza Type: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Time: double (nullable = true)
 |-- Time Range: string (nullable = true)
 |-- Price: long (nullable = true)



In [26]:
psSaleDataDF.show(2)

[Stage 0:>                                                          (0 + 1) / 1]

+---+-----+------+------------+--------+-----------------+-------------+-----+
|S/N| Date|Branch|  Pizza Type|Quantity|             Time|   Time Range|Price|
+---+-----+------+------------+--------+-----------------+-------------+-----+
|  1|42349| Abuja|     Meatzaa|       5|0.333344907407407|Before 9:00am|    8|
|  2|42352|Ibadan|Extravaganza|       4|0.333356481481482|Before 9:00am|    8|
+---+-----+------+------------+--------+-----------------+-------------+-----+
only showing top 2 rows



                                                                                

In [27]:
def writingSparkDFtoDatabase(sparkDF,dbName,dbTableName):
    
    try:
        sparkDF.write \
                    .format('jdbc') \
                    .option("url", f"jdbc:postgresql://localhost:5432/{dbName}") \
                    .option('dbtable', dbTableName) \
                    .option('user','postgres') \
                    .option('password', 1234) \
                    .option('driver','org.postgresql.Driver') \
                    .save(mode='overwrite')
        print('Write Complete')
    except Exception as e:
        print(f'Write errored out due to {e}')
    

In [28]:
writingSparkDFtoDatabase(psSaleDataDF,'dashboards','sales_data')
#Data is written and verified

[Stage 1:>                                                          (0 + 4) / 4]

Write Complete




It is better to write out the data to database tables, so that it will be easier to pick the data from there for future. Also, if others or collaborating, then having the data on the server will help them to pull a copy and analyse it. It will save work.

### Predicting the quantity of Pizza 

We will try and predict the quantity of pizza, given the branch, pizza name, time. The columns needs to be selected, the stringindexer encoding needs to be applied for the pizza type, and branch to convert it to numbers. Then apply the regression model on the features, with quantity as "label/target" column

In [29]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression

In [34]:
saleStringer = StringIndexer(inputCols=["Branch","Pizza Type"],
                            outputCols=["Branch_idx","Pizza_idx"])

vectors = VectorAssembler(inputCols=["Branch_idx","Pizza_idx","Time"],
                         outputCol="features")

linReg = LinearRegression(featuresCol='features',labelCol="Quantity")

#Pipeline is created with indexer, assembler and regressor
pipeSales = Pipeline(stages=[saleStringer,vectors,linReg])

In [35]:
#Data available is split between train and test data

train, test = psSaleDataDF.randomSplit([0.8,0.2])
print(train.count())
print(test.count())

4000
1000


In [36]:
salesModel = pipeSales.fit(train)

22/11/30 08:38:29 WARN Instrumentation: [df64a9a6] regParam is zero, which might cause numerical instability and overfitting.


[Stage 20:>                                                         (0 + 4) / 4]

22/11/30 08:38:30 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/11/30 08:38:30 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

22/11/30 08:38:30 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [37]:
quantityPredict = salesModel.transform(test)
quantityPredict.select("Quantity","Prediction","Branch","Pizza Type").show()

+--------+------------------+--------+----------------+
|Quantity|        Prediction|  Branch|      Pizza Type|
+--------+------------------+--------+----------------+
|       5| 3.050774961385897|   Ikoyi|     BBQ Chicken|
|       4|2.9801379485860604|   Lekki|         Meatzaa|
|       5|3.0190306653855288|   Abuja|BBQ Philly Steak|
|       5|3.0223243453975925|   Ikoyi|  Chicken Legend|
|       5| 3.014884106037318|   Abuja|   Chicken Feast|
|       5|3.0384103223998515|   Ikoyi|       Beef Suya|
|       5| 2.983951001007215|   Lekki|   Chicken Feast|
|       5| 3.054554663114346|   Ikoyi|      Hot Veggie|
|       4| 3.055032347157555|   Abuja|  Pepperoni Suya|
|       2|2.9829813942465977|  Ibadan|    Extravaganza|
|       5| 3.002411077299979|   Abuja| Pepperoni Feast|
|       4|3.0420899720501833|   Ikoyi|    Chicken Bali|
|       2| 3.070348821555431|   Ikoyi|  Pepperoni Suya|
|       5| 2.987613975311194|   Lekki|BBQ Philly Steak|
|       3| 3.057514836199353|Surulere|    Chicke

In [45]:
writingSparkDFtoDatabase(quantityPredict,'dashboards','quantityprediction')

Write errored out due to Can't get JDBC type for struct<type:tinyint,size:int,indices:array<int>,values:array<double>>


In [49]:
predictionDF = quantityPredict.select("Quantity","Prediction",
                                      "Branch","Time","Pizza Type","Branch_idx",
                                     "Pizza_idx")
predictionDF.show(2)

+--------+------------------+------+-----------------+-----------+----------+---------+
|Quantity|        Prediction|Branch|             Time| Pizza Type|Branch_idx|Pizza_idx|
+--------+------------------+------+-----------------+-----------+----------+---------+
|       5| 3.050774961385897| Ikoyi| 0.33337962962963|BBQ Chicken|       3.0|      5.0|
|       4|2.9801379485860604| Lekki|0.333425925925926|    Meatzaa|       0.0|     11.0|
+--------+------------------+------+-----------------+-----------+----------+---------+
only showing top 2 rows



In [50]:
writingSparkDFtoDatabase(predictionDF,'dashboards','quantityprediction')

Write Complete


In [44]:
from pyspark.ml.evaluation import RegressionEvaluator

salEval = RegressionEvaluator(labelCol="Quantity")
salEval.evaluate(quantityPredict)

print(salEval.getThroughOrigin())
print(salEval.evaluate(quantityPredict, {salEval.metricName: "mae"}))
print(salEval.evaluate(quantityPredict, {salEval.metricName: "r2"}))

False
1.2239403716241768
-0.001368947741357207
