## PySpark

For this notebook, download the data from the eCommerce Events History in Cosmetics Shop kaggle repository: https://www.kaggle.com/datasets/mkechinov/ecommerce-events-history-in-cosmetics-shop?resource=download&select=2020-Jan.csv

References:

- DataScience con PySpark I: Apache Spark, Python, DataFrames y RDDs:
https://www.youtube.com/watch?v=iMOgTbaDJXc

- Pandas vs PySpark DataFrame With Examples:
https://sparkbyexamples.com/pyspark/pandas-vs-pyspark-dataframe-with-examples/#:~:text=Copy-,What%20is%20PySpark%3F,(100x)%20faster%20than%20Pandas

#### What is Pandas?

Pandas is one of the most used open-source Python libraries to work with Structured tabular data for analysis. Pandas library is heavily used for Data Analytics, Machine learning, data science projects, and many more. Pandas can load the data by reading CSV, JSON, SQL, many other formats and creates a DataFrame which is a structured object containing rows and columns (similar to SQL table).

It doesn’t support distributed processing (use of more than one processor to perform the processing for an individual task.) hence you would always need to increase the resources when you need additional horsepower to support your growing data. However, there is Moding which is an open-source Python library that accelerated Pandas workflow by distributing operations across multiple cores of the CPU. Unlike other distributed libraries, Modin can be easily integrated and compatible with Pandas library and has similar APIs. As a renference, here is a good link for anyone interested: https://towardsdatascience.com/speed-up-your-pandas-workflow-by-changing-a-single-line-of-code-11dfd85efcfb

#### What is PySpark?

PySpark is a Spark library written in Python to run Python applications using Apache Spark capabilities. Using PySpark we can run applications parallelly on the distributed cluster (multiple nodes) or even on a single node. In other words, PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs, but also provides the PySpark shell for interactively analyzing your data in a distributed environment. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) and Spark Core.

Spark basically written in Scala and later on due to its industry adaptation it’s API PySpark released for Python using Py4J. Py4J is a Java library that is integrated within PySpark and allows python to dynamically interface with JVM objects, hence to run PySpark you also need Java to be installed along with Python, and Apache Spark.

#### PySpark Features

- In-memory computation
- Distributed processing using parallelize
- Can be used with many cluster managers (Spark, Yarn, Mesos e.t.c)
- Fault-tolerant
- Immutable
- Lazy evaluation
- Cache & persistence
- Inbuild-optimization when using DataFrames
- Supports ANSI SQL

#### PySpark Advantages

- PySpark is a general-purpose, in-memory, distributed processing engine that allows you to process data efficiently in a distributed fashion.
- Applications running on PySpark are 100x faster than traditional systems.
- You will get great benefits from using PySpark for data ingestion pipelines.
- Using PySpark we can process data from Hadoop HDFS, AWS S3, and many file systems.
- PySpark also is used to process real-time data using Streaming and Kafka.
- Using PySpark streaming you can also stream files from the file system and also stream from the socket.
- PySpark natively has machine learning and graph libraries.

#### PySpark Modules & Packages

- PySpark RDD (pyspark.RDD)
    1. Se procesan sobre distintas máquinas
    2. Sí son tolerantes a fallos
    3. Son inmutables. Si quieres modificarlo te creas otro RDD con el resultado
- PySpark DataFrame and SQL (pyspark.sql)
    1. Se procesan siempre sobre una única máquina
    2. No son tolerantes a fallos
    3. Sin mutables
- PySpark Streaming (pyspark.streaming)
- PySpark MLib (pyspark.ml, pyspark.mllib)
- PySpark GraphFrames (GraphFrames)
- PySpark Resource (pyspark.resource) It’s new in PySpark 3.0

#### Difference between Pandas and PySpark?

In very simple words Pandas run operations on a single machine whereas PySpark runs on multiple machines. If you are working on a Machine Learning application where you are dealing with larger datasets, PySpark is a best fit which could processes operations many times(100x) faster than Pandas.

#### How to Decide Between Pandas vs PySpark

