# **Transformations Using PY-SPARK**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:

spark.conf.set("fs.azure.account.auth.type.project1ram.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.project1ram.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.project1ram.dfs.core.windows.net", "6112cc64-d085-4ac8-b234-4939111cc842")
spark.conf.set("fs.azure.account.oauth2.client.secret.project1ram.dfs.core.windows.net","Of38Q~MxCDwDCoAgXUoyjcrIAXgmxY~oixx8Tdr4")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.project1ram.dfs.core.windows.net", "https://login.microsoftonline.com/4c99ca7b-b160-4af0-9188-28a5941f0cd9/oauth2/token")

## **Customers Dataset Transformations**

In [0]:
df_customers=spark.read.format("csv").option("header","true").option("inferSchema","true").load("abfss://bronze@project1ram.dfs.core.windows.net/olist_customers_dataset/")

In [0]:
df_customers.display(100)

In [0]:
brazil_states = {
    "AC": "Acre",
    "AL": "Alagoas",
    "AP": "Amapa",
    "AM": "Amazonas",
    "BA": "Bahia",
    "CE": "Ceara",
    "DF": "Distrito Federal",
    "ES": "Espirito Santo",
    "GO": "Goias",
    "MA": "Maranhao",
    "MT": "Mato Grosso",
    "MS": "Mato Grosso do Sul",
    "MG": "Minas Gerais",
    "PA": "Para",
    "PB": "Paraiba",
    "PR": "Parana",
    "PE": "Pernambuco",
    "PI": "Piaui",
    "RJ": "Rio de Janeiro",
    "RN": "Rio Grande do Norte",
    "RS": "Rio Grande do Sul",
    "RO": "Rondonia",
    "RR": "Roraima",
    "SC": "Santa Catarina",
    "SP": "Sao Paulo",
    "SE": "Sergipe",
    "TO": "Tocantins"
}


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
def get_state_fullform(state):
    return brazil_states[state]

In [0]:
get_state_fullform = udf(get_state_fullform,StringType())

In [0]:
df_customers = df_customers.withColumn("customer_state_fullform",get_state_fullform(col("customer_state")))
df_customers.display()

In [0]:
from itertools import chain
map_exp =create_map([lit(x) for x in chain(*brazil_states.items())])

In [0]:
df_customers = df_customers.withColumn("customer_state_fullform",map_exp.getItem(col("customer_state")))
df_customers.display()

In [0]:
state_to_region = {

    "AC": "North", "AP": "North", "AM": "North", "PA": "North",
    "RO": "North", "RR": "North", "TO": "North",


    "AL": "Northeast", "BA": "Northeast", "CE": "Northeast",
    "MA": "Northeast", "PB": "Northeast", "PE": "Northeast",
    "PI": "Northeast", "RN": "Northeast", "SE": "Northeast",


    "DF": "Central-West", "GO": "Central-West", "MT": "Central-West", "MS": "Central-West",


    "ES": "Southeast", "MG": "Southeast", "RJ": "Southeast", "SP": "Southeast",

    "PR": "South", "RS": "South", "SC": "South"
}


In [0]:
map_region = create_map([lit(x) for x in chain(*state_to_region.items())])

In [0]:
df_customers = df_customers.withColumn("customer_region",map_region.getItem(col("customer_state")))\
    .withColumn("customer_city",initcap(col("customer_city")))
df_customers.display()

In [0]:
df_customers.groupBy("customer_state_fullform").agg(countDistinct("customer_id")).alias("No Of Customers").display()

In [0]:
df_customers.write.option("fomrat","parquet").option("path","abfss://silver@project1ram.dfs.core.windows.net/olist_customers/").save()

## **Geolocation Dataset Transformations**

In [0]:
df_geolocation = spark.read.format("csv").option("header","true").load("abfss://bronze@project1ram.dfs.core.windows.net/olist_geolocation_dataset/")

In [0]:
df_geolocation.display()

In [0]:
import unicodedata as unicode
def remove_accents(text):
    if text is None:
        return None
    nkdf=unicode.normalize("NFKD",text)
    return "".join([c for c in nkdf if not unicode.combining(c)])

