# Instalar el software

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Crear la sesión

In [3]:

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

# Leer los datos

In [4]:
customers = spark.read.option("inferSchema","true").option("header","true").csv("customers.csv")
products = spark.read.option("inferSchema","true").option("header","true").csv("products.csv")
stock = spark.read.option("inferSchema","true").option("header","true").csv("stock.csv")
customers.printSchema()
products.printSchema()
stock.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- customer: integer (nullable = true)
 |-- product: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: integer (nullable = true)

root
 |-- ID: integer (nullable = true)
 |-- NAME: string (nullable = true)
 |-- COLOR: string (nullable = true)

root
 |-- ID: integer (nullable = true)
 |-- PRICE: integer (nullable = true)
 |-- STOCKNUM: integer (nullable = true)



In [5]:
customers.printSchema()
customers.show(5)
customers.dtypes
customers.summary()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- customer: integer (nullable = true)
 |-- product: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: integer (nullable = true)

+----------+--------+--------+-------+--------+-----+
|      date|    time|customer|product|quantity|price|
+----------+--------+--------+-------+--------+-----+
|05/10/2018| 2:20 PM|     100|      1|      10|  816|
|06/10/2018| 3:30 PM|     100|      1|      10|    1|
|07/10/2018| 5:20 PM|     100|      1|      10|   10|
|04/08/2018|11:38 PM|     100|      2|       8|   79|
|25/03/2018| 3:52 AM|     100|      3|       1|   91|
+----------+--------+--------+-------+--------+-----+
only showing top 5 rows



summary,date,time,customer,product,quantity,price
count,1002,1002,1002.0,1002.0,1002.0,1002.0
mean,,,114.625748502994,5.491017964071856,4.481037924151696,50.9500998003992
stddev,,,8.769136669632404,2.877870223733518,2.946431445375335,37.7535187341949
min,01/01/2018,10:00 AM,100.0,1.0,0.0,0.0
25%,,,107.0,3.0,2.0,25.0
50%,,,115.0,5.0,4.0,51.0
75%,,,122.0,8.0,7.0,75.0
max,31/12/2017,9:59 AM,130.0,10.0,10.0,816.0


#  Selección y expresiones

In [6]:
from pyspark.sql.functions import expr, desc, asc
customers.select("customer", "product").show(10)

+--------+-------+
|customer|product|
+--------+-------+
|     100|      1|
|     100|      1|
|     100|      1|
|     100|      2|
|     100|      3|
|     100|      4|
|     100|      5|
|     100|      6|
|     100|      7|
|     100|      8|
+--------+-------+
only showing top 10 rows



In [7]:
customers.select(expr("customer"),
expr("product"),
expr("quantity as q")).show()

+--------+-------+---+
|customer|product|  q|
+--------+-------+---+
|     100|      1| 10|
|     100|      1| 10|
|     100|      1| 10|
|     100|      2|  8|
|     100|      3|  1|
|     100|      4|  3|
|     100|      5|  8|
|     100|      6|  8|
|     100|      7|  4|
|     100|      8|  5|
|     100|      9|  9|
|     100|     10|  9|
|     100|      1|  3|
|     100|      2|  6|
|     100|      3|  1|
|     100|      4|  7|
|     100|      5|  3|
|     100|      6|  5|
|     100|      7|  7|
|     100|      8|  0|
+--------+-------+---+
only showing top 20 rows



In [8]:
customers.select("product", "quantity").show()

+-------+--------+
|product|quantity|
+-------+--------+
|      1|      10|
|      1|      10|
|      1|      10|
|      2|       8|
|      3|       1|
|      4|       3|
|      5|       8|
|      6|       8|
|      7|       4|
|      8|       5|
|      9|       9|
|     10|       9|
|      1|       3|
|      2|       6|
|      3|       1|
|      4|       7|
|      5|       3|
|      6|       5|
|      7|       7|
|      8|       0|
+-------+--------+
only showing top 20 rows



In [9]:
customers.select(expr("*")).show()

+----------+--------+--------+-------+--------+-----+
|      date|    time|customer|product|quantity|price|
+----------+--------+--------+-------+--------+-----+
|05/10/2018| 2:20 PM|     100|      1|      10|  816|
|06/10/2018| 3:30 PM|     100|      1|      10|    1|
|07/10/2018| 5:20 PM|     100|      1|      10|   10|
|04/08/2018|11:38 PM|     100|      2|       8|   79|
|25/03/2018| 3:52 AM|     100|      3|       1|   91|
|24/07/2018|11:37 AM|     100|      4|       3|   59|
|10/01/2018| 9:17 PM|     100|      5|       8|   74|
|04/10/2018| 5:05 PM|     100|      6|       8|   33|
|10/06/2018| 8:39 PM|     100|      7|       4|   45|
|25/12/2017|10:10 AM|     100|      8|       5|   33|
|21/05/2018| 6:39 PM|     100|      9|       9|    8|
|08/04/2018| 5:02 AM|     100|     10|       9|   98|
|13/04/2018|11:55 AM|     100|      1|       3|   55|
|19/06/2018| 1:38 PM|     100|      2|       6|   45|
|02/07/2018| 4:30 AM|     100|      3|       1|   54|
|21/10/2018| 2:49 PM|     10

