# Imports & Configuration

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
spark_path = "E:\\Spark\\spark-3.2.0-bin-hadoop2.7"

In [7]:
import findspark
findspark.init(spark_path)

In [None]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

In [None]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [8]:
spark = (
    SparkSession
    .builder
    .config("spark.driver.memory", "10g")
    .config("spark.sql.files.maxPartitionBytes", "268435456")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .master("local[*]")
    .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("ERROR")

<h1> Topics </h1>

1. Reading Files (parquet)
2. Narrow Operations
   - `filter`
   - `withColumn`: adding/modifying a column
   - `select`: selecting relevant columns
3. Wide Operations
   - Joins
     - Sort-Merge Join
     - Broadcast Join
   - GroupBy
     - `count` & `sum`
     - `countDistinct`

# Reading File

In [10]:
transactions_file = "E:/PySpark/data/transactions.parquet"
df_transactions = spark.read.parquet(transactions_file)

In [11]:
df_transactions.rdd.getNumPartitions()

8

In [12]:
df_transactions.show(5, False)

+----------+----------+----------+---------------+----------+----+-----+---+-------------+-----+------------+
|cust_id   |start_date|end_date  |txn_id         |date      |year|month|day|expense_type |amt  |city        |
+----------+----------+----------+---------------+----------+----+-----+---+-------------+-----+------------+
|CCWG6X0ANH|2010-07-01|2019-10-01|TXCC7I6BMRX3XIT|2015-06-27|2015|6    |27 |Entertainment|35.18|denver      |
|CCWG6X0ANH|2010-07-01|2019-10-01|TGW6THZKM8C6SUF|2016-08-02|2016|8    |2  |Entertainment|58.15|new_york    |
|CCWG6X0ANH|2010-07-01|2019-10-01|TOLMMCXS90H2K31|2014-09-17|2014|9    |17 |Groceries    |43.92|boston      |
|CCWG6X0ANH|2010-07-01|2019-10-01|TXLH0P8JII96Q7U|2012-10-05|2012|10   |5  |Entertainment|35.2 |philadelphia|
|CCWG6X0ANH|2010-07-01|2019-10-01|TP6RDK6P58M2RO4|2015-10-21|2015|10   |21 |Entertainment|47.44|chicago     |
+----------+----------+----------+---------------+----------+----+-----+---+-------------+-----+------------+
only showi

In [13]:
customers_file = "E:/PySpark/data/customers.parquet"
df_customers = spark.read.parquet(customers_file)

In [14]:
df_customers.show(5, False)

+----------+-------------+---+------+----------+-----+-----------+
|cust_id   |name         |age|gender|birthday  |zip  |city       |
+----------+-------------+---+------+----------+-----+-----------+
|C007YEYTX9|Aaron Abbott |34 |Female|7/13/1991 |97823|boston     |
|C00B971T1J|Aaron Austin |37 |Female|12/16/2004|30332|chicago    |
|C00WRSJF1Q|Aaron Barnes |29 |Female|3/11/1977 |23451|denver     |
|C01AZWQMF3|Aaron Barrett|31 |Male  |7/9/1998  |46613|los_angeles|
|C01BKUFRHA|Aaron Becker |54 |Male  |11/24/1979|40284|san_diego  |
+----------+-------------+---+------+----------+-----+-----------+
only showing top 5 rows



# Narrow Transformations
- `filter` rows where `city='boston'`
- `add` a new column: adding `first_name` and `last_name`
- `alter` an exisitng column: adding 5 to `age` column
- `select` relevant columns

In [15]:
df_narrow_transform = (
    df_customers
    .filter(F.col("city") == "boston")
    .withColumn("first_name", F.split("name", " ").getItem(0))
    .withColumn("last_name", F.split("name", " ").getItem(1))
    .withColumn("age", F.col("age") + F.lit(5))
    .select("cust_id", "first_name", "last_name", "age", "gender", "birthday")
)

df_narrow_transform.write.format("noop").mode("overwrite").save("E:/PySpark/data/test/df_narrow_transform.parquet")

In [16]:
df_narrow_transform.show(7, False)

+----------+----------+---------+----+------+---------+
|cust_id   |first_name|last_name|age |gender|birthday |
+----------+----------+---------+----+------+---------+
|C007YEYTX9|Aaron     |Abbott   |39.0|Female|7/13/1991|
|C08XAQUY73|Aaron     |Lambert  |59.0|Female|11/5/1966|
|C094P1VXF9|Aaron     |Lindsey  |29.0|Male  |9/21/1990|
|C097SHE1EF|Aaron     |Lopez    |27.0|Female|4/18/2001|
|C0DTC6436T|Aaron     |Schwartz |57.0|Female|7/9/1962 |
|C0R42FPHRH|Abbie     |Reyes    |68.0|Male  |10/8/1995|
|C0RZV4BH7T|Abbie     |Stevenson|41.0|Male  |2/10/1971|
+----------+----------+---------+----+------+---------+
only showing top 7 rows



In [17]:
df_customer_gt_50 = (
    df_customers
    .filter(F.col("age").cast("int") > 50)
)
df_customer_gt_50.write.format("noop").mode("overwrite").save("../data/test/df_customer_gt_50.parquet")

# Wide Transformations
1. Joins
   - Sort Merge Join
   - Broadcast Join
2. GroupBy
   - `count`
   - `countDistinct`
   - `sum`

## 1. Joins

### Sort Merge Join

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [None]:
df_joined = (
    df_transactions.join(
        df_customers,
        how="inner",
        on="cust_id"
    )
)

In [None]:
df_joined.write.format("noop").mode("overwrite").save("E:/PySpark/data/test/df_joined.parquet")

In [None]:
df_joined.explain(True)

### Broadcast Join

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)

In [None]:
df_broadcast_joined = (
    df_transactions.join(
        F.broadcast(df_customers),
        how="inner",
        on="cust_id"
    )
)

In [None]:
df_broadcast_joined.write.format("noop").mode("overwrite").save("E:/PySpark/data/test/df_broadcast_joined.parquet")

## 2. GroupBy

### GroupBy Count

In [None]:
df_city_counts = (
    df_transactions
    .groupBy("city")
    .count()
)

In [None]:
df_city_counts.show(5, False)

In [None]:
df_txn_amt_city = (
    df_transactions
    .groupBy("city")
    .agg(F.sum("amt").alias("txn_amt"))
)

In [None]:
df_txn_amt_city.show(5, False)

### GroupBy Count Distinct 

In [None]:
df_txn_per_city = (
    df_transactions
    .groupBy("city")
    .agg(F.countDistinct("txn_id").alias("txn_count"))
)

In [None]:
df_txn_per_city.show(5, False)

In [None]:
spark.stop()