In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as pySql


spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

In [2]:
product_invoices_path = '../notebooks/data/product_invoices/'
product_package_types_path = '../notebooks/data/product_package_types/'
product_shipments_path = '../notebooks/data/product_shipments/'
provider_invoices_path = '../notebooks/data/provider_invoices/'
provider_prices_path = '../notebooks/data/provider_prices/'

In [3]:
  def read_json(path):
    return spark.read.json(path)

In [4]:
product_invoices = read_json(product_invoices_path)
product_package_types = read_json(product_package_types_path)
product_shipments = read_json(product_shipments_path)
provider_invoices = read_json(provider_invoices_path)
provider_prices = read_json(provider_prices_path)

In [5]:
product_invoices.printSchema()

root
 |-- amount: double (nullable = true)
 |-- transaction_id: long (nullable = true)
 |-- user_invoice_date: string (nullable = true)



In [6]:
product_shipments.printSchema()

root
 |-- buyer_id: long (nullable = true)
 |-- from_country: string (nullable = true)
 |-- package_type_id: long (nullable = true)
 |-- seller_id: long (nullable = true)
 |-- shipping_label_created: string (nullable = true)
 |-- to_country: string (nullable = true)
 |-- tracking_code: string (nullable = true)
 |-- transaction_id: long (nullable = true)



In [7]:
product_invoices_full = product_invoices.join(product_shipments, ['transaction_id'])\
                                        .select(product_invoices.amount,
                                                product_shipments.tracking_code)\
                                        .withColumnRenamed("amount","vinted_amount")

In [8]:
provider_invoices_compare = provider_invoices.select(provider_invoices.amount, 
                                                     provider_invoices.tracking_code)\
                                             .withColumnRenamed('amount','provider_amount')

In [9]:
comparison_df = product_invoices_full.join(provider_invoices_compare, ['tracking_code'], 'left')

discrepencies_in_amount = comparison_df.filter(pySql.col('vinted_amount')!=pySql.col('provider_amount'))

In [10]:
discrepencies_in_amount_adv_prv = comparison_df.filter(pySql.col('vinted_amount')<pySql.col('provider_amount'))
discrepencies_in_amount_adv_vnt = comparison_df.filter(pySql.col('vinted_amount')>pySql.col('provider_amount'))
total_amount_discrepency = comparison_df.select(pySql.sum(pySql.col('vinted_amount'))-pySql.sum(pySql.col('provider_amount')))\
                                        .collect()[0][0]

In [11]:
print('product_invoices: ', product_invoices.count())
print('provider_invoices: ', provider_invoices.count())
print('discrepencies_in_amount: ', discrepencies_in_amount.count())
print('provider charges more than Vinted: ', discrepencies_in_amount_adv_prv.count())
print('provider charges less than Vinted: ', discrepencies_in_amount_adv_vnt.count())
print('total amount discrepency: ', total_amount_discrepency)

product_invoices:  554038
provider_invoices:  562943
discrepencies_in_amount:  187602
provider charges more than Vinted:  68266
provider charges less than Vinted:  119336
total amount discrepency:  5717.439252016367


I.
● Are there any discrepancies between the shipping service provider invoices and our invoices to buyers? Can you explain them?
● What are the key trends you observe in the data over the period for which data is available? What is the business impact of these trends?
● If you had more time and could ask for more data, how would you continue your analysis to identify and fix shortcomings in the shipping/billing process?

● After exploring data I see that discrepency is amount paid for delivery, around 33% of product invoice amount are mismatched with provider amount. Most of them are in advantage for Vinted. This in my opiniond derives from different delivery amount calculations on vinted and provider side. product_package_types dataset is not that detailed as provider_prices is. This creates discrepencies on mentioned KPI.
● With additional data and time I would try to create more precise package delivery pricing classification, that could be more accurate to country, region, provider etc. so billing would be as near as it possibly can be to what service provider charges Vinted.


<img src="../notebooks/arch_example.png">

II. Please think about the production version of your solution and provide a structure of how the ETL pipeline could look like to arrive from the raw input data to the analytical reporting. Think of potential risks regarding the scalability and flexibility of your solution having the rapid growth of Vinted in mind.

Pipeline really depends on how data is provided by sources. Pipeline could be triggered by file system where data is dropped (if possible) or by schedule, not overloading ETL job with any business logic or quality checks just to increate performance on getting data into place. Best scenario would be getting delta of data and adding it to existing raw data. If possible, avoid full loads. From bronze to silver layer jobs should clear data, remove or separate corrupted data and do most of the data quality checks (if for that jupyter notebooks are used I would suggest writing out custom libs adjusted to particular working patterns). Last job should be developed together with business person, no data discrepancies should arrive to it, only changes  should be just simple joins to complete data usability, setting number precision, column names etc.

Performance issues: Increasing amount of data could be solved by parallelizing jobs, scaling clusters and adjusting time windows, Spark does provide capability to scale according to your requirements.
In Data Lake I would suggest to Parquet or Delta Lake formats, those provide quite scalable solutions for most day to day perf issues. Regular data offload jobs should also improve performance of some datasets, moving unused data to “cold” or historical storages can save time and cost.

