## PySpark

### Pandas-Friendly Big Data Processing with Spark

In [None]:
!pip install "pyspark[pandas_on_spark]"

Spark enables scaling of your pandas workloads across multiple nodes. However, learning PySpark syntax can be daunting for pandas users. 

Pandas API on Spark enables leveraging Spark's capabilities for big data while retaining a familiar pandas-like syntax.

The following code compares the syntax between PySpark and the Pandas API on Spark.

In [None]:
import warnings

warnings.simplefilter(action="ignore", category=FutureWarning)


Pandas API on Spark:

In [None]:
import numpy as np
import pyspark.pandas as ps


In [None]:
psdf = ps.DataFrame(
    {
        "A": ["foo", "bar", "foo"],
        "B": ["one", "one", "two"],
        "C": [0.1, 0.3, 0.5],
        "D": [0.2, 0.4, 0.6],
    }
)


In [None]:
psdf.sort_values(by='B')


Unnamed: 0,A,B,C,D
0,foo,one,0.1,0.2
1,bar,one,0.3,0.4
2,foo,two,0.5,0.6


In [None]:
psdf.groupby('A').sum()


Unnamed: 0_level_0,C,D
A,Unnamed: 1_level_1,Unnamed: 2_level_1
foo,0.6,0.8
bar,0.3,0.4


In [None]:
psdf.query("C > 0.4")

Unnamed: 0,A,B,C,D
2,foo,two,0.5,0.6


In [None]:
psdf[["C", "D"]].abs()

Unnamed: 0,C,D
0,0.1,0.2
1,0.3,0.4
2,0.5,0.6


PySpark:

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import abs
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
spark_data = spark.createDataFrame([
    ("foo", "one", 0.1, 0.2),
    ("bar", "one", 0.3, 0.4),
    ("foo", "two", 0.5, 0.6),
], ["A", "B", "C", "D"])

In [None]:
spark_data.sort(col('B')).show()

+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|foo|one|0.1|0.2|
|bar|one|0.3|0.4|
|foo|two|0.5|0.6|
+---+---+---+---+



In [None]:
spark_data.groupBy('A').sum().show()

[Stage 25:>                                                         (0 + 8) / 8]

+---+------+------+
|  A|sum(C)|sum(D)|
+---+------+------+
|foo|   0.6|   0.8|
|bar|   0.3|   0.4|
+---+------+------+



                                                                                

In [None]:
spark_data.filter(col('C') > 0.4).show()

+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|foo|two|0.5|0.6|
+---+---+---+---+



In [None]:
spark_data.select(abs(spark_data["C"]).alias("C"), abs(spark_data["D"]).alias("D"))


DataFrame[C: double, D: double]

### PySpark SQL: Enhancing Reusability with Parameterized Queries

In [None]:
!pip install "pyspark[sql]"

In PySpark, parametrized queries enable the same query structure to be reused with different inputs, without rewriting the SQL.

Additionally, they safeguard against SQL injection attacks by treating input data as parameters rather than as executable code.

In [2]:
from pyspark.sql import SparkSession
import pandas as pd 

spark = SparkSession.builder.getOrCreate()

In [4]:
# Create a Spark DataFrame
item_price_pandas = pd.DataFrame({"item_id": [1, 2, 3, 4], "price": [4, 2, 5, 1]})
item_price = spark.createDataFrame(item_price_pandas)
item_price.show()

                                                                                

+-------+-----+
|item_id|price|
+-------+-----+
|      1|    4|
|      2|    2|
|      3|    5|
|      4|    1|
+-------+-----+



24/01/08 22:15:24 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 976371 ms exceeds timeout 120000 ms
24/01/08 22:15:24 WARN SparkContext: Killing executors is not supported by current scheduler.
24/01/08 22:15:28 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [16]:
query = """SELECT item_id, price 
FROM {item_price} 
WHERE item_id = {id_val} 
"""

spark.sql(query, id_val=1, item_price=item_price).show()

                                                                                

+-------+-----+
|item_id|price|
+-------+-----+
|      1|    4|
+-------+-----+



In [17]:
spark.sql(query, id_val=2, item_price=item_price).show()

+-------+-----+
|item_id|price|
+-------+-----+
|      2|    2|
+-------+-----+

