<a href="https://colab.research.google.com/github/sfranchois/pyspark_exercise/blob/main/pyspark_exercise_rdd.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark homework assignment

## Context

The goal of this assignment is to get view on your coding workflow & style.  Your main focus should be creating performant & robust code for data manipulations.  

For a homework assignment, we cannot grant you access to our infrastructure (Cloudera data platform on prem: a spark cluster deployment on Yarn).  Since the focus is on development, we provided a template notebook to get up and running very quickly on Google Colab.  

You have the freedom to perform this assignment on any spark3+ infrastructure.  If want to use a local or cloud setup, go for it!

Some of the tasks are open for interpretation.  This allows us to assess business understanding and relevant field experience.  These tasks are not pass or fail checks.  During the interview we'll ask details about the choice(s) you made.

For the assignment, you'll be working with store location data.  You might be familiar with the phrase "Location, location, location" from the real-estate context.  The same house can have a different selling price based on the location.  In fast moving consumer goods (FMCG), location is one of the most crucial aspects:

* Proximity & accessibility to customers increases convenience
* Proximity to competitors increases market pressure
* It has impact on the supply chain

## Evaluation criteria

1. Software engineering
   1. Clean code (e.g. using meaningful names)
   1. Robust & efficient code
   1. Styling (e.g. PEP8, or Google style guide)
   1. Documentation(e.g. docstrings)
   1. Design (e.g. SOLID principles)
1. Workflow
   1. How you use Git
   1. How you structure your assignment
   1. Owning mistakes
   1. Rationale for design decisions
   1. Making your solution accessible to others
1. Business context
   1. GDPR
   1. Fast moving consumer goods
1.(optional: own infra) System engineering
   1. What setup did you use?
   1. How did you set it up?

## Deliverables we expect

1. Private GitHub repo
   1. Colab allows you to save to GitHub
   1. Invite my username to your private repo as contributor
1. README.md with relevant content
1. Code relevant to the assignment


# **README**

**Platform**

MacOS Sonoma 14.3.1


**Steps**

* clone repo https://github.com/MarkiesFredje/pyspark-exercise
* read instructions in notebook
* brew install openjdk@11
* created virtualenv with pyenv
* pip installed jupyter findspark pyarrow pandas keyring
* start jupyter-notebook
* point env vars to my dirs
* set spark local ip env var
* implement notebook
* uploaded notebook to Google Colab space


Initially, the code was written with Pandas DataFrames.
I'm on a tight schedule.

Setting up Spark inside Colab was done in another way.
Java is already installed there, so skipped.

Migration to use RDD DataFrames ongoing.

## Google Colab spark setup

In [2]:
# commented out: see below
#!apt-get install openjdk-11-jdk-headless -qq > /dev/null
#!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
#!tar xzvf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark pandas pyarrow

In [3]:
from os import environ
import findspark
import requests
import subprocess
import os
import re
import socket
import shutil
import time
import sys

def run(cmd):
    # run a shell command
    try:
        # Run the command and capture stdout and stderr
        subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        # Access stdout (stderr redirected to stdout)
        stdout_result = subprocess_output.stdout.strip().splitlines()[-1]
        # Process the results as needed
        print(f'✅ {stdout_result}')
        return stdout_result
    except subprocess.CalledProcessError as e:
        # Handle the error if the command returns a non-zero exit code
        print(f"Command failed with return code {e.returncode}")
        print("stdout:", e.stdout)

def is_java_installed():
    return shutil.which("java")

def install_java():
    # Uncomment and modify the desired version
    # java_version= 'openjdk-11-jre-headless'
    # java_version= 'default-jre'
    # java_version= 'openjdk-17-jre-headless'
    # java_version= 'openjdk-18-jre-headless'
    java_version= 'openjdk-19-jre-headless'
    os.environ['JAVA_HOME'] = ' /usr/lib/jvm/java-19-openjdk-amd64'
    print(f"Java not found. Installing {java_version} ... (this might take a while)")
    try:
        cmd = f"apt install -y {java_version}"
        subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        stdout_result = subprocess_output.stdout
        # Process the results as needed
        print(f'✅ Done installing Java {java_version}')
    except subprocess.CalledProcessError as e:
        # Handle the error if the command returns a non-zero exit code
        print(f"Command failed with return code {e.returncode}")
        print("stdout:", e.stdout)

print("\n0️⃣   Install Java if not available")
if is_java_installed():
    print("✅ Java is already installed.")
else:
    install_java()

print("\n1️⃣   Download and install Hadoop and Spark")
# URL for downloading Hadoop and Spark
SPARK_VERSION = "3.5.1"
HADOOP_SPARK_URL = "https://dlcdn.apache.org/spark/spark-" + SPARK_VERSION + \
                   "/spark-" + SPARK_VERSION + "-bin-hadoop3.tgz"
r = requests.head(HADOOP_SPARK_URL)
if r.status_code >= 200 and r.status_code < 400:
    print(f'✅ {HADOOP_SPARK_URL} was found')
