<a href="https://colab.research.google.com/github/kumpari90/colruyt_exercise/blob/main/pyspark_exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark homework assignment

## Context

The goal of this assignment is to get view on your coding workflow & style.  Your main focus should be creating performant & robust code for data manipulations.  

For a homework assignment, we cannot grant you access to our infrastructure (Cloudera data platform on prem: a spark cluster deployment on Yarn).  Since the focus is on development, we provided a template notebook to get up and running very quickly on Google Colab.  

You have the freedom to perform this assignment on any spark3+ infrastructure.  If want to use a local or cloud setup, go for it!

Some of the tasks are open for interpretation.  This allows us to assess business understanding and relevant field experience.  These tasks are not pass or fail checks.  During the interview we'll ask details about the choice(s) you made.

For the assignment, you'll be working with store location data.  You might be familiar with the phrase "Location, location, location" from the real-estate context.  The same house can have a different selling price based on the location.  In fast moving consumer goods (FMCG), location is one of the most crucial aspects:

* Proximity & accessibility to customers increases convenience
* Proximity to competitors increases market pressure
* It has impact on the supply chain

## Evaluation criteria

1. Software engineering
   1. Clean code (e.g. using meaningful names)
   1. Robust & efficient code
   1. Styling (e.g. PEP8, or Google style guide)
   1. Documentation(e.g. docstrings)
   1. Design (e.g. SOLID principles)
1. Workflow
   1. How you use Git
   1. How you structure your assignment
   1. Owning mistakes
   1. Rationale for design decisions
   1. Making your solution accessible to others
1. Business context
   1. GDPR
   1. Fast moving consumer goods
1.(optional: own infra) System engineering
   1. What setup did you use?
   1. How did you set it up?

## Deliverables we expect

1. Private GitHub repo
   1. Colab allows you to save to GitHub
1. README.md with relevant content
1. Code relevant to the assignment


## Google colab spark setup

In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
from os import environ
import findspark

In [3]:
# Setting environment variables
environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

In [4]:
# Init spark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
# spark.sql.repl.eagerEval.enabled: Property used to format output tables better

spark = (
    SparkSession
    .builder
    .appName("cg-pyspark-assignment")
    .master("local")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .getOrCreate()
  )

spark

In [143]:
spark

## Getting the assignment data

This will call the api and save the results in current working directory as .json files

In [6]:
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/clp-places > clp-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/okay-places > okay-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/spar-places > spar-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/dats-places > dats-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/cogo-colpnts > cogo-colpnts.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  217k    0  217k    0     0   158k      0 --:--:--  0:00:01 --:--:--  158k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  140k    0  140k    0     0   143k      0 --:--:-- --:--:-- --:--:--  143k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  157k    0  157k    0     0   138k      0 --:--:--  0:00:01 --:--:--  138k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 85826    0 85826    0     0   123k      0 --:--:-- --:--:-- --:--:--  123k
  % Total    % Received % Xferd  Average Speed   Tim

## Assignment instructions

1. Download the data from api
1. Create a logger object that logs to a file "assignment.log"
   1. You can add whatever logging config you want or need
   1. At least on Filehandler based on instructions
1. implement get_data_by_brand function
   1. Follow instructions in docstring
   1. df_clp code line should work
1. No more handholding ... :-)
1. Create a single object (dataframe) that:
   1. Contains data from **all brands**
      1. Not every brand has the same columns!
   1. Drop placeSearchOpeningHours
   1. You can keep sellingPartners as an array
   1. Extract "postal_code" from address
   1. Create new column "province" derived from postal_code
   1. Transform geoCoordinates into lat and lon column
   1. One-hot-encode the handoverServices
   1. Pretend houseNumber and streetName are GDPR sensitive.
      1. How would you anonymize this data for unauthorized users?
      1. (optional) Implement the above
      1. How would you show the real data to authorized users?
      1. (optional) Implement the above
1. Save the end result as a parquet file
   1. (optional)partitioning?

**postal_code** logic:
* "Brussel": 1000-1299  
* "Waals-Brabant": 1300-1499  
* "Vlaams-Brabant": 1500-1999, 3000-3499  
* "Antwerpen": 2000-2999  
* "Limburg": 3500-3999  
* "Luik": 4000-4999  
* "Namen": 5000-5999  
* "Henegouwen": 6000-6599,7000-7999  
* "Luxemburg": 6600-6999  
* "West-Vlaanderen": 8000-8999  
* "Oost-Vlaanderen": 9000-9999

In [7]:
# Import statements should go here
from logging import getLogger, Logger
from pyspark.sql.functions import lit,col
from pyspark.ml.feature import StringIndexer



In [9]:
# Modify this based on assignment instructions
LOGGER = getLogger()


