# Data Preprocessing with PySpark

## The 5 V's in this project. 

**Veracidad**: The dataset used was obtained from Kaggle. Its author is Michael Kechinov, CEO of REES46 eCommerce Marketing Platform.

**Variability**: The data contained in the dataset is structured. There are datetime, strings, numerical and categorical data.

**Volume:** The dataset has a weight of 5.3 gb. 

**Valence:** The variables of the data have a high level of connection. For example, the brand (*brand*) and the category (*category_code*).

**Value:** The dataset has data that, worked correctly, generate great value to whoever uses it.
(land) what added value it generates.

In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/8e/b0/bf9020b56492281b9c9d8aae8f44ff51e1bc91b3ef5a884385cb4e389a40/pyspark-3.0.0.tar.gz (204.7MB)
[K     |████████████████████████████████| 204.7MB 64kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 44.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044182 sha256=407978a1ff6a0b950cb873acc1cf652d3b73f76f06d1b18d729bf631d8c496aa
  Stored in directory: /root/.cache/pip/wheels/57/27/4d/ddacf7143f8d5b76c45c61ee2e43d9f8492fc5a8e78ebd7d37
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.0


In [None]:
import pyspark

from pyspark.sql import SparkSession

from pyspark.sql.types import StringType

# To apply user defined functions
from pyspark.sql.functions import udf 

# To convert to date type
from pyspark.sql.types import DateType

# To select columns easier
from pyspark.sql.functions import col

# To use datasets stored in my personal Drive account
from google.colab import drive
drive.mount('/content/drive/')  # Choose file path

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [None]:
# Creating a Spark Session called "spark", with app name "eCommerce Analysis"
spark = SparkSession \
    .builder \
    .appName("eCommerce Analysis") \
    .getOrCreate()

In [None]:
# Reading data from personal Drive
data = spark.read.csv("drive/My Drive/2019-Oct.csv", header = True)

In [None]:
# data.show(2)

In [None]:
# Function that removes "UTC" from the event_time data
removerUTC = udf(lambda x:x[0:-4],StringType())

# Apply function to "event_time" column
# Store results in a new column called "date_new"
data = data.withColumn('date_new', removerUTC('event_time'))

In [None]:
# Show distinct brands
# data.where("brand").distinct().show(10)

In [None]:
# Count the brands 
conteo_marcas = data.groupby("brand").count()

# Date Processing

In [None]:
# Function that receives a date and returns the day
convertToDay = udf(lambda x:x[8:10], StringType())

# Apply the function to "date_new" column and save the result to a new column "day_int"
data = data.withColumn('day_int', convertToDay('date_new'))

In [None]:
# Function that receives a date and returns the hour
convertToHour = udf(lambda x:x[11:13], StringType())

# Apply the function to "date_new" column and save the result to a new column "hour_int"
data = data.withColumn('hour_int', convertToHour('date_new'))

# Sold Products

In [None]:
# See only purchased records
ventas = data.where(data.event_type == "purchase")
# ventas.show(2)

# Wordcloud data

In [None]:
# Most famous categories
# Sort them in descending order
conteo_categorias = data.groupby("category_code").count().sort(col("count").desc())

In [None]:
# See how many categories there are
# conteo_categorias.count()

In [None]:
# See the most popular category codes
df_wordcloud = ventas.groupby("category_code").count().sort(col("count").desc())

# Export data
# Specify number of repartitions, so the results are saved in 1 single file
# df_wordcloud.repartition(1).write.csv("wasb:///DatosExportados/WordNubeVENTAS.csv")

In [None]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "false")

In [None]:
df_wordcloud.toPandas().to_csv("WordNubeVentasFinal.csv")

In [None]:
!pip install pyarrow



# Products that were added to a cart

In [None]:
# See only products added to the cart
carritos = data.where(data.event_type == "cart")

# Products that were viewed

In [None]:
# See only viewed products
vista = data.where(data.event_type == "view")

# Heatmap

In [None]:
# Group sold records by day and hour
# Count the groups generated
heatmap = ventas.groupBy(["day_int", "hour_int"]).agg({"event_type" : "count"})

In [None]:
# Export the data
heatmap.toPandas().to_csv("HeatMap.csv")

# Horizontal Bar Plot

In [None]:
# Group each of the events (view, added to cart and sold) by day. And count the number of event types
horizontal_bar1 = ventas.groupBy("day_int").agg({"event_type" : "count"})
horizontal_bar2 = carritos.groupBy("day_int").agg({"event_type" : "count"})
horizontal_bar3 = vista.groupBy("day_int").agg({"event_type" : "count"})

In [None]:
# Export the data
horizontal_bar1.toPandas().to_csv("HorizontalBarPlot1.csv")
horizontal_bar2.toPandas().to_csv("HorizontalBarPlot2.csv")
horizontal_bar3.toPandas().to_csv("HorizontalBarPlot3.csv")

# Histograms
From horizontal_bar1, we saw the 5 main category codes, and then we generate a csv for each one of them.

In [None]:
# Generate 5 different dataframes for each of the 5 main category codes
hist_cat_1 = ventas.where(ventas.category_code == "electronics.smartphone")
hist_cat_2 = ventas.where(ventas.category_code == "electronics.audio.headphone")
hist_cat_3 = ventas.where(ventas.category_code == "electronics.video.tv")
hist_cat_4 = ventas.where(ventas.category_code == "electronics.clocks")
hist_cat_5 = ventas.where(ventas.category_code == "appliances.kitchen.washer")

In [None]:
# Export data
hist_cat_1.toPandas().to_csv("Hist1.csv")
hist_cat_2.toPandas().to_csv("Hist2.csv")
hist_cat_3.toPandas().to_csv("Hist3.csv")
hist_cat_4.toPandas().to_csv("Hist4.csv")
hist_cat_5.toPandas().to_csv("Hist5.csv")

# Stacked Area Chart

In [None]:
# Group each of the 5 main category codes by day
# Count the number of records generated after grouping
h1grouped = hist_cat_1.groupBy("day_int").count()
h2grouped = hist_cat_2.groupBy("day_int").count()
h3grouped = hist_cat_3.groupBy("day_int").count()
h4grouped = hist_cat_4.groupBy("day_int").count()
h5grouped = hist_cat_5.groupBy("day_int").count()

In [None]:
# Export the data
h1grouped.toPandas().to_csv("stacked1Nuevo.csv")
h2grouped.toPandas().to_csv("stacked2.csv")
h3grouped.toPandas().to_csv("stacked3.csv")
h4grouped.toPandas().to_csv("stacked4.csv")
h5grouped.toPandas().to_csv("stacked5.csv")