Data preprocessing for the global sales data in the udemy course below<br>
https://www.udemy.com/course/the-complete-power-bi-practical-course

In [1]:
from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder.appName("GlobalSalesDataProcessing").getOrCreate()

# Read the CSV file
df = spark.read.csv('../input_data/udemy_ms_powerbi/factInternetSales.csv', header=True, inferSchema=True)

# Show the first few rows
df.show()

+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-------+-------+--------+-------+----+--------+
|ProductKey| OrderDate|   DueDate| ShipDate|CustomerKey|PromotionKey|SalesTerritoryKey|SalesOrderNumber|SalesOrderLineNumber|RevisionNumber|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|     29|      6|      98|     19|  36|     100|
+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-------+-------+--------+-------+----+--------+
|       310|29/12/2010|10/01/2011|5/01/2011|      21768|           1|                6|         SO43

In [2]:
# Rename 6 currency key columns prior to unpivot
ck_29 = 'ck_29'
ck_6 = 'ck_6'
ck_98 = 'ck_98'
ck_19 = 'ck_19'
ck_36 = 'ck_36'
ck_100 = 'ck_100'
df = df.withColumnRenamed('29', ck_29).withColumnRenamed('6', ck_6).withColumnRenamed('98', ck_98).withColumnRenamed('19', ck_19).withColumnRenamed('36', ck_36).withColumnRenamed('100', ck_100)
df.show()

+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-------+-------+--------+-------+-----+--------+
|ProductKey| OrderDate|   DueDate| ShipDate|CustomerKey|PromotionKey|SalesTerritoryKey|SalesOrderNumber|SalesOrderLineNumber|RevisionNumber|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|  ck_29|   ck_6|   ck_98|  ck_19|ck_36|  ck_100|
+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-------+-------+--------+-------+-----+--------+
|       310|29/12/2010|10/01/2011|5/01/2011|      21768|           1|                6|         S

In [3]:
# Unpivot to create the 2 columns: Currency Key & Sales Amount
from pyspark.sql import functions as F

df_unpivoted = (df.select(
    "ProductKey",
    "OrderDate",
    "DueDate",
    "ShipDate",
    "CustomerKey",
    "PromotionKey",
    "SalesTerritoryKey",
    "SalesOrderNumber",
    "SalesOrderLineNumber",
    "RevisionNumber",
    "OrderQuantity",
    "UnitPrice",
    "ExtendedAmount",
    "UnitPriceDiscountPct",
    "DiscountAmount",
    "ProductStandardCost",
    "TotalProductCost",
    F.explode(F.expr("array(struct(29 as Currency_Key, ck_29 as Sales_Amount), \
                         struct(6 as Currency_Key, ck_6 as Sales_Amount), \
                         struct(98 as Currency_Key, ck_98 as Sales_Amount), \
                         struct(19 as Currency_Key, ck_19 as Sales_Amount), \
                         struct(36 as Currency_Key, ck_36 as Sales_Amount), \
                         struct(100 as Currency_Key, ck_100 as Sales_Amount))")).alias("Currency_Amount")
).select(
    "ProductKey",
    "OrderDate",
    "DueDate",
    "ShipDate",
    "CustomerKey",
    "PromotionKey",
    "SalesTerritoryKey",
    "SalesOrderNumber",
    "SalesOrderLineNumber",
    "RevisionNumber",
    "OrderQuantity",
    "UnitPrice",
    "ExtendedAmount",
    "UnitPriceDiscountPct",
    "DiscountAmount",
    "ProductStandardCost",
    "TotalProductCost",
    "Currency_Amount.Currency_Key",
    "Currency_Amount.Sales_Amount")).withColumnRenamed("Currency_Key", "Currency Key").withColumnRenamed("Sales_Amount", "Sales Amount")

df_unpivoted.show()

+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+------------+------------+
|ProductKey| OrderDate|   DueDate| ShipDate|CustomerKey|PromotionKey|SalesTerritoryKey|SalesOrderNumber|SalesOrderLineNumber|RevisionNumber|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|Currency Key|Sales Amount|
+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+------------+------------+
|       310|29/12/2010|10/01/2011|5/01/2011|      21768|           1|                6|         SO43697|                   1|             1|            1|  3578.27

In [4]:
# Just get the records having valid sales amount
df_fact_internet_sales = df_unpivoted.filter(F.col("Sales Amount") != F.lit('NULL'))
df_fact_internet_sales.count()

60339

In [5]:
# Load dim currency table into a dataframe
df_dim_currency = spark.read.csv('../input_data/udemy_ms_powerbi/dimCurrency.csv', header=True, inferSchema=True)
df_dim_currency.show()

+-----------+--------------------+--------------------+-------------+
|CurrencyKey|CurrencyAlternateKey|        CurrencyName|Exchange Rate|
+-----------+--------------------+--------------------+-------------+
|          6|                 AUD|   Australian Dollar|         1.62|
|         19|                 CAD|     Canadian Dollar|          1.5|
|         29|                 DEM|       Deutsche Mark|          1.0|
|         36|                 EUR|                EURO|          1.0|
|         98|                 GBP|United Kingdom Pound|         0.83|
|        100|                 USD|           US Dollar|         1.08|
+-----------+--------------------+--------------------+-------------+



In [7]:
# Merge fact internet sales & dim currency
df_result = (df_fact_internet_sales.alias("f").join(df_dim_currency.alias("d"),
                                                    F.col("f.Currency Key") == F.col("d.CurrencyKey"), how="left")
             .drop("Currency Key", "CurrencyKey", "CurrencyName")).withColumnRenamed("CurrencyAlternateKey",
                                                                                     "Currency Code")
df_result.show()

+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+------------+-------------+-------------+
|ProductKey| OrderDate|   DueDate| ShipDate|CustomerKey|PromotionKey|SalesTerritoryKey|SalesOrderNumber|SalesOrderLineNumber|RevisionNumber|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|Sales Amount|Currency Code|Exchange Rate|
+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+------------+-------------+-------------+
|       310|29/12/2010|10/01/2011|5/01/2011|      21768|           1|                6|         SO43697|              

60339

In [8]:
df_result = df_result.withColumn("Sales Amount EUR", F.col("Sales Amount") / F.col("Exchange Rate")).drop("Sales Amount", "Exchange Rate")
df_result.show()

+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-------------+------------------+
|ProductKey| OrderDate|   DueDate| ShipDate|CustomerKey|PromotionKey|SalesTerritoryKey|SalesOrderNumber|SalesOrderLineNumber|RevisionNumber|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|Currency Code|  Sales Amount EUR|
+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-------------+------------------+
|       310|29/12/2010|10/01/2011|5/01/2011|      21768|           1|                6|         SO43697|                   1|             1|  

In [9]:
df_result.coalesce(1).write.csv('../input_data/udemy_ms_powerbi/expected_factInternetSales.csv', mode="overwrite", header=True)

Py4JJavaError: An error occurred while calling o189.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:288)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:851)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$26(FileFormatWriter.scala:277)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:277)
	... 42 more
