In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

In [None]:
spark=SparkSession.builder \
    .appName("Orders Pipeline") \
    .getOrCreate()

In [None]:
OrderSchema=StructType([
    StructField("order_id",StringType(),False),
    StructField("customer_id",StringType(),False),
    StructField("order_status",StringType(),True),
    StructField("order_purchase_timestamp",TimeStampType(),True),
    StructField("order_approved_at",TimeStampType(),True),
    StructField("order_delivered_carrier_date",TimeStampType(),True),
    StructField("order_delivered_customer_date",TimeStampType(),True),
    StructField("order_estimated_delivery_date",TimeStampType(),True),
])

CustomerSchema=StructType([
    StructField("customer_id",StringType(),False),
    StructField("customer_unique_id",StringType(),False),
    StructField("customer_zip_code_prefix",IntegerType(),False),
    StructField("customer_city",StringType(),False),
    StructField("customer_state",StringType(),False),
])

PaymentSchema=StructType([
    StructField("order_id",StringType(),False),
    StructField("payment_sequential",IntegerType(),True),
    StructField("payment_type",StringType(),True),
    StructField("payment_installments",IntegerType(),True),       
    StructField("payment_value",DoubleType(),True),
])

ProductSchema=StructType([
    StructField("product_id",StringType(),False),
    StructField("product_category_name",StringName(),True),
    StructField("product_name_lenght",IntegerType(),True),
    StructField("product_description_lenght",IntegerType(),True),       
    StructField("product_photos_qty",IntegerType(),True),
    StructField("product_weight_g",IntegerType(),True),
    StructField("product_length_cm",IntegerType(),True),
    StructField("product_height_cm",IntegerType(),True),
    StructField("product_width_cm",IntegerType(),True),

])

ReviewsSchema=StructType([
    StructField("review_id",StringType(),False),
    StructField("order_id",StringType(),True),
    StructField("review_score",IntegerType(),True),
    StructField("review_comment_title",StringType(),True),
    StructField("review_comment_message",StringType(),True),
    StructField("review_creation_date",TimeStampType(),True),
    StructField("review_answer_timestamp",TimeStampType(),True),
])

order_path="data/order_dataset.csv"
customer_path="data/customers_dataset.csv"
product_path="data/products_dataset.csv"
payment_path="/data/order_payments_dataset.csv"
reviews_path="/data/order_reviews_dataset.csv"
order_df=spark.read.format('csv').option('header','true').option('inferSchema','false').option('failfast','true').schema(OrderSchema).load(order_path)
customer_df=spark.read.format('csv').option('header','true').option('inferSchema','false').option('failfast','true').schema(CustomerSchema).load(customer_path)
payment_df=spark.read.format('csv').option('header','true').option('inferSchema','false').option('failfast','true').schema(PaymentSchema).load(payment_path)
product_df=spark.read.format('csv').option('header','true').option('inferSchema','false').option('failfast','true').schema(ProductSchema).load(product_path)
reviews_df=spark.read.format('csv').option('header','true').option('inferSchema','false').option('failfast','true').schema(ReviewsSchema).load(reviews_path)

In [None]:
order_df.printSchema()
order_df.show(5)

In [None]:
orders_with_customer=order_df.join(customer_df,on='customer_id',how='inner')

In [None]:
orders_by_city=orders_with_customer.groupby('customer_city')

In [None]:
orders_by_state=orders_with_customer.groupby('customer_state')