In [0]:
remove_accents = udf(remove_accents,StringType())

In [0]:
df_geolocation = df_geolocation.withColumn("geolocation_city", initcap(col("geolocation_city")))\
    .withColumn("geolocation_city",remove_accents(col("geolocation_city")))
df_geolocation.display()

In [0]:
df_geolocation.write.option("format","parquet").mode("overwrite").option("path","abfss://silver@project1ram.dfs.core.windows.net/olist_geolocation/").save()

## **Orders Dataset Transformations**

In [0]:
df_orders = spark.read.format("csv").option("header","true").option("inferSchema","true").load("abfss://bronze@project1ram.dfs.core.windows.net/olist_orders_dataset/")
df_orders.display()

In [0]:
df_orders= df_orders.withColumn(
    "met_delivery_criteria",
    when(
        isnull(col("order_delivered_customer_date")) | isnull(col("order_estimated_delivery_date")),
        lit("DATE MISSING")
    ).when(
        col("order_delivered_customer_date") < col("order_estimated_delivery_date"),
        lit("DELIVERY ON TIME")
    ).otherwise(
        lit("DELIVERY LATE")
    )
)
df_orders.display()


In [0]:
df_orders = df_orders.withColumn("shipping days", datediff(col("order_delivered_customer_date"),col("order_delivered_carrier_date")))
df_orders.display()

In [0]:
df_orders.write.mode("overwrite").format("parquet").option("path","abfss://silver@project1ram.dfs.core.windows.net/olist_orders/").save()

## **Payment Dataset Transformations**

In [0]:
df_payments = spark.read.format("csv").option("header","true").option("inferSchema","true").load("abfss://bronze@project1ram.dfs.core.windows.net/olist_order_payments_dataset/")
df_payments.display()


In [0]:
df_payments = df_payments.withColumn("installment_band",
                                     when(col("payment_installments") == 1, lit("One-Time"))\
                                     .when(col("payment_installments") <= 3, lit("Short-Term"))\
                                    .otherwise(lit("Long-Term")))
df_payments.display()


In [0]:
df_payments = df_payments.withColumn("installment_value",
                                     round(col("payment_value")/col("payment_installments"),2))
df_payments.display()

In [0]:
df_payments.write.mode("overwrite").format("parquet").option("path","abfss://silver@project1ram.dfs.core.windows.net/olist_payments/").save()


## **Reviews DataSet Transformations**

In [0]:
df_reviews = spark.read.format("csv").option("header","true").option("inferSchema","true").load("abfss://bronze@project1ram.dfs.core.windows.net/olist_order_reviews_dataset/")
df_reviews.display()


In [0]:
import datetime
def is_date(val):
    try:
        datetime.datetime.strptime(val, '%Y-%m-%d')
        return True
    except:
        try:
            datetime.datetime.strptime(val,'%Y-%m-%d %H:%M:%S')
            return True
        except:
            return False


In [0]:
is_date = udf(is_date)

In [0]:
df_reviews = df_reviews.withColumn("review_score",when(is_date(col("review_score")) == 'false', col("review_score"))\
                                                    .otherwise(lit("Not Given"))) \
                                                        .withColumn("review_score",when(col("review_score").isNull(),lit("Not Given"))\
                                                            .otherwise(col("review_score")))

In [0]:
df_reviews.display()

In [0]:
df_reviews = df_reviews.withColumn("review_sentiment",
                                   when(col("review_score")>=4,lit("Positive"))\
                                    .when(col("review_score")<=2,lit("Negative"))\
                                        .when(col("review_score") == 3,lit("Neutral"))\
                                        .when(col("review_score") == "Not Given",lit("Not Given"))\
                                    .otherwise(lit("Not Given")))


In [0]:
df_reviews = df_reviews.withColumn("review_comment_title",
                      when(is_date(col("review_comment_title")) == 'false', col("review_comment_title"))\
                      .otherwise(lit("Not Given"))) \
                          .withColumn("review_comment_title",when(col("review_comment_title").isNull(),lit("Not Given"))\
                              .otherwise(col("review_comment_title")))


In [0]:
df_reviews.display()

