### Practica Spark

Registro de un comercio entre Octubre 2019 y Febrero del 2020, con 20.692.840 observaciones 

In [4]:
import pandas as pd
import findspark
findspark.init()

In [5]:
from pyspark import SparkContext
sc = SparkContext()

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat,col,split
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType

In [7]:
spark=SparkSession.builder.appName('Practise').getOrCreate()

In [8]:
df1_2020 = spark.read.csv("2020-Jan.csv",inferSchema='True',header="True")
df2_2020 = spark.read.csv("2020-Feb.csv",inferSchema='True',header="True")
df1_2019 = spark.read.csv("2019-Oct.csv",inferSchema='True',header="True")
df2_2019 = spark.read.csv("2019-Nov.csv",inferSchema='True',header="True")
df3_2019 = spark.read.csv("2019-Dec.csv",inferSchema='True',header="True")


In [126]:
# Dimension de la BBDD
print(df1_2020.count(),df2_2020.count(),df1_2019.count(),df2_2019.count(),df3_2019.count())

4264752 4156682 4102283 4635837 3533286


In [9]:
dfs = [df1_2020,df2_2020,df1_2019,df2_2019,df3_2019]
df = reduce(DataFrame.unionAll, dfs)

In [128]:
df.count()

20692840

In [129]:
# Tipo de variables de BBDD
df.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [130]:
# Mostrar 10 primeros elementos
df_dataset = df.limit(10)
df_dataset.toPandas()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2020-01-01 00:00:00 UTC,view,5809910,1602943681873052386,,grattol,5.24,595414620,4adb70bb-edbd-4981-b60f-a05bfd32683a
1,2020-01-01 00:00:09 UTC,view,5812943,1487580012121948301,,kinetics,3.97,595414640,c8c5205d-be43-4f1d-aa56-4828b8151c8a
2,2020-01-01 00:00:19 UTC,view,5798924,1783999068867920626,,zinger,3.97,595412617,46a5010f-bd69-4fbe-a00d-bb17aa7b46f3
3,2020-01-01 00:00:24 UTC,view,5793052,1487580005754995573,,,4.92,420652863,546f6af3-a517-4752-a98b-80c4c5860711
4,2020-01-01 00:00:25 UTC,view,5899926,2115334439910245200,,,3.92,484071203,cff70ddf-529e-4b0c-a4fc-f43a749c0acb
5,2020-01-01 00:00:30 UTC,view,5837111,1783999068867920626,,staleks,6.35,595412617,46a5010f-bd69-4fbe-a00d-bb17aa7b46f3
6,2020-01-01 00:00:37 UTC,cart,5850281,1487580006300255120,,marathon,137.78,593016733,848f607c-1d14-474a-8869-c40e60783c9d
7,2020-01-01 00:00:46 UTC,view,5802440,2151191070908613477,,,2.16,595411904,74ca1cd5-5381-4ffe-b00b-a258b390db77
8,2020-01-01 00:00:57 UTC,view,5726464,1487580005268456287,,,5.56,420652863,546f6af3-a517-4752-a98b-80c4c5860711
9,2020-01-01 00:01:02 UTC,remove_from_cart,5850281,1487580006300255120,,marathon,137.78,593016733,848f607c-1d14-474a-8869-c40e60783c9d


In [52]:
# Busqueda de NA
df = {col:df.filter(df[col].isNull()).count() for col in df.columns}
df

# se observa que la variable category code tiene en su totalidad NA
# Brand tiene 8757117 con NA

{'event_time': 0,
 'event_type': 0,
 'product_id': 0,
 'category_id': 0,
 'category_code': 20339246,
 'brand': 8757117,
 'price': 0,
 'user_id': 0,
 'user_session': 4598}

In [17]:
# tipos de eventos
df.select("event_type").distinct().show()

# purchase         = compra
# view             = ver un articulo
# cart             = añadir un articulo al carro
# remove from cart = remover un articulo del carro

+----------------+
|      event_type|
+----------------+
|        purchase|
|            view|
|            cart|
|remove_from_cart|
+----------------+



In [33]:
# Contabilizar la cantidad de eventos para la variable event_type
contarPorEvento = df.groupBy("event_type").count()
contarPorEvento.show()

