Create spark session

In [None]:
import os
from pyspark.sql import SparkSession
import warnings

warnings.filterwarnings('ignore')

spark = (
    SparkSession
        .builder
        .config(
            "spark.driver.host", 
            "localhost"
        )
        .config(
            "spark.jars",
            "{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
        .config(
            "spark.driver.extraClassPath",
            "{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
        .appName('appname')
        .getOrCreate()
)

spark.conf.set("spark.sql.ansi.enabled", False)

print("Spark session created succesfully...")

Save to database function

In [2]:
import pyspark.sql.functions as fn
import numpy as np
import sqlite3

def save_to_database(filepath, data_types, dbpath, table):
    df = spark.read.csv(
        filepath, 
        header=True, 
        inferSchema=False,
        quote="\"", 
        escape="\""
    )

    # update to pandas core and replace none to nan
    df = df.toPandas().replace({None: np.nan})

    # update data types accordingly
    df = df.astype(data_types)

    conn = sqlite3.connect(dbpath)

    # write database to table
    df.to_sql(
        table, 
        conn, 
        if_exists='replace', 
        index=False
    )

    # Close the connection
    conn.close()

    print("Successfully created " + table + " table in " + dbpath + " database.")

Save Cities

In [3]:
# main
save_to_database(
    "./wwi/Cities.csv",
    {
        "CityID": int, 
        "StateProvinceID": int,
        "LatestRecordedPopulation": float,
        "LastEditedBy": int
    },
    "wwi.db",
    "Cities"
)

# archive
save_to_database(
    "./wwi/Cities_Archive.csv",
    {
        "CityID": int, 
        "StateProvinceID": int,
        "LatestRecordedPopulation": float,
        "LastEditedBy": int
    },
    "wwi.db",
    "Cities_Archive"
)

Successfully created Cities table in wwi.db database.


Successfully created Cities_Archive table in wwi.db database.


Save Countries tables

In [4]:
# main
save_to_database(
    "./wwi/Countries.csv",
    {
        "CountryID": int,
        "IsoNumericCode": int,
        "LatestRecordedPopulation": float,
        "LastEditedBy": int
    },
    "wwi.db",
    "Countries"
)

# archive
save_to_database(
    "./wwi/Countries_Archive.csv",
    {
        "CountryID": int,
        "IsoNumericCode": int,
        "LatestRecordedPopulation": float,
        "LastEditedBy": int
    },
    "wwi.db",
    "Countries_Archive"
)

Successfully created Countries table in wwi.db database.


Successfully created Countries_Archive table in wwi.db database.


Save Delivery Methods tables

In [5]:
# main
save_to_database(
    "./wwi/DeliveryMethods.csv",
    {
        "DeliveryMethodID": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "DeliveryMethods"
)

# archive
save_to_database(
    "./wwi/DeliveryMethods_Archive.csv",
    {
        "DeliveryMethodID": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "DeliveryMethods_Archive"
)

Successfully created DeliveryMethods table in wwi.db database.


Successfully created DeliveryMethods_Archive table in wwi.db database.


Save Payment Methods tables

In [6]:
# main
save_to_database(
    "./wwi/PaymentMethods.csv",
    {
        "PaymentMethodID": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "PaymentMethods"
)

# archive
save_to_database(
    "./wwi/PaymentMethods_Archive.csv",
    {
        "PaymentMethodID": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "PaymentMethods_Archive"
)

Successfully created PaymentMethods table in wwi.db database.


Successfully created PaymentMethods_Archive table in wwi.db database.


Save People tables

In [7]:
# main
save_to_database(
    "./wwi/People.csv",
    {
        "PersonID": int,
        "IsPermittedToLogon": int,
        "IsExternalLogonProvider": int,
        "IsSystemUser": int,
        "IsEmployee": int,
        "IsSalesperson": int,   
        "LastEditedBy": int
    },
    "wwi.db",
    "People"
)

# archive
save_to_database(
    "./wwi/People_Archive.csv",
    {
        "PersonID": int,
        "IsPermittedToLogon": int,
        "IsExternalLogonProvider": int,
        "IsSystemUser": int,
        "IsEmployee": int,
        "IsSalesperson": int,
        "LastEditedBy": float
    },
    "wwi.db",
    "People_Archive"
)

Successfully created People table in wwi.db database.


Successfully created People_Archive table in wwi.db database.


Save State Provinces table

In [8]:
# main
save_to_database(
    "./wwi/StateProvinces.csv",
    {
        "StateProvinceID": int,
        "CountryID": int,
        "LatestRecordedPopulation": float,
        "LastEditedBy": int
    },
    "wwi.db",
    "StateProvinces"
)

# archive
save_to_database(
    "./wwi/StateProvinces_Archive.csv",
    {
        "StateProvinceID": int,
        "CountryID": int,
        "LatestRecordedPopulation": float,
        "LastEditedBy": int
    },
    "wwi.db",
    "StateProvinces_Archive"
)

Successfully created StateProvinces table in wwi.db database.


Successfully created StateProvinces_Archive table in wwi.db database.


Save Transaction Types tables

In [9]:
# main
save_to_database(
    "./wwi/TransactionTypes.csv",
    {
        "TransactionTypeID": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "TransactionTypes"
)

# archive
save_to_database(
    "./wwi/TransactionTypes_Archive.csv",
    {
        "TransactionTypeID": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "TransactionTypes_Archive"
)

Successfully created TransactionTypes table in wwi.db database.


Successfully created TransactionTypes_Archive table in wwi.db database.


Save Purchase tables

In [10]:
# purchase order lines
save_to_database(
    "./wwi/PurchaseOrderLines.csv",
    {
        "PurchaseOrderLineID": int,
        "PurchaseOrderID": int,
        "StockItemID": int,
        "OrderedOuters": int,
        "ReceivedOuters": int,
        "PackageTypeID": int,
        "ExpectedUnitPricePerOuter": float,
        "IsOrderLineFinalized": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "PurchaseOrderLines"
)

# purchase orders
save_to_database(
    "./wwi/PurchaseOrders.csv",
    {
        "PurchaseOrderID": int,
        "SupplierID": int,
        "DeliveryMethodID": int,
        "ContactPersonID": int,
        "IsOrderFinalized": int,
        "Comments": object,
        "InternalComments": object,
        "LastEditedBy": int
    },
    "wwi.db",
    "PurchaseOrders"
)

# supplier categories
save_to_database(
    "./wwi/SupplierCategories.csv",
    {
        "SupplierCategoryID": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "SupplierCategories"
)

# supplier categories archive
save_to_database(
    "./wwi/SupplierCategories_Archive.csv",
    {
        "SupplierCategoryID": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "SupplierCategories_Archive"
)

# suppliers
save_to_database(
    "./wwi/Suppliers.csv",
    {
        "SupplierID": int,
        "SupplierCategoryID": int,
        "PrimaryContactPersonID": int,
        "AlternateContactPersonID": int,
        "DeliveryMethodID": "Int64",
        "DeliveryCityID": int,
        "PostalCityID": int,
        "PostalCityID": int,
        "PaymentDays": int,
        "InternalComments": object,
        "LastEditedBy": int
    },
    "wwi.db",
    "Suppliers"
)

# suppliers archive
save_to_database(
    "./wwi/Suppliers_Archive.csv",
    {
        "SupplierID": int,
        "SupplierCategoryID": int,
        "PrimaryContactPersonID": int,
        "AlternateContactPersonID": int,
        "DeliveryMethodID": "Int64",
        "DeliveryCityID": int,
        "PostalCityID": int,
        "PostalCityID": int,
        "PaymentDays": int,
        "InternalComments": object,
        "LastEditedBy": int
    },
    "wwi.db",
    "Suppliers_Archive"
)

# supplier transactions
save_to_database(
    "./wwi/SupplierTransactions.csv",
    {
        "SupplierTransactionID": int,
        "SupplierID": int,
        "TransactionTypeID": int,
        "PurchaseOrderID": "Int64",
        "PaymentMethodID": int,
        "AmountExcludingTax": float,
        "TaxAmount": float,
        "TransactionAmount": float,
        "OutstandingBalance": float,
        "IsFinalized": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "SupplierTransactions"
)



Successfully created PurchaseOrderLines table in wwi.db database.


Successfully created PurchaseOrders table in wwi.db database.


Successfully created SupplierCategories table in wwi.db database.


Successfully created SupplierCategories_Archive table in wwi.db database.


Successfully created Suppliers table in wwi.db database.


Successfully created Suppliers_Archive table in wwi.db database.


Successfully created SupplierTransactions table in wwi.db database.


Save Sales tables

In [11]:
# buying groups
save_to_database(
    "./wwi/BuyingGroups.csv",
    {
        "BuyingGroupID": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "BuyingGroups"
)

# buying groups
save_to_database(
    "./wwi/BuyingGroups_Archive.csv",
    {
        "BuyingGroupID": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "BuyingGroups_Archive"
)

# customer categories
save_to_database(
    "./wwi/CustomerCategories.csv",
    {
        "CustomerCategoryID": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "CustomerCategories"
)

# customer categories archive
save_to_database(
    "./wwi/CustomerCategories_Archive.csv",
    {
        "CustomerCategoryID": int,
        "LastEditedBy": int
    },
    "wwi.db",
    "CustomerCategories_Archive"
)

# customers
save_to_database(
    "./wwi/Customers.csv",
    {
        "CustomerID": "Int64",
        "BillToCustomerID": "Int64",
        "CustomerCategoryID": "Int64",
        "BuyingGroupID": "Int64",
        "PrimaryContactPersonID": "Int64",
        "AlternateContactPersonID": "Int64",
        "DeliveryMethodID": "Int64",
        "DeliveryCityID": "Int64",
        "PostalCityID": "Int64",
        "CreditLimit": float,
        "StandardDiscountPercentage": float,
        "IsStatementSent": "Int64",
        "IsOnCreditHold": "Int64",
        "PaymentDays": "Int64",
        "DeliveryRun": object,
        "RunPosition": object,
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "Customers"
)

# customers archive
save_to_database(
    "./wwi/Customers_archive.csv",
    {
        "CustomerID": "Int64",
        "BillToCustomerID": "Int64",
        "CustomerCategoryID": "Int64",
        "BuyingGroupID": "Int64",
        "PrimaryContactPersonID": "Int64",
        "AlternateContactPersonID": "Int64",
        "DeliveryMethodID": "Int64",
        "DeliveryCityID": "Int64",
        "PostalCityID": "Int64",
        "CreditLimit": float,
        "StandardDiscountPercentage": float,
        "IsStatementSent": "Int64",
        "IsOnCreditHold": "Int64",
        "PaymentDays": "Int64",
        "DeliveryRun": object,
        "RunPosition": object,
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "Customers_Archive"
)

# customer transactions
save_to_database(
    "./wwi/CustomerTransactions.csv",
    {
        "CustomerTransactionID": "Int64",
        "CustomerID": "Int64",
        "TransactionTypeID": "Int64",
        "InvoiceID": "Int64",
        "PaymentMethodID": "Int64",
        "AmountExcludingTax": float,
        "TaxAmount": float,
        "TransactionAmount": float,
        "OutstandingBalance": float,
        "IsFinalized": int,
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "CustomerTransactions"
)

# invoice lines
save_to_database(
    "./wwi/InvoiceLines.csv",
    {
        "InvoiceLineID": "Int64",
        "InvoiceID": "Int64",
        "StockItemID": "Int64",
        "PackageTypeID": "Int64",
        "Quantity": "Int64",
        "UnitPrice": float,
        "TaxRate": float,
        "TaxAmount": float,
        "LineProfit": float,
        "ExtendedPrice": float,
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "InvoiceLines"
)

# invoices
save_to_database(
    "./wwi/Invoices.csv",
    {
        "InvoiceID": "Int64",
        "CustomerID": "Int64",
        "BillToCustomerID": "Int64",
        "OrderID": "Int64",
        "DeliveryMethodID": "Int64",
        "ContactPersonID": "Int64",
        "AccountsPersonID": "Int64",
        "SalespersonPersonID": "Int64",
        "PackedByPersonID": "Int64",
        "IsCreditNote": "Int64",
        "CreditNoteReason": object,
        "Comments": object,
        "DeliveryInstructions": object,
        "InternalComments": object,
        "TotalDryItems": "Int64",
        "TotalChillerItems": "Int64",
        "DeliveryRun": object,
        "RunPosition": object,
        "ReturnedDeliveryData": object,
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "Invoices"
)

# order lines
save_to_database(
    "./wwi/OrderLines.csv",
    {
        "OrderLineID": "Int64",
        "OrderID": "Int64",
        "StockItemID": "Int64",
        "PackageTypeID": "Int64",
        "Quantity": "Int64",
        "UnitPrice": float,
        "TaxRate": float,
        "PickedQuantity": "Int64",
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "OrderLines"
)

# orders
save_to_database(
    "./wwi/Orders.csv",
    {
        "OrderID": "Int64",
        "CustomerID": "Int64",
        "SalespersonPersonID": "Int64",
        "PickedByPersonID": "Int64",
        "ContactPersonID": "Int64",
        "BackorderOrderID": "Int64",
        "IsUndersupplyBackordered": "Int64",
        "Comments": object,
        "DeliveryInstructions": object,
        "InternalComments": object,
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "Orders"
)

# special deals
save_to_database(
    "./wwi/SpecialDeals.csv",
    {
        "SpecialDealID": "Int64",
        "StockItemID": "Int64",
        "CustomerID": "Int64",
        "BuyingGroupID": "Int64",
        "CustomerCategoryID": "Int64",
        "StockGroupID": "Int64",
        "DiscountAmount": float,
        "DiscountPercentage": float,
        "UnitPrice": float,
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "SpecialDeals"
)

Successfully created BuyingGroups table in wwi.db database.


Successfully created BuyingGroups_Archive table in wwi.db database.


Successfully created CustomerCategories table in wwi.db database.


Successfully created CustomerCategories_Archive table in wwi.db database.


Successfully created Customers table in wwi.db database.
Successfully created Customers_Archive table in wwi.db database.


Successfully created CustomerTransactions table in wwi.db database.


Successfully created InvoiceLines table in wwi.db database.


Successfully created Invoices table in wwi.db database.


Successfully created OrderLines table in wwi.db database.


Successfully created Orders table in wwi.db database.


Successfully created SpecialDeals table in wwi.db database.


Save Warehouse tables

In [12]:
# cold room temperatures
save_to_database(
    "./wwi/ColdRoomTemperatures.csv",
    {
        "ColdRoomTemperatureID": float,
        "ColdRoomSensorNumber": "Int64",
        "Temperature": float
    },
    "wwi.db",
    "ColdRoomTemperatures"
)

# # cold room temperatures archive
# save_to_database(
#     "./wwi/ColdRoomTemperatures_Archive.csv",
#     {
#         "ColdRoomTemperatureID": float,
#         "ColdRoomSensorNumber": "Int64",
#         "Temperature": float
#     },
#     "wwi.db",
#     "ColdRoomTemperatures_Archive"
# )

# colors
save_to_database(
    "./wwi/Colors.csv",
    {
        "ColorID": "Int64",
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "Colors"
)

# colors archive
save_to_database(
    "./wwi/Colors_Archive.csv",
    {
        "ColorID": "Int64",
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "Colors_Archive"
)

# colors package types
save_to_database(
    "./wwi/PackageTypes.csv",
    {
        "PackageTypeID": "Int64",
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "PackageTypes"
)

# colors package types archive
save_to_database(
    "./wwi/PackageTypes_Archive.csv",
    {
        "PackageTypeID": "Int64",
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "PackageTypes_Archive"
)

# colors stock groups
save_to_database(
    "./wwi/StockGroups.csv",
    {
        "StockGroupID": "Int64",
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "StockGroups"
)

# colors stock groups archive
save_to_database(
    "./wwi/StockGroups_Archive.csv",
    {
        "StockGroupID": "Int64",
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "StockGroups_Archive"
)

# stock item holdings
save_to_database(
    "./wwi/StockItemHoldings.csv",
    {
        "StockItemID": "Int64",
        "QuantityOnHand": "Int64",
        "LastStocktakeQuantity": "Int64",
        "LastCostPrice": float,
        "ReorderLevel": "Int64",
        "TargetStockLevel": "Int64",
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "StockItemHoldings"
)

# stock items
save_to_database(
    "./wwi/StockItems.csv",
    {
        "StockItemID": "Int64",
        "SupplierID": "Int64",
        "ColorID": "Int64",
        "UnitPackageID": "Int64",
        "OuterPackageID": "Int64",
        "LeadTimeDays": "Int64",
        "QuantityPerOuter": "Int64",
        "IsChillerStock": "Int64",
        "TaxRate": float,
        "UnitPrice": float,
        "RecommendedRetailPrice": float,
        "TypicalWeightPerUnit": float,
        "MarketingComments": object,
        "InternalComments": object,
        "Photo": object,
        "CustomFields": object,
        "Tags": object,
        "SearchDetails": object,
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "StockItems"
)

# stock items archive
save_to_database(
    "./wwi/StockItems_Archive.csv",
    {
        "StockItemID": "Int64",
        "SupplierID": "Int64",
        "ColorID": "Int64",
        "UnitPackageID": "Int64",
        "OuterPackageID": "Int64",
        "LeadTimeDays": "Int64",
        "QuantityPerOuter": "Int64",
        "IsChillerStock": "Int64",
        "TaxRate": float,
        "UnitPrice": float,
        "RecommendedRetailPrice": float,
        "TypicalWeightPerUnit": float,
        "MarketingComments": object,
        "InternalComments": object,
        "Photo": object,
        "CustomFields": object,
        "Tags": object,
        "SearchDetails": object,
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "StockItems_Archive"
)

# stock item stock groups
save_to_database(
    "./wwi/StockItemStockGroups.csv",
    {
        "StockItemStockGroupID": "Int64",
        "StockItemID": "Int64",
        "StockGroupID": "Int64",
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "StockItemStockGroups"
)

# stock item transactions
save_to_database(
    "./wwi/StockItemTransactions.csv",
    {
        "StockItemTransactionID": "Int64",
        "StockItemID": "Int64",
        "TransactionTypeID": "Int64",
        "CustomerID": "Int64",
        "InvoiceID": "Int64",
        "SupplierID": "Int64",
        "PurchaseOrderID": "Int64",
        "Quantity": float,
        "LastEditedBy": "Int64"
    },
    "wwi.db",
    "StockItemTransactions"
)

# vehicle temperatures
save_to_database(
    "./wwi/VehicleTemperatures.csv",
    {
        "VehicleTemperatureID": "Int64",
        "ChillerSensorNumber": "Int64",
        "IsCompressed": "Int64",
        "CompressedSensorData": object
    },
    "wwi.db",
    "VehicleTemperatures"
)


Successfully created ColdRoomTemperatures table in wwi.db database.
Successfully created Colors table in wwi.db database.


Successfully created Colors_Archive table in wwi.db database.


Successfully created PackageTypes table in wwi.db database.
Successfully created PackageTypes_Archive table in wwi.db database.


Successfully created StockGroups table in wwi.db database.
Successfully created StockGroups_Archive table in wwi.db database.


Successfully created StockItemHoldings table in wwi.db database.
Successfully created StockItems table in wwi.db database.


Successfully created StockItems_Archive table in wwi.db database.


Successfully created StockItemStockGroups table in wwi.db database.


Successfully created StockItemTransactions table in wwi.db database.


Successfully created VehicleTemperatures table in wwi.db database.
