In [None]:
pip install pyspark

In [1]:
from pyspark.sql.types import *
from pyspark.sql.window import Window

import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate() 

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/16 02:43:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/16 02:43:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark.read.text("hdfs://namenode:8020/user/datapath/datasets/customers/").show()

+--------------------+
|               value|
+--------------------+
|9327,Donna,Smith,...|
|9328,Mary,Perez,X...|
|9329,Eugene,Powel...|
|9330,Mary,Conley,...|
|9331,Donna,Smith,...|
|9332,Mary,Jordan,...|
|9333,Angela,Mills...|
|9334,Mary,Johnsto...|
|9335,Joseph,Smith...|
|9336,Janice,Guzma...|
|9337,Mary,Smith,X...|
|9338,James,Davis,...|
|9339,Ann,Moyer,XX...|
|9340,Mary,Smith,X...|
|9341,Karen,Collin...|
|9342,Teresa,Grant...|
|9343,Mary,Knapp,X...|
|9344,Kelly,Smith,...|
|9345,Mary,Branch,...|
|9346,Jack,Smith,X...|
+--------------------+
only showing top 20 rows



In [4]:
CUSTOMERS_DATA =   'hdfs://namenode:8020/user/datapath/datasets/customers'
DEPARTMENTS_DATA = 'hdfs://namenode:8020/user/datapath/datasets/departments'
CATEGORIES_DATA =  'hdfs://namenode:8020/user/datapath/datasets/categories'
PRODUCTS_DATA =    'hdfs://namenode:8020/user/datapath/datasets/products'
ORDERS_DATA =      'hdfs://namenode:8020/user/datapath/datasets/orders'
ORDER_ITEMS_DATA = 'hdfs://namenode:8020/user/datapath/datasets/order_items'

In [5]:
# define the schema, corresponding to a line in the csv data file for Customer
customers_schema = StructType([
    StructField('customer_id',       IntegerType(), nullable=True),
    StructField('customer_fname',    StringType(), nullable=True),
    StructField('customer_lname',    StringType(), nullable=True),
    StructField('customer_email',    StringType(), nullable=True),
    StructField('customer_password', StringType(), nullable=True),
    StructField('customer_street',   StringType(), nullable=True),
    StructField('customer_city',     StringType(), nullable=True),
    StructField('customer_state',    StringType(), nullable=True),
    StructField('customer_zipcode',  StringType(), nullable=True)])

In [6]:
departments_schema = StructType([
    StructField('department_id',   IntegerType(), nullable=True),
    StructField('department_name', StringType(), nullable=True)])

In [7]:
categories_schema = StructType([
    StructField('category_id',            IntegerType(), nullable=True),
    StructField('category_department_id', IntegerType(), nullable=True),
    StructField('category_name',          StringType(), nullable=True)])

In [8]:
products_schema = StructType([
    StructField('product_id',          IntegerType(), nullable=True),
    StructField('product_category_id', IntegerType(), nullable=True),
    StructField('product_name',        StringType(), nullable=True),
    StructField('product_description', StringType(), nullable=True),
    StructField('product_price',       FloatType(), nullable=True),
    StructField('product_image',       StringType(), nullable=True)])

In [9]:
orders_schema = StructType([
    StructField('order_id',          IntegerType(), nullable=True),
    StructField('order_date',        StringType(), nullable=True),
    StructField('order_customer_id', IntegerType(), nullable=True),
    StructField('order_status',      StringType(), nullable=True)])

In [10]:
order_items_schema = StructType([
    StructField('order_item_id',            IntegerType(), nullable=True),
    StructField('order_item_order_id',      IntegerType(), nullable=True),
    StructField('order_item_product_id',    IntegerType(), nullable=True),
    StructField('order_item_quantity',      IntegerType(), nullable=True),
    StructField('order_item_subtotal',      FloatType(), nullable=True),
    StructField('order_item_product_price', FloatType(), nullable=True)])

### Cargamos los datos

In [11]:

customers_df = spark.read.csv(path=CUSTOMERS_DATA, schema=customers_schema)
customers_df.cache()

departments_df = spark.read.csv(path=DEPARTMENTS_DATA, schema=departments_schema)
departments_df.cache()

categories_df = spark.read.csv(path=CATEGORIES_DATA, schema=categories_schema)
categories_df.cache()

products_df = spark.read.csv(path=PRODUCTS_DATA, schema=products_schema)
products_df.cache()

orders_df = spark.read.csv(path=ORDERS_DATA, schema=orders_schema)
orders_df.cache()

order_items_df = spark.read.csv(path=ORDER_ITEMS_DATA, schema=order_items_schema)
order_items_df.cache()

DataFrame[order_item_id: int, order_item_order_id: int, order_item_product_id: int, order_item_quantity: int, order_item_subtotal: float, order_item_product_price: float]

In [13]:
categories_df.show()

+-----------+----------------------+-------------------+
|category_id|category_department_id|      category_name|
+-----------+----------------------+-------------------+
|          1|                     2|           Football|
|          2|                     2|             Soccer|
|          3|                     2|Baseball & Softball|
|          4|                     2|         Basketball|
|          5|                     2|           Lacrosse|
|          6|                     2|   Tennis & Racquet|
|          7|                     2|             Hockey|
|          8|                     2|        More Sports|
|          9|                     3|   Cardio Equipment|
|         10|                     3|  Strength Training|
|         11|                     3|Fitness Accessories|
|         12|                     3|       Boxing & MMA|
|         13|                     3|        Electronics|
|         14|                     3|     Yoga & Pilates|
|         15|                  

### Creamos vistas temporales para trabajar con spark.sql

