## PySpark on Google Colab

Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. 

Therefore, our first task is to download Java.


In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Next, we will install Apache Spark 3.2.1 with Hadoop 3.2 from here: https://spark.apache.org/downloads.html

In [2]:
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

Note – At the time of writing this article, 3.2.1 was the latest version of Apache Spark. But Spark is developing quite rapidly. So, if there is a newer version of Spark when you are executing this code, then you just need to replace 3.2.1, wherever you see it, with the latest version.

Now, we just need to unzip that folder.

In [3]:
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

There is one last thing that we need to install and that is the findspark library. 
It will locate Spark on the system and import it as a regular library.

In [4]:
!pip install -q findspark

Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

Time for the real test!

We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.

In [9]:
import findspark
findspark.init()

In [8]:
findspark.find()

'/content/spark-3.2.1-bin-hadoop3.2'

Now, we can import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark.

Extra material: https://sparkbyexamples.com/pyspark/pyspark-what-is-sparksession/

In [10]:
# SparkSession : punto de entrada a PySpark para trabajar con RDD / Dataframes

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [11]:
spark

Import data!

This dataset was obtained from : https://datahack.analyticsvidhya.com/contest/black-friday/#ProblemStatement

In [15]:
!gdown https://drive.google.com/uc?id=1LnlIF2s8deJAgBQ2-wSso_yXM6_yVU9d

Downloading...
From: https://drive.google.com/uc?id=1LnlIF2s8deJAgBQ2-wSso_yXM6_yVU9d
To: /content/train.csv
100% 25.5M/25.5M [00:00<00:00, 193MB/s]


### Business Problem:

A retail company “ABC Private Limited” wants to understand the customer purchase behaviour (specifically, purchase amount) against various products of different categories. They have shared purchase summary of various customers for selected high volume products from last month.
The data set also contains customer demographics (age, gender, marital status, city_type, stay_in_current_city), product details (product_id and product category) and Total purchase_amount from last month.

Now, they want to build a model to predict the purchase amount of customer against various products which will help them to create personalized offer for customers against different products.

## Reading csv files

If you are curious, please find below how to read different files types with Spark:

https://www.analyticsvidhya.com/blog/2020/10/data-engineering-101-data-sources-apache-spark/?utm_source=blog&utm_medium=working-with-pyspark-on-google-colab-for-data-scientists

In [16]:
df = spark.read.csv("train.csv", header=True)

## Show data

In [17]:
df.show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|              null|              null|    1422|
|100

## Show column details

In [18]:
df.printSchema()
#para ver la el esquemas de la columna

