In [1]:
import findspark
import datetime
findspark.init()
from pyspark import SparkContext
sc = SparkContext("local", "first app")
#print(sc)

In [2]:
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from pyspark.sql.types import DateType, IntegerType
import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql.window import Window
import pyspark.sql.functions as func

sqlContext = HiveContext(sc)

In [3]:
# Read file for HDFS
employees=sc.textFile("hdfs://quickstart.cloudera:8020/data/orders/employees")
offices=sc.textFile("hdfs://quickstart.cloudera:8020/data/orders/offices")
customers=sc.textFile("hdfs://quickstart.cloudera:8020/data/orders/customers")

In [4]:
# Create a Dataframe for employees
employeesrdd=employees.map(lambda line: (line.split('|')[0].strip('"'), 
                                   line.split('|')[1].strip('"'),
                                   line.split('|')[2].strip('"'),
                                   line.split('|')[3].strip('"'), 
                                   line.split('|')[4].strip('"'),
                                   line.split('|')[5].strip('"'),
                                   line.split('|')[6].strip('"'),
                                   line.split('|')[7].strip('"'))).collect()
df_employees=sqlContext.createDataFrame(employeesrdd, schema=['employeeNumber', 'lastName',       
                                                              'firstName', 'extension',      
                                                              'email','officeCode',     
                                                              'reportsTo','jobTitle'])
df_employees.show(2)

+--------------+---------+---------+---------+--------------------+----------+---------+---------+
|employeeNumber| lastName|firstName|extension|               email|officeCode|reportsTo| jobTitle|
+--------------+---------+---------+---------+--------------------+----------+---------+---------+
|          1002|   Murphy|    Diane|    x5800|dmurphy@classicmo...|         1|       \N|President|
|          1056|Patterson|     Mary|    x4611|mpatterso@classic...|         1|     1002| VP Sales|
+--------------+---------+---------+---------+--------------------+----------+---------+---------+
only showing top 2 rows



In [6]:
# Create a Dataframe for customers
customersrdd=customers.map(lambda line: (line.split('|')[0].strip('"'), 
                                   line.split('|')[1].strip('"'),
                                   line.split('|')[2].strip('"'),
                                   line.split('|')[3].strip('"'), 
                                   line.split('|')[4].strip('"'),
                                   line.split('|')[5].strip('"'),
                                   line.split('|')[6].strip('"'),
                                   line.split('|')[7].strip('"'),
                                   line.split('|')[8].strip('"'),
                                   line.split('|')[9].strip('"'), 
                                   line.split('|')[10].strip('"'),
                                   line.split('|')[11].strip('"'),
                                   line.split('|')[12].strip('"'))).collect()
df_customers=sqlContext.createDataFrame(customersrdd, schema=['customerNumber', 'customerName',          
                                                                'contactLastName', 'contactFirstName',      
                                                                'phone', 'addressLine1',          
                                                                'addressLine2', 'city',                  
                                                                'state', 'postalCode',            
                                                                'country', 'salesRepEmployeeNumber',
                                                                'creditLimit' ])

df_customer1 = df_customers.select(df_customers.creditLimit.cast(IntegerType()),
                                   'customerName', 'country')
df_customers.show(2)

+--------------+------------------+---------------+----------------+----------+---------------+------------+---------+-----+----------+-------+----------------------+-----------+
|customerNumber|      customerName|contactLastName|contactFirstName|     phone|   addressLine1|addressLine2|     city|state|postalCode|country|salesRepEmployeeNumber|creditLimit|
+--------------+------------------+---------------+----------------+----------+---------------+------------+---------+-----+----------+-------+----------------------+-----------+
|           103| Atelier graphique|        Schmitt|         Carine |40.32.2555| 54, rue Royale|          \N|   Nantes|   \N|     44000| France|                  1370|   21000.00|
|           112|Signal Gift Stores|           King|            Jean|7025551838|8489 Strong St.|          \N|Las Vegas|   NV|     83030|    USA|                  1166|   71800.00|
+--------------+------------------+---------------+----------------+----------+---------------+----------