# Tenemos casi 4 millones de registros de productos que fueron removidos del carro de compras

+----------------+-------+
|      event_type|  count|
+----------------+-------+
|        purchase|1287007|
|            view|9657821|
|            cart|5768333|
|remove_from_cart|3979679|
+----------------+-------+



In [10]:
# Podriamos eliminar las compras que fueron removidas del carro de compras
df1 = df.filter(df.event_type != "remove_from_cart")
contarPorEvento = df1.groupBy("event_type").count()
contarPorEvento.show()

+----------+-------+
|event_type|  count|
+----------+-------+
|  purchase|1287007|
|      view|9657821|
|      cart|5768333|
+----------+-------+



In [132]:
# descripcion de la variable price
df1.describe('price').show()

# porque hay precios negativos?

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|          16713161|
|   mean| 9.320747965634618|
| stddev|20.901918046263543|
|    min|            -79.37|
|    max|            327.78|
+-------+------------------+



In [11]:
# lamentablemente la info de la data no abarca este suceso
# se procede a eliminar esos registros

df1 = df1.where(df1.price>=1)


In [36]:
# cuantas marcas disponibles
df1.select("brand").distinct().show()

+------------+
|       brand|
+------------+
|     beautix|
|     farmona|
|  dr.gloderm|
|   profhenna|
|       riche|
|        oniq|
|    lebelage|
|     vilenta|
|       fancy|
|    siberina|
|      jaguar|
|      tertio|
|   koreatida|
|         jas|
|rocknailstar|
|   depilflax|
|protokeratin|
|     relouis|
|       essie|
|      coifin|
+------------+
only showing top 20 rows



In [65]:
# Cantidad de productos por marca
contarPorProduc = df1.groupBy("brand").count()
contarPorProduc.show()

+------------+-----+
|       brand|count|
+------------+-----+
|     beautix|78649|
|     farmona| 7042|
|  dr.gloderm| 1961|
|   profhenna| 6021|
|       riche| 3072|
|        oniq|95869|
|    lebelage| 3531|
|     vilenta| 3774|
|       fancy| 1232|
|    siberina|10814|
|      jaguar| 8451|
|      tertio| 6240|
|   koreatida|  720|
|         jas|24036|
|rocknailstar|  115|
|   depilflax|24130|
|protokeratin| 3135|
|     relouis|11672|
|       essie|   45|
|      coifin| 6602|
+------------+-----+
only showing top 20 rows



In [134]:
#Cantidad de dinero vendida por marca
df1.groupBy("brand").sum("price").show()

+------------+------------------+
|       brand|        sum(price)|
+------------+------------------+
|     beautix|  976885.569999988|
|     farmona|165549.46999999983|
|  dr.gloderm|52620.599999999984|
|   profhenna| 55255.89999999998|
|       riche|          68953.72|
|        oniq| 950167.6099999889|
|    lebelage|          21891.88|
|     vilenta| 8811.619999999995|
|       fancy| 6269.980000000001|
|    siberina| 61655.72999999995|
|      jaguar|415651.24000000063|
|      tertio| 25325.38999999999|
|   koreatida|           16916.7|
|         jas|1233417.8099999982|
|rocknailstar|331.13999999999993|
|   depilflax|175251.97000000018|
|protokeratin|44295.859999999986|
|     relouis| 55611.69000000006|
|       essie|362.34999999999997|
|      coifin| 381471.9599999993|
+------------+------------------+
only showing top 20 rows



In [135]:
# Precio promedio por producto vendido por marca
df1.groupBy("brand").mean("price").show()

+------------+------------------+
|       brand|        avg(price)|
+------------+------------------+
|     beautix|12.420826329641674|
|     farmona| 23.50887105935811|
|  dr.gloderm|   26.833554309026|
|   profhenna| 9.177196478990197|
|       riche|22.445872395833334|
|        oniq| 9.911103797890757|
|    lebelage|6.1999093741149816|
|     vilenta| 2.334822469528351|
|       fancy| 5.089269480519482|
|    siberina| 5.701473090438316|
|      jaguar| 49.18367530469774|
|      tertio| 4.058556089743588|
|   koreatida|23.495416666666667|
|         jas| 51.31543559660502|
|rocknailstar| 2.879478260869565|
|   depilflax|7.2628251139660245|
|protokeratin|14.129460925039869|
|     relouis| 4.764538211103501|
|       essie| 8.052222222222222|
|      coifin|57.781272341714526|
+------------+------------------+
only showing top 20 rows