else:
    SPARK_CDN = "https://dlcdn.apache.org/spark/"
    print(f'⚠️ {HADOOP_SPARK_URL} was NOT found. \nCheck for available Spark versions in {SPARK_CDN}')

# set some environment variables
os.environ['SPARK_HOME'] = os.path.join(os.getcwd(), os.path.splitext(os.path.basename(HADOOP_SPARK_URL))[0])
os.environ['PATH'] = ':'.join([os.path.join(os.environ['SPARK_HOME'], 'bin'), os.environ['PATH']])
os.environ['PATH'] = ':'.join([os.path.join(os.environ['SPARK_HOME'], 'sbin'), os.environ['PATH']])

# download Spark
# using --no-clobber option will prevent wget from downloading file if already present
# shell command: wget --no-clobber $HADOOP_SPARK_URL
cmd = f"wget --no-clobber {HADOOP_SPARK_URL}"
run(cmd)

# uncompress
try:
    # Run the command and capture stdout and stderr
    cmd = "([ -d $(basename {0}|sed 's/\.[^.]*$//') ] && echo -n 'Folder already exists') || (tar xzf $(basename {0}) && echo 'Uncompressed Spark distribution')"
    subprocess_output = subprocess.run(cmd.format(HADOOP_SPARK_URL), shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    # Access stdout (stderr redirected to stdout)
    stdout_result = subprocess_output.stdout
    # Process the results as needed
    print(f'✅ {stdout_result}')

except subprocess.CalledProcessError as e:
    # Handle the error if the command returns a non-zero exit code
    print(f"Command failed with return code {e.returncode}")
    print("stdout:", e.stdout)


print("\n2️⃣   Start Spark engine")
# start master
# shell command: $SPARK_HOME/sbin/start-master.sh
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'stop-master.sh')
run(cmd)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'start-master.sh')
out = run(cmd)

# start one worker (first stop it in case it's already running)
# shell command: $SPARK_HOME/sbin/start-worker.sh spark://${HOSTNAME}:7077
cmd = [os.path.join(os.environ['SPARK_HOME'], 'sbin', 'stop-worker.sh')]
run(cmd)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'start-worker.sh') + ' ' + 'spark://'+socket.gethostname()+':7077'
run(cmd)

print("\n3️⃣   Start Master Web UI")
# get master UI's port number
# the subprocess that's starting the master with start-master.sh
# might still not be ready with assigning the port number at this point
# therefore we check the logfile a few times (attempts=5) to see if the port
# has been assigned. This might take 1-2 seconds.

master_log = out.partition("logging to")[2].strip()
print("Search for port number in log file {}".format(master_log))
attempts = 10
search_pattern = "Successfully started service 'MasterUI' on port (\d+)"
found = False
for i in range(attempts):
  if not found:
   with open(master_log) as log:
      found = re.search(search_pattern, log.read())
      if found:
          webUIport = found.group(1)
          print(f"✅ Master UI is available at localhost:{webUIport} (attempt nr. {i})")
          break
      else:
          time.sleep(2) # need to try until port information is found in the logfile
          i+=1
if not found:
  print("Could not find port for Master Web UI\n")

IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
    # serve the Web UI on Colab
    print("Click on the link below to open the Spark Web UI 🚀")
    from google.colab import output
    output.serve_kernel_port_as_window(webUIport)

print("\n4️⃣   Start history server")
# start history server
# shell command: mkdir -p /tmp/spark-events
# shell command: $SPARK_HOME/sbin/start-history-server.sh
spark_events_dir = os.path.join('/tmp', 'spark-events')
if not os.path.exists(spark_events_dir):
    os.mkdir(spark_events_dir)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'stop-history-server.sh')
run(cmd)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'start-history-server.sh')
run(cmd)

if IN_COLAB:
    # serve the History Server
    print("Click on the link below to open the Spark History Server Web UI 🚀")
    output.serve_kernel_port_as_window(18080)



0️⃣   Install Java if not available
✅ Java is already installed.

1️⃣   Download and install Hadoop and Spark
✅ https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz was found
✅ 2024-03-14 08:47:20 (150 MB/s) - ‘spark-3.5.1-bin-hadoop3.tgz’ saved [400446614/400446614]
✅ Uncompressed Spark distribution


2️⃣   Start Spark engine
✅ no org.apache.spark.deploy.master.Master to stop
✅ starting org.apache.spark.deploy.master.Master, logging to /content/spark-3.5.1-bin-hadoop3/logs/spark--org.apache.spark.deploy.master.Master-1-c85129298a8d.out
✅ no org.apache.spark.deploy.worker.Worker to stop
✅ starting org.apache.spark.deploy.worker.Worker, logging to /content/spark-3.5.1-bin-hadoop3/logs/spark--org.apache.spark.deploy.worker.Worker-1-c85129298a8d.out

3️⃣   Start Master Web UI
Search for port number in log file /content/spark-3.5.1-bin-hadoop3/logs/spark--org.apache.spark.deploy.master.Master-1-c85129298a8d.out
✅ Master UI is available at localhost:8081 (attempt nr. 1)
C

