# From data lake to data warehouse

Disclaimer:

- Link to the [AWS academy login](https://awsacademy.instructure.com/login/canvas): https://awsacademy.instructure.com/login/canvas
- At the end of the lab, *remember to Stop/Destroy the created services*. (Why?)
- *Upload the Notebook to Sagemaker* (not to COLAB!)
- The second part of the lab requires [Tableau Desktop](https://www.tableau.com/products/desktop/download) installed on your machine

Known issues:

- AWS classroom does not work on *Safari* and *Firefox*.
    - Solution: use Google Chrome
- Tableau Desktop had problems on *macOS* when connecting to Postgres.
    - Solution: use the lab's computers

# Data lake vs Data warehouse

**Data warehouse** and **data lake** serve different needs and use cases.

A *data lake* stores relational data from business applications, and non-relational data

- The structure of the data or schema is not defined when data is captured (*schema on read*)
- You can store all of your data without careful design or the need to know what questions you might need answers 

A *data warehouse* is a database optimized to analyze relational data coming from business applications

- The data structure and schema are defined in advance to optimize for fast SQL queries (*schema on write*)
- The results are typically used for operational reporting and analysis
- Data is cleaned, enriched, and transformed so it can act as the single source of truth

# Roadmap

**Goal**: build a sub-module of a decision support system, namely a DWH to monitor historical trends of soil moisture

Necessary steps:

1. Create a data lake (AWS S3)
2. Collect and store the sensor data (manually)
   - KISS: we start with a .csv with sensor data
3. Do ETL (AWS SageMaker)
4. Build a (relational) data warehouse (AWS RDS)
5. Query the data warehouse (Tableau)

# 1. Create a data lake

Log in to the [AWS console](https://awsacademy.instructure.com/login/canvas)

:::: {.columns}
::: {.column width=40%}

![Click on `Dashbord`](https://user-images.githubusercontent.com/18005592/200340672-756adacd-bad6-4aca-b7ca-8c03ff20d928.png)

:::
::: {.column width=40%}

![Click on `Courses`](https://user-images.githubusercontent.com/18005592/200342442-edaa7560-95a2-47d1-b6b2-ccc4ae9abe0c.png)

:::
::::

# Start the lab

![Click on `Start Lab`](https://user-images.githubusercontent.com/18005592/200342733-326a05b7-9f6e-438f-9749-1322412d7321.png)

When AWS is green, click on it

![Click on `AWS`](https://user-images.githubusercontent.com/18005592/200343321-444d3d2b-3e95-4296-baaa-29cf10616736.png)

# Welcome to the AWS Console!

![AWS Console](https://user-images.githubusercontent.com/18005592/200344531-88e92bbb-c0c0-456e-a532-a116d8d330f4.png)

# Data Lake

AWS Simple Storage Service (S3)

- A serverless object storage service offering industry-leading scalability, data availability, security, and performance. 
- Customers of all sizes and industries can store and protect any amount of data for virtually any use case, such as data lakes

Create two buckets

In [None]:
input_bucket = "s3://landing-raw-wateringsensors-1234/"
output_bucket = "s3://staging-clean-wateringsensors-12345/"

:::: {.columns}
::: {.column width=50%}

![](https://user-images.githubusercontent.com/18005592/200345832-d3522fdb-92ee-4457-8a78-5c0449af6bc4.png)

:::
::: {.column width=50%}

![](https://user-images.githubusercontent.com/18005592/200346937-410c49dc-6685-447c-9b18-4970f1954e6c.png)

:::
::::

# 2. Collect and store the sensor data (manually)

KISS: we start with a .csv with sensor data

- [http://big.csr.unibo.it/projects/nosql-datasets/watering-data-1661269649253.csv](http://big.csr.unibo.it/projects/nosql-datasets/watering-data-1661269649253.csv)

# 3. Do ETL

We need a working environment: Amazon SageMaker

- Fully managed service that provides machine learning (ML) capabilities for data scientists and developers to prepare, build, train, and deploy high-quality ML models efficiently

![SageMaker](https://user-images.githubusercontent.com/18005592/200364098-6390f3a4-e1cf-4041-9d88-ab425ed9933b.png)

# Creating a notebook instance

:::: {.columns}
::: {.column width=50%}

![Create a notebook instance](https://user-images.githubusercontent.com/18005592/200364480-53d6fe54-aac6-431c-a664-87712f82d2cf.png)

:::
::: {.column width=50%}

![When ready, Open Jupyter](https://user-images.githubusercontent.com/18005592/200364714-37992354-a047-440e-bfe3-12df1e1264ad.png)

:::
::::



# Upload the notebook

![Upload the notebook](https://user-images.githubusercontent.com/18005592/200364905-e6b7f5b6-ee52-4e8e-9df3-b2e8b9b5afb0.png)

![Set the kernel](https://user-images.githubusercontent.com/18005592/200365269-0b85f661-a0c8-4902-9c93-eb978b5226bd.png)

# Hands on!

In [None]:
!pip install awswrangler==3.4.2
!pip install pandas==2.1.2
!pip install scikit-learn==1.3.0
!pip install seaborn==0.12.2

In [None]:
import awswrangler as wr
import pandas as pd
file_name = "watering-data-1661269649253.csv"  # name of the dataset
df = pd.read_csv("http://big.csr.unibo.it/projects/nosql-datasets/" + file_name)  # import the dataset from unibo's server
wr.s3.to_csv(df, path=input_bucket + file_name)  # write it to the data lake

# 3. Do ETL (AWS SageMaker)

In [None]:
print(input_bucket + file_name)
df = wr.s3.read_csv(input_bucket + file_name)  # import the data
df

Data understanding

- `plantRow`: name of the field in which we have the sensors
- `detectedValueTypeId`: type of the sensor data
- `xx`, `yy`, and `zz`: displacement of the sensor with respect to the dripper
- `value`: measurement
- `unit`: unit of measurement
- `timestamp`: when the measurement has been recorded

# Data profiling

In [None]:
df.describe().transpose()

In [None]:
df.info()

# Data distribution

In [None]:
import math
import matplotlib.pyplot as plt

def plot(df, cols = 5):
    rows = math.ceil(len(df.columns) / cols)
    fig, axs = fig, ax = plt.subplots(rows, cols, figsize = (3 * cols, 2 * rows))
    for i, x in enumerate(df.columns):
        ax = axs[int(i / cols)][i % cols]
        df[x].hist(ax=ax)
        ax.set_title(x)
    fig.tight_layout()
plot(df)

#

The `zz` column contains missing values and a single non-null value (`0`), what should we do?

In [None]:
if "zz" in df.columns:
    df = df.drop(columns=["zz"])
df

#

- Do we need to store the `plantRow`?
- Do the sensors from the same `detectedValueTypeId` share the same `unit`?
- Do we care about all the sensor types (i.e., `detectedValueTypeId`)?

In [None]:
df.groupby(["detectedValueTypeId"]).nunique()

#

Drop the "useless" columns

In [None]:
# What should we drop?
# df = df.drop(["plantRow", "unit"], axis=1)
df

# 

Drop the useless rows

In [None]:
df = df[df["detectedValueTypeId"] == "GRND_WATER_G"]
df = df.drop(["detectedValueTypeId"], axis=1)
df

Take a better look at the sensor data over time

In [16]:
def plot():
    fig, ax = plt.subplots(figsize=(4,3))
    for key, grp in df.sort_values(by=["timestamp"]).groupby(['xx', 'yy']):
        ax = grp.plot(ax=ax, kind='line', x='timestamp', y='value', label=key)

    ax.set_xticklabels([pd.to_datetime(tm, unit='s').strftime('%Y-%m-%d %H:%M:%S') for tm in ax.get_xticks()], rotation = 45)

In [None]:
plot()

#

Snapshots of sensor data at different timestamps

In [None]:
def plot():
    fig, ax = plt.subplots(1, 3, figsize=(12, 3), sharey=True)
    for i, timestamp in enumerate([1628950550, 1626309949, 1627101007]):
        df[df["timestamp"] == timestamp].plot.scatter(ax=ax[i], x='xx', y='yy', c='value', cmap='seismic_r', s=100, title=pd.to_datetime(timestamp, unit='s').strftime('%Y-%m-%d %H:%M:%S'))
    fig.tight_layout()
plot()

What if we bin our data hourly?

In [None]:
def plot():
    fig, ax = plt.subplots(1, 3, figsize=(12, 3), sharey=True)
    for i, timestamp in enumerate([1628950550, 1626309949, 1627101007]):
        df[df["timestamp"].apply(lambda x: int(x / (60 * 60))) == int(timestamp / (60 * 60))].plot.scatter(ax=ax[i], x='xx', y='yy', c='value', cmap='seismic_r', s=100, title=pd.to_datetime(int(timestamp / (60 * 60)) * 60 * 60, unit='s').strftime('%Y-%m-%d %H:%M:%S'))
    fig.tight_layout()
plot()

# Data preparation

- Bin data hourly, average the soil moisture for each sensor 
- Create derived attributes (e.g., time/sensor hierarchy) useful for posterior analysis

In [22]:
df["timestamp"] = df["timestamp"].apply(lambda x: int(x / 3600) * 3600)  # bin the time by hours
df1 = df.groupby(["plantRow", "timestamp", "xx", "yy"])["value"].mean().reset_index()  # compute the average humidity
df1["hour"] = [pd.to_datetime(tm, unit='s').strftime('%Y-%m-%d %H') for tm in df1["timestamp"]]  # format the hour
df1["date"] = [pd.to_datetime(tm, unit='s').strftime('%Y-%m-%d') for tm in df1["timestamp"]]  # format the date
df1["month"] = [pd.to_datetime(tm, unit='s').strftime('%Y-%m') for tm in df1["timestamp"]]  # format the month
df1["year"] = [pd.to_datetime(tm, unit='s').strftime('%Y') for tm in df1["timestamp"]]  # format the year
df1["timestamp"] = [pd.to_datetime(tm, unit='s').strftime('%Y-%m-%d %H:%M:%S') for tm in df1["timestamp"]]  # format the timestamp

df2 = df1.copy(deep=True)
df2["sensor"] = df2.apply(lambda x: "(" + str(x["xx"]) + ", " + str(x["yy"]) + ")", axis=1)
df2 = df2.rename({"xx": "dist", "yy": "depth", "plantRow": "plant"}, axis=1)

#

In [None]:
df1

In [None]:
df2

Finally, save the data back to S3.

In [25]:
# wr.s3.to_csv(df=df1, path=output_bucket + "cleaned-v1-" + file_name, index=False)

In [None]:
wr.s3.to_csv(df=df2, path=output_bucket + "cleaned-v2-" + file_name, index=False)

#

In [None]:
databases = wr.catalog.databases()
databases

In [None]:
df_tables = wr.catalog.tables()
df_tables

In [None]:
pdf = df2[["sensor", "value", "timestamp"]].pivot(index='timestamp', columns='sensor')
pdf.columns = pdf.columns.droplevel(0)
pdf = pdf.reset_index().rename_axis(None, axis=1)
pdf

In [30]:
# import seaborn as sns
# sns.pairplot(pdf)
# plt.show()

In [31]:
# wr.s3.to_csv(df=pdf, path=output_bucket + "/pivot.csv", index=False)

# 4. Build a (relational) data warehouse (AWS RDS)

Amazon Relational Database Service (Amazon RDS) 
- A collection of managed services that makes it simple to set up, operate, and scale relational databases in the cloud


![image](https://user-images.githubusercontent.com/18005592/200369753-f6d9e73e-6d7f-4d4b-bad6-311528e51629.png)

# Create a database

![image](https://github.com/w4bo/welaser/assets/18005592/ec3cdca6-548f-4370-95ca-107fcbb215ed)

# Create a database

In [32]:
pwd = "bigdata2023"

![image](https://user-images.githubusercontent.com/18005592/200370529-ae695d6f-9629-49bb-ae13-a4f5caeb9a0d.png)

# Create a database

![image](https://user-images.githubusercontent.com/18005592/200373378-f257ef56-aea9-4eb3-a922-2a4a3b2a1d11.png)

# Create a database

By default, even if you have set "Public accessibility" to "Yes", the "Security Group" is still not allowing external connections yet.

- Virtual private clouds (VPC): a VPC is a virtual network that closely resembles a traditional network that you'd operate in your own data center
- A security group acts as a virtual firewall for your AWS instances to control incoming/outgoing traffic
    - When you launch an instance, you can specify one or more security groups
    - If you don't specify a security group, Amazon EC2 uses the default security group
- Your AWS account automatically has a default security group for the default VPC in each Region

# Create a database

From [Default security groups (2022-11-15)](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/default-custom-security-groups.html#default-security-group)

![image](https://user-images.githubusercontent.com/18005592/201981163-0bd75761-be85-41ea-aaf9-6af66e3e41ac.png)


# Create a database

You can add or remove inbound and outbound rules for any default security group.

![image](https://user-images.githubusercontent.com/18005592/201690868-abdd53f7-c6b5-48d4-89ce-e9f6f374a8ce.png)


# Create a database

![image](https://user-images.githubusercontent.com/18005592/201691253-db130df8-c31a-4c6f-aa69-34cf4d89f0e2.png)

# Create a database

![image](https://user-images.githubusercontent.com/18005592/201691398-a16f467e-c6cd-4dd9-8fc3-f31b45740ec4.png)

# Create a database

What is the schema of our dataset?

In [None]:
df2.columns

In [None]:
raw_data = wr.s3.read_csv(output_bucket + "cleaned-v2-" + file_name)
raw_data

- Is this schema normalized?
- Is this the schema you are used to when building DWH?

# Creating the DWH

We need to distinguish the fact and the dimension tables:

- FT: Measurement
- DT1: Time
- DT2: Sensor

How?

# DT Time

In [None]:
dt_time = raw_data[["timestamp", "date", "month", "year"]].drop_duplicates()
dt_time

# DT Sensor

In [None]:
dt_sensor = raw_data[["sensor", "dist", "depth", "plant"]].drop_duplicates()
dt_sensor

# FT measurement

In [None]:
ft_mea = raw_data[["sensor", "timestamp", "value"]].drop_duplicates()
ft_mea

# 

We need to connect to the database...

![image](https://user-images.githubusercontent.com/18005592/200372868-4881edde-4f7f-4024-8ffb-cc3836d1a367.png)

In [38]:
host = "sensor-dwh1.crxqb7bplkfq.us-east-1.rds.amazonaws.com"
port = 5432
user = "postgres"
db = "postgres"

from sqlalchemy import create_engine
s = 'postgresql://{}:{}@{}:{}/{}'.format(user, pwd, host, str(port), db)
engine = create_engine(s)

#

... and just write the tables

In [None]:
dt_sensor.to_sql('sensor', engine, index=False, if_exists='replace')

In [None]:
dt_time.to_sql('date', engine, index=False, if_exists='replace')

In [None]:
ft_mea.to_sql('measurement', engine, index=False, if_exists='replace')

## 5. Query the data warehouse (Tableau)

![image](https://user-images.githubusercontent.com/18005592/200375443-c023c9f7-6df4-4717-91a6-4d9ea230eaea.png)

Tasks

1. Build the hierarchies by following the functional dependencies
2. Plot the average soil moisture (`avg(value)`) by `sensor` and `date`. Is soil moisture behaving as expected for all sensors?
3. Plot the standard deviation of the soil moisture for every sensor. What can you tell from this visualization?