root
 |-- User_ID: string (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Product_Category_1: string (nullable = true)
 |-- Product_Category_2: string (nullable = true)
 |-- Product_Category_3: string (nullable = true)
 |-- Purchase: string (nullable = true)



## Official description
User_ID	-> User ID <br>
Product_ID	-> Product ID <br>
Gender	-> Sex of User <br>
Age	-> Age in bins <br>
Occupation	-> Occupation (Masked) <br>
City_Category	-> Category of the City (A,B,C) <br>
Stay_In_Current_City_Years	-> Number of years stay in current city <br>
Marital_Status	-> Marital Status <br>
Product_Category_1	-> Product Category (Masked) <br>
Product_Category_2	-> Product may belongs to other category also (Masked) <br>
Product_Category_3	-> Product may belongs to other category also (Masked) <br>
Purchase	-> Purchase Amount (Target Variable) <br>

### Number of rows and columns in DF

In [20]:
df.count(), len(df.columns)

12

## Display specific column

In [22]:
df.select("User_Id", "Gender").show()

+-------+------+
|User_Id|Gender|
+-------+------+
|1000001|     F|
|1000001|     F|
|1000001|     F|
|1000001|     F|
|1000002|     M|
|1000003|     M|
|1000004|     M|
|1000004|     M|
|1000004|     M|
|1000005|     M|
|1000005|     M|
|1000005|     M|
|1000005|     M|
|1000005|     M|
|1000006|     F|
|1000006|     F|
|1000006|     F|
|1000006|     F|
|1000007|     M|
|1000008|     M|
+-------+------+
only showing top 20 rows



### Describing the columns

In [24]:
df.describe().show()

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|           550068|       550068|                    550068|             550068|            550068|            376430|            166821|           550068|
|   mean|1003028.8424013031|      null|  null|  null|8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 9.84232925112238

### Distinct values for Categorical columns

In [32]:
# Distinct value of one column es como el groupby des sql
df.select("city_category", "Age").distinct().orderBy("Age").show()

+-------------+-----+
|city_category|  Age|
+-------------+-----+
|            C| 0-17|
|            A| 0-17|
|            B| 0-17|
|            A|18-25|
|            B|18-25|
|            C|18-25|
|            B|26-35|
|            C|26-35|
|            A|26-35|
|            B|36-45|
|            A|36-45|
|            C|36-45|
|            C|46-50|
|            B|46-50|
|            A|46-50|
|            B|51-55|
|            C|51-55|
|            A|51-55|
|            B|  55+|
|            A|  55+|
+-------------+-----+
only showing top 20 rows



In [35]:
# Distinct value of two columns
df.select("city_category", "Age").distinct().show()

+-------------+-----+
|city_category|  Age|
+-------------+-----+
|            B|  55+|
|            A|46-50|
|            A|26-35|
|            C|46-50|
|            B| 0-17|
|            A|18-25|
|            B|46-50|
|            A|  55+|
|            C|18-25|
|            C|36-45|
|            B|26-35|
|            A| 0-17|
|            A|36-45|
|            C|  55+|
|            B|18-25|
|            A|51-55|
|            C|26-35|
|            C| 0-17|
|            B|51-55|
|            C|51-55|
+-------------+-----+
only showing top 20 rows



In [36]:
# Distinct value of two columns + Order values
df.select("city_category", "Age").distinct()\
.orderBy("Age").show()

+-------------+-----+
|city_category|  Age|
+-------------+-----+
|            C| 0-17|
|            A| 0-17|
|            B| 0-17|
|            A|18-25|
|            B|18-25|
|            C|18-25|
|            B|26-35|
|            C|26-35|
|            A|26-35|
|            B|36-45|
|            A|36-45|
|            C|36-45|
|            C|46-50|
|            B|46-50|
|            A|46-50|
|            B|51-55|
|            C|51-55|
|            A|51-55|
|            B|  55+|
|            A|  55+|
+-------------+-----+
only showing top 20 rows



## Group by sentence

In [33]:
# Group by one column
df.groupby("city_category").count().show()

+-------------+------+
|city_category| count|
+-------------+------+
|            B|231173|
|            C|171175|
|            A|147720|
+-------------+------+



In [45]:
# Group by two columns
df.groupby("city_category", "age").count().show()

+-------------+-----+-----+
|city_category|  age|count|
+-------------+-----+-----+
|            B|  55+| 5162|
|            A|46-50| 7607|
|            A|26-35|73745|
|            C|46-50|17688|
|            B| 0-17| 5435|
|            A|18-25|27535|
|            B|46-50|20406|
|            A|  55+| 3573|
|            C|18-25|28878|
|            C|36-45|35798|
|            B|26-35|91584|
|            A| 0-17| 2544|
|            A|36-45|26617|
|            C|  55+|12769|
|            B|18-25|43247|
|            A|51-55| 6099|
|            C|26-35|54258|
|            C| 0-17| 7123|
|            B|51-55|17741|
|            C|51-55|14661|
+-------------+-----+-----+
only showing top 20 rows



In [44]:
# Count number of rows per each combination of "City Category" and "Stay in current city years" ordered by frequency
df.groupby("city_category", "age").count().orderBy("count", ascending= False).show()

+-------------+-----+-----+
|city_category|  age|count|
+-------------+-----+-----+
|            B|26-35|91584|
|            A|26-35|73745|
|            C|26-35|54258|
|            B|36-45|47598|
|            B|18-25|43247|
|            C|36-45|35798|
|            C|18-25|28878|
|            A|18-25|27535|
|            A|36-45|26617|
|            B|46-50|20406|
|            B|51-55|17741|
|            C|46-50|17688|
|            C|51-55|14661|
|            C|  55+|12769|
|            A|46-50| 7607|
|            C| 0-17| 7123|
|            A|51-55| 6099|
|            B| 0-17| 5435|
|            B|  55+| 5162|
|            A|  55+| 3573|
+-------------+-----+-----+
only showing top 20 rows



## Filtering data

A lot of options to filtering = https://sparkbyexamples.com/pyspark/pyspark-where-filter/


In [46]:
import pyspark.sql.functions as f

In [49]:
# Filtering by City Category "A"
df.filter(df.City_Category=="A").show(5)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|                12|              null|              null|    1422

In [53]:
# Also filtering by City Category "A"
df.filter(f.col("City_Category") =="A").show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|                12|              null|              null|    1422

In [56]:
# Filtering by two conditions
df.filter((df.City_Category=="A")&(df.Gender=="M")).show(5)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000003| P00193542|     M|26-35|        15|            A|                         3|             0|                 1|                 2|              null|   15227|
|1000005| P00274942|     M|26-35|        20|            A|                         1|             1|                 8|              null|              null|    7871|
|1000005| P00251242|     M|26-35|        20|            A|                         1|             1|                 5|                11|              null|    5254

In [58]:
# Filtering by two conditions, select just a few columns and ordering result
df.select("city_category", "Age").filter((df.City_Category=="A")&(df.Gender=="M")).orderBy("Gender").show(5)

+-------------+-----+
|city_category|  Age|
+-------------+-----+
|            A|26-35|
|            A|26-35|
|            A|26-35|
|            A|  55+|
|            A|  55+|
+-------------+-----+
only showing top 5 rows



## Updating column type

In [70]:
df=df.withColumn("Purchase", df.Purchase.cast("float"))
df.printSchema()
#si quiero crear una columna pillo y pongo aqui  df=df.withColumn("city_category" el nobre nuevo de la columna 

root
 |-- User_ID: string (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- city_category: double (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Product_Category_1: string (nullable = true)
 |-- Product_Category_2: string (nullable = true)
 |-- Product_Category_3: string (nullable = true)
 |-- Purchase: float (nullable = true)
 |-- ity_category: double (nullable = true)



## Aggregative functions (sum, avg, min, max...)

In [71]:
# Sum purchase per user
df.groupBy("User_Id").agg(f.sum("Purchase")).show()
#funcion agregativa agg

+-------+-------------+
|User_Id|sum(Purchase)|
+-------+-------------+
|1000240|     854532.0|
|1000280|     189891.0|
|1000795|     559717.0|
|1000839|    4681205.0|
|1000888|     492236.0|
|1001866|     860896.0|
|1002011|    1370857.0|
|1002185|    1540819.0|
|1002442|     100036.0|
|1002783|     335592.0|
|1002883|     535502.0|
|1002887|    3259008.0|
|1003202|     607054.0|
|1003366|     215881.0|
|1003397|     179982.0|
|1003644|     292138.0|
|1003663|     549674.0|
|1003665|    1364886.0|
|1004042|    2333608.0|
|1004266|     347340.0|
+-------+-------------+
only showing top 20 rows



In [81]:
# Sum purchase per user and get top 5
df.groupBy("User_Id")\
.agg(f.sum("Purchase"))\
.orderBy("sum(Purchase)", ascending=False)\
.show(5)
## forma b
df.groupBy("User_Id")\
.agg(f.sum("Purchase").alias("pajaro"))\
.orderBy("pajaro", ascending=False)\
.show(5)

+-------+-------------+
|User_Id|sum(Purchase)|
+-------+-------------+
|1004277|  1.0536909E7|
|1001680|    8699596.0|
|1002909|    7577756.0|
|1001941|    6817493.0|
|1000424|    6573609.0|
+-------+-------------+
only showing top 5 rows

+-------+-----------+
|User_Id|     pajaro|
+-------+-----------+
|1004277|1.0536909E7|
|1001680|  8699596.0|
|1002909|  7577756.0|
|1001941|  6817493.0|
|1000424|  6573609.0|
+-------+-----------+
only showing top 5 rows



In [82]:
# Same result but renaming column name


+-------+------------------+
|User_Id|            pajaro|
+-------+------------------+
|1003902|18577.893617021276|
|1005069|18490.166666666668|
|1005999|18345.944444444445|
|1001349|18162.739130434784|
|1003461|           17508.7|
+-------+------------------+
only showing top 5 rows



In [83]:
# Purchase avg per user and get top 5
df.groupBy("User_Id")\
.agg(f.avg("Purchase").alias("pajaro"))\
.orderBy("pajaro", ascending=False)\
.show(5)

+-------+------------------+
|User_Id|            pajaro|
+-------+------------------+
|1003902|18577.893617021276|
|1005069|18490.166666666668|
|1005999|18345.944444444445|
|1001349|18162.739130434784|
|1003461|           17508.7|
+-------+------------------+
only showing top 5 rows



In [94]:
# Can you round the result and rename the purchase column?
df.groupBy("User_Id")\
.agg(f.round(f.avg("Purchase"),2).alias("dinero"))\
.orderBy("dinero", ascending=False)\
.show(5)

+-------+--------+
|User_Id|  pajaro|
+-------+--------+
|1003902|18577.89|
|1005069|18490.17|
|1005999|18345.94|
|1001349|18162.74|
|1003461| 17508.7|
+-------+--------+
only showing top 5 rows



In [117]:
# Create a new DF -> Calculate frecuency, min, max, total and avg purchase per user
df.groupBy("User_Id").agg(f.count("User_Id"),f.round(f.mean("Purchase")),\
f.min("Purchase"),\
f.max("Purchase"),\
f.avg("User_Id")).show()

+-------+--------------+-----------------------+-------------+-------------+------------+
|User_Id|count(User_Id)|round(avg(Purchase), 0)|min(Purchase)|max(Purchase)|avg(User_Id)|
+-------+--------------+-----------------------+-------------+-------------+------------+
|1000240|            86|                 9936.0|        247.0|      19647.0|   1000240.0|
|1000280|            26|                 7304.0|        773.0|      19352.0|   1000280.0|
|1000795|            80|                 6996.0|       1720.0|      20646.0|   1000795.0|
|1000839|           435|                10761.0|        578.0|      23842.0|   1000839.0|
|1000888|            47|                10473.0|         24.0|      19396.0|   1000888.0|
|1001866|            90|                 9566.0|        932.0|      23589.0|   1001866.0|
|1002011|           141|                 9722.0|        474.0|      20108.0|   1002011.0|
|1002185|           171|                 9011.0|         25.0|      23277.0|   1002185.0|
|1002442| 

## Rename, create and drop columns

In [123]:
# Renaming column
df.withColumnRenamed("User_Id","dinero").show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+------------+
|pajaros|Product_ID|Gender| Age|Occupation|city_category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ity_category|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+------------+
|1000001| P00069042|     F|0-17|        10|         null|                         2|             0|                 3|              null|              null|  8370.0|        null|
|1000001| P00248942|     F|0-17|        10|         null|                         2|             0|                 1|                 6|                14| 15200.0|        null|
|1000001| P00087842|     F|0-17|        10|         null|                         2|             0|      

In [None]:
# Renaming many column
compra_df.withColumnRenamed("User_ID", "User_Number_ID")\
  .withColumnRenamed("Frecuency", "Frec")\
  .withColumnRenamed("Total_Purchase", "TP")\
  .show(5)

In [None]:
# Creating new columns
compra_df.withColumn("Range", f.col("Max_Purchase") - f.col("Min_Purchase")).show(5)


In [None]:
# Creating new columns
compra_df.withColumn("Label", f.when(f.col("Frecuency") < 100, "Menos 100 compras")\
                              .when(f.col("Frecuency") < 300, "Mas de 100 y menos 300")\
                              .otherwise("Más de 300")).show(5)


In [139]:
# Drop one column
df.drop("Gender")

DataFrame[User_ID: string, Product_ID: string, Age: string, Occupation: string, city_category: double, Stay_In_Current_City_Years: string, Marital_Status: string, Product_Category_1: string, Product_Category_2: string, Product_Category_3: string, Purchase: float, ity_category: double]

In [None]:
# Drop N columns
dropear=["Gender","Age"]
df.drop(*dropear)