In [23]:
# Let's get our dependencies
import sys
!{sys.executable} -m pip install pandas
import pandas as pd
import shutil
from datetime import datetime
import random



In [99]:
# ----------------------------------------------------------------------------------------------------------------
# IGNORE THE CODE BLOCK BELOW IT MOCKS STUFF AND YOU'RE NOT SUPPOSED TO KNOW THAT :P
# ...
### THIS CODE IS are a replacement for EXTERNAL API'S. Assume this is actually an REST API or something like that...
### that you might call to get data from...

def get_source_A(day: str):
    """
    I am a "mostly" stateless event driven API. I return events with an "event_time"...
    
    I return the "sales" data for a given day
    
    """
    return pd.read_csv(f"source_data/sales_day_{day}.csv")
    
def get_source_B():
    """
    I am a mostly stateful API. I return the "state" as it is.
    
    I return the "user" data.
    """
    return pd.read_csv(f"source_data/users_v{random.randint(1,2)}.csv")


print("===\n Here we can sneek a peak at the sales data for day 01")
print(get_source_A("01"))
print("===\n And here's the user data")
print(get_source_B())

===
 Here we can sneek a peak at the sales data for day 01
   sales_date   quantity   revenue   user_id
0  2022-01-01          1         5         1
1  2022-01-01          2        10         2
2  2022-01-01          4        20         3
===
 And here's the user data
   user_id   name     status
0        1   sven    premium
1        2   john   standard
2        4    eve    premium


**Step 1**: As in every data warehouse, the step 1 is to get the data into the system, into some kind of "staging area". Since we're building a fully functional data warehouse here, our "staging area" is going to be immutable, or persitent. 

To do so we:

1. Call the source_A once a day, and put the results into a file called "sales_A.csv". We add a column for the "event processing time" to make sure we are able to replay the ingestion (which is just a simple select on that column). 
2. Call the source_B once a day, and snapshot the whole raw data into files "users_dayXYZ.csv".

In [146]:
# ----------------------------------------------------------------------------------------------------------------
# Data Ingestion Code
# API Data => Immutable Staging Area
def stage_source_A(today: str) -> (str, str):
    data = get_source_A(today)
    dt = datetime.now()
    event_processing_time = datetime.timestamp(dt)
    data["event_proc_time"] = event_processing_time
    target_file = f'staging_source_A/{today}_{event_processing_time}_data.csv'
    data.to_csv(target_file, index=False)
    
    return event_processing_time, target_file

### this runs daily! 
stage_source_A("01")    
### But if we run it twice, the time at which we run it is an implicit input and recorded as well, so this:...
ep_time_sales, sales_partition = stage_source_A("01")   
print(ep_time_sales, sales_partition)
### Is still going to produce an immutable staging area!

def stage_source_B(today: str)-> (str, str):
    data = get_source_B()
    dt = datetime.now()
    event_processing_time = datetime.timestamp(dt)
    data["event_proc_time"] = event_processing_time
    target_file = f'staging_source_B/{event_processing_time}_data.csv'
    data.to_csv(target_file, index=False)
    
    return event_processing_time, target_file

### this runs daily! But really, we don't care at all how often it runs, because it doesn't carry any timestamp 
### anyways...
stage_source_B("01")    
stage_source_B("01") 
ep_time_users_1, users_partition_1 = stage_source_B("01") 
ep_time_users_2, users_partition_2 = stage_source_B("02")    

print(ep_time_users, users_partition)
### Is still going to produce an immutable staging area!

1652269546.612016 staging_source_A/01_1652269546.612016_data.csv
1652269481.905827 staging_source_B/1652269481.905827_data.csv



**Step 2**: As in every data warehouse, the next step is to build up a modelling area where we process our 
    data a bit. This is going to be minor here, we're ...
    
1. Going to remove the "names" from the raw user input, we don't need them.
2. We're going to keep the partitioning based on event_time
3. For the "sales data" we add a column article filled with "article_1". Right now we just have one article but 
we plan on adding more so that seems nice to have
   


In [149]:
def model_sales(today: str, proc_time: str, partition: str) -> str:
    # Load the right partition
    data = pd.read_csv(partition)
    
    # Do some modelling
    data["article"] = "article_1"
    target_file = f'model_sales/{today}_{proc_time}.csv'
    
    # Store new data
    data.to_csv(target_file)
    return proc_time, target_file

# Let's do our modelling run for today on the partition we've loaded before
model_sales("01", ep_time_sales, sales_partition)

# And yes, doing this a dozen times is fine because it's idempotent!
model_sales("01", ep_time_sales, sales_partition)
model_ep_time_sales, model_sales_partition  = model_sales("01", ep_time_sales, sales_partition)
print(model_sales("01", ep_time_sales, sales_partition))


# And yes again, the "today" part is just a bit of convenience, no need to calculate with time stamps... 

def model_users(proc_time: str, partition: str) -> str:
    # Load the right partition
    data = pd.read_csv(partition, header=0)

    # Do some modelling
    data.drop(columns=["name"], axis=0, inplace=True)
    target_file = f'model_users/{proc_time}.csv'

    # Store new data
    data.to_csv(target_file)
    return proc_time, target_file

model_users(ep_time_users_1, users_partition_1)
model_users(ep_time_users_2, users_partition_2)

# Again, this is idempotent so we can do this again and again!!! No worries about dropping something here,
# We can always restore it from the raw data and the partition....
print(model_users(ep_time_users_2, users_partition_2))


(1652269546.612016, 'model_sales/01_1652269546.612016.csv')
(1652269546.649704, 'model_users/1652269546.649704.csv')


**Step 3**: This is going to be a fun step, now we want to provide the data to the external world, so we decide to aggregate the data into a new table agg_sales containing a join of the two models on the user_id. Here's what 
we're going to do:

1. Join the two models on the user_id
2. BUT we will write a small "macro" to get the latest version of our user data
3. (We're not going to write one for the sales data, we're assuming we're just doing one daily run, be we could do multiple ones!)


In [181]:
# Sales data
today = "01"

def latest_user_partition():
    """
    Returns the filepath of the latest user partition...
    
    """
    from os import listdir
    user_partitions = listdir("./model_users")
    user_partitions.sort()
    user_partitions.pop(0)
    return f"model_users/{user_partitions[-1]}"

latest_user_partition = latest_user_partition()

sales_data = pd.read_csv(model_sales_partition, header=0)
users_data = pd.read_csv(latest_user_partition, header=0)

agg_sales = pd.merge(sales_data, users_data[["user_id","status"]], left_on=' user_id', right_on='user_id')

target_file = f'agg_sales/{today}_{proc_time}.csv'

agg_sales.to_csv(target_file)

print(agg_sales)

### Again, this is idempotent! Run it again, and only one file is there...

   Unnamed: 0  sales_date   quantity   revenue   user_id  event_proc_time  \
0           0  2022-01-01          1         5         1     1.652270e+09   
1           1  2022-01-01          2        10         2     1.652270e+09   
2           2  2022-01-01          4        20         3     1.652270e+09   

     article  user_id     status  
0  article_1        1   standard  
1  article_1        2   standard  
2  article_1        3    premium  