In [10]:
customers.select(expr("quantity > 1")).show(5)

+--------------+
|(quantity > 1)|
+--------------+
|          true|
|          true|
|          true|
|          true|
|         false|
+--------------+
only showing top 5 rows



In [11]:
customers.select(expr("product"), expr("quantity > 1")).show(5)

+-------+--------------+
|product|(quantity > 1)|
+-------+--------------+
|      1|          true|
|      1|          true|
|      1|          true|
|      2|          true|
|      3|         false|
+-------+--------------+
only showing top 5 rows



In [12]:
customers.select(expr("*"), expr("quantity >1")).show(5)

+----------+--------+--------+-------+--------+-----+--------------+
|      date|    time|customer|product|quantity|price|(quantity > 1)|
+----------+--------+--------+-------+--------+-----+--------------+
|05/10/2018| 2:20 PM|     100|      1|      10|  816|          true|
|06/10/2018| 3:30 PM|     100|      1|      10|    1|          true|
|07/10/2018| 5:20 PM|     100|      1|      10|   10|          true|
|04/08/2018|11:38 PM|     100|      2|       8|   79|          true|
|25/03/2018| 3:52 AM|     100|      3|       1|   91|         false|
+----------+--------+--------+-------+--------+-----+--------------+
only showing top 5 rows



In [13]:
customers.selectExpr("*", "customer = 100", "price > 10").show(5)

+----------+--------+--------+-------+--------+-----+----------------+------------+
|      date|    time|customer|product|quantity|price|(customer = 100)|(price > 10)|
+----------+--------+--------+-------+--------+-----+----------------+------------+
|05/10/2018| 2:20 PM|     100|      1|      10|  816|            true|        true|
|06/10/2018| 3:30 PM|     100|      1|      10|    1|            true|       false|
|07/10/2018| 5:20 PM|     100|      1|      10|   10|            true|       false|
|04/08/2018|11:38 PM|     100|      2|       8|   79|            true|        true|
|25/03/2018| 3:52 AM|     100|      3|       1|   91|            true|        true|
+----------+--------+--------+-------+--------+-----+----------------+------------+
only showing top 5 rows



# Agrupaciones

In [14]:
customers.groupBy("product").agg({"quantity": "sum"}).show()

+-------+-------------+
|product|sum(quantity)|
+-------+-------------+
|      1|          444|
|      6|          437|
|      3|          481|
|      5|          469|
|      9|          431|
|      4|          431|
|      8|          483|
|      7|          430|
|     10|          453|
|      2|          431|
+-------+-------------+



In [15]:
customers.selectExpr("sum(price)").show()

+----------+
|sum(price)|
+----------+
|     51052|
+----------+



In [16]:
customers.selectExpr("avg(price)").show()

+----------------+
|      avg(price)|
+----------------+
|50.9500998003992|
+----------------+



In [17]:
customers.selectExpr("count(customer)").show()
customers.selectExpr("count(distinct(customer))").show()

+---------------+
|count(customer)|
+---------------+
|           1002|
+---------------+

+------------------------+
|count(DISTINCT customer)|
+------------------------+
|                      31|
+------------------------+



In [18]:
customers.selectExpr("avg(price)", "count(customer)").show()

+----------------+---------------+
|      avg(price)|count(customer)|
+----------------+---------------+
|50.9500998003992|           1002|
+----------------+---------------+



# Filtros

In [19]:
customers.where(expr("quantity < 9")).show(5)
customers.where(expr("customer != 100")).show(5)
customers.where(expr("quantity < 9")).where(expr("customer != 100")).show(5)

+----------+--------+--------+-------+--------+-----+
|      date|    time|customer|product|quantity|price|
+----------+--------+--------+-------+--------+-----+
|04/08/2018|11:38 PM|     100|      2|       8|   79|
|25/03/2018| 3:52 AM|     100|      3|       1|   91|
|24/07/2018|11:37 AM|     100|      4|       3|   59|
|10/01/2018| 9:17 PM|     100|      5|       8|   74|
|04/10/2018| 5:05 PM|     100|      6|       8|   33|
+----------+--------+--------+-------+--------+-----+
only showing top 5 rows

+----------+--------+--------+-------+--------+-----+
|      date|    time|customer|product|quantity|price|
+----------+--------+--------+-------+--------+-----+
|13/06/2018| 3:02 PM|     101|      4|       8|   63|
|06/07/2018|10:35 AM|     101|      5|       8|   42|
|13/04/2018| 2:24 AM|     101|      6|       6|   73|
|21/12/2017| 3:09 AM|     101|      7|       6|    8|
|02/07/2018| 6:23 PM|     101|      8|       9|   74|
+----------+--------+--------+-------+--------+-----+
onl

In [20]:
customers.selectExpr("customer").where(expr("product = 8")).where(expr("quantity > 7")).show(5)

+--------+
|customer|
+--------+
|     100|
|     101|
|     101|
|     102|
|     102|
+--------+
only showing top 5 rows



