In [4]:
%%pyspark
wwi_sales = spark \
    .read \
    .option("sep","|") \
    .load('abfss://dev@<primary_storage>.dfs.core.windows.net/bronze/wwi-factsale.csv', format="csv", header=True)

wwi_sales = wwi_sales \
    .withColumnRenamed('Sale Key', 'SaleKey') \
    .withColumnRenamed('City Key', 'CityKey') \
    .withColumnRenamed('Customer Key', 'CustomerKey') \
    .withColumnRenamed('Bill To Customer Key', 'BillToCustomerKey') \
    .withColumnRenamed('Stock Item Key', 'StockItemKey') \
    .withColumnRenamed('Invoice Date Key', 'InvoiceDateKey') \
    .withColumnRenamed('Delivery Date Key', 'DeliveryDateKey') \
    .withColumnRenamed('Salesperson Key', 'SalespersonKey') \
    .withColumnRenamed('WWI Invoice ID', 'WWIInvoiceID') \
    .withColumnRenamed('Description', 'Description') \
    .withColumnRenamed('Package', 'Package') \
    .withColumnRenamed('Quantity', 'Quantity') \
    .withColumnRenamed('Unit Price', 'UnitPrice') \
    .withColumnRenamed('Tax Rate', 'TaxRate') \
    .withColumnRenamed('Total Excluding Tax', 'TotalExcludingTax') \
    .withColumnRenamed('Tax Amount', 'TaxAmount') \
    .withColumnRenamed('Profit', 'Profit') \
    .withColumnRenamed('Total Including Tax', 'TotalIncludingTax') \
    .withColumnRenamed('Total Dry Items', 'TotalDryItems') \
    .withColumnRenamed('Total Chiller Items', 'TotalChillerItems') \
    .withColumnRenamed('Lineage Key', 'LineageKey')

In [None]:
import numpy as np
from datetime import datetime
from dateutil.relativedelta import relativedelta

### Export WWI sales data to parquet files


In [28]:
quarters = 20
start_date = datetime(2012, 1, 1)

for quarter in np.arange(1, quarters + 1):

    quarter_number = 4 if quarter % 4 == 0 else quarter % 4
    end_date = start_date + relativedelta(months=3) + relativedelta(days=-1)
    
    print(f'Exporting data for {start_date.year} Q{quarter_number} ({start_date:%Y-%m-%d} : {end_date:%Y-%m-%d}) ...')

    storage_path_parquet = f'abfss://wwi@<primary_storage>.dfs.core.windows.net/factsale-parquet/{start_date.year}/Q{quarter_number}'

    wwi_sales \
        .where((wwi_sales['InvoiceDateKey'] >= f'{start_date:%Y-%m-%d}') & (wwi_sales['InvoiceDateKey'] <= f'{end_date:%Y-%m-%d}')) \
        .write \
        .partitionBy('InvoiceDateKey') \
        .mode("overwrite") \
        .parquet(storage_path_parquet)

    start_date = end_date + relativedelta(days=1)

Exporting data for 2012 Q1 (2012-01-01 : 2012-03-31) ...
Exporting data for 2012 Q2 (2012-04-01 : 2012-06-30) ...
Exporting data for 2012 Q3 (2012-07-01 : 2012-09-30) ...
Exporting data for 2012 Q4 (2012-10-01 : 2012-12-31) ...
Exporting data for 2013 Q1 (2013-01-01 : 2013-03-31) ...
Exporting data for 2013 Q2 (2013-04-01 : 2013-06-30) ...
Exporting data for 2013 Q3 (2013-07-01 : 2013-09-30) ...
Exporting data for 2013 Q4 (2013-10-01 : 2013-12-31) ...
Exporting data for 2014 Q1 (2014-01-01 : 2014-03-31) ...
Exporting data for 2014 Q2 (2014-04-01 : 2014-06-30) ...
Exporting data for 2014 Q3 (2014-07-01 : 2014-09-30) ...
Exporting data for 2014 Q4 (2014-10-01 : 2014-12-31) ...
Exporting data for 2015 Q1 (2015-01-01 : 2015-03-31) ...
Exporting data for 2015 Q2 (2015-04-01 : 2015-06-30) ...
Exporting data for 2015 Q3 (2015-07-01 : 2015-09-30) ...
Exporting data for 2015 Q4 (2015-10-01 : 2015-12-31) ...
Exporting data for 2016 Q1 (2016-01-01 : 2016-03-31) ...
Exporting data for 2016 Q2 (201

### Export WWI sales data to CSV files


In [31]:
quarters = 20
start_date = datetime(2012, 1, 1)

for quarter in np.arange(1, quarters + 1):

    quarter_number = 4 if quarter % 4 == 0 else quarter % 4
    end_date = start_date + relativedelta(months=3) + relativedelta(days=-1)
    
    print(f'Exporting data for {start_date.year} Q{quarter_number} ({start_date:%Y-%m-%d} : {end_date:%Y-%m-%d}) ...')

    storage_path_csv = f'abfss://wwi@<primary_storage>.dfs.core.windows.net/factsale-csv/{start_date.year}/Q{quarter_number}'

    wwi_sales \
        .where((wwi_sales['InvoiceDateKey'] >= f'{start_date:%Y-%m-%d}') & (wwi_sales['InvoiceDateKey'] <= f'{end_date:%Y-%m-%d}')) \
        .write \
        .partitionBy('InvoiceDateKey') \
        .mode("overwrite") \
        .option("quote", "\u0000") \
        .option("sep","|") \
        .csv(storage_path_csv, header=True)

    start_date = end_date + relativedelta(days=1)

Exporting data for 2012 Q1 (2012-01-01 : 2012-03-31) ...
Exporting data for 2012 Q2 (2012-04-01 : 2012-06-30) ...
Exporting data for 2012 Q3 (2012-07-01 : 2012-09-30) ...
Exporting data for 2012 Q4 (2012-10-01 : 2012-12-31) ...
Exporting data for 2013 Q1 (2013-01-01 : 2013-03-31) ...
Exporting data for 2013 Q2 (2013-04-01 : 2013-06-30) ...
Exporting data for 2013 Q3 (2013-07-01 : 2013-09-30) ...
Exporting data for 2013 Q4 (2013-10-01 : 2013-12-31) ...
Exporting data for 2014 Q1 (2014-01-01 : 2014-03-31) ...
Exporting data for 2014 Q2 (2014-04-01 : 2014-06-30) ...
Exporting data for 2014 Q3 (2014-07-01 : 2014-09-30) ...
Exporting data for 2014 Q4 (2014-10-01 : 2014-12-31) ...
Exporting data for 2015 Q1 (2015-01-01 : 2015-03-31) ...
Exporting data for 2015 Q2 (2015-04-01 : 2015-06-30) ...
Exporting data for 2015 Q3 (2015-07-01 : 2015-09-30) ...
Exporting data for 2015 Q4 (2015-10-01 : 2015-12-31) ...
Exporting data for 2016 Q1 (2016-01-01 : 2016-03-31) ...
Exporting data for 2016 Q2 (201