In [14]:
customers_df.createOrReplaceTempView("customers")
customers_df.show(5)

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|       9327|         Donna|         Smith|     XXXXXXXXX|        XXXXXXXXX|4114 Clear Nectar...|       Caguas|            PR|           00725|
|       9328|          Mary|         Perez|     XXXXXXXXX|        XXXXXXXXX|  376 Golden Orchard|Moreno Valley|            CA|           92553|
|       9329|        Eugene|        Powell|     XXXXXXXXX|        XXXXXXXXX|   2161 Burning Maze|     Metairie|            LA|           70003|
|       9330|          Mary|        Conley|     XXXXXXXXX|        XXXXXXXXX| 3046 Broad Sky Dale|       Caguas|            PR|          

In [15]:
departments_df.createOrReplaceTempView("departments")
departments_df.show(5)

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            6|       Outdoors|
|            7|       Fan Shop|
|            2|        Fitness|
|            3|       Footwear|
|            4|        Apparel|
+-------------+---------------+
only showing top 5 rows



In [16]:
orders_df.createOrReplaceTempView("orders")
orders_df.show(5)

[Stage 8:>                                                          (0 + 2) / 2]

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|   17222|2013-11-09 00:00:...|             2373|       COMPLETE|
|   17223|2013-11-09 00:00:...|            12091|PENDING_PAYMENT|
|   17224|2013-11-09 00:00:...|              871|        PENDING|
|   17225|2013-11-09 00:00:...|             6381|PENDING_PAYMENT|
|   17226|2013-11-09 00:00:...|             4456|        PENDING|
+--------+--------------------+-----------------+---------------+
only showing top 5 rows



                                                                                

In [17]:
order_items_df.createOrReplaceTempView("order_items")
order_items_df.show(5)

[Stage 10:>                                                         (0 + 2) / 2]

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|       129149|              51686|                  403|                  1|             129.99|                  129.99|
|       129150|              51687|                  403|                  1|             129.99|                  129.99|
|       129151|              51687|                  403|                  1|             129.99|                  129.99|
|       129152|              51687|                 1014|                  5|              249.9|                   49.98|
|       129153|              51687|                  191|                  4|             399.96|                   99.99|
+-------------+-

                                                                                

In [18]:
products_df.createOrReplaceTempView("products")
products_df.show(5)

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|      1009|                 45|Diamond Fear No E...|               NULL|       599.99|http://images.acm...|
|      1010|                 46|DBX Vector Series...|               NULL|        19.98|http://images.acm...|
|      1011|                 46|Old Town Canoe Sa...|               NULL|       499.99|http://images.acm...|
|      1012|                 46|Pelican Trailblaz...|               NULL|       299.99|http://images.acm...|
|      1013|                 46|Perception Sport ...|               NULL|       349.99|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
only showing top 5 

In [19]:
categories_df.createOrReplaceTempView("categories")
categories_df.show(5)

+-----------+----------------------+-------------------+
|category_id|category_department_id|      category_name|
+-----------+----------------------+-------------------+
|          1|                     2|           Football|
|          2|                     2|             Soccer|
|          3|                     2|Baseball & Softball|
|          4|                     2|         Basketball|
|          5|                     2|           Lacrosse|
+-----------+----------------------+-------------------+
only showing top 5 rows



In [20]:
spark.sql("select * from categories").show()

+-----------+----------------------+-------------------+
|category_id|category_department_id|      category_name|
+-----------+----------------------+-------------------+
|          1|                     2|           Football|
|          2|                     2|             Soccer|
|          3|                     2|Baseball & Softball|
|          4|                     2|         Basketball|
|          5|                     2|           Lacrosse|
|          6|                     2|   Tennis & Racquet|
|          7|                     2|             Hockey|
|          8|                     2|        More Sports|
|          9|                     3|   Cardio Equipment|
|         10|                     3|  Strength Training|
|         11|                     3|Fitness Accessories|
|         12|                     3|       Boxing & MMA|
|         13|                     3|        Electronics|
|         14|                     3|     Yoga & Pilates|
|         15|                  

In [23]:
query = '''
SELECT
    customer_id, customer_fname, customer_lname, customer_email, sum(order_item_quantity) as quantity_item_total, sum(order_item_subtotal)as total
FROM
    customers as c
INNER JOIN
    orders as o
    ON c.customer_id = o.order_customer_id
INNER JOIN
    order_items as oi
    ON o.order_id = oi.order_item_order_id
WHERE order_status <> 'CANCELED'
GROUP BY customer_id, customer_fname, customer_lname, customer_email
ORDER BY  total DESC
LIMIT 20;
'''
spark.sql(query).show()

+-----------+--------------+--------------+--------------+-------------------+------------------+
|customer_id|customer_fname|customer_lname|customer_email|quantity_item_total|             total|
+-----------+--------------+--------------+--------------+-------------------+------------------+
|        791|          Mary|         Smith|     XXXXXXXXX|                 82|10524.170177459717|
|       8766|          Mary|        Duncan|     XXXXXXXXX|                 87| 9296.140186309814|
|       1657|         Betty|      Phillips|     XXXXXXXXX|                111| 9223.710151672363|
|       2641|         Betty|        Spears|     XXXXXXXXX|                 90| 9130.920223236084|
|       1288|        Evelyn|      Thompson|     XXXXXXXXX|                 89|  9019.11019897461|
|       3710|        Ashley|         Smith|     XXXXXXXXX|                 89|  9019.10020828247|
|       5654|         Jerry|         Smith|     XXXXXXXXX|                103| 8904.950210571289|
|       5624|       