In [68]:
# cuales son los productos que se venden acompañados?

df1.select(["product_id"]).filter("event_type = 'cart'").show()

+----------+
|product_id|
+----------+
|   5850281|
|   5802440|
|   5810716|
|   5809803|
|   5591314|
|   5869577|
|   5800788|
|      6730|
|   5712801|
|   5878923|
|   5622680|
|   5775981|
|   5814046|
|   5775982|
|   5775985|
|   5706782|
|   5828286|
|   5809910|
|      5938|
|   5828284|
+----------+
only showing top 20 rows



In [69]:
# sesiones donde el producto 5850281 se ha metido en el carro

sesions = df1.select(["user_session"]).filter("event_type = 'cart' AND product_id = 5850281")

In [70]:
# Ahora se accederan a todos los productos que se han metido al carro en conjunto con el producto 5850281
products = df1.select(["product_id"]).filter("event_type = 'cart' AND product_id <> 5850281").filter(df["user_session"].isin(sesions["user_session"]))


In [71]:
# Productos que fueron vendidos en conjunto con 5850281

products.select("product_id").show()

+----------+
|product_id|
+----------+
|   5802440|
|   5810716|
|   5809803|
|   5591314|
|   5869577|
|   5800788|
|      6730|
|   5712801|
|   5878923|
|   5622680|
|   5775981|
|   5814046|
|   5775982|
|   5775985|
|   5706782|
|   5828286|
|   5809910|
|      5938|
|   5828284|
|   5628025|
+----------+
only showing top 20 rows



In [72]:
# Cuantos son?
products.select("product_id").count()


# Son 4.794.780 productos, pero hay copias entre si, ya que una dupla de productos fue realizada varias veces.
# Por tanto, buscamos los distintos

4794780

In [73]:
products = products.select("product_id").distinct()

In [74]:
# Ahora si, se obtiene una lista de 42.734 productos distintos que fueron vendidos en conjunto con el producto 5850281

products.select("product_id").count()

42734

In [12]:
# Podriamos separar la data por año y meses

df2 = df1.withColumn('year', split(df1['event_time'], '-').getItem(0)) \
       .withColumn('month', split(df1['event_time'], '-').getItem(1)) \
       .withColumn('day', split(df1['event_time'], '-').getItem(2))


In [13]:
df2=df2.drop('event_time','day')
df2.show()

+----------+----------+-------------------+-------------+---------+------+---------+--------------------+----+-----+
|event_type|product_id|        category_id|category_code|    brand| price|  user_id|        user_session|year|month|
+----------+----------+-------------------+-------------+---------+------+---------+--------------------+----+-----+
|      view|   5809910|1602943681873052386|         null|  grattol|  5.24|595414620|4adb70bb-edbd-498...|2020|   01|
|      view|   5812943|1487580012121948301|         null| kinetics|  3.97|595414640|c8c5205d-be43-4f1...|2020|   01|
|      view|   5798924|1783999068867920626|         null|   zinger|  3.97|595412617|46a5010f-bd69-4fb...|2020|   01|
|      view|   5793052|1487580005754995573|         null|     null|  4.92|420652863|546f6af3-a517-475...|2020|   01|
|      view|   5899926|2115334439910245200|         null|     null|  3.92|484071203|cff70ddf-529e-4b0...|2020|   01|
|      view|   5837111|1783999068867920626|         null|  stale

In [14]:
# Cantidad de productos vendidos por mes
contarPorMes = df2.groupBy("month").count()
contarPorMes.show()

+-----+-------+
|month|  count|
+-----+-------+
|   01|3107111|
|   02|2965472|
|   10|2990352|
|   11|3290461|
|   12|2580619|
+-----+-------+



In [150]:
exit()