In [1]:
from datetime import date, timedelta
from typing import List

from abc import ABC
from abc import abstractmethod

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

from create_data import recreate_databases, create_tables

In [2]:
spark = SparkSession.builder.appName("DM").getOrCreate()

# Create sample data

In [3]:
recreate_databases(spark)
create_tables(spark)

2021-11-26 16:18:21.209 | INFO     | create_data:recreate_databases:28 - Removing spark warehouse (path = 'spark-warehouse')
2021-11-26 16:18:21.249 | INFO     | create_data:recreate_databases:32 - Creating database 'standardized_glovo_live'
2021-11-26 16:18:23.557 | INFO     | create_data:recreate_databases:32 - Creating database 'mpcustomer_custom_events'
2021-11-26 16:18:23.602 | INFO     | create_data:recreate_databases:32 - Creating database 'mpcustomer_screen_views'
2021-11-26 16:18:23.637 | INFO     | create_data:recreate_databases:32 - Creating database 'enriched_custom_events'
2021-11-26 16:18:23.669 | INFO     | create_data:recreate_databases:32 - Creating database 'enriched_screen_views'
2021-11-26 16:18:23.710 | INFO     | create_data:create_table:63 - Creating table 'standardized_glovo_live.cities'
2021-11-26 16:18:32.181 | INFO     | create_data:create_table:63 - Creating table 'standardized_glovo_live.devices'
2021-11-26 16:18:39.221 | INFO     | create_data:create_table

# Loaders

In [4]:
class Loader(ABC):
    def __init__(self, spark: SparkSession):
        self.spark = spark
        self.sdf = self.load()

    def load(self) -> DataFrame:
        sdf = self.spark.table(f"{self.database_in}.{self.name_in}")
        return self.select(sdf)

    def select(self, sdf):
        raise NotImplementedError

    @property
    @abstractmethod
    def database_in(self):
        raise NotImplementedError
        
    @property
    @abstractmethod
    def name_in(self):
        raise NotImplementedError


class LoaderLiveDB(Loader):
    database_in = "standardized_glovo_live"
    
class LoaderCustomEvent(Loader):
    database_in = "mpcustomer_custom_events"
    
class LoaderScreenView(Loader):
    database_in = "mpcustomer_screen_views"

# Writers

In [5]:
class Writer(ABC):
    
    @property
    @abstractmethod
    def database_out(self):
        raise NotImplementedError
        
    @property
    @abstractmethod
    def name_out(self):
        raise NotImplementedError

    def write(self):
        self.sdf.write.format("parquet").saveAsTable(f"{self.database_out}.{self.name_out}")

class WriterCustomEvent(Writer):
    database_out = "enriched_custom_events"

class WriterScreenView(Writer):
    database_out = "enriched_screen_views"

# Ports

In [6]:
class CitiesPort(LoaderLiveDB):
    name_in = "cities"
    
    code = "code"
    time_zone = "time_zone"
    country_code = "country_code"
    
    def select(self, sdf) -> DataFrame:
        return sdf.select(self.code, self.time_zone, self.country_code)


class DevicesPort(LoaderLiveDB):
    name_in = "devices"
    
    device_id = "custom_attributes__device_id"
    experiment_score = "device_experiment_score"

    def select(self, sdf) -> DataFrame:
        return sdf.selectExpr(
            f"id AS {self.device_id}",
            f"experiment_score AS {self.experiment_score}",
        )

In [7]:
class CustomEventPort(LoaderCustomEvent, WriterCustomEvent):
    
    creation_date = "p_creation_date"
    city = "custom_attributes__city"
    
    def __init__(self, spark, exec_date: date, n_days: int):
        self.exec_date = exec_date
        self.n_days = n_days

        super().__init__(spark)

    def select(self, sdf) -> DataFrame:
        
        start = self.exec_date - timedelta(days=self.n_days)
        end = self.exec_date

        return sdf.filter(
            f"{self.creation_date} BETWEEN '{start:%Y-%m-%d}' AND '{end:%Y-%m-%d}'"
        )

In [8]:
class OrderCreatedPort(CustomEventPort):
    name_in = "order_created"
    name_out = "order_created"

# Transformations

In [9]:
class Transformation(ABC):

    @abstractmethod
    def apply(self, sdf: DataFrame) -> DataFrame:
        raise NotImplementedError
        
class Table(ABC):

    @abstractmethod
    def sdf(self):
        raise NotImplementedError

In [10]:
class AddTimezone(Transformation):
    def __init__(self, cities: CitiesPort):
        self.cities = cities

    def apply(self, table: Table) -> Table:
        
        table.sdf = table.sdf.join(
            F.broadcast(self.cities.sdf),
            on=table.sdf[table.city] == self.cities.sdf[self.cities.code],
            how="left",
        ).drop(self.cities.code)

        return table

# Jobs

In [11]:
class TransformLinearlyJob:

    @abstractmethod
    def transformations(self):
        raise NotImplementedError

    def run(self):
        for transformation in self.transformations:
            self.table = transformation.apply(self.table)

        self.table.write()

In [12]:
class EnrichActionJob(TransformLinearlyJob):
    
    def __init__(self, spark, exec_date, n_days):
        self.spark = spark
        self.exec_date = exec_date
        self.n_days = n_days
        
        # Create the table
        self.table = self.action_port(spark, exec_date, n_days)
        
        # Set transformations
        self.transformations = self.get_transformations()

    def get_transformations(self):
        return [
#             AddTimezone(CitiesPort(self.spark))
        ]

    @property
    @abstractmethod
    def action_port(self):
        raise NotImplementedError

    
class EnrichCEOrderCreatedJob(EnrichActionJob):
    action_port = OrderCreatedPort
    
    # Example of how to add extra transformations for one event
    def get_transformations(self):
        return super().get_transformations() + [
            AddTimezone(CitiesPort(self.spark))
        ]

## Order Created

In [13]:
order_created_job = EnrichCEOrderCreatedJob(
    spark=spark,
    exec_date=date(2021, 11, 19),
    n_days=3
)
order_created_job.run()

In [14]:
spark.table(f"{OrderCreatedPort.database_out}.{OrderCreatedPort.name_out}").show()

+-----------------------+---------------+-------------+------------+
|custom_attributes__city|p_creation_date|    time_zone|country_code|
+-----------------------+---------------+-------------+------------+
|                    BCN|     2021-11-19|Europe/Madrid|          ES|
|                    BCN|     2021-11-18|Europe/Madrid|          ES|
|                    CAG|     2021-11-17|  Europe/Rome|          IT|
+-----------------------+---------------+-------------+------------+