<IPython.core.display.Javascript object>


4️⃣   Start history server
✅ no org.apache.spark.deploy.history.HistoryServer to stop
✅ starting org.apache.spark.deploy.history.HistoryServer, logging to /content/spark-3.5.1-bin-hadoop3/logs/spark--org.apache.spark.deploy.history.HistoryServer-1-c85129298a8d.out
Click on the link below to open the Spark History Server Web UI 🚀


<IPython.core.display.Javascript object>

In [401]:
# Setting environment variables
# These settings are for MacOS and not needed in Colab
#environ["JAVA_HOME"] = "/usr/local/opt/openjdk@11"
#environ["SPARK_HOME"] = "./spark-3.4.1-bin-hadoop3"
#environ["SPARK_LOCAL_IP"] = "10.0.1.10"

environ["SPARK_HOME"]
environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [4]:
# Init spark
findspark.init()

In [217]:
from pyspark.sql import SparkSession, DataFrame

spark = (
    SparkSession
    .builder
    .appName("cg-pyspark-assignment")
    .master("local")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .getOrCreate()
  )
#spark.sparkContext.setLogLevel("INFO")
spark

In [6]:
os.getcwd()

'/content'

## Getting the assignment data

This will call the api and save the results in current working directory as .json files

In [7]:
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/clp-places > clp-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/okay-places > okay-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/spar-places > spar-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/dats-places > dats-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/cogo-colpnts > cogo-colpnts.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  223k    0  223k    0     0   163k      0 --:--:--  0:00:01 --:--:--  163k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  146k    0  146k    0     0   183k      0 --:--:-- --:--:-- --:--:--  183k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  165k    0  165k    0     0   193k      0 --:--:-- --:--:-- --:--:--  193k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 89162    0 89162    0     0   136k      0 --:--:-- --:--:-- --:--:--  136k
  % Total    % Received % Xferd  Average Speed   Tim

## Assignment instructions

1. Download the data from api
1. Create a logger object that logs to a file "assignment.log"
   1. You can add whatever logging config you want or need
   1. At least on Filehandler based on instructions
1. implement get_data_by_brand function
   1. Follow instructions in docstring
   1. df_clp code line should work
1. No more handholding ... :-)
1. Create a single object (dataframe) that:
   1. Contains data from **all brands**
      1. Not every brand has the same columns!
   1. Drop placeSearchOpeningHours
   1. You can keep sellingPartners as an array
   1. Extract "postal_code" from address
   1. Create new column "province" derived from postal_code
   1. Transform geoCoordinates into lat and lon column
   1. One-hot-encode the handoverServices
   1. Pretend houseNumber and streetName are GDPR sensitive.
      1. How would you anonymize this data for unauthorized users?
      1. (optional) Implement the above
      1. How would you show the real data to authorized users?
      1. (optional) Implement the above
1. Save the end result as a parquet file
   1. (optional)partitioning?

**postal_code** logic:
* "Brussel": 1000-1299  
* "Waals-Brabant": 1300-1499  
* "Vlaams-Brabant": 1500-1999, 3000-3499  
* "Antwerpen": 2000-2999  
* "Limburg": 3500-3999  
* "Luik": 4000-4999  
* "Namen": 5000-5999  
* "Henegouwen": 6000-6599,7000-7999  
* "Luxemburg": 6600-6999  
* "West-Vlaanderen": 8000-8999  
* "Oost-Vlaanderen": 9000-9999

# **Notes**

Pandas DataFrames are a different breed than Spark RDD DataFrames.
Could not fit Spark in the Google Colab workspace (Free plan) initially using the provided boilerplate.
Bootstrap used from https://colab.research.google.com/github/groda/big_data/blob/master/Run_Spark_on_Google_Colab.ipynb

Pandas used here FTTB for demonstration purposes. Porting ongoing...

references:
https://sparkbyexamples.com/pyspark/pyspark-read-json-file-into-dataframe/#read-json-file

In [227]:
from logging import basicConfig, getLogger, INFO, Logger, FileHandler, StreamHandler
import dataclasses
import pandas as pd
from typing import List, Dict, Optional
from cryptography.fernet import Fernet
import pyarrow as pa
from abc import ABCMeta
from functools import reduce
from pyspark.sql.types import (StructType, StructField, StringType,
   IntegerType, LongType, DoubleType, ArrayType, BooleanType)
from pyspark.sql.functions import lit, expr, col, concat_ws, when, size, udf
from pyspark.ml.feature import StringIndexer, OneHotEncoder

from google.colab import userdata

In [201]:
class NotebookLogHandler(StreamHandler):
    def emit(self, record):
        print(self.format(record))

In [228]:

# open issue: 3x rogue logs to sys.stdout

log_filename = 'assignment.log'
basicConfig(
     level=INFO,
     format= '[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s',
     datefmt='%H:%M:%S',
     handlers=[FileHandler(log_filename), ],
 )
LOGGER = getLogger("pyspark_exercise")
LOGGER.setLevel(INFO)

