<a href="https://colab.research.google.com/github/its-relative/Kaggle/blob/main/learn_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Libraries


In [3]:
import pyspark
from pyspark.sql import SparkSession

This is an introductory Pyspark hands on tutorial. I have decided to study and summarize some basic concepts of pyspark functions. Hope these can help some else!


# CONTENT TABLE
----------------
## Concepts
* 1) What is Spark \ PySpark
* 2) Characteristics
## Practice
* 1) GETTING RUNNING ON NOTEBOOKS ( Jupyter\Colab\Kaggle)
* 2) READING DATA FROM DIFFERENT SOURCES
* 3) DISPLAYING
* 4) FILTERING
* 5) FILTERING PART 2
* 6) MANIPULATING DATA
* 7) GROUPING BY
* 8) WINDOW FUNCTION
* 9) JOIN
* 10) UDF
* 11) REFERENCES
## Next
* 102 Pyspark - MLlib (Soon!)


# What is Spark \ PySpark


Apache Spark is an open source unified computing engine for distributed data processing on computer clusters, allowing an easy and scalable developments. It supports many programing languages such as Python, R and Scala. Also, includes libraries from SQL and Machine Learning. The structure is designed to deal with huge amount of data distributing large datasets among "computers". Each one of the computers os called a "Executors" and the managemnt of all data is done by a so called "Driver".
Pyspark is a library for python that enables to run python applications in the Apache Spark architeture, hence, allowing parallel distribuiton to cope with large data


# Characteristics
Among the characteristics of Spark, some interesting ones are:
* Fast Processing: High data processing speed that could reach 100x faster in memory and 10x faster on the disk
* Lazy Evaluation: The transformations applied are not done right way. Spark order all transformations in an efficient way under the hood.
* Support Multiple High level languages: Python, R, Scala, Java, SQL.
* Native Libraries: Spark has Machine Learning and Graph libraries that cope with distributed systems (MLlib, SQL, Dataframes, GraphFrames)


--------------------


# GETTING STARTED WITH PYSPARK IN NOTEBOOKS
## Step 1: Install PySpark
Install the PySpark library using pip. Ensure you use the quiet flag to avoid verbose output.

## Step 2: Import Necessary Libraries
Import `SparkSession` from `pyspark.sql` to create your Spark entry point. Also, import `StructType`, `StructField`, `StringType`, `IntegerType` etc. from `pyspark.sql.types` for defining schemas. You may also need `pyspark.sql.functions` as F for various DataFrame operations.

## Step 3: Initialize SparkSession
Create a SparkSession object. This is the main entry point for DataFrame and SQL functionalities.


In [4]:
!pip install pyspark



In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

import pyspark.sql.functions as F

In [7]:
# IMPORTANT: SOME KAGGLE DATA SOURCES ARE PRIVATE
# RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES.
import kagglehub
kagglehub.login()


