# From data lake to data warehouse

Data warehouse and a data lake serve different needs, and use cases.

A data lake stores relational data from business applications, and non-relational data 
- The structure of the data or schema is not defined when data is captured
- You can store all of your data without careful design or the need to know what questions you might need answers 

A data warehouse is a database optimized to analyze relational data coming from business applications
- The data structure, and schema are defined in advance to optimize for fast SQL queries
- The results are typically used for operational reporting and analysis
- Data is cleaned, enriched, and transformed so it can act as the single source of truth

Organizations are evolving their warehouse to include data lakes, and enable diverse query capabilities

**Goal**: build a sub-module of the expert system, namely a DWH to monitor historical trends of soil moisture

Necessary steps:

1. Create a data lake (AWS S3)
2. Collect and store the sensor data (manually)
   - KISS: we start with a .csv with sensor data
3. Do ETL (AWS SageMaker and/or AWS Glue)
4. Build a (relational) data warehouse (AWS RDS)
5. Query the data warehouse (Tableau)

## 1. Create a data lake

See the AWS console
- https://awsacademy.instructure.com/login/canvas

![image](https://user-images.githubusercontent.com/18005592/200340672-756adacd-bad6-4aca-b7ca-8c03ff20d928.png)


![image](https://user-images.githubusercontent.com/18005592/200342442-edaa7560-95a2-47d1-b6b2-ccc4ae9abe0c.png)

![image](https://user-images.githubusercontent.com/18005592/200342733-326a05b7-9f6e-438f-9749-1322412d7321.png)

![image](https://user-images.githubusercontent.com/18005592/200343321-444d3d2b-3e95-4296-baaa-29cf10616736.png)

![image](https://user-images.githubusercontent.com/18005592/200344531-88e92bbb-c0c0-456e-a532-a116d8d330f4.png)

Create two buckets:
- `landing-raw-wateringsensors-123`
- `staging-clean-wateringsensors-123`

![image](https://user-images.githubusercontent.com/18005592/200345832-d3522fdb-92ee-4457-8a78-5c0449af6bc4.png)

![image](https://user-images.githubusercontent.com/18005592/200346937-410c49dc-6685-447c-9b18-4970f1954e6c.png)

In [22]:
input_bucket = "s3://landing-raw-wateringsensors-123/"
output_bucket = "s3://staging-clean-wateringsensors-123/"

## 2. Collect and store the sensor data (manually)

KISS: we start with a .csv with sensor data
- http://big.csr.unibo.it/projects/nosql-datasets/watering-data-1661269649253.csv

In [None]:
!pip install awswrangler
!pip install pandas
!pip install sklearn
!pip install seaborn

In [19]:
import awswrangler as wr
import pandas as pd
file_name = "watering-data-1661269649253.csv"
df = pd.read_csv("http://big.csr.unibo.it/projects/nosql-datasets/" + file_name)
# wr.s3.to_csv(df, path=input_bucket + file_name)

## 3. Do ETL (AWS SageMaker)

In [24]:
# import the data
print(input_bucket + file_name)
df = wr.s3.read_csv(input_bucket + file_name)
df

s3://landing-raw-wateringsensors-123/watering-data-1661269649253.csv


NoCredentialsError: Unable to locate credentials

Show some statistics

In [None]:
df.describe().transpose()

In [None]:
df.info()

Let's plot the distribution of the data

In [None]:
cols = 3
rows = math.ceil(len(df.columns) / cols)

fig, axs = fig, ax = plt.subplots(cols, rows, figsize = (4 * rows, 3 * cols))
i = 0

for x in df.columns:
    ax = axs[int(i / cols)][i % cols]
    # df[x].value_counts().plot(kind="bar", ax=ax)
    df[x].hist(ax=ax)
    ax.set_title(x)
    i += 1

fig.tight_layout()

The `zz` column contains missing values and a single non-null value (`0`), what should we do?

In [None]:
if "zz" in df.columns:
    df = df.drop(columns=["zz"])
df

What else can we do?

- Do we need to store the `plantRow`?
- Do the sensors from the same `detectedValueTypeId` share the same `unit`?
- Do we care about all the sensor types (i.e., `detectedValueTypeId`)?

In [None]:
df.groupby(["detectedValueTypeId"]).nunique()

Drop the "useless" columns

In [None]:
df = df.drop(["plantRow", "unit"], axis=1)
df

Drop the useless rows

In [None]:
df = df[df["detectedValueTypeId"] == "GRND_WATER_G"]
df = df.drop(["detectedValueTypeId"], axis=1)
df

Take a better look at the sensor data over time

In [None]:
fig, ax = plt.subplots()
for key, grp in df.sort_values(by=["timestamp"]).groupby(['xx', 'yy']):
    ax = grp.plot(ax=ax, kind='line', x='timestamp', y='value', label=key)

ax.set_xticklabels([pd.to_datetime(tm, unit='s').strftime('%Y-%m-%d %H:%M:%S') for tm in ax.get_xticks()], rotation = 45)

Snapshots of sensor data, is it everything normal?

In [None]:
for i, timestamp in enumerate([1628950550, 1627101007]):
    df[df["timestamp"] == timestamp].plot.scatter(x='xx', y='yy', c='value', cmap='seismic_r', s=100, title=timestamp)

What if we bin our data hourly?

In [None]:
for i, timestamp in enumerate([1628950550, 1627101007]):
    df[df["timestamp"].apply(lambda x: int(x / (60 * 60))) == int(timestamp / (60 * 60))].plot.scatter(x='xx', y='yy', c='value', cmap='seismic_r', s=100, title=timestamp)

So we can bin our data hourly, and create derived attributes useful for posterior analysis.
Finally, save the data frame back to s3.

In [None]:
df["timestamp"] = df["timestamp"].apply(lambda x: int(x / 3600) * 3600)  # bin the time by hours
df = df.groupby(["timestamp", "xx", "yy"])["value"].mean().reset_index()
df["hour"] = [pd.to_datetime(tm, unit='s').strftime('%Y-%m-%d %H') for tm in df["timestamp"]]
df["date"] = [pd.to_datetime(tm, unit='s').strftime('%Y-%m-%d') for tm in df["timestamp"]] 
df["month"] = [pd.to_datetime(tm, unit='s').strftime('%Y-%m') for tm in df["timestamp"]] 
df["year"] = [pd.to_datetime(tm, unit='s').strftime('%Y') for tm in df["timestamp"]] 

# wr.s3.to_csv(df=df, path=output_path + "/v1.csv")
wr.s3.to_csv(df=df, path=output_path + "/v1.csv", index=False)

df["sensor"] = df.apply(lambda x: "(" + str(x["xx"]) + ", " + str(x["yy"]) + ")", axis=1)
df = df.rename({"xx": "dist", "yy": "depth"}, axis=1)

wr.s3.to_csv(df=df, path=output_path + "/v2.csv", index=False)
df

In [None]:
databases = wr.catalog.databases()
databases

In [None]:
df_tables = wr.catalog.tables()
df_tables

In [None]:
pdf = df[["sensor", "value", "timestamp"]].pivot(index='timestamp', columns='sensor')
pdf.columns = pdf.columns.droplevel(0)
pdf = pdf.reset_index().rename_axis(None, axis=1)
pdf = pdf.drop(["timestamp"], axis=1)
pdf

In [None]:
sns.pairplot(pdf)

In [None]:
wr.s3.to_csv(df=pdf, path=output_path + "/pivot.csv", index=False)

## 4. Build a (relational) data warehouse (AWS RDS)

## 5. Query the data warehouse (Tableau)