In [7]:
# Create a Dataframe for offices
officesrdd=offices.map(lambda line: (line.split('|')[0].strip('"'), 
                                   line.split('|')[1].strip('"'),
                                   line.split('|')[2].strip('"'),
                                   line.split('|')[3].strip('"'), 
                                   line.split('|')[4].strip('"'),
                                   line.split('|')[5].strip('"'),
                                   line.split('|')[6].strip('"'),
                                   line.split('|')[7].strip('"'),
                                   line.split('|')[8].strip('"'))).collect()
df_offices=sqlContext.createDataFrame(officesrdd, schema=['officeCode', 'city',         
                                                            'phone', 'addressLine1', 
                                                            'addressLine2', 'state',        
                                                            'country', 'postalCode',   
                                                            'territory'  ])
df_offices.show(2)

+----------+-------------+---------------+-----------------+------------+-----+-------+----------+---------+
|officeCode|         city|          phone|     addressLine1|addressLine2|state|country|postalCode|territory|
+----------+-------------+---------------+-----------------+------------+-----+-------+----------+---------+
|         1|San Francisco|+1 650 219 4782|100 Market Street|   Suite 300|   CA|    USA|     94080|       NA|
|         2|       Boston|+1 215 837 0825| 1550 Court Place|   Suite 102|   MA|    USA|     02107|       NA|
+----------+-------------+---------------+-----------------+------------+-----+-------+----------+---------+
only showing top 2 rows



In [8]:
# Register tables for for employees, customers and offices
df_employees.registerTempTable('myemployees')
rowemployees = sqlContext.sql("select count(*) as cnt_employees from myemployees")
rowemployees.show(1)

df_customers.registerTempTable('mycustomers')
rowcustomers = sqlContext.sql("select count(*) as cnt_customers from mycustomers")
rowcustomers.show(1)

df_offices.registerTempTable('myoffices')
rowoffices = sqlContext.sql("select count(*) as cnt_offices from myoffices")
rowoffices.show(1)

+-------------+
|cnt_employees|
+-------------+
|           23|
+-------------+

+-------------+
|cnt_customers|
+-------------+
|          122|
+-------------+

+-----------+
|cnt_offices|
+-----------+
|          7|
+-----------+



In [9]:
# Use Spark SQL to find the city of the office for each employee
emp_office_sql =  " SELECT CONCAT(firstName, ' ', lastName) AS employee_name, myoffices.city AS office_city" \
                  " FROM myemployees " \
                  " JOIN myoffices ON myemployees.officeCode=myoffices.officeCode"
df_office  = sqlContext.sql(emp_office_sql)
df_office.show(5)

+---------------+-------------+
|  employee_name|  office_city|
+---------------+-------------+
|   Diane Murphy|San Francisco|
| Mary Patterson|San Francisco|
|  Jeff Firrelli|San Francisco|
|    Anthony Bow|San Francisco|
|Leslie Jennings|San Francisco|
+---------------+-------------+
only showing top 5 rows



In [10]:
# Use Spark SQL to find the number of employees per city
emp_city_sql =  " SELECT myoffices.city AS city, count(*) AS employees" \
                  " FROM myemployees " \
                  " JOIN myoffices ON myemployees.officeCode=myoffices.officeCode " \
                  " GROUP BY myoffices.city"
df_city = sqlContext.sql(emp_city_sql)
df_city.show(5)

+-------------+---------+
|         city|employees|
+-------------+---------+
|        Paris|        5|
|San Francisco|        6|
|       London|        2|
|       Boston|        2|
|          NYC|        2|
+-------------+---------+
only showing top 5 rows



