# Apache Spark 

## Imports

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.ml.feature import Imputer
import pandas as pd

## Task 1: Spark Fundamentals and DataFrames (The pandas Connection)

In [None]:
spark = SparkSession.builder.appName("Example").getOrCreate()

In [None]:
spark

In [None]:
pd_df = pd.read_csv('champions.csv', encoding="utf-16")

In [None]:
spark_df = spark.read.csv('champions.csv', header=True, inferSchema=True, encoding="utf-16")

### Data Summaries

In [None]:
spark_df.printSchema()

In [None]:
spark_df.show()

In [None]:
spark_df.dtypes

In [None]:
spark_df.describe().show()

### Missing Values

In [None]:
for c in spark_df.columns:
    null_count = spark_df.filter(spark_df[c].isNull()).count()
    print(f"{c}: {null_count}")


Drop all missing values

In [None]:
spark_df.na.drop().show()

Drop rows with ALL missing values

In [None]:
spark_df.na.drop(how='all').show()

Remove rows with threshold value of non-nulls required i.e. below, each row should have a minimum of 2 non-null values to be retained.

In [None]:
spark_df.na.drop(how='any', thresh=2).count()

Drop rows with missing values in a selected columns

In [None]:
spark_df.na.drop(how="any", subset=['club']).show()

Fill missing values (data type must match)

In [None]:
spark_df.na.fill('Missing Values', ['country','club']).show()
spark_df.na.fill(0, ["participated"]).show()

Impute values

In [None]:
imputer = Imputer(
    inputCols=['played', 'participated', 'titles'],
    outputCols=["{}_imputed".format(c)
                for c in ['played', 'participated', 'titles']]
).setStrategy("mean")

In [None]:
imputer.fit(spark_df).transform(spark_df).show()

### Column Selection and Manipulation

In [None]:
spark_df.select(["club", "titles"]).sort(["titles", "club"], ascending=[False, True]).show()

In [None]:
spark_df = spark_df.withColumn("win_percentage", func.round(spark_df["win"]/(spark_df["win"] + spark_df["draw"] + spark_df["loss"]), 2))

In [None]:
spark_df.head(3)

In [None]:
spark_df = spark_df.drop("win_percentage")

In [None]:
spark_df.head(3)

In [None]:
spark_df = spark_df.withColumnRenamed("pts", "points")

In [None]:
spark_df.head(3)

### Filter Operations

In [None]:
spark_df.filter("titles>10").select(["club", "country", "titles", "points"]).show()

In [None]:
spark_df.filter(spark_df["titles"]>10).select(["club", "country", "titles", "points"]).show()

In [None]:
spark_df.filter((spark_df["titles"] <= 15) & ((spark_df["titles"] > 5))).select(["club", "country", "titles", "points"]).show()

In [None]:
spark_df.filter((spark_df["titles"] <= 15) & 
                ((spark_df["titles"] > 5)) &
                ~(spark_df["country"] == "GER")).select(
    ["club", "country", "titles", "points"]).show()

### Group By and Aggregate Functions

In [None]:
pd_df[pd_df["participated"] > 25].groupby("country").sum()
spark_df.filter(spark_df["participated"] > 25).groupBy("country").sum().show()

In [None]:
spark_df.filter(spark_df["participated"] > 25).groupBy("country").agg({"titles": "sum"}).show()

In [None]:
spark_df.agg({"goals_for": "max"}).show()

In [None]:
spark_df.groupBy(["country", "club"]).agg({"goals_for": "max"}).orderBy("max(goals_for)", ascending=False).show()

## Task 2: Advanced DataFrame Operations and SQL

### Temporary View

Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates

In [None]:
spark_df.createOrReplaceTempView("champions")

sqlDF = spark.sql("SELECT club FROM champions")
sqlDF.show()

In [None]:
# Create temp view for SQL queries
spark_df.createOrReplaceTempView("champions")

# Use SQL directly (pandas doesn't support this)
result = spark.sql("""
    SELECT country,
           ROUND(AVG(win),0) as avg_wins,
           SUM(participated) as participations
    FROM champions
    GROUP BY country
    ORDER BY avg_wins DESC
""")

result.show()

### Global Temporary View

 If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view.

In [None]:
# Register the DataFrame as a global temporary view
spark_df.createGlobalTempView("champions")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.champions").show()

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.champions").show()

In [None]:
spark.sql("SELECT country, MAX(win) FROM global_temp.champions GROUP BY country ORDER BY max(win) DESC").show()

### Data Sources Integration

In [None]:
# pandas - limited options
pandas_df = pd.read_csv('champions.csv', encoding='utf-16')  # Single file only

# PySpark - handles massive datasets (not here unfortunately)
multi_csv_spark_df = spark.read.csv('champions_split', encoding='utf-16', header=True, inferSchema=True)  # Multiple files
# df = spark.read.csv('hdfs://path/to/files/*.csv')
parquet_spark_df = spark.read.parquet('users.parquet') # Columnar format
# df = spark.read.parquet('s3a://bucket/data/')
json_spark_df = spark.read.json('employees.json')  # Cloud storage (potentially, like below)
# df = spark.read.json('gs://bucket/streaming/data')

In [None]:
pandas_df.head()

In [None]:
multi_csv_spark_df.filter("club == 'FC Vaslui'").show()

In [None]:
parquet_spark_df.show()

In [None]:
json_spark_df.show()

## Â Task 3: Data Processing and ETL Pipelines

### ETL Pipeline Design - Scaling Beyond pandas: pandas Limitations vs Spark Solutions:

In [None]:
# pandas - single machine, memory limitations
def process_data_pandas(csv_data: str):
    df = pd.read_csv(csv_data, encoding='utf-16')  # Might not fit in memory
    df_clean = df.dropna().groupby('country').sum()
    df_clean.to_csv('pd_output.csv')

# PySpark - distributed, handles TB+ data
def process_data_spark(csv_data: str):
    df = spark.read.csv(csv_data, header=True, encoding='utf-16')
    df_clean = df.dropna().groupBy('country').sum()
    # df_clean.write.mode('overwrite').parquet('hdfs://output/')
    df_clean.write.mode('overwrite').parquet('spark_output')

In [67]:
process_data_pandas("champions.csv")

25/10/23 11:44:52 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1080332 ms exceeds timeout 120000 ms
25/10/23 11:44:52 WARN SparkContext: Killing executors is not supported by current scheduler.
25/10/23 12:01:00 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

In [66]:
process_data_spark("champions.csv")