In [1]:
from pyspark.sql.types import *
from pyspark.sql.window import Window

import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate() 

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/15 20:46:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/15 20:46:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark.read.text("hdfs://namenode:8020/user/vagrant/datasets/departments/").show()

[Stage 0:>                                                          (0 + 1) / 1]

+----------+
|     value|
+----------+
|6,Outdoors|
|7,Fan Shop|
| 2,Fitness|
|3,Footwear|
| 4,Apparel|
|    5,Golf|
+----------+



                                                                                

In [4]:
CUSTOMERS_DATA =   'hdfs://namenode:8020/user/vagrant/datasets/customers'
DEPARTMENTS_DATA = 'hdfs://namenode:8020/user/vagrant/datasets/departments'
CATEGORIES_DATA =  'hdfs://namenode:8020/user/vagrant/datasets/categories'
PRODUCTS_DATA =    'hdfs://namenode:8020/user/vagrant/datasets/products'
ORDERS_DATA =      'hdfs://namenode:8020/user/vagrant/datasets/orders'
ORDER_ITEMS_DATA = 'hdfs://namenode:8020/user/vagrant/datasets/order_items'

In [5]:
# define the schema, corresponding to a line in the csv data file for Customer
customers_schema = StructType([
    StructField('customer_id',       IntegerType(), nullable=True),
    StructField('customer_fname',    StringType(), nullable=True),
    StructField('customer_lname',    StringType(), nullable=True),
    StructField('customer_email',    StringType(), nullable=True),
    StructField('customer_password', StringType(), nullable=True),
    StructField('customer_street',   StringType(), nullable=True),
    StructField('customer_city',     StringType(), nullable=True),
    StructField('customer_state',    StringType(), nullable=True),
    StructField('customer_zipcode',  StringType(), nullable=True)])

In [6]:
departments_schema = StructType([
    StructField('department_id',   IntegerType(), nullable=True),
    StructField('department_name', StringType(), nullable=True)])

In [7]:
categories_schema = StructType([
    StructField('category_id',            IntegerType(), nullable=True),
    StructField('category_department_id', IntegerType(), nullable=True),
    StructField('category_name',          StringType(), nullable=True)])

In [8]:
products_schema = StructType([
    StructField('product_id',          IntegerType(), nullable=True),
    StructField('product_category_id', IntegerType(), nullable=True),
    StructField('product_name',        StringType(), nullable=True),
    StructField('product_description', StringType(), nullable=True),
    StructField('product_price',       FloatType(), nullable=True),
    StructField('product_image',       StringType(), nullable=True)])

In [9]:
orders_schema = StructType([
    StructField('order_id',          IntegerType(), nullable=True),
    StructField('order_date',        StringType(), nullable=True),
    StructField('order_customer_id', IntegerType(), nullable=True),
    StructField('order_status',      StringType(), nullable=True)])

In [10]:
order_items_schema = StructType([
    StructField('order_item_id',            IntegerType(), nullable=True),
    StructField('order_item_order_id',      IntegerType(), nullable=True),
    StructField('order_item_product_id',    IntegerType(), nullable=True),
    StructField('order_item_quantity',      IntegerType(), nullable=True),
    StructField('order_item_subtotal',      FloatType(), nullable=True),
    StructField('order_item_product_price', FloatType(), nullable=True)])

### Cargamos los datos

In [11]:

customers_df = spark.read.csv(path=CUSTOMERS_DATA, schema=customers_schema)
customers_df.cache()

departments_df = spark.read.csv(path=DEPARTMENTS_DATA, schema=departments_schema)
departments_df.cache()

categories_df = spark.read.csv(path=CATEGORIES_DATA, schema=categories_schema)
categories_df.cache()

products_df = spark.read.csv(path=PRODUCTS_DATA, schema=products_schema)
products_df.cache()

orders_df = spark.read.csv(path=ORDERS_DATA, schema=orders_schema)
orders_df.cache()

order_items_df = spark.read.csv(path=ORDER_ITEMS_DATA, schema=order_items_schema)
order_items_df.cache()

