<a href="https://colab.research.google.com/github/vishnubablu112/Pyspark_Exercie/blob/main/pyspark_exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark homework assignment

## Context

The goal of this assignment is to get view on your coding workflow & style.  Your main focus should be creating performant & robust code for data manipulations.  

For a homework assignment, we cannot grant you access to our infrastructure (Cloudera data platform on prem: a spark cluster deployment on Yarn).  Since the focus is on development, we provided a template notebook to get up and running very quickly on Google Colab.  

You have the freedom to perform this assignment on any spark3+ infrastructure.  If want to use a local or cloud setup, go for it!

Some of the tasks are open for interpretation.  This allows us to assess business understanding and relevant field experience.  These tasks are not pass or fail checks.  During the interview we'll ask details about the choice(s) you made.

For the assignment, you'll be working with store location data.  You might be familiar with the phrase "Location, location, location" from the real-estate context.  The same house can have a different selling price based on the location.  In fast moving consumer goods (FMCG), location is one of the most crucial aspects:

* Proximity & accessibility to customers increases convenience
* Proximity to competitors increases market pressure
* It has impact on the supply chain

## Evaluation criteria

1. Software engineering
   1. Clean code (e.g. using meaningful names)
   1. Robust & efficient code
   1. Styling (e.g. PEP8, or Google style guide)
   1. Documentation(e.g. docstrings)
   1. Design (e.g. SOLID principles)
1. Workflow
   1. How you use Git
   1. How you structure your assignment
   1. Owning mistakes
   1. Rationale for design decisions
   1. Making your solution accessible to others
1. Business context
   1. GDPR
   1. Fast moving consumer goods
1.(optional: own infra) System engineering
   1. What setup did you use?
   1. How did you set it up?

## Deliverables we expect

1. Private GitHub repo
   1. Colab allows you to save to GitHub
   1. Invite my username to your private repo as contributor
1. README.md with relevant content
1. Code relevant to the assignment


## Google colab spark setup

In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
from os import environ
import findspark

In [3]:
# Setting environment variables
environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

In [4]:
# Init spark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
# spark.sql.repl.eagerEval.enabled: Property used to format output tables better

spark = (
    SparkSession
    .builder
    .appName("cg-pyspark-assignment")
    .master("local")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .getOrCreate()
  )

spark

## Getting the assignment data

This will call the api and save the results in current working directory as .json files

In [6]:
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/clp-places > clp-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/okay-places > okay-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/spar-places > spar-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/dats-places > dats-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/cogo-colpnts > cogo-colpnts.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  224k    0  224k    0     0   100k      0 --:--:--  0:00:02 --:--:--  100k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  144k    0  144k    0     0   103k      0 --:--:--  0:00:01 --:--:--  103k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  169k    0  169k    0     0   109k      0 --:--:--  0:00:01 --:--:--  109k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 88519    0 88519    0     0  75659      0 --:--:--  0:00:01 --:--:-- 75721
  % Total    % Received % Xferd  Average Speed   Tim

## Assignment instructions

1. Download the data from api
1. Create a logger object that logs to a file "assignment.log"
   1. You can add whatever logging config you want or need
   1. At least on Filehandler based on instructions
1. implement get_data_by_brand function
   1. Follow instructions in docstring
   1. df_clp code line should work
1. No more handholding ... :-)
1. Create a single object (dataframe) that:
   1. Contains data from **all brands**
      1. Not every brand has the same columns!
   1. Drop placeSearchOpeningHours
   1. You can keep sellingPartners as an array
   1. Extract "postal_code" from address
   1. Create new column "province" derived from postal_code
   1. Transform geoCoordinates into lat and lon column
   1. One-hot-encode the handoverServices
   1. Pretend houseNumber and streetName are GDPR sensitive.
      1. How would you anonymize this data for unauthorized users?
      1. (optional) Implement the above
      1. How would you show the real data to authorized users?
      1. (optional) Implement the above
1. Save the end result as a parquet file
   1. (optional)partitioning?

**postal_code** logic:
* "Brussel": 1000-1299  
* "Waals-Brabant": 1300-1499  
* "Vlaams-Brabant": 1500-1999, 3000-3499  
* "Antwerpen": 2000-2999  
* "Limburg": 3500-3999  
* "Luik": 4000-4999  
* "Namen": 5000-5999  
* "Henegouwen": 6000-6599,7000-7999  
* "Luxemburg": 6600-6999  
* "West-Vlaanderen": 8000-8999  
* "Oost-Vlaanderen": 9000-9999

In [None]:
# Import statements should go here
from logging import getLogger, Logger

In [None]:
# Modify this based on assignment instructions
LOGGER = getLogger()

In [None]:
def get_data_by_brand(brand: str, logger: Logger = LOGGER):
  """Fetch input data based on brand.

  Please add a column to the data indicating the input brand
  Please add minimum one sanity check for loading the data
  Please log things you consider relevant

  Args:
      brand: allowed values are (clp, okay, spar, dats, cogo)
      logger: Logger object for logging

  Returns:
      The relevant dataframe
  """
  raise NotImplementedError()

In [None]:
# df_clp code snippet, this should work as expected
df_clp = get_data_by_brand(brand="clp", logger=LOGGER)

NotImplementedError: ignored