In [8]:
import logging
logger = logging.getLogger('assignment_logger')
logger.setLevel(logging.DEBUG)

#File Handler and setting log level
file_handler = logging.FileHandler('assignment.log')
file_handler.setLevel(logging.DEBUG)

#Creating formatter and adding it to file handler
formatter = logging.Formatter("%(asctime)s %(relativeCreated)d [%(levelname)s] - [%(process)d][%(filename)s][%(funcName)s] - [%(lineno)d] %(message)s")
file_handler.setFormatter(formatter)

# adding file_handler to the logger
logger.addHandler(file_handler)

logger.info('This is an info message')

INFO:assignment_logger:This is an info message


In [None]:
def get_data_by_brand(brand: str, logger: Logger = LOGGER):
  """Fetch input data based on brand.

  Please add a column to the data indicating the input brand
  Please add minimum one sanity check for loading the data
  Please log things you consider relevant

  Args:
      brand: allowed values are (clp, okay, spar, dats, cogo)
      logger: Logger object for logging

  Returns:
      The relevant dataframe
  """
  raise NotImplementedError()

In [9]:
# Implementation of get_data_by_brand()
brand_name = ['clp','cogo','spar']

col_list = ['address', 'branchId', 'commercialName', 'ensign', 'geoCoordinates', 'handoverServices', 'isActive', 'moreInfoUrl', 'placeId', 'placeSearchOpeningHours', 'placeType', 'routeUrl', 'sellingPartners', 'sourceStatus', 'brand_name']
uni_df = None

def get_data_by_brand(brand: str):
    # brand_list = ['clp','cogo','dats','okay','spar']
    logger.info(brand)
    # uni_df = None

    # full_path = '/content/' + brand + '*'+ '.json'
    # logger.info(full_path)

    if brand not in brand_name:
      logger.info('The brand name provided in input *** {} *** is not from CG'.format(brand))
    else:
      try:
        df = spark.read.option("multiline", "true").json('/content/' + brand + '*'+ '.json')
        for column in [column for column in col_list if column not in df.columns]:
          df = df.withColumn(column, lit(None))
          df = df.withColumn('brand_name',lit(brand))
          logger.info(df.columns)

      except Exception as e:
        logger.info('Error while reading file for {}'.format(brand))
        logger.info('Error while reading file for {}'.format(e))
      return df


for brand in brand_name:
  logger.info('The iteration for brand name {}'.format(brand))
  df = get_data_by_brand(brand)
  if uni_df is None:
    uni_df = df
  else:
    logger.info(uni_df.columns)
    uni_df = uni_df.unionByName(df)


INFO:assignment_logger:The iteration for brand name clp
INFO:assignment_logger:clp
INFO:assignment_logger:['address', 'branchId', 'commercialName', 'ensign', 'geoCoordinates', 'handoverServices', 'isActive', 'moreInfoUrl', 'placeId', 'placeSearchOpeningHours', 'placeType', 'routeUrl', 'sellingPartners', 'sourceStatus', 'brand_name']
INFO:assignment_logger:The iteration for brand name cogo
INFO:assignment_logger:cogo
INFO:assignment_logger:['address', 'branchId', 'commercialName', 'ensign', 'geoCoordinates', 'handoverServices', 'isActive', 'moreInfoUrl', 'placeId', 'placeSearchOpeningHours', 'placeType', 'routeUrl', 'sellingPartners', 'sourceStatus', 'brand_name']
INFO:assignment_logger:['address', 'branchId', 'commercialName', 'ensign', 'geoCoordinates', 'handoverServices', 'isActive', 'moreInfoUrl', 'placeId', 'placeSearchOpeningHours', 'placeType', 'routeUrl', 'sellingPartners', 'sourceStatus', 'brand_name']
INFO:assignment_logger:The iteration for brand name spar
INFO:assignment_log

In [11]:
# For reference of distinct brand_name
uni_df.select('brand_name').distinct().show()
uni_df = uni_df.dropDuplicates()
# uni_df.count()
uni_df.printSchema()

+----------+
|brand_name|
+----------+
|       clp|
|      cogo|
|      spar|
+----------+

root
 |-- address: struct (nullable = true)
 |    |-- cityName: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- countryName: string (nullable = true)
 |    |-- houseNumber: string (nullable = true)
 |    |-- postalcode: string (nullable = true)
 |    |-- streetName: string (nullable = true)
 |-- branchId: string (nullable = true)
 |-- commercialName: string (nullable = true)
 |-- ensign: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- geoCoordinates: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |-- handoverServices: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- isActive: boolean (nullable = true)
 |-- moreInfoUrl: string (nullable = true)
 |-- placeId: long (nullable = true)
 |-- placeSearchOpeningHou