In [0]:
df_reviews = df_reviews.withColumn("review_comment_message",when(col("review_comment_message").isNull(),lit("Not Given"))\
                              .otherwise(col("review_comment_message")))\
                                  .withColumn("review_id", when(col("review_id").isNull(),lit("Missing review id"))\
                                      .otherwise(col("review_id")))\
                                          .withColumn("order_id", when(col("order_id").isNull(),lit("Missing order id"))\
                                              .otherwise(col("order_id"))) \
                                                  .withColumn("review_id",when(is_date(col("review_id")) == 'false', col("review_id"))\
                                                      .otherwise(lit("Missing review id"))) \
                                                          .withColumn("order_id",when(is_date(col("order_id")) == 'false', col("order_id"))\
                                                              .otherwise(lit("Missing order id")))

In [0]:
df_reviews = df_reviews.withColumn("review_response_time in days",
                      when(col("review_creation_date").isNull() | col("review_answer_timestamp").isNull(),lit("Missing date"))\
                      .otherwise(date_diff(col("review_answer_timestamp"),col("review_creation_date"))))


In [0]:
df_reviews.write.mode("overwrite").format("parquet").option("path","abfss://silver@project1ram.dfs.core.windows.net/reviews").save()

## **Order Items Dataset Transformations**

In [0]:
df_orders = spark.read.format("csv").option("header",True).option("inferSchema",True).load("abfss://bronze@project1ram.dfs.core.windows.net/olist_order_items_dataset/")
df_orders.display()

In [0]:
df_orders.write.mode("overwrite").format("parquet").option("path","abfss://silver@project1ram.dfs.core.windows.net/orders").save()

In [0]:
df_products = spark.read.format("csv").option("header",True).option("inferSchema",True).load("abfss://bronze@project1ram.dfs.core.windows.net/olist_products_dataset/")
df_products.display()

In [0]:
df_products = df_products.withColumnRenamed("product_name_lenght","product_name_length")\
    .withColumnRenamed("product_description_lenght","product_description_length")\
        .withColumnRenamed("product_photos_qty","product_photos_quantity")\
        .na.fill({
        "product_category_name": "No Information",
        "product_name_length": "No Information",
        "product_description_length": "No Information",
        "product_photos_quantity": "No Information"
    })


In [0]:
df_products = df_products.withColumn(
    "shape_category",
    when((col("product_height_cm") > col("product_length_cm")) & (col("product_height_cm") > col("product_width_cm")), "tall")
    .when((col("product_length_cm") > col("product_height_cm")) & (col("product_length_cm") > col("product_width_cm")), "long")
    .when((col("product_width_cm") > col("product_height_cm")) & (col("product_width_cm") > col("product_length_cm")), "wide")
    .otherwise("box"))\
    .withColumn("product_volume_cm3", col("product_height_cm") * col("product_length_cm") * col("product_width_cm"))\
        .withColumn("is_heavy",(col("product_weight_g") > 10000))\
            .withColumn("is_large_volume",(col("product_volume_cm3") > 100000))

In [0]:
df_products.write.mode("overwrite").format("parquet").option("path","abfss://silver@project1ram.dfs.core.windows.net/products").save()

## **Sellers Dataset Transformations**

In [0]:
df_sellers = spark.read.format("csv").option("header",True).option("inferSchema",True).load("abfss://bronze@project1ram.dfs.core.windows.net/olist_sellers_dataset/")
df_sellers.display()


In [0]:
df_sellers = df_sellers.withColumn("sellers_state_fullform",map_exp.getItem(col("seller_state")))

In [0]:
df_sellers.write.mode("overwrite").format("parquet").option("path","abfss://silver@project1ram.dfs.core.windows.net/sellers").save()


In [0]:
df_product_category_name_translation = spark.read.format("csv").option("header",True).option("inferSchema",True).load("abfss://bronze@project1ram.dfs.core.windows.net/product_category_name_translation/")


In [0]:
df_product_category_name_translation.display()

In [0]:
df_product_category_name_translation.write.mode("overwrite").format("parquet").option("path","abfss://silver@project1ram.dfs.core.windows.net/product_category_name_translation").save()