LOGGER.info("LOGGER initialised. Logging to %s", log_filename)
LOGGER.info("working dir: %s", os.getcwd())

INFO:pyspark_exercise:LOGGER initialised. Logging to assignment.log
LOGGER initialised. Logging to assignment.log


LOGGER initialised. Logging to assignment.log
LOGGER initialised. Logging to assignment.log


INFO:pyspark_exercise:working dir: /content
working dir: /content


working dir: /content
working dir: /content


In [11]:
@dataclasses.dataclass
class BrandInfo:
    clp: DataFrame
    okay: DataFrame
    spar: DataFrame
    dats: DataFrame
    cogo: DataFrame

    df: DataFrame = dataclasses.field(init=False)

    AddressType = StructType(
    [ StructField('cityName', StringType(), True),
      StructField('countryCode', StringType(), True),
      StructField('countryName', StringType(), True),
      StructField('houseNumber', StringType(), True),
      StructField('postalcode', StringType(), True),
      StructField('streetName', StringType(), True)])

    schema = StructType([StructField('address', AddressType, True),
                         StructField('branchId', StringType(), True), StructField('commercialName', StringType(), True),
                         StructField('ensign', StructType([StructField('id', LongType(), True),
                                                           StructField('name', StringType(), True)]), True),
                         StructField('geoCoordinates', StructType([StructField('latitude', DoubleType(), True),
                                                                   StructField('longitude', DoubleType(), True)]), True),
                         StructField('handoverServices', ArrayType(StringType(), True), True), StructField('isActive', BooleanType(), True),
                         StructField('moreInfoUrl', StringType(), True), StructField('placeId', LongType(), True),
                         StructField('placeSearchOpeningHours', ArrayType(StructType([StructField('closes', LongType(), True),
                                                                                      StructField('date', StringType(), True),
                                                                                      StructField('isOpenForTheDay', BooleanType(), True),
                                                                                      StructField('isToday', BooleanType(), True),
                                                                                      StructField('opens', LongType(), True)]), True), True),
                         StructField('placeType', StructType([StructField('id', LongType(), True), StructField('longName', StringType(), True),
                                                              StructField('placeTypeDescription', StringType(), True)]), True),
                         StructField('routeUrl', StringType(), True), StructField('sellingPartners', ArrayType(StringType(), True), True),
                         StructField('sourceStatus', StringType(), True),
                         StructField('temporaryClosures', ArrayType(
                             StructType([StructField('from', StringType(), True),
                                         StructField('till', StringType(), True)]), True), True)])

    @classmethod
    def load_from_json(cls) -> "BrandInfo":
        return cls(
            clp = spark.read.json("clp-places.json", schema=cls.schema),
            okay = spark.read.json("okay-places.json", schema=cls.schema),
            spar = spark.read.json("spar-places.json", schema=cls.schema),
            dats = spark.read.json("dats-places.json", schema=cls.schema),
            cogo = spark.read.json("cogo-colpnts.json", schema=cls.schema)
        )

    def __post_init__(self) -> None:
        self.validate()

        self.clp = self.clp.withColumn("brand", lit("clp"))
        self.okay = self.okay.withColumn("brand", lit("okay"))
        self.spar = self.spar.withColumn("brand", lit("spar"))
        self.dats = self.dats.withColumn("brand", lit("dats"))
        self.cogo = self.cogo.withColumn("brand", lit("cogo"))

        self.df = reduce(lambda a,b: DataFrame.unionByName(a, b, allowMissingColumns=True), self.aslist())


    def aslist(self) -> List[DataFrame]:
        return [self.clp, self.okay, self.spar, self.dats, self.cogo]

    def get_all_columns(self) -> List[str]:
        return set().union(
            set(self.clp.columns),
            set(self.okay.columns),
            set(self.spar.columns),
            set(self.dats.columns),
            set(self.cogo.columns)
        )

    def get_common_columns(self) -> List[str]:
            return self.get_all_columns().intersection(
                set(self.clp.columns),
                set(self.okay.columns),
                set(self.spar.columns),
                set(self.dats.columns),
                set(self.cogo.columns)
            )

    def validate(self) -> None:
        if any(map(lambda df: df.isEmpty(), self.aslist())):
            raise ValueError("one or more JSON files have no data")


def load_brand_data() -> BrandInfo:
    return BrandInfo.load_from_json()

In [12]:
def get_data_by_brand(brand: str, logger: Logger = LOGGER) -> DataFrame:
    """Fetch input data based on brand.

    Please add a column to the data indicating the input brand
    Please add minimum one sanity check for loading the data
    Please log things you consider relevant

    :param brand: allowed values are (clp, okay, spar, dats, cogo)
    :param logger: Logger object for logging

    :return: The relevant dataframe
    """
    brands = load_brand_data()

    logger.info("columns found: %s", brands.get_all_columns())
    logger.info("common columns : %s", brands.get_common_columns())

    return brands.df.where(brands.df["brand"] == brand)


# **Load the brand data in memory**

