In [None]:
spark

## Convert column name to convention

It is possible that there are variety of naming columns or fields of raw data, however we should have a common convention for our data in datalake so that data users can use it with intuition and no confusion.

As our tables in raw have coluns in `camel-casing` convention, but we need `snake-casing` in our datalake.

![Data Models](https://www.mysqltutorial.org/wp-content/uploads/2009/12/MySQL-Sample-Database-Schema.png)

For an example, `productlines` need to change columns as following:

- `productLine` changed to `product_line`
- `textDescription` chagned to `text_description`
- `htmlDescription` changed to `html_description`
- `image` nothing to change

We need to transform column names and store all tranformed models in: `s3a://datalake/exercises/bronze/classicmodels/<table>.parquet`

In [None]:
def camel2snake(name):
    """Convert from camel-case to snake-case"""
    result = []
    for x, y in zip(name[:-1], name[1:]):
        result.append(x.lower())
        if x.islower() and (y.isupper() or y.isdigit()):
            result.append("_")
    result.append(y.lower())
    return "".join(result)

## Productlines

In [None]:
raw_productlines = (
    spark
        .read
        .format("parquet")
        .load("s3a://datalake/exercises/raw/classicmodels/productlines.parquet")
)

In [None]:
raw_productlines_columns = raw_productlines.columns
bronze_productlines_columns = list(map(camel2snake, raw_productlines_columns))

In [None]:
bronze_productlines = raw_productlines.toDF(*bronze_productlines_columns)
bronze_productlines.printSchema()

In [None]:
(
    bronze_productlines
        .write
        .format("parquet")
        .mode("overwrite")
        .save("s3a://datalake/exercises/bronze/classicmodels/productlines.parquet")
)

In [None]:
df = spark.read.format("parquet").load("s3a://datalake/exercises/bronze/classicmodels/productlines.parquet")

assert sorted(df.columns) == ['html_description', 'image', 'product_line', 'text_description']

## Products

In [None]:
raw_products = (
    spark
        .read
        .format("parquet")
        .load("s3a://datalake/exercises/raw/classicmodels/products.parquet")
)

In [None]:
raw_products_columns = raw_products.columns
bronze_products_columns = list(map(camel2snake, raw_products_columns))

In [None]:
bronze_products = raw_products.toDF(*bronze_products_columns)
bronze_products.printSchema()

In [None]:
(
    bronze_products
        .write
        .format("parquet")
        .mode("overwrite")
        .save("s3a://datalake/exercises/bronze/classicmodels/products.parquet")
)

In [None]:
df = spark.read.format("parquet").load("s3a://datalake/exercises/bronze/classicmodels/products.parquet")

assert sorted(df.columns) == ['buy_price', 'msrp', 'product_code',
                              'product_description', 'product_line', 'product_name',
                              'product_scale', 'product_vendor', 'quantity_in_stock']

## Employees

In [None]:
raw_employees = (
    spark
        .read
        .format("parquet")
        .load("s3a://datalake/exercises/raw/classicmodels/employees.parquet")
)

In [None]:
raw_employees_columns = raw_employees.columns
bronze_employees_columns = list(map(camel2snake, raw_employees_columns))

In [None]:
bronze_employees = raw_employees.toDF(*bronze_employees_columns)
bronze_employees.printSchema()

In [None]:
(
    bronze_employees
        .write
        .format("parquet")
        .mode("overwrite")
        .save("s3a://datalake/exercises/bronze/classicmodels/employees.parquet")
)

In [None]:
df = spark.read.format("parquet").load("s3a://datalake/exercises/bronze/classicmodels/employees.parquet")

assert sorted(df.columns) == ['email', 'employee_number', 'extension',
                              'first_name', 'job_title', 'last_name',
                              'office_code', 'reports_to']

## Offices

In [None]:
raw_offices = (
    spark
        .read
        .format("parquet")
        .load("s3a://datalake/exercises/raw/classicmodels/offices.parquet")
)

In [None]:
raw_offices_columns = raw_offices.columns
bronze_offices_columns = list(map(camel2snake, raw_offices_columns))

In [None]:
bronze_offices = raw_offices.toDF(*bronze_offices_columns)
bronze_offices.printSchema()

In [None]:
(
    bronze_offices
        .write
        .format("parquet")
        .mode("overwrite")
        .save("s3a://datalake/exercises/bronze/classicmodels/offices.parquet")
)

In [None]:
df = spark.read.format("parquet").load("s3a://datalake/exercises/bronze/classicmodels/offices.parquet")

assert sorted(df.columns) == ['address_line_1', 'address_line_2', 'city',
                              'country', 'office_code', 'phone',
                              'postal_code', 'state', 'territory']

## Customers

In [None]:
raw_customers = (
    spark
        .read
        .format("parquet")
        .load("s3a://datalake/exercises/raw/classicmodels/customers.parquet")
)

In [None]:
raw_customers_columns = raw_customers.columns
bronze_customers_columns = list(map(camel2snake, raw_customers_columns))

In [None]:
bronze_customers = raw_customers.toDF(*bronze_customers_columns)
bronze_customers.printSchema()

In [None]:
(
    bronze_customers
        .write
        .format("parquet")
        .save("s3a://datalake/exercises/bronze/classicmodels/customers.parquet")
)

In [None]:
df = spark.read.format("parquet").load("s3a://datalake/exercises/bronze/classicmodels/customers.parquet")

assert sorted(df.columns) == ['address_line_1', 'address_line_2', 'city',
                              'contact_first_name', 'contact_last_name', 'country',
                              'credit_limit', 'customer_name', 'customer_number',
                              'phone', 'postal_code', 'sales_rep_employee_number',
                              'state']

## Payments

In [None]:
raw_payments = (
    spark
        .read
        .format("parquet")
        .load("s3a://datalake/exercises/raw/classicmodels/payments.parquet")
)

In [None]:
raw_payments_columns = raw_payments.columns
bronze_payments_columns = list(map(camel2snake, raw_payments_columns))

In [None]:
bronze_payments = raw_payments.toDF(*bronze_payments_columns)
bronze_payments.printSchema()

In [None]:
(
    bronze_payments
        .write
        .format("parquet")
        .save("s3a://datalake/exercises/bronze/classicmodels/payments.parquet")
)

In [None]:
df = spark.read.format("parquet").load("s3a://datalake/exercises/bronze/classicmodels/payments.parquet")

assert sorted(df.columns) == ['amount', 'check_number', 'customer_number', 'payment_date']

## Orders

In [None]:
raw_orders = (
    spark
        .read
        .format("parquet")
        .load("s3a://datalake/exercises/raw/classicmodels/orders.parquet")
)

In [None]:
raw_orders_columns = raw_orders.columns
bronze_orders_columns = list(map(camel2snake, raw_orders_columns))

In [None]:
bronze_orders = raw_orders.toDF(*bronze_orders_columns)
bronze_orders.printSchema()

In [None]:
(
    bronze_orders
        .write
        .format("parquet")
        .mode("overwrite")
        .save("s3a://datalake/exercises/bronze/classicmodels/orders.parquet")
)

In [None]:
df = spark.read.format("parquet").load("s3a://datalake/exercises/bronze/classicmodels/orders.parquet")

assert sorted(df.columns) == ['comments', 'customer_number', 'order_date',
                              'order_number', 'required_date', 'shipped_date',
                              'status']

## Orderdetails

In [None]:
raw_orderdetails = (
    spark
        .read
        .format("parquet")
        .load("s3a://datalake/exercises/raw/classicmodels/orderdetails.parquet")
)

In [None]:
raw_orderdetails_columns = raw_orderdetails.columns
bronze_orderdetails_columns = list(map(camel2snake, raw_orderdetails_columns))

In [None]:
bronze_orderdetails = raw_orderdetails.toDF(*bronze_orderdetails_columns)
bronze_orderdetails.printSchema()

In [None]:
(
    bronze_orderdetails
        .write
        .format("parquet")
        .mode("overwrite")
        .save("s3a://datalake/exercises/bronze/classicmodels/orderdetails.parquet")
)

In [None]:
df = spark.read.format("parquet").load("s3a://datalake/exercises/bronze/classicmodels/orderdetails.parquet")

assert sorted(df.columns) == ['order_line_number', 'order_number', 'price_each', 'product_code', 'quantity_ordered']