If the data is small enough that you can use pandas to process it, then you likely don't need pyspark. Spark is useful when you have such large data sizes that it doesn't fit into memory in one machine since it can perform distributed computation. That being said, if the computation is complex enough that it could benefit from a lot of parallelization, then you could see an efficiency boost using pyspark. I'm more comfortable with pyspark's APIs than pandas, so I might end up using pyspark anyways, but whether you'll see an efficiency boost depends a lot on the problem.

Below are the few considerations when to choose PySpark over Pandas

- If your data is huge and grows significantly over the years and you wanted to improve your processing time.
- If you want fault-tolerant.
- ANSI SQL compatibility.
- Language to choose (Spark supports Python, Scala, Java & R)
- When you want Machine-learning capability.
- Would like to read Parquet, Avro, Hive, Casandra, Snowflake e.t.c
- If you wanted to stream the data and process it real-time.

#### Code

In [1]:
import glob
import pandas as pd
from pyspark.sql import SparkSession
import time

In [3]:
spark = SparkSession.builder \
               .appName('SparkByExamples.com') \
               .getOrCreate()

Loading the files

In [4]:
# Pandas
start = time.time()

files_s = glob.glob("data/*.csv")
li = []
for filename in files_s:
    df = pd.read_csv(filename, index_col=None, header=0)
    li.append(df)
df_pandas = pd.concat(li, axis=0, ignore_index=True)
df_pandas.head(2)

end = time.time()
print(f'Time pandas: {(end - start):.2f} sec')

# PySpark
start = time.time()

df_spark = spark.read.options(header='True', inferSchema='True').csv('data/*.csv')
df_spark

end = time.time()
print(f'Time spark: {(end - start):.2f} sec')

Time pandas: 24.84 sec
Time spark: 23.91 sec


Printing info

In [7]:
# Pandas
print(len(df_pandas))
print(df_pandas.info())

# PySpark
print(df_spark.count())
print(df_spark.printSchema())

20692840
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20692840 entries, 0 to 20692839
Data columns (total 9 columns):
 #   Column         Dtype  
---  ------         -----  
 0   event_time     object 
 1   event_type     object 
 2   product_id     int64  
 3   category_id    int64  
 4   category_code  object 
 5   brand          object 
 6   price          float64
 7   user_id        int64  
 8   user_session   object 
dtypes: float64(1), int64(3), object(5)
memory usage: 1.4+ GB
None
20692840
root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

None


Getting unique values

In [10]:
# Pandas
start = time.time()

print(df_pandas['event_type'].unique())

end = time.time()
print(f'Time pandas: {(end - start):.2f} sec')

# PySpark
start = time.time()

print(df_spark.select('event_type').distinct().show())

end = time.time()
print(f'Time spark: {(end - start):.2f} sec')

['view' 'cart' 'remove_from_cart' 'purchase']
Time pandas: 0.72 sec
+----------------+
|      event_type|
+----------------+
|        purchase|
|            view|
|            cart|
|remove_from_cart|
+----------------+

None
Time spark: 5.82 sec


Getting the product IDs when the event type is cart

In [17]:
# Pandas
start = time.time()

print(len(df_pandas[df_pandas['event_type'] == 'cart']['product_id']))

end = time.time()
print(f'Time pandas: {(end - start):.2f} sec')

# PySpark
start = time.time()

print(df_spark.select(['product_id']).filter("event_type='cart'").count())

end = time.time()
print(f'Time spark: {(end - start):.2f} sec')

5768333
Time pandas: 1.48 sec
5768333
Time spark: 4.56 sec


In [19]:
# Pandas
start = time.time()

print(len(df_pandas[df_pandas['event_type'] == 'cart']['product_id']))

end = time.time()
print(f'Time pandas: {(end - start):.2f} sec')

# PySpark
start = time.time()

# Temp view called data of our dataframe
df_spark.createOrReplaceTempView('dataSpark')
spark.sql("select count(*) from dataSpark where event_type = 'cart'").show()

end = time.time()
print(f'Time spark: {(end - start):.2f} sec')

5768333
Time pandas: 1.43 sec
+--------+
|count(1)|
+--------+
| 5768333|
+--------+

Time spark: 4.39 sec
