-
Notifications
You must be signed in to change notification settings - Fork 0
/
Olist_script.py
71 lines (61 loc) · 2.67 KB
/
Olist_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#
# Setting up spark
#import findspark
#findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *
conf = SparkConf().setMaster("local").setAppName("Missed_Deadlines_Olist")
spark = SparkSession.builder.getOrCreate()
print(spark)
# Set sqlContext from the Spark context
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)
# Edit Spark SQL context for ease of use with Pandas
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# Build a spark session
spark = SparkSession.builder.getOrCreate()
# Load in csv files into spark dataframes
df_items = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("s3a://olist-project/data/olist_order_items_dataset.csv")
df_orders = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("s3a://olist-project/data/olist_orders_dataset.csv")
df_products = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("s3a://olist-project/data/olist_products_dataset.csv")
# Create SQL Table Views from dfs for SQL querying
df_items.createOrReplaceTempView('items')
df_orders.createOrReplaceTempView('orders')
df_products.createOrReplaceTempView('products')
# SQL Query to pull order/seller/product info for orders where the
# seller missed the deadline to deliver the shipment to the carrier
late_carrier_deliveries = spark.sql("""
SELECT i.order_id, i.seller_id, i.shipping_limit_date, i.price, i.freight_value,
p.product_id, p.product_category_name,
o.customer_id, o.order_status, o.order_purchase_timestamp, o.order_delivered_carrier_date,
o.order_delivered_customer_date, o.order_estimated_delivery_date
FROM items AS i
JOIN orders AS o
ON i.order_id = o.order_id
JOIN products AS p
ON i.product_id = p.product_id
WHERE i.shipping_limit_date < o.order_delivered_carrier_date
""")
# Write the results to a single csv file
# coalesce(1) requires that the file is small enough to fit
# in the heap memory of the master Spark node and is therefore
# only recommended for very small datasets
# Alternatives are converting the Spark df to a Pandas df before
# writing to disk.
# Otherwise, it's best practice to maintain the partitions to
# take advantage of HDFS
late_carrier_deliveries.coalesce(1) \
.write \
.option("header", "true") \
.csv("s3a://olist-project/output_data/missed_shipping_limit_orders.csv") # This path is for output folder