Spark Assignment #2 : Dataframes and SparkSQL

Author: Tryambak Kaushik

In [3]:
import os
import findspark
import datetime
findspark.init()
from pyspark import SparkContext, SQLContext, HiveContext
sc = SparkContext("local", "first app")

sqlContext = SQLContext(sc)
hiveContext = HiveContext(sc)

print(sc)

<pyspark.context.SparkContext object at 0x7f9dec36cb50>


In [4]:
from pyspark.sql.functions import *
from pyspark.sql.types import DateType, IntegerType, DoubleType
from pyspark.sql.window import Window
import re

import matplotlib
import matplotlib.pyplot as plt
import pandas as pd

In [5]:
MYDIR='/user/hdfs/sparkdata'

In [4]:
%%bash -s "$MYDIRS3"

if hdfs dfs -test -e $1 ; then 
echo "Deleting existing $1 on HDFS"
hadoop fs - rm -R $1
fi

echo "Creating new $1 on HDFS"
hadoop fs -mkdir $1

hadoop fs -find / -name $1

echo "Transfering files from local file system to hadoop file system"
for file in /var/lib/hadoop-hdfs/datauniversity/spark/orders/*.csv
do
 hadoop fs -put $file $1
done

hadoop fs -ls $1

In [6]:
# Read file for HDFS
orders=sc.textFile("hdfs://quickstart.cloudera:8020"+MYDIR+"/orders.csv")
orderdetails=sc.textFile("hdfs://quickstart.cloudera:8020"+MYDIR+"/orderdetails.csv")
products=sc.textFile("hdfs://quickstart.cloudera:8020"+MYDIR+"/products.csv")
customers=sc.textFile("hdfs://quickstart.cloudera:8020"+MYDIR+"/customers.csv")

In [7]:
ordersrdd=orders.map(lambda line: (line.split('|')[0].strip('"'), 
                                   line.split('|')[1].strip('"'),
                                   line.split('|')[2].strip('"'),
                                   line.split('|')[3].strip('"'), 
                                   line.split('|')[4].strip('"'),
                                   line.split('|')[5].strip('"'),
                                   line.split('|')[6].strip('"')))


orderdetailsrdd=orderdetails.map(lambda line: (line.split('|')[0].strip('"'), 
                                               line.split('|')[1].strip('"'),
                                               line.split('|')[2].strip('"'), 
                                               line.split('|')[3].strip('"'),
                                               line.split('|')[4].strip('"')))

productsrdd=products.map(
    lambda line: line if line.startswith('"') and line.endswith('"') else '2' ).filter(
    lambda x: len(x)>3).map(lambda line: (line.split('|')[0].strip('"'), 
                                               line.split('|')[1].strip('"'),
                                               line.split('|')[2].strip('"'), 
                                               line.split('|')[3].strip('"'),
                                               line.split('|')[4].strip('"'),
                                               line.split('|')[5].strip('"'),
                                               line.split('|')[6].strip('"'),
                                               line.split('|')[7].strip('"'),
                                               line.split('|')[8].strip('"')))

customersrdd=customers.map(lambda line: (line.split('|')[0].strip('"'), 
                                   line.split('|')[1].strip('"'),
                                   line.split('|')[2].strip('"'),
                                   line.split('|')[3].strip('"'), 
                                   line.split('|')[4].strip('"'),
                                   line.split('|')[5].strip('"'),
                                   line.split('|')[6].strip('"'),
                                   line.split('|')[7].strip('"'), 
                                   line.split('|')[8].strip('"'),
                                   line.split('|')[9].strip('"'),
                                   line.split('|')[10].strip('"'),
                                   line.split('|')[11].strip('"'),
                                   line.split('|')[12].strip('"')))

In [8]:
df_orders=sqlContext.createDataFrame(ordersrdd, schema=['orderNumber', 'orderDate', 
                                                        'requiredDate', 'shippedDate', 
                                                        'status', 'comments', 
                                                        'customerNumber'])
df_orders.show(2)

df_orderdetails=sqlContext.createDataFrame(orderdetailsrdd, 
                                           schema=['orderNumber', 'productCode', 
                                                   'quantityOrdered', 'priceEach', 
                                                   'orderLineNumber'])
df_orderdetails.show(2)

df_productdetails=sqlContext.createDataFrame(productsrdd, 
                                           schema=['productCode', 'productName', 
                                                   'productLine', 'productScale',
                                                  'productVendor', 'productDescription',
                                                  'quantityInStock', 'buyPrice', 'MSRP'])
df_productdetails.show(2)

df_customers=sqlContext.createDataFrame(customersrdd, schema=['customerNumber', 'customerName', 
                                                        'contactLastName', 'contactFirstName', 
                                                        'phone', 'addressLine1', 'addressLine2',
                                                        'city', 'state', 'postalCode', 'country',
                                                             'salesRepEmployeeNumber', 'creditLimit'])
df_customers.show(2)

+-----------+----------+------------+-----------+-------+--------------------+--------------+
|orderNumber| orderDate|requiredDate|shippedDate| status|            comments|customerNumber|
+-----------+----------+------------+-----------+-------+--------------------+--------------+
|      10100|2003-01-06|  2003-01-13| 2003-01-10|Shipped|                  \N|           363|
|      10101|2003-01-09|  2003-01-18| 2003-01-11|Shipped|Check on availabi...|           128|
+-----------+----------+------------+-----------+-------+--------------------+--------------+
only showing top 2 rows

+-----------+-----------+---------------+---------+---------------+
|orderNumber|productCode|quantityOrdered|priceEach|orderLineNumber|
+-----------+-----------+---------------+---------+---------------+
|      10100|   S18_1749|             30|   136.00|              3|
|      10100|   S18_2248|             50|    55.09|              2|
+-----------+-----------+---------------+---------+---------------+
onl

In [9]:
#1. Find orders that were shipped late to the customer
print("Q1: Find orders that were shipped late to the customer\n")

print("A1.1: Orders where requiredDate < shippedDate\n")
df_orders.filter(df_orders.requiredDate < df_orders.shippedDate).show()

print("A1.2: Orders where requiredDate < shippedDate and status is shipped\n")
df_orders.filter( (df_orders.requiredDate < df_orders.shippedDate) & (df_orders.status.like('Shipped'))).show()

Q1: Find orders that were shipped late to the customer

A1.1: Orders where requiredDate < shippedDate

+-----------+----------+------------+-----------+----------+--------------------+--------------+
|orderNumber| orderDate|requiredDate|shippedDate|    status|            comments|customerNumber|
+-----------+----------+------------+-----------+----------+--------------------+--------------+
|      10165|2003-10-22|  2003-10-31| 2003-12-26|   Shipped|This order was on...|           148|
|      10167|2003-10-23|  2003-10-30|         \N| Cancelled|Customer called t...|           448|
|      10248|2004-05-07|  2004-05-14|         \N| Cancelled|Order was mistake...|           131|
|      10260|2004-06-16|  2004-06-22|         \N| Cancelled|Customer heard co...|           357|
|      10262|2004-06-24|  2004-07-01|         \N| Cancelled|This customer fou...|           141|
|      10334|2004-11-19|  2004-11-28|         \N|   On Hold|The outstaniding ...|           144|
|      10401|2005-04-03|

In [10]:
#2. List orders where total amount exceeds $250
print('Q2: List orders where total amount exceeds $250\n')

print("""A2.1: Total Amount = quantityOrdered * orderdetails
The sorted list (by Total Amount) of orders where total amount exceeds $250:""")
df_orderdetails.withColumn('Total',
    (df_orderdetails.quantityOrdered * df_orderdetails.priceEach)).groupby(
    'orderNumber').agg(round(sum('Total'),2).alias('Total Amount')).filter(
    col('Total Amount')>250).sort('Total Amount').show()

print("""A2.2: Total Amount = quantityOrdered * orderdetails
The sorted list (by ordeNumber) of orders where total amount exceeds $250:""")
df_orderdetails.withColumn('Total',
    (df_orderdetails.quantityOrdered * df_orderdetails.priceEach)).groupby(
    'orderNumber').agg(round(sum('Total'),2).alias('Total Amount')).filter(
    col('Total Amount')>250).sort('orderNumber').show()

Q2: List orders where total amount exceeds $250

A2.1: Total Amount = quantityOrdered * orderdetails
The sorted list (by Total Amount) of orders where total amount exceeds $250:
+-----------+------------+
|orderNumber|Total Amount|
+-----------+------------+
|      10408|      615.45|
|      10144|      1128.2|
|      10158|     1491.38|
|      10116|     1627.56|
|      10345|     1676.14|
|      10242|     1679.92|
|      10364|     1834.56|
|      10286|      1960.8|
|      10409|     2326.18|
|      10317|     2434.25|
|      10277|     2611.84|
|      10132|      2880.0|
|      10118|      3101.4|
|      10376|     3452.75|
|      10303|     3474.66|
|      10387|     3516.04|
|      10189|     3879.96|
|      10294|      4424.4|
|      10154|     4465.85|
|      10385|     4466.71|
+-----------+------------+
only showing top 20 rows

A2.2: Total Amount = quantityOrdered * orderdetails
The sorted list (by ordeNumber) of orders where total amount exceeds $250:
+-----------+--------

In [11]:
#3. Find the productCode per order that is most expensive
print("Q3: Find the productCode per order that is most expensive\n")

print("A3: The desired list sorted by orderNumber is:")
df_orderdetails_hv=hiveContext.createDataFrame(orderdetailsrdd, 
                                           schema=['orderNumber', 'productCode', 
                                                   'quantityOrdered', 'priceEach', 
                                                   'orderLineNumber'])

w = Window().partitionBy("orderNumber").orderBy(col("priceEach").desc())

df_orderdetails_numeric=df_orderdetails_hv.select(
    'orderNumber', 'productCode', df_orderdetails_hv.priceEach.cast(DoubleType()))

df_temp=df_orderdetails_numeric.withColumn('rn', row_number().over(w)).where(
    col('rn')==1).select('orderNumber', 'productCode', 'priceEach')

df_temp.sort('orderNumber').show(5)

Q3: Find the productCode per order that is most expensive

A3: The desired list sorted by orderNumber is:
+-----------+-----------+---------+
|orderNumber|productCode|priceEach|
+-----------+-----------+---------+
|      10100|   S18_1749|    136.0|
|      10101|   S18_2795|   167.06|
|      10102|   S18_1342|    95.55|
|      10103|   S10_1949|    214.3|
|      10104|   S18_3232|   165.95|
+-----------+-----------+---------+
only showing top 5 rows



In [12]:
# 4. Write a list of customers (include demographic information)
# whose order were late (point 1 above). This list should have an
# additional column called Gift Card Amount. Populate Gift Card
# Amount as follows: If total order exceeds $100 then Gift Card
# Amount=$50 else Gift Card Amount=$25, Use Parquet format
# for output.

print("""Q4: Write a list of customers (include demographic information) 
whose order were late (point 1 above). This list should have an
additional column called Gift Card Amount. Populate Gift Card
Amount as follows: If total order exceeds $100 then Gift Card
Amount=$50 else Gift Card Amount=$25, Use Parquet format for output.""")


df_temp1=df_orders.join(df_orderdetails, on='orderNumber').filter(df_orders.requiredDate < df_orders.shippedDate)
df_temp2=df_customers.join(df_temp1, on='customerNumber')

print("\n")
print("""A4.1: The list of customers whose 'total order amount' (quantityOrdered * priceEach)
of one product is greater than 100 and delayed""")
df_temp=df_temp2.withColumn('Total',
    round(df_temp2.quantityOrdered * df_temp2.priceEach, 2)).filter(col('Total')>100).sort('Total')
df_final=df_temp.withColumn('Gift Card Amount', when(col('Total')>100, 50).otherwise(25))
df_final.show(5)
print "The total number of customers for the above 'total order amount' based list = ", df_final.count()

print("\n")
print("""A4.2: The list of customers whose 'priceEach' amount
of one product is greater than 100 and delayed""")
df_temp=df_temp2.filter(col('priceEach')>100).sort('priceEach')
df_final=df_temp.withColumn('GiftCardAmount', when(col('priceEach')>100, 50).otherwise(25))
df_final.show(5)
print "The total number of customers for the above 'priceEach' based list = ", df_final.count()

Q4: Write a list of customers (include demographic information) 
whose order were late (point 1 above). This list should have an
additional column called Gift Card Amount. Populate Gift Card
Amount as follows: If total order exceeds $100 then Gift Card
Amount=$50 else Gift Card Amount=$25, Use Parquet format for output.


A4.1: The list of customers whose 'total order amount' (quantityOrdered * priceEach)
of one product is greater than 100 and delayed
+--------------+--------------------+---------------+----------------+---------------+--------------------+--------------------+---------+-----+----------+---------+----------------------+-----------+-----------+----------+------------+-----------+----------+--------------------+-----------+---------------+---------+---------------+------+----------------+
|customerNumber|        customerName|contactLastName|contactFirstName|          phone|        addressLine1|        addressLine2|     city|state|postalCode|  country|salesRepEmployeeNumb

In [13]:
MYDIRS2='/user/hdfs/sparkdata/soln2'

In [87]:
%%bash -s "$MYDIRS2"

if hdfs dfs -test -e $1 ; then 
echo "Deleting existing $1 on HDFS"
hadoop fs -rm -r $1
fi

echo "Creating new directory $1 on HDFS"
hadoop fs -mkdir $1

Deleting existing /user/hdfs/sparkdata/soln2 on HDFS
Deleted /user/hdfs/sparkdata/soln2
Creating new directory /user/hdfs/sparkdata/soln2 on HDFS


In [88]:
print("A4.3")
print("\nSaving file in parquet format")
df_final.write.parquet("hdfs://quickstart.cloudera:8020"+MYDIRS2+"/parquet_df")
print("Saving Complete")

print("\nReading file from parquet format")
df_parquet=sqlContext.read.parquet("hdfs://quickstart.cloudera:8020"+MYDIRS2+"/parquet_df")
df_parquet.show(2)

A4.3

Saving file in parquet format
Saving Complete

Reading file from parquet format
+--------------+--------------------+---------------+----------------+------------+---------------+------------+--------+-----+----------+-------+----------------------+-----------+-----------+----------+------------+-----------+-------+--------------------+-----------+---------------+---------+---------------+--------------+
|customerNumber|        customerName|contactLastName|contactFirstName|       phone|   addressLine1|addressLine2|    city|state|postalCode|country|salesRepEmployeeNumber|creditLimit|orderNumber| orderDate|requiredDate|shippedDate| status|            comments|productCode|quantityOrdered|priceEach|orderLineNumber|GiftCardAmount|
+--------------+--------------------+---------------+----------------+------------+---------------+------------+--------+-----+----------+-------+----------------------+-----------+-----------+----------+------------+-----------+-------+--------------------+

In [90]:
#5. Can you output the list above in 1 single CSV file?
print("Q5: Can you output the list above in 1 single CSV file?\n")

print("A5: Saving the output list on local file system")

mypwd = os.getcwd()
myfile='final_customers_S2Q5.csv'

print "Saving the dataframe as CSV file"
df_final.coalesce(1).rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020" + MYDIRS2 + "/" + myfile)
print "The saved file name is: ", myfile

print "\nLoading the saved CSV file as dataframe"
final_S2Q5_read = sc.textFile("hdfs://quickstart.cloudera:8020" + MYDIRS2 + "/" + myfile)
         

final_S2Q5_readrdd=final_S2Q5_read.map(lambda line: (line.split(',')[0].strip('"'), 
                                   line.split(',')[1].strip('"'), line.split(',')[2].strip('"'),
                                   line.split(',')[3].strip('"'), line.split(',')[4].strip('"'),
                                   line.split(',')[5].strip('"'), line.split(',')[6].strip('"'),
                                   line.split(',')[7].strip('"'), line.split(',')[8].strip('"'),
                                   line.split(',')[9].strip('"'), line.split(',')[10].strip('"'),
                                   line.split(',')[11].strip('"'), line.split(',')[12].strip('"'),
                                   line.split(',')[13].strip('"'), line.split(',')[14].strip('"'),
                                   line.split(',')[15].strip('"'), line.split(',')[16].strip('"'),
                                   line.split(',')[17].strip('"'), line.split(',')[18].strip('"'),
                                   line.split(',')[19].strip('"'), line.split(',')[20].strip('"'),
                                   line.split(',')[21].strip('"'), line.split(',')[22].strip('"'),
                                   line.split(',')[23].strip('"')
                                                    ))

df_final_read=sqlContext.createDataFrame(final_S2Q5_readrdd, schema=['customerNumber', 'customerName', 
                                                        'contactLastName', 'contactFirstName', 
                                                        'phone', 'addressLine1', 'addressLine2',
                                                        'city', 'state', 'postalCode', 'country',
                                                        'salesRepEmployeeNumber', 'creditLimit',
                                                        'orderNumber', 'orderDate', 'requiredDate',
                                                        'shippedDate', 'status', 'comments',
                                                        'productCode', 'quantityOrdered', 
                                                        'priceEach', 'orderLineNumber', 'GiftCardAmount'])

print "The total number of rows in loaded CSV file is:", df_final_read.count()

Q5: Can you output the list above in 1 single CSV file?

A5: Saving the output list on local file system
Saving the dataframe as CSV file
The saved file name is:  final_customers_S2Q5.csv

Loading the saved CSV file as dataframe
The total number of rows in loaded CSV file is: 50