VBox(children=(HTML(value='<center> <img\nsrc=https://www.kaggle.com/static/images/site-logo.png\nalt=\'Kaggle…

Kaggle credentials set.
Kaggle credentials successfully validated.


In [8]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

titanic_path = kagglehub.competition_download('titanic')
yasserh_wine_quality_dataset_path = kagglehub.dataset_download('yasserh/wine-quality-dataset')

print('Data source import complete.')


Downloading from https://www.kaggle.com/api/v1/competitions/data/download-all/titanic...


100%|██████████| 34.1k/34.1k [00:00<00:00, 34.9MB/s]

Extracting files...





Data source import complete.


In [19]:
spark = SparkSession.builder.getOrCreate()

# READING DATA FROM DIFFERENT SOURCES
## Step 1: Create a PySpark DataFrame from example data
Define a schema for your data using `StructType` and `StructField`. Then, create an example dataset as a list of tuples. Finally, use `spark.createDataFrame()` to create a PySpark DataFrame from this data and schema.

## Step 2: Read a CSV file into a PySpark DataFrame
Code to read a CSV file. Remember to specify options like `header=True` to indicate the first row as headers and `inferSchema=True` to automatically detect column data types.

## Step 3: Convert a Pandas DataFrame to a PySpark DataFrame
First, create a Pandas DataFrame with some sample data. Then, convert this Pandas DataFrame into a PySpark DataFrame. Be cautious with large datasets as `toPandas()` can cause memory issues.


In [20]:
example_data = [(100,"Brazil","1000","A100",6),
    (101,"Spain","2000","MA100",2),
    (102,"EUA","3000","A200",10),
    (110,"Mexico","B100","F400",8),
    (200,"Japan","5000","A100",9),
    (880,"EUA","500","Z120",1)
  ]

schema = StructType([StructField('key', IntegerType(), True),
                     StructField('C1', StringType(),True),
                      StructField("C2", StringType(), True),
                      StructField("C3", StringType(),True),
                      StructField("C4", IntegerType(), True)])
# schema = StructType([StructField('key', IntegerType(), True),
#                     StructField('C1', StringType(), True),
#                     StructField('C2', StringType(), True),
#                     StructField('C3', StringType(), True),
#                     StructField('C4', IntegerType(), True)])

df1 = spark.createDataFrame(data=example_data, schema = schema)

In [21]:
#Toy dataset for example
example_data_2 = [(100,"BB",10),
    (101,"XX",99),
    (102,"AN",898),
    (110,"AC",567),
    (200,"AV",344),
    (300,"FV",834),
    (111,"ZW",54)
  ]

schema2 = StructType([StructField("key",IntegerType(), True),StructField("C1", StringType(), True), StructField("C3", IntegerType(), True)])

df2 = spark.createDataFrame(example_data_2 ,  schema2)

# DISPLAYING DATAFRAME INFORMATION
## Step 1: Display the content of your DataFrame
Use the `.show()` method to display the rows of your DataFrame. Experiment with the `numRows` parameter to control how many rows are shown.

## Step 2: Check the schema and data types
Utilize the `.printSchema()` method to see the column names and their inferred data types in your DataFrame.

## Step 3: Get column names
Access the column names of your DataFrame.

## Step 4: Analyze descriptive statistics
Use the `.describe()` method to view summary statistics (count, mean, stddev, min, max) for numerical columns in your DataFrame.

## Step 5: Count rows and columns
Determine the number of rows and columns in your DataFrame.

## Step 6: Create a cross-tabulation (crosstab)
Generate a frequency table for two or more columns using `.crosstab()` or similar grouping operations.

## Step 7: Select specific columns
Select a subset of columns from your DataFrame.

## Step 8: Order your DataFrame by a column
Sort the DataFrame rows based on the values in one or more columns, in ascending or descending order.

## Step 9: Drop variables (columns)
Remove unwanted columns from your DataFrame.


In [23]:
df1.show(5)

+---+------+----+-----+---+
|key|    C1|  C2|   C3| C4|
+---+------+----+-----+---+
|100|Brazil|1000| A100|  6|
|101| Spain|2000|MA100|  2|
|102|   EUA|3000| A200| 10|
|110|Mexico|B100| F400|  8|
|200| Japan|5000| A100|  9|
+---+------+----+-----+---+
only showing top 5 rows



In [24]:
df2.printSchema()

root
 |-- key: integer (nullable = true)
 |-- C1: string (nullable = true)
 |-- C3: integer (nullable = true)



In [25]:
df1.columns

['key', 'C1', 'C2', 'C3', 'C4']

In [26]:
df1.printSchema()

root
 |-- key: integer (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: string (nullable = true)
 |-- C4: integer (nullable = true)



In [37]:
num_cols = [col for col, dtype in df1.dtypes if dtype in ['int', 'double', 'float', 'long']]
# df1[num_cols].describe().show()
df1.select(num_cols).describe().show()
# num_cols

+-------+------------------+------------------+
|summary|               key|                C4|
+-------+------------------+------------------+
|  count|                 6|                 6|
|   mean|248.83333333333334|               6.0|
| stddev|311.63980276381045|3.7416573867739413|
|    min|               100|                 1|
|    max|               880|                10|
+-------+------------------+------------------+



In [40]:
print("Number of rows: \n")
print(df1.count())
print("Number of columns : \n")
print(len(df1.columns))

Number of rows: 

6
Number of columns : 

5


In [42]:
df1.crosstab("key", "C4").show()

+------+---+---+---+---+---+---+
|key_C4|  1| 10|  2|  6|  8|  9|
+------+---+---+---+---+---+---+
|   200|  0|  0|  0|  0|  0|  1|
|   101|  0|  0|  1|  0|  0|  0|
|   880|  1|  0|  0|  0|  0|  0|
|   110|  0|  0|  0|  0|  1|  0|
|   100|  0|  0|  0|  1|  0|  0|
|   102|  0|  1|  0|  0|  0|  0|
+------+---+---+---+---+---+---+



In [45]:
df1.select("key", "C4").show()

+---+---+
|key| C4|
+---+---+
|100|  6|
|101|  2|
|102| 10|
|110|  8|
|200|  9|
|880|  1|
+---+---+



In [50]:
# df1.crosstab("key", "C4").orderBy("key_C4", ascending =True).show()
df1.crosstab("key", "C4").orderBy("key_C4", ascending = Falseb).show()
# df1.orderBy()

+------+---+---+---+---+---+---+
|key_C4|  1| 10|  2|  6|  8|  9|
+------+---+---+---+---+---+---+
|   880|  1|  0|  0|  0|  0|  0|
|   200|  0|  0|  0|  0|  0|  1|
|   110|  0|  0|  0|  0|  1|  0|
|   102|  0|  1|  0|  0|  0|  0|
|   101|  0|  0|  1|  0|  0|  0|
|   100|  0|  0|  0|  1|  0|  0|
+------+---+---+---+---+---+---+



In [53]:
df2.crosstab("key","C3").drop("344").show()

+------+---+---+---+---+---+---+
|key_C3| 10| 54|567|834|898| 99|
+------+---+---+---+---+---+---+
|   200|  0|  0|  0|  0|  0|  0|
|   101|  0|  0|  0|  0|  0|  1|
|   110|  0|  0|  1|  0|  0|  0|
|   300|  0|  0|  0|  1|  0|  0|
|   100|  1|  0|  0|  0|  0|  0|
|   102|  0|  0|  0|  0|  1|  0|
|   111|  0|  1|  0|  0|  0|  0|
+------+---+---+---+---+---+---+



# FILTERING DATAFRAME ROWS
To filter data based on conditions, you will primarily use `pyspark.sql.functions`.
Refer to the [SQL Functions in Spark](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.functions.arrays_zip) documentation for a full list of available functions.

## Step 1: Filter using a single numerical condition
Filter the DataFrame where a numerical column is greater than a specific value. For example, filter where 'fixed acidity' > 5.

## Step 2: Filter using another single numerical condition
Filter the DataFrame where a numerical column is less than a specific value. For example, filter where 'fixed acidity' < 5.

## Step 3: Filter using multiple conditions (AND operator)
Combine multiple conditions using the AND (`&`) operator. For example, filter where 'fixed acidity' < 10 AND 'quality' > 5.

## Step 4: Filter using multiple conditions (OR operator)
Combine multiple conditions using the OR (`|`) operator. For example, filter where 'quality' > 4 OR 'quality' < 8.

## Step 5: Filter using a list of values (isin)
Filter rows where a column's value is present in a given list. For example, filter where 'quality' is 5, 6, or 7.


# ADVANCED FILTERING TECHNIQUES
## Step 1: Filter by categorical columns with multiple conditions
Filter the DataFrame based on conditions involving categorical columns. For instance, filter where 'Embarked' is 'S' AND 'Sex' is 'female'.

## Step 2: Filter with exclusion (NOT operator)
Exclude rows that meet a certain condition using the NOT (`~`) operator. For example, filter where 'Embarked' is NOT 'S'.

## Step 3: Filter with exclusion using SQL syntax
Demonstrate filtering using SQL-like syntax for exclusion. For example, filter where 'Embarked' is not 'C' or 'Q'.

## Step 4: Filter rows where a string column starts with a specific substring
Filter the DataFrame where a string column (e.g., 'Cabin') starts with 'A'.

## Step 5: Filter rows where a string column ends with a specific substring
Filter the DataFrame where a string column (e.g., 'Cabin') ends with '3'.

## Step 6: Filter rows where a string column contains a specific substring
Filter the DataFrame where a string column (e.g., 'Name') contains 'Miss'.

## Step 7: Filter using SQL `LIKE` operator (REGEX)
Filter the DataFrame using regular expressions with the `like` operator. For example, filter where 'Name' contains 'John'.

## Step 8: Filter for null values
Identify and filter rows where a specific column (e.g., 'Cabin') has null values.

## Step 9: Filter for non-null values
Identify and filter rows where a specific column (e.g., 'Cabin') does NOT have null values.


# MANIPULATING DATA IN DATAFRAMES
This section covers creating new columns, modifying existing ones, and changing data types. Key functions include `withColumn()`, `F.round()`, and `F.when()`.

## Step 1: Create a new column by adding two existing columns
Add the values of two numerical columns (e.g., 'volatile acidity' and 'citric acid') to create a new column.

## Step 2: Create a new column with a constant value
Add a new column to your DataFrame where all values are a constant (e.g., a dummy column with ones).

## Step 3: Round a numerical column to a specific decimal place
Round the values in a numerical column (e.g., 'pH') to one decimal place.

## Step 4: Implement conditional logic for a new column (F.when())
Create a new column ('Survived_TEXT') based on a conditional statement on an existing column ('Survived'). For instance, if 'Survived' is 0, set 'Survived_TEXT' to 'NO', otherwise 'YES'.

## Step 5: Implement multiple conditional logic for a new column
Apply multiple `F.when()` conditions to create a new categorical column ('COD_EMBARKED') based on values in 'Embarked'. For example, 'C' maps to 'YES_C', 'S' maps to 'NO_S', and others map to 'MAYBE_OTHER'.

## Step 6: Change the data type of columns
Import `pyspark.sql.types` as `T` to access data types like `DoubleType`, `IntegerType`, `StringType`, etc.
Change the data type of an existing column (e.g., 'Survived' to DoubleType, 'Age' to StringType). Remember to print the schema before and after the change to verify.


- Conditionals (F.when('Condition', value ))


- Changing type of variables (importing the sql types module as T)
- DoubleType, IntegerType, StringType...


# GROUPING AND AGGREGATING DATA
This section covers various grouping operations including counting, aggregating with multiple functions, and pivot tables.

## Step 1: Count occurrences after grouping by multiple columns
Group your DataFrame by two categorical columns (e.g., 'Sex' and 'Embarked') and then count the number of occurrences for each group.

## Step 2: Order grouped counts
Extend the previous step by ordering the grouped counts based on one or more of the grouping columns.

## Step 3: Aggregate multiple statistics with renaming
Group by a column (e.g., 'Survived') and calculate multiple aggregation functions (mean, max, min) on another column (e.g., 'Age'). Alias the resulting aggregated columns for clarity.

## Step 4: Create a Pivot Table
Use the `pivot` function to reshape your DataFrame. Group by one column (e.g., 'Embarked'), pivot by another (e.g., 'Sex' with specified categories 'male', 'female'), and then apply an aggregation function (e.g., `F.sum('Survived')`, `F.count('Survived')`, `F.mean('Survived')`).


# WINDOW FUNCTIONS IN PYSPARK
Window functions allow you to perform calculations across a set of DataFrame rows that are related to the current row. This is similar to `GROUP BY` but returns a value for each row, rather than a single aggregated value for a group.
To use window functions, import `Window` from `pyspark.sql`.
More details: [Pyspark Window Functions](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.Window)

## Step 1: Assign a row number
Use `F.row_number().over(Window.orderBy('column'))` to assign a unique sequential number to each row within a window, ordered by a specified column.

## Step 2: Calculate rank over partitions
Calculate the rank of rows within partitions (groups) defined by one or more columns (e.g., 'Country'), ordered by another column (e.g., 'value'). Use `Window.partitionBy('column').orderBy('column')`.

## Step 3: Get the previous value (LAG)
Retrieve the value from a preceding row using `F.lag('column', offset).over(window_spec)`. Partition and order your window appropriately (e.g., by 'Country' and 'id').

## Step 4: Get the next value (LEAD)
Retrieve the value from a subsequent row using `F.lead('column', offset).over(window_spec)`. Partition and order your window as needed.

## Step 5: Divide data into N tiles (NTILE)
Distribute rows into a specified number of groups (tiles) based on the order of a column. Use `F.n_tile(n).over(window_spec)`.

## Step 6: Calculate sum over a rolling window (RangeBetween)
Compute a sum over a specified range of rows relative to the current row (e.g., the previous row, current row, and next row). Use `Window.orderBy('column').rowsBetween(start, end)` where -1 is previous, 0 is current, 1 is next, etc.


# JOINING DATAFRAMES
Joining DataFrames is fundamental for combining data from different sources. PySpark offers various join types similar to SQL.
Available join types: Inner (default), Cross, Outer, Left, Right, Left anti, Left semi.
More details: [Pyspark Joins](https://sparkbyexamples.com/pyspark/pyspark-join-types/)

## Step 1: Perform an Inner Join
Join two DataFrames (`df_data` and `df_data_2`) on a common key (e.g., 'id') using an inner join. This will return only the rows where the join key exists in both DataFrames.

## Step 2: Perform a Left Join
Execute a left join. This will return all rows from the left DataFrame (`df_data`) and the matching rows from the right DataFrame (`df_data_2`). If there's no match, the columns from the right DataFrame will be null.

## Step 3: Perform a Right Join
Execute a right join. This will return all rows from the right DataFrame (`df_data_2`) and the matching rows from the left DataFrame (`df_data`). If there's no match, the columns from the left DataFrame will be null.

## Step 4: Perform a Cross Join
Perform a cross join between two DataFrames. This join returns the Cartesian product of rows from both DataFrames (every row from the first DataFrame is combined with every row from the second).


# USER DEFINED FUNCTIONS (UDFs)
UDFs allow you to use custom Python functions directly within PySpark DataFrames. This is useful for operations that are not natively supported by Spark's built-in functions.
To use UDFs, import `udf` from `pyspark.sql.functions` and relevant `DataType` from `pyspark.sql.types`.
More details: [UDF](https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/)

## Step 1: Create a simple UDF
Define a Python function (e.g., `price_to_category`) that takes a value and returns a categorized string ('Low', 'Medium', 'High'). Register this Python function as a PySpark UDF, specifying its return type (e.g., `StringType()`). Then, apply this UDF to a column in your DataFrame (e.g., 'Price') to create a new categorized column.


# REFERENCES
* [Pyspark: The Definitive Guide](https://www.amazon.com/Spark-Definitive-Guide-Unified-Analytics/dp/1491976707)
* [Spark official webpage](https://spark.apache.org/docs/latest/)
* [Pyspark official webpage](https://spark.apache.org/docs/latest/api/python/index.html)
* [IBM tutorial on Pyspark](https://www.ibm.com/cloud/blog/apache-spark-for-beginners)
* [Databricks tutorial](https://databricks.com/glossary/what-is-pyspark)
* [Spark by examples](https://sparkbyexamples.com/pyspark-tutorial/)
