# pyspark-demo

In [1]:
%%capture
%pip install -e .

In [2]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.types import DecimalType, IntegerType, StringType, StructType, StructField
from pyspark_utils import get_all_rows_joins_unsafe

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Demo')\
        .getOrCreate()

**1. Product DataFrame**

In [3]:
product_schema = StructType(fields=[
    StructField('id', IntegerType(), False),
    StructField('product_name', StringType(), False),
    StructField('price', DecimalType(6,2), True),
    StructField('number_available_in_stock', IntegerType(), True)
])
product_csv = os.path.join("sample_data", "product.csv")
product_df = spark.read.csv(product_csv, schema=product_schema, header=True)
product_df.show(truncate=False)

+---+------------+-----+-------------------------+
|id |product_name|price|number_available_in_stock|
+---+------------+-----+-------------------------+
|1  |Product_1   |10.09|5                        |
|2  |Product_2   |50.00|NULL                     |
|3  |Product_3   |20.23|0                        |
|4  |Product_4   |NULL |NULL                     |
|5  |Product_5   |12.40|9                        |
|6  |Product_6   |NULL |2                        |
+---+------------+-----+-------------------------+



**2. Category DataFrame**

In [4]:
category_schema = StructType(fields=[
    StructField('id', IntegerType(), False),
    StructField('category_name', StringType(), False),
    StructField('description', StringType(), True),
])
category_csv = os.path.join("sample_data", "category.csv")
category_df = spark.read.csv(category_csv, schema=category_schema, header=True)
category_df.show(truncate=False)

+---+-------------+----------------+
|id |category_name|description     |
+---+-------------+----------------+
|1  |Category_1   |First category. |
|2  |Category_2   |Second category.|
|3  |Category_3   |NULL            |
|4  |Category_4   |NULL            |
+---+-------------+----------------+



**3. ProductCategory (Joins) DataFrame**

In [5]:
product_category_schema = StructType(fields=[
    StructField('product_id', IntegerType(), False),
    StructField('category_id', IntegerType(), False),
])
product_category_csv = os.path.join("sample_data", "product_category.csv")
product_category_df = spark.read.csv(product_category_csv, schema=product_category_schema, header=True)
product_category_df.show(truncate=False)

+----------+-----------+
|product_id|category_id|
+----------+-----------+
|1         |3          |
|1         |4          |
|2         |3          |
|3         |1          |
|5         |4          |
+----------+-----------+



**4. Use fucntion `get_all_rows_joins_unsafe` for get new dataframe with `product_name`, `category_name` columns. You can see the result below.**

In [6]:
print('get_all_rows_joins_unsafe:\n\n', get_all_rows_joins_unsafe.__doc__.strip())
df = get_all_rows_joins_unsafe(
    product_df, category_df, product_category_df, 
    join_df1_pk="product_id", 
    join_df2_pk="category_id", 
    columns=("product_name", "category_name"),
    mode="left"
)
df.show()

get_all_rows_joins_unsafe:

 Get all rows according to the given joins between dataframes.

    Return:
        DataFrame

    Note:
        The '_unsafe' suffix means that input arguments are not validated and an exception may be thrown.

        The `mode` argument specifies the dataframe relative to which the rows will be retrieved.
+------------+-------------+
|product_name|category_name|
+------------+-------------+
|   Product_1|   Category_4|
|   Product_1|   Category_3|
|   Product_2|   Category_3|
|   Product_3|   Category_1|
|   Product_4|         NULL|
|   Product_5|   Category_4|
|   Product_6|         NULL|
+------------+-------------+



In [7]:
spark.stop()