# Ordenación

In [21]:
customers.orderBy("price").show(5)

from pyspark.sql.functions import expr, desc, asc
customers.orderBy(desc("customer"), asc("price")).show(5)
customers.where(expr("date > '16/09/2018'")).orderBy(desc("customer"), desc("price")).show(5)

+----------+--------+--------+-------+--------+-----+
|      date|    time|customer|product|quantity|price|
+----------+--------+--------+-------+--------+-----+
|23/06/2018|12:49 AM|     110|      8|       8|    0|
|22/08/2018| 3:38 PM|     124|      5|       4|    0|
|07/07/2018| 6:23 PM|     112|      6|       4|    0|
|16/03/2018|12:21 PM|     107|      3|       7|    0|
|03/12/2017| 2:38 PM|     113|      9|       3|    0|
+----------+--------+--------+-------+--------+-----+
only showing top 5 rows

+----------+--------+--------+-------+--------+-----+
|      date|    time|customer|product|quantity|price|
+----------+--------+--------+-------+--------+-----+
|16/08/2018|12:03 PM|     130|      1|       1|    8|
|19/09/2018| 6:31 PM|     130|      5|       7|   14|
|25/09/2018| 9:02 PM|     130|      4|       0|   29|
|23/05/2018| 2:04 PM|     130|      8|       9|   55|
|29/03/2018| 6:13 AM|     130|      3|       6|   69|
+----------+--------+--------+-------+--------+-----+
onl

In [22]:
customers.groupBy("customer").count().show(3)

+--------+-----+
|customer|count|
+--------+-----+
|     108|   33|
|     101|   33|
|     115|   33|
+--------+-----+
only showing top 3 rows



In [23]:
customers.groupBy("customer").agg(expr("sum(quantity)")).show(3)
customers.groupBy("customer").avg().show(3)

+--------+-------------+
|customer|sum(quantity)|
+--------+-------------+
|     108|          129|
|     101|          196|
|     115|          143|
+--------+-------------+
only showing top 3 rows

+--------+-------------+-----------------+------------------+------------------+
|customer|avg(customer)|     avg(product)|     avg(quantity)|        avg(price)|
+--------+-------------+-----------------+------------------+------------------+
|     108|        108.0|5.545454545454546| 3.909090909090909| 49.93939393939394|
|     101|        101.0|5.454545454545454|5.9393939393939394| 49.27272727272727|
|     115|        115.0|5.636363636363637| 4.333333333333333|49.666666666666664|
+--------+-------------+-----------------+------------------+------------------+
only showing top 3 rows



In [24]:
customers.groupBy("customer").agg(expr("avg(quantity)"),expr("stddev_pop(quantity)")).show(3)
customers.groupBy("customer").agg(expr("avg(quantity)"),expr("max(price)")).show(3)

+--------+------------------+--------------------+
|customer|     avg(quantity)|stddev_pop(quantity)|
+--------+------------------+--------------------+
|     108| 3.909090909090909|    2.83232035043586|
|     101|5.9393939393939394|  2.8170411507132362|
|     115| 4.333333333333333|  3.1107503398324456|
+--------+------------------+--------------------+
only showing top 3 rows

+--------+------------------+----------+
|customer|     avg(quantity)|max(price)|
+--------+------------------+----------+
|     108| 3.909090909090909|        99|
|     101|5.9393939393939394|        93|
|     115| 4.333333333333333|        98|
+--------+------------------+----------+
only showing top 3 rows



# Unión

In [25]:
products.printSchema()
stock.printSchema()
joined = products.join(stock, products.ID == stock.ID, "inner")
joined.printSchema()
joined.show(5)

root
 |-- ID: integer (nullable = true)
 |-- NAME: string (nullable = true)
 |-- COLOR: string (nullable = true)

root
 |-- ID: integer (nullable = true)
 |-- PRICE: integer (nullable = true)
 |-- STOCKNUM: integer (nullable = true)

root
 |-- ID: integer (nullable = true)
 |-- NAME: string (nullable = true)
 |-- COLOR: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- PRICE: integer (nullable = true)
 |-- STOCKNUM: integer (nullable = true)

+----+-----+-----+----+-----+--------+
|  ID| NAME|COLOR|  ID|PRICE|STOCKNUM|
+----+-----+-----+----+-----+--------+
|1234|chair| blue|1234|  125|    1000|
|   1|table|black|   1|  816|     100|
|   2|  jar|white|   2|   46|       1|
|   3|  pan|  red|   3|   54|      22|
+----+-----+-----+----+-----+--------+



Preguntas sobre customers:

1. ¿Cuántos elementos podemos encontrar (en nuestro DataFrame)?
2. ¿Cuántos clientes únicos?
3. ¿Cuántos productos compró cada cliente?
4. Clasificar clientes por cantidad
5. ¿Cuántas veces el cliente número 100 ha comprado más de 5 artículos?
6. ¿Cuáles fueron los productos comprados por el cliente con mayor número de transacciones? Nos interesa el cliente que ha realizado más compras. No es necesario considerar cantidades de productos, sólo cuántas veces un cliente ha realizado una transacción.