In [11]:
# Use Spark SQL to find the reporting manager for each employee
emp_report_sql =  " SELECT CONCAT(e.firstName, ' ', e.lastName) AS employee_name, " \
                  " CONCAT(r.firstName, ' ', r.lastName) AS manager " \
                  " FROM myemployees e" \
                  " JOIN myemployees r ON e.reportsTo=r.employeeNumber " 
                  
df_report = sqlContext.sql(emp_report_sql)
df_report.show(5)

+---------------+-----------+
|  employee_name|    manager|
+---------------+-----------+
|Leslie Jennings|Anthony Bow|
|Leslie Thompson|Anthony Bow|
| Julie Firrelli|Anthony Bow|
|Steve Patterson|Anthony Bow|
| Foon Yue Tseng|Anthony Bow|
+---------------+-----------+
only showing top 5 rows



In [12]:
# Use Spark SQL to find the Sales Rep (based in NYC) for each customer 
emp_salesrep_sql =  " SELECT c.customerName AS customer_name, " \
                    " CONCAT(e.firstName, ' ', e.lastName) AS salerep_name" \
                    " FROM mycustomers c" \
                    " JOIN myemployees e ON e.employeeNumber=c.salesRepEmployeeNumber " \
                    " JOIN myoffices o ON e.officeCode=o.officeCode  " \
                    " WHERE o.city='NYC'"
df_salesrep = sqlContext.sql(emp_salesrep_sql)
df_salesrep.show()

+--------------------+--------------+
|       customer_name|  salerep_name|
+--------------------+--------------+
|   Land of Toys Inc.| George Vanauf|
|     Gift Depot Inc.| George Vanauf|
|Canadian Gift Exc...| George Vanauf|
|Royal Canadian Co...| George Vanauf|
|       Mini Classics| George Vanauf|
|Tekni Collectable...| George Vanauf|
|    Gift Ideas Corp.| George Vanauf|
|Motor Mint Distri...| George Vanauf|
|  Muscle Machine Inc|Foon Yue Tseng|
|American Souvenir...|Foon Yue Tseng|
|     Vitachrome Inc.|Foon Yue Tseng|
|Québec Home Shopp...|Foon Yue Tseng|
|Classic Legends Inc.|Foon Yue Tseng|
|    Super Scale Inc.|Foon Yue Tseng|
|     Microscale Inc.|Foon Yue Tseng|
+--------------------+--------------+



In [13]:
df_customers.printSchema()

root
 |-- customerNumber: string (nullable = true)
 |-- customerName: string (nullable = true)
 |-- contactLastName: string (nullable = true)
 |-- contactFirstName: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- addressLine1: string (nullable = true)
 |-- addressLine2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postalCode: string (nullable = true)
 |-- country: string (nullable = true)
 |-- salesRepEmployeeNumber: string (nullable = true)
 |-- creditLimit: string (nullable = true)



In [14]:
df_customer1 = df_customers.select(df_customers.creditLimit.cast(IntegerType()),
                                   'customerName', 'country'
                                   )
df_customer1.printSchema()
df_customer1.show(3)

root
 |-- creditLimit: integer (nullable = true)
 |-- customerName: string (nullable = true)
 |-- country: string (nullable = true)

+-----------+--------------------+---------+
|creditLimit|        customerName|  country|
+-----------+--------------------+---------+
|      21000|   Atelier graphique|   France|
|      71800|  Signal Gift Stores|      USA|
|     117300|Australian Collec...|Australia|
+-----------+--------------------+---------+
only showing top 3 rows



In [15]:
#  Windowing functon rank() on Dataframes
windowSpec = Window.partitionBy("country").orderBy("creditLimit")
df_customer1.withColumn("rank", rank().over(windowSpec)).select('customerName', 'country', 'creditLimit', 'rank').show()