In [209]:

df_clp = get_data_by_brand(brand="clp", logger=LOGGER)


INFO:pyspark_exercise:columns found: {'brand', 'placeType', 'placeSearchOpeningHours', 'routeUrl', 'sourceStatus', 'placeId', 'ensign', 'address', 'isActive', 'temporaryClosures', 'geoCoordinates', 'commercialName', 'moreInfoUrl', 'sellingPartners', 'handoverServices', 'branchId'}
columns found: {'brand', 'placeType', 'placeSearchOpeningHours', 'routeUrl', 'sourceStatus', 'placeId', 'ensign', 'address', 'isActive', 'temporaryClosures', 'geoCoordinates', 'commercialName', 'moreInfoUrl', 'sellingPartners', 'handoverServices', 'branchId'}


columns found: {'brand', 'placeType', 'placeSearchOpeningHours', 'routeUrl', 'sourceStatus', 'placeId', 'ensign', 'address', 'isActive', 'temporaryClosures', 'geoCoordinates', 'commercialName', 'moreInfoUrl', 'sellingPartners', 'handoverServices', 'branchId'}
columns found: {'brand', 'placeType', 'placeSearchOpeningHours', 'routeUrl', 'sourceStatus', 'placeId', 'ensign', 'address', 'isActive', 'temporaryClosures', 'geoCoordinates', 'commercialName', 'moreInfoUrl', 'sellingPartners', 'handoverServices', 'branchId'}


INFO:pyspark_exercise:common columns : {'isActive', 'placeType', 'placeSearchOpeningHours', 'routeUrl', 'geoCoordinates', 'commercialName', 'temporaryClosures', 'moreInfoUrl', 'sellingPartners', 'sourceStatus', 'handoverServices', 'placeId', 'ensign', 'address', 'branchId', 'brand'}
common columns : {'isActive', 'placeType', 'placeSearchOpeningHours', 'routeUrl', 'geoCoordinates', 'commercialName', 'temporaryClosures', 'moreInfoUrl', 'sellingPartners', 'sourceStatus', 'handoverServices', 'placeId', 'ensign', 'address', 'branchId', 'brand'}


common columns : {'isActive', 'placeType', 'placeSearchOpeningHours', 'routeUrl', 'geoCoordinates', 'commercialName', 'temporaryClosures', 'moreInfoUrl', 'sellingPartners', 'sourceStatus', 'handoverServices', 'placeId', 'ensign', 'address', 'branchId', 'brand'}
common columns : {'isActive', 'placeType', 'placeSearchOpeningHours', 'routeUrl', 'geoCoordinates', 'commercialName', 'temporaryClosures', 'moreInfoUrl', 'sellingPartners', 'sourceStatus', 'handoverServices', 'placeId', 'ensign', 'address', 'branchId', 'brand'}


In [14]:
df_clp.drop("placeSearchOpeningHours")

