# Imports & Configuration

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [3]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

In [None]:
# %pip install pyspark

In [4]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [5]:
spark = (
    SparkSession
    .builder
    .config("spark.driver.memory", "10g")
    .config("spark.sql.files.maxPartitionBytes", "268435456")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .master("local[*]")
    .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("ERROR")

<h1> Topics </h1>

1. Reading Files (parquet)
2. Narrow Operations
   - `filter`
   - `withColumn`: adding/modifying a column
   - `select`: selecting relevant column
3. Wide Operations
   - Joins
     - Sort Merge Join
     - Broadcast Join
   - GroupBy
     - `count`
     - `sum`
     - `countDistinct`

# Reading File

In [None]:
!java -version

In [8]:
transactions_file = "../data/data_skew/transactions.parquet"
df_transactions = spark.read.parquet(transactions_file)

In [9]:
df_transactions.rdd.getNumPartitions()

1

In [11]:
df_transactions.show(50, False)

+-------+------+------+-----------+
|cust_id|txn_id|amt   |city       |
+-------+------+------+-----------+
|1      |1     |798.58|chicago    |
|1      |2     |781.89|miami      |
|1      |3     |164.46|chicago    |
|1      |4     |108.98|chicago    |
|1      |5     |867.51|los angeles|
|1      |6     |151.44|chicago    |
|1      |7     |30.38 |new york   |
|1      |8     |724.78|seattle    |
|1      |9     |220.22|los angeles|
|1      |10    |191.57|los angeles|
|1      |11    |615.54|miami      |
|2      |12    |298.32|chicago    |
|2      |13    |405.86|los angeles|
|2      |14    |974.02|seattle    |
|2      |15    |99.7  |seattle    |
|2      |16    |207.68|los angeles|
|2      |17    |596.49|chicago    |
|2      |18    |861.34|miami      |
|2      |19    |455.99|new york   |
|2      |20    |949.4 |los angeles|
|2      |21    |567.66|new york   |
|2      |22    |311.57|miami      |
|2      |23    |238.58|los angeles|
|2      |24    |445.75|los angeles|
|3      |25    |44.04 |seatt

* A `batch` refers to a group of rows that are processed together.

In [12]:
customers_file = "../data/data_skew/customers.parquet"
df_customers = spark.read.parquet(customers_file)

In [13]:
df_customers.show(5, False)

+-------+--------------+-----------+---+------+----------+
|cust_id|name          |city       |age|gender|birthday  |
+-------+--------------+-----------+---+------+----------+
|1      |John Doe      |miami      |21 |F     |1968-10-03|
|2      |Jane Smith    |boston     |33 |M     |1949-10-18|
|3      |Bob Johnson   |new york   |23 |F     |1988-02-24|
|4      |Alice Brown   |los angeles|50 |M     |1957-07-02|
|5      |Charlie Wilson|boston     |56 |M     |1964-01-06|
+-------+--------------+-----------+---+------+----------+
only showing top 5 rows



# Narrow Transformations
- `filter` rows where `city='boston'`
- `add` a new column: adding `first_name` and `last_name`
- `alter` an exisitng column: adding 5 to `age` column
- `select` relevant columns

In [14]:
df_narrow_transform = (
    df_customers
    .filter(F.col("city") == "boston")
    .withColumn("first_name", F.split("name", " ").getItem(0))
    .withColumn("last_name", F.split("name", " ").getItem(1))
    .withColumn("age", F.col("age") + F.lit(5))
    .select("cust_id", "first_name", "last_name", "age", "gender", "birthday")
)

df_narrow_transform.write.format("noop").mode("overwrite").save("../data/test/df_narrow_transform.parquet")

In [15]:
df_narrow_transform.show(7, False)

+-------+----------+---------+---+------+----------+
|cust_id|first_name|last_name|age|gender|birthday  |
+-------+----------+---------+---+------+----------+
|2      |Jane      |Smith    |38 |M     |1949-10-18|
|5      |Charlie   |Wilson   |61 |M     |1964-01-06|
|6      |Diana     |Davis    |80 |M     |1956-09-01|
|21     |Steve     |King     |78 |M     |1955-08-05|
|24     |Victoria  |Hill     |81 |M     |1968-10-16|
|38     |Jasmine   |Taylor   |46 |F     |1959-07-01|
|54     |Alice     |Brown    |45 |M     |1987-05-27|
+-------+----------+---------+---+------+----------+
only showing top 7 rows



In [16]:
df_customer_gt_50 = (
    df_customers
    .filter(F.col("age").cast("int") > 50)
)
df_customer_gt_50.write.format("noop").mode("overwrite").save("../data/test/df_customer_gt_50.parquet")

# Wide Transformations
1. Joins
   - Sort Merge Join
   - Broadcast Join
2. GroupBy
   - `count`
   - `countDistinct`
   - `sum`

## 1. Joins

### Sort Merge Join

In [17]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [18]:
df_joined = (
    df_transactions.join(
        df_customers,
        how="inner",
        on="cust_id"
    )
)

In [19]:
df_joined.write.format("noop").mode("overwrite").save("../data/test/df_joined.parquet")

### Broadcast Join

In [20]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)

In [21]:
df_broadcast_joined = (
    df_transactions.join(
        F.broadcast(df_customers),
        how="inner",
        on="cust_id"
    )
)

In [22]:
df_broadcast_joined.write.format("noop").mode("overwrite").save("../data/test/df_broadcast_joined.parquet")

## 2. GroupBy

### GroupBy Count

In [24]:
df_city_counts = (
    df_transactions
    .groupBy("city")
    .count()
)

In [25]:
df_city_counts.show(5, False)

+--------+-----+
|city    |count|
+--------+-----+
|new york|157  |
|chicago |146  |
|boston  |174  |
|seattle |183  |
|miami   |190  |
+--------+-----+
only showing top 5 rows



In [26]:
df_txn_amt_city = (
    df_transactions
    .groupBy("city")
    .agg(F.sum("amt").alias("txn_amt"))
)

In [27]:
df_txn_amt_city.show(5, False)

+--------+-----------------+
|city    |txn_amt          |
+--------+-----------------+
|new york|83904.73000000007|
|chicago |67404.06000000003|
|boston  |77801.67000000004|
|seattle |93322.53999999998|
|miami   |96800.70999999996|
+--------+-----------------+
only showing top 5 rows



### GroupBy Count Distinct 

In [28]:
df_txn_per_city = (
    df_transactions
    .groupBy("city")
    .agg(F.countDistinct("txn_id").alias("txn_count"))
)

In [29]:
df_txn_per_city.show(5, False)

+--------+---------+
|city    |txn_count|
+--------+---------+
|new york|157      |
|chicago |146      |
|boston  |174      |
|seattle |183      |
|miami   |190      |
+--------+---------+
only showing top 5 rows



In [30]:
spark.stop()