In [12]:
uni_df.count()

746

In [13]:
uni_df.columns

['address',
 'branchId',
 'commercialName',
 'ensign',
 'geoCoordinates',
 'handoverServices',
 'isActive',
 'moreInfoUrl',
 'placeId',
 'placeSearchOpeningHours',
 'placeType',
 'routeUrl',
 'sellingPartners',
 'sourceStatus',
 'brand_name']

In [14]:
uni_df_upd = uni_df.drop(col('placeSearchOpeningHours'))
uni_df_upd.columns

['address',
 'branchId',
 'commercialName',
 'ensign',
 'geoCoordinates',
 'handoverServices',
 'isActive',
 'moreInfoUrl',
 'placeId',
 'placeType',
 'routeUrl',
 'sellingPartners',
 'sourceStatus',
 'brand_name']

In [22]:
# Implementation of province based on derived column postalcode.
from pyspark.sql import functions as F
uni_df_mod = uni_df_upd\
.withColumn('postalcode',uni_df_upd.address.postalcode)\
.withColumn('latitude',uni_df_upd.geoCoordinates.latitude)\
.withColumn('longitude',uni_df_upd.geoCoordinates.longitude)\
.withColumn('housenumber',uni_df_upd.address.houseNumber)\
.withColumn('streetname',uni_df_upd.address.streetName)\
.withColumn('province',F.when((col('postalcode') >= 1000 ) & (col('postalcode') <= 1299),'Brussels')
.when((col('postalcode') >= 1300 ) & (col('postalcode') <= 1499),'Waals-Brabant')
.when((col('postalcode') >= 2000 ) & (col('postalcode') <= 2999),'Antwerpen')
.when((col('postalcode') >= 3500 ) & (col('postalcode') <= 3999),'Limburg')
.when((col('postalcode') >= 4000 ) & (col('postalcode') <= 4999),'Luik')
.when((col('postalcode') >= 5000 ) & (col('postalcode') <= 5999),'Namen')
.when((col('postalcode') >= 6600 ) & (col('postalcode') <= 6999),'Luxemburg')
.when((col('postalcode') >= 8000 ) & (col('postalcode') <= 8999),'West-Vlaanderen')
.when((col('postalcode') >= 9000 ) & (col('postalcode') <= 9999),'Oost-Vlaanderen')
.when((col('postalcode') >= 6000 ) & (col('postalcode') <= 6599) | (col('postalcode') >= 7000 ) & (col('postalcode') <= 7999),'Henegouwen')
.when((col('postalcode') >= 1500 ) & (col('postalcode') <= 1999) | (col('postalcode') >= 3000 ) & (col('postalcode') <= 3499),'Vlaams-Brabant')
.otherwise('')
)

# Implementation of masking as per GDPR for PI columns (streetname and housenumber)
# Some cases the housenumber is of single-digit, so have masked the entire value

uni_df_mod = uni_df_mod.withColumn('streetname', F.regexp_replace('streetname', '(?<!^).(?=.+)', '*'))\
.withColumn('housenumber', F.regexp_replace('housenumber', '(?=).(?=)', '*'))
uni_df_mod.show(5)