address,branchId,commercialName,ensign,geoCoordinates,handoverServices,isActive,moreInfoUrl,placeId,placeType,routeUrl,sellingPartners,sourceStatus,temporaryClosures,brand
"{AALST, BE, Belgi...",4156,AALST (COLRUYT),"{8, COLR_Colruyt}","{50.933074, 4.053...","[CSOP_ORDERABLE, ...",True,https://www.colru...,902,"{1, Winkel, Winkel}",https://maps.appl...,"[QUALITY, 3RDPARTY]",AC,[],clp
"{AALTER, BE, Belg...",4218,AALTER (COLRUYT),"{8, COLR_Colruyt}","{51.0784761, 3.45...","[CSOP_ORDERABLE, ...",True,https://www.colru...,946,"{1, Winkel, Winkel}",https://maps.appl...,"[QUALITY, 3RDPARTY]",AC,[],clp
"{AARSCHOT, BE, Be...",4222,AARSCHOT (COLRUYT),"{8, COLR_Colruyt}","{50.9760369, 4.81...","[CSOP_ORDERABLE, ...",True,https://www.colru...,950,"{1, Winkel, Winkel}",https://maps.appl...,"[QUALITY, 3RDPARTY]",AC,[],clp
"{ALSEMBERG, BE, B...",4138,ALSEMBERG (COLRUYT),"{8, COLR_Colruyt}","{50.7415212, 4.33...","[CSOP_ORDERABLE, ...",True,https://www.colru...,886,"{1, Winkel, Winkel}",https://maps.appl...,"[QUALITY, 3RDPARTY]",AC,[],clp
"{AMAY, BE, België...",3853,AMAY (COLRUYT),"{8, COLR_Colruyt}","{50.5599284, 5.30...","[CSOP_ORDERABLE, ...",True,https://www.colru...,783,"{1, Winkel, Winkel}",https://maps.appl...,"[QUALITY, 3RDPARTY]",AC,[],clp
"{ANDENNE, BE, Bel...",3596,ANDENNE (COLRUYT),"{8, COLR_Colruyt}","{50.4917055, 5.09...","[CSOP_ORDERABLE, ...",True,https://www.colru...,650,"{1, Winkel, Winkel}",https://maps.appl...,"[QUALITY, 3RDPARTY]",AC,[],clp
"{ANDERLECHT, BE, ...",3620,ANDERLECHT (HERBE...,"{8, COLR_Colruyt}","{50.8439965, 4.30...","[CSOP_ORDERABLE, ...",True,https://www.colru...,669,"{1, Winkel, Winkel}",https://maps.appl...,"[QUALITY, 3RDPARTY]",AC,[],clp
"{ANDERLECHT, BE, ...",3759,ANDERLECHT (VEEWE...,"{8, COLR_Colruyt}","{50.8275378372, 4...","[CSOP_ORDERABLE, ...",True,https://www.colru...,744,"{1, Winkel, Winkel}",https://maps.appl...,"[QUALITY, 3RDPARTY]",AC,[],clp
"{ANDERLUES, BE, B...",3074,ANDERLUES (COLRUYT),"{8, COLR_Colruyt}","{50.401257, 4.279...",[CSOP_ORDERABLE],False,https://www.colru...,448,"{1, Winkel, Winkel}",https://maps.appl...,[QUALITY],IN,"[{04-03-2024, 17-...",clp
"{ANS, BE, België,...",3644,ANS (COLRUYT),"{8, COLR_Colruyt}","{50.6588119, 5.53...","[CSOP_ORDERABLE, ...",True,https://www.colru...,681,"{1, Winkel, Winkel}",https://maps.appl...,"[QUALITY, 3RDPARTY]",AC,[],clp


In [411]:
df_clp[["sellingPartners"]]

sellingPartners
"[QUALITY, 3RDPARTY]"
"[QUALITY, 3RDPARTY]"
"[QUALITY, 3RDPARTY]"
"[QUALITY, 3RDPARTY]"
"[QUALITY, 3RDPARTY]"
"[QUALITY, 3RDPARTY]"
"[QUALITY, 3RDPARTY]"
"[QUALITY, 3RDPARTY]"
[QUALITY]
"[QUALITY, 3RDPARTY]"


In [412]:
# extract the postal code in separate field
df_clp = df_clp.withColumn("postalcode", df_clp["address.postalCode"].cast(IntegerType()))

In [413]:
postal_code_mapping = {
    (1000,1299) : "Brussel",
    (1300,1499) : "Waals-Brabant",
    (1500,1999) : "Vlaams-Brabant",
    (3000,3499) : "Vlaams-Brabant",
    (2000,2999) : "Antwerpen",
    (3500,3999) : "Limburg",
    (4000,4999) : "Luik",
    (5000,5999) : "Namen",
    (6000,6599) : "Henegouwen",
    (7000,7999) : "Henegouwen",
    (6600,6999) : "Luxemburg",
    (8000,8999) : "West-Vlaanderen",
    (9000,9999) : "Oost-Vlaanderen",
}
bins = list(postal_code_mapping.keys())
label = lambda bin: postal_code_mapping[bin]

e = "case "
for bin in bins[:-1]:
    e += f"when postalCode >= {bin[0]} and postalCode < {bin[1]} then '{label(bin)}' "
e += f"else '{label(bins[-1])}' end as bin"

# binning the postal codes
df_clp = df_clp.withColumn("province", expr(e))

In [414]:
df_clp[["province", "postalcode"]]

province,postalcode
Oost-Vlaanderen,9300
Oost-Vlaanderen,9880
Vlaams-Brabant,3200
Vlaams-Brabant,1652
Luik,4540
Namen,5300
Brussel,1070
Brussel,1070
Henegouwen,6150
Luik,4430


In [415]:
df_clp = df_clp.withColumn("lat", df_clp["geoCoordinates.latitude"].cast(DoubleType())).withColumn("lon", df_clp["geoCoordinates.longitude"].cast(DoubleType()))

In [416]:
df_clp[['geoCoordinates','lat', 'lon']]

geoCoordinates,lat,lon
"{50.933074, 4.053...",50.933074,4.0538972
"{51.0784761, 3.45...",51.0784761,3.4500133
"{50.9760369, 4.81...",50.9760369,4.8110969
"{50.7415212, 4.33...",50.7415212,4.336719
"{50.5599284, 5.30...",50.5599284,5.3061951
"{50.4917055, 5.09...",50.4917055,5.0930033
"{50.8275378372, 4...",50.8275378372,4.3025743961
"{50.8439965, 4.30...",50.8439965,4.3099483
"{50.401257, 4.279...",50.401257,4.2797751
"{50.6588119, 5.53...",50.6588119,5.5324966


In [417]:
df_clp[["handoverServices"]]

handoverServices
"[CSOP_ORDERABLE, ..."
"[CSOP_ORDERABLE, ..."
"[CSOP_ORDERABLE, ..."
"[CSOP_ORDERABLE, ..."
"[CSOP_ORDERABLE, ..."
"[CSOP_ORDERABLE, ..."
"[CSOP_ORDERABLE, ..."
"[CSOP_ORDERABLE, ..."
[CSOP_ORDERABLE]
"[CSOP_ORDERABLE, ..."


In [82]:
# todo: sort handoverServices list first
df_clp = df_clp.withColumn("handoverServices_str", when(size(col("handoverServices")) > 0, concat_ws(",", col("handoverServices"))).otherwise("-"))

indexerModel = StringIndexer(inputCol="handoverServices_str", outputCol="handoverServices_indexed").fit(df_clp)
indexed_df = indexerModel.transform(df_clp)
indexed_df.show()

encoder = OneHotEncoder(inputCol="handoverServices_indexed", outputCol="handoverServices_onehot")
encoded_df = encoder.fit(indexed_df).transform(indexed_df)
encoded_df.show(truncate=False)


+--------------------+--------+--------------------+-----------------+--------------------+--------------------+--------+--------------------+-------+-----------------------+-------------------+--------------------+-------------------+------------+--------------------+-----+--------------------+------------------------+
|             address|branchId|      commercialName|           ensign|      geoCoordinates|    handoverServices|isActive|         moreInfoUrl|placeId|placeSearchOpeningHours|          placeType|            routeUrl|    sellingPartners|sourceStatus|   temporaryClosures|brand|handoverServices_str|handoverServices_indexed|
+--------------------+--------+--------------------+-----------------+--------------------+--------------------+--------+--------------------+-------+-----------------------+-------------------+--------------------+-------------------+------------+--------------------+-----+--------------------+------------------------+
|{AALST, BE, Belgi...|    4156|   

In [195]:
import abc

class CryptoCodec(abc.ABC):
    @abc.abstractmethod
    def encode(self, msg) -> Optional[bytes]:
        """ encode data using AES """

    @abc.abstractmethod
    def decode(self, encoded_msg) -> Optional[bytes]:
        """ decode data using AES """

class FernetCodec(CryptoCodec):

    def __init__(self, key: Optional[bytes] = None) -> None:
        self._key = key or Fernet.generate_key()
        self._fernet = Fernet(self._key)

    def encode(self, msg) -> Optional[bytes]:
        return self._fernet.encrypt(msg.encode()) if msg else None

    def decode(self, encoded_msg) -> Optional[bytes]:
        return self._fernet.decrypt(encoded_msg).decode() if encoded_msg else None

    def save_key(self, filename: str) -> None:
        """ save the private key """
        open(filename, "wb").write(self._key)

    @staticmethod
    def load_key(filename: str) -> bytes:
        """ load the  key """
        return open(filename, "rb").read()

    @property
    def key(self) -> str:
        """ get obfuscated key """
        return self._key.decode('ascii')[:-8] + "*" * 8

    @classmethod
    def from_key(cls, filename: str) -> "FernetCodec":
        return cls(key=cls.load_key(filename))

In [180]:
class AddressCodec:

    def __init__(self, codec: CryptoCodec) -> None:
        self.codec = codec

    def encode_address_asdict(self, addr: Dict) -> Dict:
        return {
            "houseNumber": self.codec.encode(addr.get("houseNumber")),
            "streetName" : self.codec.encode(addr.get("streetName")),
            "postalcode": addr.get("postalcode") ,
            "cityName": addr.get("cityName"),
            "countryCode": addr.get("countryCode"),
            "countryName": addr.get("countryName")
        }

    def decode_address_asdict(self, addr: Dict) -> Dict:
        return {
            "houseNumber": self.codec.decode(addr.get("houseNumber")),
            "streetName" : self.codec.decode(addr.get("streetName")),
            "postalcode": addr.get("postalcode") ,
            "cityName": addr.get("cityName"),
            "countryCode": addr.get("countryCode"),
            "countryName": addr.get("countryName")
        }

    def encode_address_aslist(self, addr: List) -> List:
        # the codec works in binary mode, but the fields are strings, hence
        # the extra UTF-8 decode
        return [
           addr[0], addr[1], addr[2], self.codec.encode(addr[3]).decode("utf-8"),
           addr[4], self.codec.encode(addr[5]).decode("utf-8")
        ]

    def decode_address_aslist(self, addr: List) -> List:
        return [
           addr[0], addr[1], addr[2], self.codec.decode(bytes(addr[3], "utf-8")),
           addr[4], self.codec.decode(bytes(addr[5], "utf-8"))
        ]

In [220]:
def get_aes_key_from_vault() -> bytes:
  return userdata.get(aes_filename).encode("utf-8")

In [221]:
# Ideally, fetch from a secure vault (KeyChain, 1Password, ..) using the `keyring` package;
# or Google Colab secrets
# Set to None to have one generated for you by the codec

aes_filename = "aes_key"

# get a new key with to store in Colab : FernetCodec()._key.decode("utf-8")
aes_key = get_aes_key_from_vault()

# **Anonymise data with a symm. crypto key**

In [222]:
codec = FernetCodec(aes_key)

# only this AES (symmetrical) key allows you to decrypt addresses again: do not lose uit
LOGGER.info("AES key: %s", codec.key)

# for this exercise, also persist key to file system
codec.save_key(aes_filename)

INFO:pyspark_exercise:AES key: RSxNI838DKITN7PpCdl8xXqyou32nI58BUTJ********
AES key: RSxNI838DKITN7PpCdl8xXqyou32nI58BUTJ********


AES key: RSxNI838DKITN7PpCdl8xXqyou32nI58BUTJ********
AES key: RSxNI838DKITN7PpCdl8xXqyou32nI58BUTJ********


In [212]:
# the address codec will obfuscate certain fields only
addr_codec = AddressCodec(codec)
encryptUDF = udf(lambda addr: addr_codec.encode_address_aslist(addr), BrandInfo.AddressType)

df_gdpr = df_clp.withColumn("gdpr_address", encryptUDF(col("address"))).drop("address")
df_gdpr[["gdpr_address"]].show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|gdpr_address                                                                                                                                                                                                                                                   |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{AALST, BE, België, gAAAAABl8tiVEOe_P4rfw3AicTvCE9p1VXUn7FQghZxFChV3RqLADmDS85t5_oxEviqeyNDK55KFDtA49xVvqodI5oFUNeK_zA==, 9300, gAAAAABl8tiVkUuyRGiz-Of4e672y0S2zlsveLmLqaHG9jOfeuyzmGvc0WQUcBE6LJ-b7eayaF6ydq_eLbeFqTwnu8pt2QB4W

# **Write to Parquet**

In [213]:
parquet_filename = "brand_data.parquet"

LOGGER.info("writing clp brands parquet file: %s", parquet_filename)

# partition the data by the branch ID

df_gdpr.write.partitionBy("branchid").parquet(parquet_filename, mode="overwrite")

INFO:pyspark_exercise:writing clp brands parquet file: brand_data.parquet
writing clp brands parquet file: brand_data.parquet


writing clp brands parquet file: brand_data.parquet
writing clp brands parquet file: brand_data.parquet



---


At this point, the GDPR-sensitive FMCG-data is persisted and the key is secured

---

# **Read from Parquet**

In [214]:
# read data back
LOGGER.info("reading brands parquet file %s", parquet_filename)
df_gdpr = spark.read.parquet(parquet_filename)

df_gdpr.show(truncate=False)

INFO:pyspark_exercise:reading brands parquet file brand_data.parquet
reading brands parquet file brand_data.parquet


reading brands parquet file brand_data.parquet
reading brands parquet file brand_data.parquet
+------------------------------+-----------------+-----------------------------+--------------------------------+--------+---------------------------------------------------+-------+---------------------------------------------------------------------------+-------------------+-------------------------------------------------------------+-------------------+------------+--------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|commercialName                |ensign           |geoCoordinates               |handoverServices                |isActive|moreInfoUrl                                        |placeId|placeSearchOpeningHours          

In [226]:
# read key back from fs and vault, and re-init codec with the key

codec_fs = FernetCodec.from_key(aes_filename)
codec_colab = FernetCodec(get_aes_key_from_vault())

assert codec_fs.key == codec_colab.key

addr_codec = AddressCodec(codec)

decryptUDF = udf(lambda addr: addr_codec.decode_address_aslist(addr), BrandInfo.AddressType)

LOGGER.info("decrypting address using AES key %s", codec.key)

# finally, decode the GDPR data to make it human-readable
df_clp = df_gdpr.withColumn("address", decryptUDF(col("gdpr_address"))).drop("gdpr_address")
df_clp[["address"]].show(truncate=False)

INFO:pyspark_exercise:decrypting address using AES key RSxNI838DKITN7PpCdl8xXqyou32nI58BUTJ********
decrypting address using AES key RSxNI838DKITN7PpCdl8xXqyou32nI58BUTJ********


decrypting address using AES key RSxNI838DKITN7PpCdl8xXqyou32nI58BUTJ********
decrypting address using AES key RSxNI838DKITN7PpCdl8xXqyou32nI58BUTJ********
+-------------------------------------------------------------------------+
|address                                                                  |
+-------------------------------------------------------------------------+
|{POMMERLOCH, LU, Luxemburg (Groothertogdom), 26, 9638, BASTNICHERSTROOSS}|
|{MARCHIENNE-AU-PONT, BE, België, 43, 6030, RUE LIEUTEN.GEN.GILLAIN}      |
|{SINT-DENIJS-WESTREM, BE, België, 1238, 9051, KORTRIJKSESTEENWEG}        |
|{NIEUWPOORT, BE, België, 33-A, 8620, JOZEF CARDIJNLAAN}                  |
|{STRASSEN, LU, Luxemburg (Groothertogdom), 2, 8008, ROUTE D'ARLON}       |
|{ANDERLUES, BE, België, 4, 6150, RUE DE LA STATION}                      |
|{ST-PIETERS-WOLUWE, BE, België, 5, 1150, PIERRE DE COCKSTRAAT}           |
|{ANDERLECHT, BE, België, 824, 1070, BERGENSESTEENWEG}                    |
|{DEURNE

End of notebook.