DataFrame[order_item_id: int, order_item_order_id: int, order_item_product_id: int, order_item_quantity: int, order_item_subtotal: float, order_item_product_price: float]

In [12]:
customers_df.show()

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|       9327|         Donna|         Smith|     XXXXXXXXX|        XXXXXXXXX|4114 Clear Nectar...|       Caguas|            PR|           00725|
|       9328|          Mary|         Perez|     XXXXXXXXX|        XXXXXXXXX|  376 Golden Orchard|Moreno Valley|            CA|           92553|
|       9329|        Eugene|        Powell|     XXXXXXXXX|        XXXXXXXXX|   2161 Burning Maze|     Metairie|            LA|           70003|
|       9330|          Mary|        Conley|     XXXXXXXXX|        XXXXXXXXX| 3046 Broad Sky Dale|       Caguas|            PR|          

### Creamos vistas temporales para trabajar con spark.sql

In [13]:
customers_df.createOrReplaceTempView("customers")
customers_df.show(5)

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|       9327|         Donna|         Smith|     XXXXXXXXX|        XXXXXXXXX|4114 Clear Nectar...|       Caguas|            PR|           00725|
|       9328|          Mary|         Perez|     XXXXXXXXX|        XXXXXXXXX|  376 Golden Orchard|Moreno Valley|            CA|           92553|
|       9329|        Eugene|        Powell|     XXXXXXXXX|        XXXXXXXXX|   2161 Burning Maze|     Metairie|            LA|           70003|
|       9330|          Mary|        Conley|     XXXXXXXXX|        XXXXXXXXX| 3046 Broad Sky Dale|       Caguas|            PR|          

In [14]:
departments_df.createOrReplaceTempView("departments")
departments_df.show(5)

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            6|       Outdoors|
|            7|       Fan Shop|
|            2|        Fitness|
|            3|       Footwear|
|            4|        Apparel|
+-------------+---------------+
only showing top 5 rows



In [16]:
orders_df.createOrReplaceTempView("orders")
orders_df.show(5)

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|   17222|2013-11-09 00:00:...|             2373|       COMPLETE|
|   17223|2013-11-09 00:00:...|            12091|PENDING_PAYMENT|
|   17224|2013-11-09 00:00:...|              871|        PENDING|
|   17225|2013-11-09 00:00:...|             6381|PENDING_PAYMENT|
|   17226|2013-11-09 00:00:...|             4456|        PENDING|
+--------+--------------------+-----------------+---------------+
only showing top 5 rows



In [17]:
order_items_df.createOrReplaceTempView("order_items")
order_items_df.show(5)

[Stage 8:>                                                          (0 + 1) / 1]

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|       129149|              51686|                  403|                  1|             129.99|                  129.99|
|       129150|              51687|                  403|                  1|             129.99|                  129.99|
|       129151|              51687|                  403|                  1|             129.99|                  129.99|
|       129152|              51687|                 1014|                  5|              249.9|                   49.98|
|       129153|              51687|                  191|                  4|             399.96|                   99.99|
+-------------+-

                                                                                

In [18]:
products_df.createOrReplaceTempView("products")
products_df.show(5)

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|      1009|                 45|Diamond Fear No E...|               null|       599.99|http://images.acm...|
|      1010|                 46|DBX Vector Series...|               null|        19.98|http://images.acm...|
|      1011|                 46|Old Town Canoe Sa...|               null|       499.99|http://images.acm...|
|      1012|                 46|Pelican Trailblaz...|               null|       299.99|http://images.acm...|
|      1013|                 46|Perception Sport ...|               null|       349.99|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
only showing top 5 

In [19]:
categories_df.createOrReplaceTempView("categories")
categories_df.show(5)

+-----------+----------------------+-------------------+
|category_id|category_department_id|      category_name|
+-----------+----------------------+-------------------+
|          1|                     2|           Football|
|          2|                     2|             Soccer|
|          3|                     2|Baseball & Softball|
|          4|                     2|         Basketball|
|          5|                     2|           Lacrosse|
+-----------+----------------------+-------------------+
only showing top 5 rows