+--------------------+--------+--------------------+-----------------+--------------------+--------------------+--------+--------------------+-------+-------------------+--------------------+-------------------+------------+----------+----------+----------+---------+-----------+--------------------+---------------+
|             address|branchId|      commercialName|           ensign|      geoCoordinates|    handoverServices|isActive|         moreInfoUrl|placeId|          placeType|            routeUrl|    sellingPartners|sourceStatus|brand_name|postalcode|  latitude|longitude|housenumber|          streetname|       province|
+--------------------+--------+--------------------+-----------------+--------------------+--------------------+--------+--------------------+-------+-------------------+--------------------+-------------------+------------+----------+----------+----------+---------+-----------+--------------------+---------------+
|{DILBEEK, BE, Bel...|    3084|   DILBEEK (COLRUY

In [19]:

# String Indexer implementation for string to numeric conversion.
# One-hot-encode implementation

from pyspark.sql.functions import  *

uni_df_mod_indexed = uni_df_mod.select('branchId','handoverServices')

uni_df_mod_indx = uni_df_mod_indexed.withColumn('handoverServices',explode(col('handoverServices')))
#uni_df_mod_indexed.show(10,False)


#uni_df_mod_indx = uni_df_mod_indexed.withColumn("handoverServices",concat_ws(",",col("handoverServices")))

indexer = StringIndexer(inputCol='handoverServices', outputCol='handoverServices_numeric')
indexer_fitted = indexer.fit(uni_df_mod_indx)
uni_df_mod_indx = indexer_fitted.transform(uni_df_mod_indx)

uni_df_mod_indx.show()

+--------+----------------+------------------------+
|branchId|handoverServices|handoverServices_numeric|
+--------+----------------+------------------------+
|    3084|  CSOP_ORDERABLE|                     0.0|
|    3084|  PREPAID_PARCEL|                     2.0|
|    4135|  CSOP_ORDERABLE|                     0.0|
|    4135|  PREPAID_PARCEL|                     2.0|
|    3107|  CSOP_ORDERABLE|                     0.0|
|    3107|  PREPAID_PARCEL|                     2.0|
|    3812|  CSOP_ORDERABLE|                     0.0|
|    3812|  PREPAID_PARCEL|                     2.0|
|    3769|  CSOP_ORDERABLE|                     0.0|
|    3769|  PREPAID_PARCEL|                     2.0|
|    3537|  CSOP_ORDERABLE|                     0.0|
|    3537|  PREPAID_PARCEL|                     2.0|
|    3600|  CSOP_ORDERABLE|                     0.0|
|    3600|  PREPAID_PARCEL|                     2.0|
|    3433|  CSOP_ORDERABLE|                     0.0|
|    3433|  PREPAID_PARCEL|                   

In [20]:
#
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=['handoverServices_numeric'], outputCols=['handoverServices_onehot'])
uni_df_mod_indx_onehot = encoder.fit(uni_df_mod_indx).transform(uni_df_mod_indx)
uni_df_mod_indx_onehot.show()

+--------+----------------+------------------------+-----------------------+
|branchId|handoverServices|handoverServices_numeric|handoverServices_onehot|
+--------+----------------+------------------------+-----------------------+
|    3084|  CSOP_ORDERABLE|                     0.0|          (5,[0],[1.0])|
|    3084|  PREPAID_PARCEL|                     2.0|          (5,[2],[1.0])|
|    4135|  CSOP_ORDERABLE|                     0.0|          (5,[0],[1.0])|
|    4135|  PREPAID_PARCEL|                     2.0|          (5,[2],[1.0])|
|    3107|  CSOP_ORDERABLE|                     0.0|          (5,[0],[1.0])|
|    3107|  PREPAID_PARCEL|                     2.0|          (5,[2],[1.0])|
|    3812|  CSOP_ORDERABLE|                     0.0|          (5,[0],[1.0])|
|    3812|  PREPAID_PARCEL|                     2.0|          (5,[2],[1.0])|
|    3769|  CSOP_ORDERABLE|                     0.0|          (5,[0],[1.0])|
|    3769|  PREPAID_PARCEL|                     2.0|          (5,[2],[1.0])|

In [21]:
# uni_df_mod.write.parquet('/content/result.parquet',mode='overwrite')

uni_df_mod.write.partitionBy("brand_name").mode("overwrite").parquet("/content/result.parquet")

In [None]:
# Note:
# Not to be evaluated after this cell

In [52]:
# Back Up code
# Ignore this

brand_name = ['clp','cogo']
uni_df = None

def get_data_by_brand(brand: str):
    # brand_list = ['clp','cogo','dats','okay','spar']
    logger.info(brand)
    # uni_df = None

    # full_path = '/content/' + brand + '*'+ '.json'
    # logger.info(full_path)

    if brand not in brand_name:
      logger.info('The brand name provided in input *** {} *** is not from CG'.format(brand))
    else:
      try:
        df = spark.read.option("multiline", "true").json('/content/' + brand + '*'+ '.json')
        df = df.withColumn('brand_name',lit(brand))
        logger.info(df.columns)

      except Exception as e:
        logger.info('Error while reading file for {}'.format(brand))
        logger.info('Error while reading file for {}'.format(e))
      return df


for brand in brand_name:
  logger.info('The iteration for brand name {}'.format(brand))
  df = get_data_by_brand(brand)
  if uni_df is None:
    uni_df = df
  else:
    logger.info(uni_df.columns)
    uni_df = uni_df.unionAll(df)



INFO:assignment_logger:clp
INFO:assignment_logger:/content/clp*.json


In [53]:
df.columns

['address',
 'branchId',
 'commercialName',
 'ensign',
 'geoCoordinates',
 'handoverServices',
 'isActive',
 'moreInfoUrl',
 'placeId',
 'placeSearchOpeningHours',
 'placeType',
 'routeUrl',
 'sellingPartners',
 'sourceStatus',
 'brand_name']

In [35]:
def get_data_by_brand(brand: str, logger: Logger = LOGGER):

  raise NotImplementedError()

In [None]:
# df_clp code snippet, this should work as expected
df_clp = get_data_by_brand(brand="clp", logger=LOGGER)

NotImplementedError: ignored