+--------------------+-----------+-----------+----+
|        customerName|    country|creditLimit|rank|
+--------------------+-----------+-----------+----+
|Danish Wholesale ...|    Denmark|      83400|   1|
|Heintze Collectables|    Denmark|     120800|   2|
|   Porto Imports Co.|   Portugal|          0|   1|
|Lisboa Souveniers...|   Portugal|          0|   1|
|     GiftsForHim.com|New Zealand|      77700|   1|
|Extreme Desk Deco...|New Zealand|      86800|   2|
|Down Under Souven...|New Zealand|      88000|   3|
|   Kelly's Gift Shop|New Zealand|     110000|   4|
|  Havel & Zbyszek Co|     Poland|          0|   1|
|  Baane Mini Imports|     Norway|      81700|   1|
|Québec Home Shopp...|     Canada|      48700|   1|
|Royal Canadian Co...|     Canada|      89600|   2|
|Canadian Gift Exc...|     Canada|      90300|   3|
|    Schuyler Imports|Netherlands|          0|   1|
|Double Decker Gif...|         UK|      43300|   1|
|Stylish Desk Deco...|         UK|      77000|   2|
|UK Collecta

In [16]:
#  Windowing functon dense_rank() on Dataframes
windowSpec = Window.partitionBy("country").orderBy("creditLimit")

df_customer1.withColumn("drank", dense_rank().over(windowSpec)).select('customerName', 'country', 'creditLimit','drank').show()

+--------------------+-----------+-----------+-----+
|        customerName|    country|creditLimit|drank|
+--------------------+-----------+-----------+-----+
|Danish Wholesale ...|    Denmark|      83400|    1|
|Heintze Collectables|    Denmark|     120800|    2|
|   Porto Imports Co.|   Portugal|          0|    1|
|Lisboa Souveniers...|   Portugal|          0|    1|
|     GiftsForHim.com|New Zealand|      77700|    1|
|Extreme Desk Deco...|New Zealand|      86800|    2|
|Down Under Souven...|New Zealand|      88000|    3|
|   Kelly's Gift Shop|New Zealand|     110000|    4|
|  Havel & Zbyszek Co|     Poland|          0|    1|
|  Baane Mini Imports|     Norway|      81700|    1|
|Québec Home Shopp...|     Canada|      48700|    1|
|Royal Canadian Co...|     Canada|      89600|    2|
|Canadian Gift Exc...|     Canada|      90300|    3|
|    Schuyler Imports|Netherlands|          0|    1|
|Double Decker Gif...|         UK|      43300|    1|
|Stylish Desk Deco...|         UK|      77000|

In [17]:
# Use Spark SQL for Windowing  
emp_credit_sql =  " SELECT customerName, country, CAST(creditLimit AS int), " \
                  " rank() OVER( PARTITION BY country ORDER BY CAST(creditLimit AS int) ) AS rank" \
                  " FROM mycustomers " 

df_credit = sqlContext.sql(emp_credit_sql)
df_credit.show(20,False)

df_credit.printSchema()



+---------------------------------+-----------+-----------+----+
|customerName                     |country    |creditLimit|rank|
+---------------------------------+-----------+-----------+----+
|Danish Wholesale Imports         |Denmark    |83400      |1   |
|Heintze Collectables             |Denmark    |120800     |2   |
|Porto Imports Co.                |Portugal   |0          |1   |
|Lisboa Souveniers, Inc           |Portugal   |0          |1   |
|GiftsForHim.com                  |New Zealand|77700      |1   |
|Extreme Desk Decorations, Ltd    |New Zealand|86800      |2   |
|Down Under Souveniers, Inc       |New Zealand|88000      |3   |
|Kelly's Gift Shop                |New Zealand|110000     |4   |
|Havel & Zbyszek Co               |Poland     |0          |1   |
|Baane Mini Imports               |Norway     |81700      |1   |
|Québec Home Shopping Network     |Canada     |48700      |1   |
|Royal Canadian Collectables, Ltd.|Canada     |89600      |2   |
|Canadian Gift Exchange N