# PySpark

This lectures provides an overview of PySpark.

There are two ways to run the content in this server: connect to a remote server or run via a local instance. These options can be controlled via the variable `USE_REMOTE`. 

If you want to run your code directly via a .py file, you would use the following command:

```python
spark-submit --master spark://your-server-ip:7077 your_script.py
```

In [None]:
# If connecting to a remote server
# !pip install "pyspark[connect]"==4.0.1 # very lightweight

# If connecting to a local, dev, instance
#!pip install pyspark==4.0.1 # bigger download

In [None]:
import getpass
from pyspark.sql import SparkSession
import pandas as pd

In [None]:
# Remote server
MASTER_IP = "51.222.140.217"
REMOTE_SERVER = f"sc://{MASTER_IP}/15002"

USE_REMOTE = False 

In [None]:
f"Dashboard available at http://{MASTER_IP}:8080/" if USE_REMOTE else f"Dashboard available at http://localhost:4040/"

In [None]:
# Note that spark.sql.ansi.enabled is set to false to allow 'Pandas on Spark' to work correctly later in the lecture
if USE_REMOTE:
    spark = SparkSession.builder.remote(MASTER_URL).appName(f"job_id_{getpass.getuser()}").config("spark.sql.ansi.enabled", "false").getOrCreate()
else:
    spark = SparkSession.builder.master("local[*]").appName(f"job_id_{getpass.getuser()}").config("spark.sql.ansi.enabled", "false") .getOrCreate()

In [None]:
# Test it
df = spark.range(1000)
print(df.count())

Notes:

Remote `connect server` for Spark may be started via `/opt/spark/sbin/start-connect-server.sh`

If no remote installation exists, the following script will install it:
```bash
wget https://archive.apache.org/dist/spark/spark-4.0.1/spark-4.0.1-bin-hadoop3.tgz
tar -zxf spark-4.0.1-bin-hadoop3.tgz 
sudo mv spark-4.0.1-bin-hadoop3 /opt/spark
rm spark-4.0.1-bin-hadoop3.tgz
```

In [None]:
df.explain()

## Spark distributes data

Read via Pandas for comparison

In [None]:
%%time
trades_df = pd.read_csv("../../datasets/market_data/trades_2025-09-10_AAPLMSFT_sorted.csv.gz")

In [None]:
type(trades_df)

In [None]:
trades_df.columns

In [None]:
%%time
len(trades_df)

In [None]:
del trades_df

Now read via Pyspark

In [None]:
%%time
trades_df = spark.read.csv("../../datasets/market_data/trades_2025-09-10_AAPLMSFT_sorted.csv.gz", header=True, inferSchema=True)

In [None]:
type(trades_df)

In [None]:
trades_df.columns

In [None]:
%%time
trades_df.count()

In [None]:
%%time
trades_df.show(5)

Note that Spark data is generally distributed and operations on it are also distributed

In [None]:
trades_df.rdd.getNumPartitions()

#### Pick specific columns

In [None]:
trades_df.select('ticker', 'price', 'size').show(5)

#### Pick rows

In [None]:
trades_df.filter(trades_df.size > 200).show(5)

#### Many operations in Spark are **lazy**, they don't execute until they are actually needed

This allows Spark to look at several operatoins together and possibly optimize them  
Notice the difference in execution speed

In [None]:
%%time
trades_df.groupby('ticker').mean()

In [None]:
%%time
trades_df.groupby('ticker').mean().show(5)

Generally speaking, function calls, such as `filter` or `select` don't actually transform the data! Only when data is actually demanded, by functions such as `show` or `collect`, does Pyspark optimize the built up commands and executes them.

#### Spark DataFrames are built on top of RDD (Resilient Distributed Datasets)

In [None]:
type(trades_df.rdd)

An RDD is assumed to be distributed by default. It is a low level datastructure which is no longer used directly.

### SQL is built into Spark!

In [None]:
trades_df.createOrReplaceTempView("trades_df")

In [None]:
spark.sql("SELECT ticker, mean(price), mean(size) from trades_df group by ticker").show()

### ... actually Pandas is also built into Spark
(although it is not as featureful as Spark's built-in Dataframes)

In [None]:
import pyspark.pandas as ps

In [None]:
trades_ds = ps.read_csv("../../datasets/market_data/trades_2025-09-10_AAPLMSFT_sorted.csv.gz")
trades_ds.head()

In [None]:
type(trades_ds)

In [None]:
trades_ds.ticker.value_counts()

## Machine learning in PySpark
Think of this as distributed scikit-learn!

Note that PySpark has has a set of libraries under `pyspark.mllib.*` and `pyspark.ml.*`. The _mllib_ set of packages are in maintenance mode. They were designed to work with RDDs, a predecessor to Dataframes. 

In [None]:
import seaborn as sns
titanic_df = sns.load_dataset('titanic')

#only keep numeric columns (for simplicity)
titanic_onlynum_df = titanic_df.drop(['sex', 'embarked', 'class', 'who', 'deck', 'embark_town', 'alive'], axis=1)

#remove na, nan, etc.
titanic_onlynum_noempty_df = titanic_onlynum_df.dropna()
titanic_df = spark.createDataFrame(titanic_onlynum_noempty_df)

In [None]:
titanic_df.show(5)

_pclass_: Ticket class (1st, 2nd, 3rd)  
_sibsp_: Number of siblings/spouses on board  
_parch_: Number of parents/children onboard

Cast booleans to integers  
(recall that scikit-learn was able to work with `bool` values directly, could it be because PySpark is actually a layer on top of Scala, which is a layer on top of Java, which does not have a way to automatically translate booleans to integers?)

In [None]:
from pyspark.sql.functions import array, col

titanic_df = titanic_df.withColumn('adult_male', col('adult_male').cast('int')).withColumn('alone', col('alone').cast('int'))
titanic_df.show(5)

Recall that in scikit-learn, the `.fit()` function takes two arguments for supervised learning algorithms `X` and `y`. Pyspark does things differently. It wants a single dataframe where the features are a single column and the target is a separate column.

In [None]:
from pyspark.ml.feature import VectorAssembler

feature_cols = ['pclass', 'age', 'sibsp', 'parch', 'fare', 'adult_male', 'alone']

assembler = VectorAssembler(
    inputCols = feature_columns,
    outputCol = "features"
)

assembler

In [None]:
titanic_ml_df = assembler.transform(titanic_df).drop(*feature_cols)

titanic_ml_df.show(5)

#### Split test/train

Surprisingly, _many_ pyspark examples in their official documentation or online blogs don't do a test/train split!

In [None]:
(train_data, test_data) = titanic_ml_df.randomSplit([0.8, 0.2], seed=42)

train_data.count(), test_data.count()

#### Pyspark also has the concept of pipelines

(although we don't use it here)

```python
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, decision_tree_classifier])
```

Example taken from https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression

#### Train the model

In [None]:
from pyspark.ml.classification import LogisticRegression

model = LogisticRegression(featuresCol="features", labelCol="survived").fit(train_data)

model

#### Run the model

In [None]:
predictions = model.transform(testData)

In [None]:
predictions.show(5)

#### Evaluate the model

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
#from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol="survived", metricName="areaUnderPR")

eval_metric = evaluator.evaluate(predictions)
eval_metric
